tab_websocket/bus/
connection_carrier.rs

1use super::connection::WebsocketConnectionBus;
2use crate::{
3    message::connection::{WebsocketRecv, WebsocketSend},
4    service::WebsocketService,
5};
6use lifeline::{dyn_bus::DynBus, prelude::*};
7use log::*;
8use serde::{de::DeserializeOwned, Serialize};
9use tokio::{
10    stream::StreamExt,
11    sync::{broadcast, mpsc},
12};
13
14/// Carries requests & responses between the websocket, and the attached bus (which must implement WebsocketMessageBus).
15pub struct WebsocketCarrier {
16    _websocket: WebsocketService,
17    _websocket_send: Lifeline,
18    _websocket_recv: Lifeline,
19}
20
21/// Defines a Send and Receive message type, and constrains the Message implementation on the bus.
22/// Allows the WebsocketConnectionBus to carry messages onto the bus.
23pub trait WebsocketMessageBus: Sized {
24    type Send: Message<Self, Channel = mpsc::Sender<Self::Send>>
25        + Clone
26        + Send
27        + Sync
28        + Serialize
29        + 'static;
30
31    type Recv: Message<Self, Channel = broadcast::Sender<Self::Recv>>
32        + Clone
33        + DeserializeOwned
34        + Send
35        + Sync
36        + 'static;
37}
38
39// TODO: why is dynbus required here?? super confusing
40impl<B: DynBus> CarryFrom<B> for WebsocketConnectionBus
41where
42    B: WebsocketMessageBus,
43{
44    type Lifeline = anyhow::Result<WebsocketCarrier>;
45
46    fn carry_from(&self, bus: &B) -> Self::Lifeline {
47        use tungstenite::Message as TungsteniteMessage;
48
49        let _websocket = WebsocketService::spawn(&self)?;
50        self.capacity::<WebsocketSend>(512)?;
51        self.capacity::<WebsocketRecv>(512)?;
52
53        let _websocket_send = {
54            let mut rx = bus.rx::<B::Send>()?;
55            let mut tx = self.tx::<WebsocketSend>()?;
56
57            Self::try_task("forward_send", async move {
58                while let Some(msg) = rx.recv().await {
59                    trace!("send message: {:?}", &msg);
60                    match bincode::serialize(&msg) {
61                        Ok(vec) => {
62                            let send = tx
63                                .send(WebsocketSend(TungsteniteMessage::Binary(vec)))
64                                .await;
65
66                            if let Err(_e) = send {
67                                debug!("sender disconnected - aborting carry.");
68                                break;
69                            }
70                        }
71                        Err(e) => error!("failed to send websocket msg: {}", e),
72                    };
73                }
74
75                tx.send(WebsocketSend(TungsteniteMessage::Close(None)))
76                    .await
77                    .ok();
78
79                Ok(())
80            })
81        };
82
83        let _websocket_recv = {
84            let mut rx = self.rx::<WebsocketRecv>()?;
85            let mut tx = bus.tx::<B::Recv>()?;
86
87            Self::try_task("forward_recv", async move {
88                while let Some(msg) = rx.next().await {
89                    let data = msg.0.into_data();
90                    match bincode::deserialize(data.as_slice()) {
91                        Ok(message) => {
92                            trace!("recv message: {:?}", &message);
93                            tx.send(message).await?;
94                        }
95                        Err(e) => error!("failed to recv websocket msg: {}", e),
96                    };
97                }
98
99                Ok(())
100            })
101        };
102
103        Ok(WebsocketCarrier {
104            _websocket,
105            _websocket_send,
106            _websocket_recv,
107        })
108    }
109}