tab_websocket/bus/
connection_carrier.rs1use super::connection::WebsocketConnectionBus;
2use crate::{
3 message::connection::{WebsocketRecv, WebsocketSend},
4 service::WebsocketService,
5};
6use lifeline::{dyn_bus::DynBus, prelude::*};
7use log::*;
8use serde::{de::DeserializeOwned, Serialize};
9use tokio::{
10 stream::StreamExt,
11 sync::{broadcast, mpsc},
12};
13
14pub struct WebsocketCarrier {
16 _websocket: WebsocketService,
17 _websocket_send: Lifeline,
18 _websocket_recv: Lifeline,
19}
20
21pub trait WebsocketMessageBus: Sized {
24 type Send: Message<Self, Channel = mpsc::Sender<Self::Send>>
25 + Clone
26 + Send
27 + Sync
28 + Serialize
29 + 'static;
30
31 type Recv: Message<Self, Channel = broadcast::Sender<Self::Recv>>
32 + Clone
33 + DeserializeOwned
34 + Send
35 + Sync
36 + 'static;
37}
38
39impl<B: DynBus> CarryFrom<B> for WebsocketConnectionBus
41where
42 B: WebsocketMessageBus,
43{
44 type Lifeline = anyhow::Result<WebsocketCarrier>;
45
46 fn carry_from(&self, bus: &B) -> Self::Lifeline {
47 use tungstenite::Message as TungsteniteMessage;
48
49 let _websocket = WebsocketService::spawn(&self)?;
50 self.capacity::<WebsocketSend>(512)?;
51 self.capacity::<WebsocketRecv>(512)?;
52
53 let _websocket_send = {
54 let mut rx = bus.rx::<B::Send>()?;
55 let mut tx = self.tx::<WebsocketSend>()?;
56
57 Self::try_task("forward_send", async move {
58 while let Some(msg) = rx.recv().await {
59 trace!("send message: {:?}", &msg);
60 match bincode::serialize(&msg) {
61 Ok(vec) => {
62 let send = tx
63 .send(WebsocketSend(TungsteniteMessage::Binary(vec)))
64 .await;
65
66 if let Err(_e) = send {
67 debug!("sender disconnected - aborting carry.");
68 break;
69 }
70 }
71 Err(e) => error!("failed to send websocket msg: {}", e),
72 };
73 }
74
75 tx.send(WebsocketSend(TungsteniteMessage::Close(None)))
76 .await
77 .ok();
78
79 Ok(())
80 })
81 };
82
83 let _websocket_recv = {
84 let mut rx = self.rx::<WebsocketRecv>()?;
85 let mut tx = bus.tx::<B::Recv>()?;
86
87 Self::try_task("forward_recv", async move {
88 while let Some(msg) = rx.next().await {
89 let data = msg.0.into_data();
90 match bincode::deserialize(data.as_slice()) {
91 Ok(message) => {
92 trace!("recv message: {:?}", &message);
93 tx.send(message).await?;
94 }
95 Err(e) => error!("failed to recv websocket msg: {}", e),
96 };
97 }
98
99 Ok(())
100 })
101 };
102
103 Ok(WebsocketCarrier {
104 _websocket,
105 _websocket_send,
106 _websocket_recv,
107 })
108 }
109}