tab_websocket/service/
connection.rs

1use crate::{
2    bus::WebsocketConnectionBus,
3    message::connection::{WebsocketRecv, WebsocketSend},
4    resource::connection::WebsocketResource,
5};
6use lifeline::prelude::*;
7use log::debug;
8use tokio::select;
9
10use crate::common::{self, should_terminate};
11
12use futures::{SinkExt, StreamExt};
13use log::{error, trace};
14
15use anyhow::Context;
16use std::fmt::Debug;
17
18use lifeline::error::{
19    ResourceTakenError, ResourceUninitializedError, TakeChannelError, TakeResourceError,
20};
21use thiserror::Error;
22use tungstenite::Error;
23
24/// A service which clients & servers use to drive the websocket connection.
25/// Handles connection status & close frames, as well as logging & protocol errors.
26#[derive(Debug)]
27pub struct WebsocketService {
28    _runloop: Lifeline,
29}
30
31impl Service for WebsocketService {
32    type Bus = WebsocketConnectionBus;
33    type Lifeline = Result<Self, WebsocketSpawnError>;
34
35    fn spawn(bus: &WebsocketConnectionBus) -> Result<Self, WebsocketSpawnError> {
36        // let mut websocket = parse_bincode(websocket);
37
38        // let (mut tx_request, rx_request) = mpsc::channel::<Request>(4);
39        // let (tx_response, mut rx_response) = mpsc::channel::<Response>(4);
40        let websocket = bus
41            .resource::<WebsocketResource>()
42            .map_err(WebsocketSpawnError::socket_error)?;
43
44        let rx = bus
45            .rx::<WebsocketSend>()
46            .map_err(WebsocketSpawnError::bus_failure)?;
47
48        let tx = bus
49            .tx::<WebsocketRecv>()
50            .map_err(WebsocketSpawnError::bus_failure)?;
51
52        let _runloop = Self::try_task("run", runloop(websocket, rx, tx));
53
54        Ok(Self { _runloop })
55    }
56}
57
58async fn runloop(
59    mut websocket_drop: WebsocketResource,
60    mut rx: impl Receiver<WebsocketSend>,
61    mut tx: impl Sender<WebsocketRecv>,
62) -> anyhow::Result<()> {
63    let websocket = &mut websocket_drop.0;
64    loop {
65        select!(
66            message = websocket.next() => {
67                if let None = message {
68                    debug!("terminating - websocket disconnected");
69                    break;
70                }
71
72                trace!("message received: {:?}", &message);
73
74                let message = message.unwrap();
75                if let Err(e) = message {
76                    match e {
77                        Error::ConnectionClosed | Error::AlreadyClosed | Error::Protocol(_)=> {
78                            break;
79                        },
80                        _ => {
81                            error!("message error: {}", e);
82                            continue;
83                        }
84                    }
85                }
86
87                let message = message.unwrap();
88                if should_terminate(&message) {
89                    debug!("terminating - received close");
90                    break;
91                }
92
93                tx.send(WebsocketRecv(message)).await.context("send WebsocketRecv")?;
94            },
95            message = rx.recv() => {
96                if !message.is_some()  {
97                    common::send_close(websocket).await;
98
99                    debug!("terminating - channel disconnected");
100                    break;
101                }
102
103                let message = message.unwrap();
104
105                trace!("send message: {:?}", &message);
106                websocket.send(message.0).await.context("wire send Tungstenite::Message")?;
107            },
108        );
109    }
110
111    debug!("server loop terminated");
112    Ok(())
113}
114#[derive(Error, Debug)]
115pub enum WebsocketSpawnError {
116    #[error("resource taken: {0}")]
117    SocketTaken(ResourceTakenError),
118
119    #[error("socket uninitialized: {0}")]
120    SocketUninitialized(ResourceUninitializedError),
121
122    #[error("websocket channel taken: {0}")]
123    BusFailure(TakeChannelError),
124}
125
126impl WebsocketSpawnError {
127    pub fn socket_error(err: TakeResourceError) -> Self {
128        match err {
129            TakeResourceError::Uninitialized(uninit) => Self::SocketUninitialized(uninit),
130            TakeResourceError::Taken(taken) => Self::SocketTaken(taken),
131        }
132    }
133
134    pub fn bus_failure(err: TakeChannelError) -> Self {
135        Self::BusFailure(err)
136    }
137}
138
139#[cfg(test)]
140mod test {
141    use super::WebsocketService;
142    use crate::bus::WebsocketConnectionBus;
143    use crate::{
144        connect_authorized,
145        message::{
146            connection::{WebsocketRecv, WebsocketSend},
147            listener::WebsocketConnectionMessage,
148        },
149        resource::{connection::WebsocketResource, listener::WebsocketAuthToken},
150        service::listener,
151    };
152    use lifeline::prelude::*;
153    use lifeline::{assert_completes, dyn_bus::DynBus};
154    use tungstenite::Message;
155
156    #[tokio::test]
157    async fn connect_authenticated() -> anyhow::Result<()> {
158        let (listener_bus, _lifeline, addr) = listener::serve("TOKEN").await?;
159
160        let url = format!("ws://{}", addr);
161        let connect = connect_authorized(url, "TOKEN".to_string()).await?;
162
163        let bus = WebsocketConnectionBus::default();
164        bus.store_resource::<WebsocketAuthToken>("TOKEN".into());
165        bus.store_resource::<WebsocketResource>(WebsocketResource(connect));
166
167        let mut tx_request = bus.tx::<WebsocketSend>()?;
168        let mut rx_conn = listener_bus.rx::<WebsocketConnectionMessage>()?;
169
170        let _service = WebsocketService::spawn(&bus)?;
171
172        tx_request
173            .send(WebsocketSend(Message::Text("request".to_string())))
174            .await?;
175
176        assert_completes!(async move {
177            let conn = rx_conn.recv().await;
178            assert!(conn.is_some());
179            let conn = conn.unwrap();
180            let conn_bus = conn.bus;
181            let _service = WebsocketService::spawn(&conn_bus);
182
183            let mut rx_request = conn_bus
184                .rx::<WebsocketRecv>()
185                .expect("conn_bus rx WebsocketRecv");
186            let request_recv = rx_request.recv().await.expect("rx_request recv");
187            let request_recv = request_recv.0.into_text().expect("into text");
188            assert_eq!("request", request_recv);
189        });
190
191        Ok(())
192    }
193}