tab_websocket/service/
connection.rs1use crate::{
2 bus::WebsocketConnectionBus,
3 message::connection::{WebsocketRecv, WebsocketSend},
4 resource::connection::WebsocketResource,
5};
6use lifeline::prelude::*;
7use log::debug;
8use tokio::select;
9
10use crate::common::{self, should_terminate};
11
12use futures::{SinkExt, StreamExt};
13use log::{error, trace};
14
15use anyhow::Context;
16use std::fmt::Debug;
17
18use lifeline::error::{
19 ResourceTakenError, ResourceUninitializedError, TakeChannelError, TakeResourceError,
20};
21use thiserror::Error;
22use tungstenite::Error;
23
24#[derive(Debug)]
27pub struct WebsocketService {
28 _runloop: Lifeline,
29}
30
31impl Service for WebsocketService {
32 type Bus = WebsocketConnectionBus;
33 type Lifeline = Result<Self, WebsocketSpawnError>;
34
35 fn spawn(bus: &WebsocketConnectionBus) -> Result<Self, WebsocketSpawnError> {
36 let websocket = bus
41 .resource::<WebsocketResource>()
42 .map_err(WebsocketSpawnError::socket_error)?;
43
44 let rx = bus
45 .rx::<WebsocketSend>()
46 .map_err(WebsocketSpawnError::bus_failure)?;
47
48 let tx = bus
49 .tx::<WebsocketRecv>()
50 .map_err(WebsocketSpawnError::bus_failure)?;
51
52 let _runloop = Self::try_task("run", runloop(websocket, rx, tx));
53
54 Ok(Self { _runloop })
55 }
56}
57
58async fn runloop(
59 mut websocket_drop: WebsocketResource,
60 mut rx: impl Receiver<WebsocketSend>,
61 mut tx: impl Sender<WebsocketRecv>,
62) -> anyhow::Result<()> {
63 let websocket = &mut websocket_drop.0;
64 loop {
65 select!(
66 message = websocket.next() => {
67 if let None = message {
68 debug!("terminating - websocket disconnected");
69 break;
70 }
71
72 trace!("message received: {:?}", &message);
73
74 let message = message.unwrap();
75 if let Err(e) = message {
76 match e {
77 Error::ConnectionClosed | Error::AlreadyClosed | Error::Protocol(_)=> {
78 break;
79 },
80 _ => {
81 error!("message error: {}", e);
82 continue;
83 }
84 }
85 }
86
87 let message = message.unwrap();
88 if should_terminate(&message) {
89 debug!("terminating - received close");
90 break;
91 }
92
93 tx.send(WebsocketRecv(message)).await.context("send WebsocketRecv")?;
94 },
95 message = rx.recv() => {
96 if !message.is_some() {
97 common::send_close(websocket).await;
98
99 debug!("terminating - channel disconnected");
100 break;
101 }
102
103 let message = message.unwrap();
104
105 trace!("send message: {:?}", &message);
106 websocket.send(message.0).await.context("wire send Tungstenite::Message")?;
107 },
108 );
109 }
110
111 debug!("server loop terminated");
112 Ok(())
113}
114#[derive(Error, Debug)]
115pub enum WebsocketSpawnError {
116 #[error("resource taken: {0}")]
117 SocketTaken(ResourceTakenError),
118
119 #[error("socket uninitialized: {0}")]
120 SocketUninitialized(ResourceUninitializedError),
121
122 #[error("websocket channel taken: {0}")]
123 BusFailure(TakeChannelError),
124}
125
126impl WebsocketSpawnError {
127 pub fn socket_error(err: TakeResourceError) -> Self {
128 match err {
129 TakeResourceError::Uninitialized(uninit) => Self::SocketUninitialized(uninit),
130 TakeResourceError::Taken(taken) => Self::SocketTaken(taken),
131 }
132 }
133
134 pub fn bus_failure(err: TakeChannelError) -> Self {
135 Self::BusFailure(err)
136 }
137}
138
139#[cfg(test)]
140mod test {
141 use super::WebsocketService;
142 use crate::bus::WebsocketConnectionBus;
143 use crate::{
144 connect_authorized,
145 message::{
146 connection::{WebsocketRecv, WebsocketSend},
147 listener::WebsocketConnectionMessage,
148 },
149 resource::{connection::WebsocketResource, listener::WebsocketAuthToken},
150 service::listener,
151 };
152 use lifeline::prelude::*;
153 use lifeline::{assert_completes, dyn_bus::DynBus};
154 use tungstenite::Message;
155
156 #[tokio::test]
157 async fn connect_authenticated() -> anyhow::Result<()> {
158 let (listener_bus, _lifeline, addr) = listener::serve("TOKEN").await?;
159
160 let url = format!("ws://{}", addr);
161 let connect = connect_authorized(url, "TOKEN".to_string()).await?;
162
163 let bus = WebsocketConnectionBus::default();
164 bus.store_resource::<WebsocketAuthToken>("TOKEN".into());
165 bus.store_resource::<WebsocketResource>(WebsocketResource(connect));
166
167 let mut tx_request = bus.tx::<WebsocketSend>()?;
168 let mut rx_conn = listener_bus.rx::<WebsocketConnectionMessage>()?;
169
170 let _service = WebsocketService::spawn(&bus)?;
171
172 tx_request
173 .send(WebsocketSend(Message::Text("request".to_string())))
174 .await?;
175
176 assert_completes!(async move {
177 let conn = rx_conn.recv().await;
178 assert!(conn.is_some());
179 let conn = conn.unwrap();
180 let conn_bus = conn.bus;
181 let _service = WebsocketService::spawn(&conn_bus);
182
183 let mut rx_request = conn_bus
184 .rx::<WebsocketRecv>()
185 .expect("conn_bus rx WebsocketRecv");
186 let request_recv = rx_request.recv().await.expect("rx_request recv");
187 let request_recv = request_recv.0.into_text().expect("into text");
188 assert_eq!("request", request_recv);
189 });
190
191 Ok(())
192 }
193}