tab_websocket/service/
listener.rs1use crate::bus::WebsocketConnectionBus;
2use crate::{
3 bind,
4 bus::WebsocketListenerBus,
5 message::listener::WebsocketConnectionMessage,
6 resource::{
7 connection::WebsocketResource,
8 listener::{WebsocketAuthToken, WebsocketListenerResource},
9 },
10};
11use log::{debug, error};
12
13use lifeline::prelude::*;
14use lifeline::{dyn_bus::DynBus, request::Request as LifelineRequest};
15use tokio::net::TcpListener;
16
17pub struct WebsocketListenerService {
19 _accept: Lifeline,
20}
21
22impl Service for WebsocketListenerService {
23 type Bus = WebsocketListenerBus;
24 type Lifeline = anyhow::Result<Self>;
25
26 fn spawn(bus: &Self::Bus) -> Self::Lifeline {
27 let listener = bus.resource::<WebsocketListenerResource>()?;
28 let auth_token = bus.resource::<WebsocketAuthToken>()?;
29
30 let tx = bus.tx::<WebsocketConnectionMessage>()?;
31 let _accept = Self::try_task("accept", accept_connections(listener.0, tx, auth_token));
32
33 Ok(Self { _accept })
34 }
35}
36
37async fn accept_connections(
39 mut listener: TcpListener,
40 mut tx: impl Sender<WebsocketConnectionMessage>,
41 auth_token: WebsocketAuthToken,
42) -> anyhow::Result<()> {
43 loop {
44 let (stream, addr) = listener.accept().await?;
45
46 debug!("connection opened from {:?}", addr);
48
49 let conn_bus = WebsocketConnectionBus::default();
50 let (request, recv_metadata) = LifelineRequest::send(());
51 let bound = match bind(stream, auth_token.clone(), request).await {
52 Ok(res) => res,
53 Err(e) => {
54 error!("error binding websocket: {}", e);
55 continue;
56 }
57 };
58
59 conn_bus.store_resource(WebsocketResource(bound));
60 let message = WebsocketConnectionMessage {
63 bus: conn_bus,
64 request: recv_metadata.await?,
65 };
66
67 tx.send(message)
68 .await
69 .map_err(|_| anyhow::Error::msg("send WebsocketConnectionMessage"))?;
70 }
71}
72
73#[cfg(test)]
74pub(crate) async fn serve(
75 token: &str,
76) -> anyhow::Result<(
77 WebsocketListenerBus,
78 WebsocketListenerService,
79 std::net::SocketAddr,
80)> {
81 let bus = WebsocketListenerBus::default();
82 bus.store_resource::<WebsocketAuthToken>(token.into());
83
84 let server = TcpListener::bind("127.0.0.1:0").await?;
85 let addr = server.local_addr()?;
86 let websocket = WebsocketListenerResource(server);
87 bus.store_resource(websocket);
88
89 let lifeline = WebsocketListenerService::spawn(&bus)?;
90 Ok((bus, lifeline, addr))
91}
92
93#[cfg(test)]
94mod tests {
95 use super::{serve, WebsocketListenerService};
96 use crate::{
97 bus::*,
98 message::{
99 connection::{WebsocketRecv, WebsocketSend},
100 listener::WebsocketConnectionMessage,
101 },
102 resource::{
103 connection::WebsocketResource,
104 listener::{WebsocketAuthToken, WebsocketListenerResource},
105 },
106 service::WebsocketService,
107 };
108 use lifeline::{assert_completes, dyn_bus::DynBus, prelude::*};
109 use std::net::SocketAddr;
110 use tokio::net::TcpListener;
111
112 async fn connect(
113 addr: SocketAddr,
114 token: &str,
115 ) -> anyhow::Result<(WebsocketConnectionBus, WebsocketService)> {
116 let bus = WebsocketConnectionBus::default();
117 bus.store_resource::<WebsocketAuthToken>(token.into());
118
119 let connection =
120 crate::connect_authorized(format!("ws://{}", addr), token.to_string()).await?;
121 bus.store_resource(WebsocketResource(connection));
122
123 let lifeline = WebsocketService::spawn(&bus)?;
124 Ok((bus, lifeline))
125 }
126
127 #[tokio::test]
128 async fn test_listener_spawn() -> anyhow::Result<()> {
129 let bus = WebsocketListenerBus::default();
130 bus.store_resource(WebsocketAuthToken::unauthenticated());
131
132 let server = TcpListener::bind("127.0.0.1:0").await?;
133 let websocket = WebsocketListenerResource(server);
134 bus.store_resource(websocket);
135
136 let _listener = WebsocketListenerService::spawn(&bus)?;
137
138 Ok(())
139 }
140
141 #[tokio::test]
142 async fn test_listener_accepts_connection() -> anyhow::Result<()> {
143 let (listener_bus, _listener, addr) = serve("TOKEN").await?;
144
145 let bus = WebsocketConnectionBus::default();
146 let connection =
147 crate::connect_authorized(format!("ws://{}", addr), "TOKEN".to_string()).await?;
148 bus.store_resource(WebsocketResource(connection));
149
150 let _sender = WebsocketService::spawn(&bus)?;
151
152 let mut rx_conn = listener_bus
153 .rx::<WebsocketConnectionMessage>()?
154 .into_inner();
155 let conn = rx_conn.try_recv();
156
157 assert!(conn.is_ok());
158
159 Ok(())
160 }
161
162 #[tokio::test]
163 async fn test_send_request() -> anyhow::Result<()> {
164 let (listener_bus, _serve, addr) = serve("TOKEN").await?;
165 let (bus, _connect) = connect(addr, "TOKEN").await?;
166
167 let mut rx_conn = listener_bus
168 .rx::<WebsocketConnectionMessage>()?
169 .into_inner();
170 let conn = rx_conn.try_recv()?;
171 let _serve = WebsocketService::spawn(&conn.bus)?;
172
173 let mut tx_request = bus.tx::<WebsocketSend>()?;
174 let mut rx_request = conn.bus.rx::<WebsocketRecv>()?;
175
176 tx_request
177 .send(WebsocketSend(tungstenite::Message::Text(
178 "request".to_string(),
179 )))
180 .await?;
181
182 assert_completes!(async move {
183 let request_recv = rx_request.recv().await.expect("rx_request recv");
184 let request_recv = request_recv.0.into_text().expect("into text");
185 assert_eq!("request", request_recv);
186 });
187
188 Ok(())
189 }
190
191 #[tokio::test]
192 async fn test_send_response() -> anyhow::Result<()> {
193 let (listener_bus, _serve, addr) = serve("TOKEN").await?;
194 let (bus, _connect) = connect(addr, "TOKEN").await?;
195
196 let mut rx_conn = listener_bus
197 .rx::<WebsocketConnectionMessage>()?
198 .into_inner();
199 let conn = rx_conn.try_recv()?;
200 let _serve = WebsocketService::spawn(&conn.bus)?;
201
202 let mut rx_response = bus.rx::<WebsocketRecv>()?;
203
204 let mut tx_response = conn.bus.tx::<WebsocketSend>()?;
205
206 tx_response
207 .send(WebsocketSend(tungstenite::Message::Text(
208 "response".to_string(),
209 )))
210 .await?;
211
212 assert_completes!(async move {
213 let response_recv = rx_response.recv().await;
214 assert!(response_recv.is_some());
215 let response_recv = response_recv.unwrap().0.into_text().expect("into text");
216 assert_eq!("response", response_recv);
217 });
218
219 Ok(())
220 }
221}