tab_websocket/service/
listener.rs

1use crate::bus::WebsocketConnectionBus;
2use crate::{
3    bind,
4    bus::WebsocketListenerBus,
5    message::listener::WebsocketConnectionMessage,
6    resource::{
7        connection::WebsocketResource,
8        listener::{WebsocketAuthToken, WebsocketListenerResource},
9    },
10};
11use log::{debug, error};
12
13use lifeline::prelude::*;
14use lifeline::{dyn_bus::DynBus, request::Request as LifelineRequest};
15use tokio::net::TcpListener;
16
17/// An established listener service, which transmits WebsocketConnectionMessages over the listener bus.
18pub struct WebsocketListenerService {
19    _accept: Lifeline,
20}
21
22impl Service for WebsocketListenerService {
23    type Bus = WebsocketListenerBus;
24    type Lifeline = anyhow::Result<Self>;
25
26    fn spawn(bus: &Self::Bus) -> Self::Lifeline {
27        let listener = bus.resource::<WebsocketListenerResource>()?;
28        let auth_token = bus.resource::<WebsocketAuthToken>()?;
29
30        let tx = bus.tx::<WebsocketConnectionMessage>()?;
31        let _accept = Self::try_task("accept", accept_connections(listener.0, tx, auth_token));
32
33        Ok(Self { _accept })
34    }
35}
36
37/// The main runloop for the WebsocketListenerService
38async fn accept_connections(
39    mut listener: TcpListener,
40    mut tx: impl Sender<WebsocketConnectionMessage>,
41    auth_token: WebsocketAuthToken,
42) -> anyhow::Result<()> {
43    loop {
44        let (stream, addr) = listener.accept().await?;
45
46        // TODO: only accept connections from loopback address
47        debug!("connection opened from {:?}", addr);
48
49        let conn_bus = WebsocketConnectionBus::default();
50        let (request, recv_metadata) = LifelineRequest::send(());
51        let bound = match bind(stream, auth_token.clone(), request).await {
52            Ok(res) => res,
53            Err(e) => {
54                error!("error binding websocket: {}", e);
55                continue;
56            }
57        };
58
59        conn_bus.store_resource(WebsocketResource(bound));
60        // let service = WebsocketService::spawn(&conn_bus)?;
61
62        let message = WebsocketConnectionMessage {
63            bus: conn_bus,
64            request: recv_metadata.await?,
65        };
66
67        tx.send(message)
68            .await
69            .map_err(|_| anyhow::Error::msg("send WebsocketConnectionMessage"))?;
70    }
71}
72
73#[cfg(test)]
74pub(crate) async fn serve(
75    token: &str,
76) -> anyhow::Result<(
77    WebsocketListenerBus,
78    WebsocketListenerService,
79    std::net::SocketAddr,
80)> {
81    let bus = WebsocketListenerBus::default();
82    bus.store_resource::<WebsocketAuthToken>(token.into());
83
84    let server = TcpListener::bind("127.0.0.1:0").await?;
85    let addr = server.local_addr()?;
86    let websocket = WebsocketListenerResource(server);
87    bus.store_resource(websocket);
88
89    let lifeline = WebsocketListenerService::spawn(&bus)?;
90    Ok((bus, lifeline, addr))
91}
92
93#[cfg(test)]
94mod tests {
95    use super::{serve, WebsocketListenerService};
96    use crate::{
97        bus::*,
98        message::{
99            connection::{WebsocketRecv, WebsocketSend},
100            listener::WebsocketConnectionMessage,
101        },
102        resource::{
103            connection::WebsocketResource,
104            listener::{WebsocketAuthToken, WebsocketListenerResource},
105        },
106        service::WebsocketService,
107    };
108    use lifeline::{assert_completes, dyn_bus::DynBus, prelude::*};
109    use std::net::SocketAddr;
110    use tokio::net::TcpListener;
111
112    async fn connect(
113        addr: SocketAddr,
114        token: &str,
115    ) -> anyhow::Result<(WebsocketConnectionBus, WebsocketService)> {
116        let bus = WebsocketConnectionBus::default();
117        bus.store_resource::<WebsocketAuthToken>(token.into());
118
119        let connection =
120            crate::connect_authorized(format!("ws://{}", addr), token.to_string()).await?;
121        bus.store_resource(WebsocketResource(connection));
122
123        let lifeline = WebsocketService::spawn(&bus)?;
124        Ok((bus, lifeline))
125    }
126
127    #[tokio::test]
128    async fn test_listener_spawn() -> anyhow::Result<()> {
129        let bus = WebsocketListenerBus::default();
130        bus.store_resource(WebsocketAuthToken::unauthenticated());
131
132        let server = TcpListener::bind("127.0.0.1:0").await?;
133        let websocket = WebsocketListenerResource(server);
134        bus.store_resource(websocket);
135
136        let _listener = WebsocketListenerService::spawn(&bus)?;
137
138        Ok(())
139    }
140
141    #[tokio::test]
142    async fn test_listener_accepts_connection() -> anyhow::Result<()> {
143        let (listener_bus, _listener, addr) = serve("TOKEN").await?;
144
145        let bus = WebsocketConnectionBus::default();
146        let connection =
147            crate::connect_authorized(format!("ws://{}", addr), "TOKEN".to_string()).await?;
148        bus.store_resource(WebsocketResource(connection));
149
150        let _sender = WebsocketService::spawn(&bus)?;
151
152        let mut rx_conn = listener_bus
153            .rx::<WebsocketConnectionMessage>()?
154            .into_inner();
155        let conn = rx_conn.try_recv();
156
157        assert!(conn.is_ok());
158
159        Ok(())
160    }
161
162    #[tokio::test]
163    async fn test_send_request() -> anyhow::Result<()> {
164        let (listener_bus, _serve, addr) = serve("TOKEN").await?;
165        let (bus, _connect) = connect(addr, "TOKEN").await?;
166
167        let mut rx_conn = listener_bus
168            .rx::<WebsocketConnectionMessage>()?
169            .into_inner();
170        let conn = rx_conn.try_recv()?;
171        let _serve = WebsocketService::spawn(&conn.bus)?;
172
173        let mut tx_request = bus.tx::<WebsocketSend>()?;
174        let mut rx_request = conn.bus.rx::<WebsocketRecv>()?;
175
176        tx_request
177            .send(WebsocketSend(tungstenite::Message::Text(
178                "request".to_string(),
179            )))
180            .await?;
181
182        assert_completes!(async move {
183            let request_recv = rx_request.recv().await.expect("rx_request recv");
184            let request_recv = request_recv.0.into_text().expect("into text");
185            assert_eq!("request", request_recv);
186        });
187
188        Ok(())
189    }
190
191    #[tokio::test]
192    async fn test_send_response() -> anyhow::Result<()> {
193        let (listener_bus, _serve, addr) = serve("TOKEN").await?;
194        let (bus, _connect) = connect(addr, "TOKEN").await?;
195
196        let mut rx_conn = listener_bus
197            .rx::<WebsocketConnectionMessage>()?
198            .into_inner();
199        let conn = rx_conn.try_recv()?;
200        let _serve = WebsocketService::spawn(&conn.bus)?;
201
202        let mut rx_response = bus.rx::<WebsocketRecv>()?;
203
204        let mut tx_response = conn.bus.tx::<WebsocketSend>()?;
205
206        tx_response
207            .send(WebsocketSend(tungstenite::Message::Text(
208                "response".to_string(),
209            )))
210            .await?;
211
212        assert_completes!(async move {
213            let response_recv = rx_response.recv().await;
214            assert!(response_recv.is_some());
215            let response_recv = response_recv.unwrap().0.into_text().expect("into text");
216            assert_eq!("response", response_recv);
217        });
218
219        Ok(())
220    }
221}