hyperlane_plugin_websocket/websocket/
impl.rs1use crate::*;
2
3impl BroadcastTypeTrait for String {}
4impl BroadcastTypeTrait for &str {}
5impl BroadcastTypeTrait for char {}
6impl BroadcastTypeTrait for bool {}
7impl BroadcastTypeTrait for i8 {}
8impl BroadcastTypeTrait for i16 {}
9impl BroadcastTypeTrait for i32 {}
10impl BroadcastTypeTrait for i64 {}
11impl BroadcastTypeTrait for i128 {}
12impl BroadcastTypeTrait for isize {}
13impl BroadcastTypeTrait for u8 {}
14impl BroadcastTypeTrait for u16 {}
15impl BroadcastTypeTrait for u32 {}
16impl BroadcastTypeTrait for u64 {}
17impl BroadcastTypeTrait for u128 {}
18impl BroadcastTypeTrait for usize {}
19impl BroadcastTypeTrait for f32 {}
20impl BroadcastTypeTrait for f64 {}
21impl BroadcastTypeTrait for IpAddr {}
22impl BroadcastTypeTrait for Ipv4Addr {}
23impl BroadcastTypeTrait for Ipv6Addr {}
24impl BroadcastTypeTrait for SocketAddr {}
25impl BroadcastTypeTrait for NonZeroU8 {}
26impl BroadcastTypeTrait for NonZeroU16 {}
27impl BroadcastTypeTrait for NonZeroU32 {}
28impl BroadcastTypeTrait for NonZeroU64 {}
29impl BroadcastTypeTrait for NonZeroU128 {}
30impl BroadcastTypeTrait for NonZeroUsize {}
31impl BroadcastTypeTrait for NonZeroI8 {}
32impl BroadcastTypeTrait for NonZeroI16 {}
33impl BroadcastTypeTrait for NonZeroI32 {}
34impl BroadcastTypeTrait for NonZeroI64 {}
35impl BroadcastTypeTrait for NonZeroI128 {}
36impl BroadcastTypeTrait for NonZeroIsize {}
37impl BroadcastTypeTrait for Infallible {}
38
39impl BroadcastTypeTrait for &String {}
40impl BroadcastTypeTrait for &&str {}
41impl BroadcastTypeTrait for &char {}
42impl BroadcastTypeTrait for &bool {}
43impl BroadcastTypeTrait for &i8 {}
44impl BroadcastTypeTrait for &i16 {}
45impl BroadcastTypeTrait for &i32 {}
46impl BroadcastTypeTrait for &i64 {}
47impl BroadcastTypeTrait for &i128 {}
48impl BroadcastTypeTrait for &isize {}
49impl BroadcastTypeTrait for &u8 {}
50impl BroadcastTypeTrait for &u16 {}
51impl BroadcastTypeTrait for &u32 {}
52impl BroadcastTypeTrait for &u64 {}
53impl BroadcastTypeTrait for &u128 {}
54impl BroadcastTypeTrait for &usize {}
55impl BroadcastTypeTrait for &f32 {}
56impl BroadcastTypeTrait for &f64 {}
57impl BroadcastTypeTrait for &IpAddr {}
58impl BroadcastTypeTrait for &Ipv4Addr {}
59impl BroadcastTypeTrait for &Ipv6Addr {}
60impl BroadcastTypeTrait for &SocketAddr {}
61impl BroadcastTypeTrait for &NonZeroU8 {}
62impl BroadcastTypeTrait for &NonZeroU16 {}
63impl BroadcastTypeTrait for &NonZeroU32 {}
64impl BroadcastTypeTrait for &NonZeroU64 {}
65impl BroadcastTypeTrait for &NonZeroU128 {}
66impl BroadcastTypeTrait for &NonZeroUsize {}
67impl BroadcastTypeTrait for &NonZeroI8 {}
68impl BroadcastTypeTrait for &NonZeroI16 {}
69impl BroadcastTypeTrait for &NonZeroI32 {}
70impl BroadcastTypeTrait for &NonZeroI64 {}
71impl BroadcastTypeTrait for &NonZeroI128 {}
72impl BroadcastTypeTrait for &NonZeroIsize {}
73impl BroadcastTypeTrait for &Infallible {}
74
75impl<B: BroadcastTypeTrait> BroadcastType<B> {
76 pub fn get_key(broadcast_type: BroadcastType<B>) -> String {
77 match broadcast_type {
78 BroadcastType::PointToPoint(key1, key2) => {
79 let (first_key, second_key) = if key1 <= key2 {
80 (key1, key2)
81 } else {
82 (key2, key1)
83 };
84 format!(
85 "{}-{}-{}",
86 POINT_TO_POINT_KEY,
87 first_key.to_string(),
88 second_key.to_string()
89 )
90 }
91 BroadcastType::PointToGroup(key) => {
92 format!("{}-{}", POINT_TO_GROUP_KEY, key.to_string())
93 }
94 }
95 }
96}
97
98impl WebSocket {
99 pub fn new() -> Self {
100 Self {
101 broadcast_map: BroadcastMap::default(),
102 }
103 }
104
105 fn subscribe_unwrap_or_insert<B: BroadcastTypeTrait>(
106 &self,
107 broadcast_type: BroadcastType<B>,
108 ) -> BroadcastMapReceiver<Vec<u8>> {
109 let key: String = BroadcastType::get_key(broadcast_type);
110 self.broadcast_map.subscribe_unwrap_or_insert(&key)
111 }
112
113 fn point_to_point<B: BroadcastTypeTrait>(
114 &self,
115 key1: &B,
116 key2: &B,
117 ) -> BroadcastMapReceiver<Vec<u8>> {
118 self.subscribe_unwrap_or_insert(BroadcastType::PointToPoint(key1.clone(), key2.clone()))
119 }
120
121 fn point_to_group<B: BroadcastTypeTrait>(&self, key: &B) -> BroadcastMapReceiver<Vec<u8>> {
122 self.subscribe_unwrap_or_insert(BroadcastType::PointToGroup(key.clone()))
123 }
124
125 pub fn receiver_count<'a, B: BroadcastTypeTrait>(
126 &self,
127 broadcast_type: BroadcastType<B>,
128 ) -> ReceiverCount {
129 let key: String = BroadcastType::get_key(broadcast_type);
130 self.broadcast_map.receiver_count(&key).unwrap_or(0)
131 }
132
133 pub fn receiver_count_after_increment<B: BroadcastTypeTrait>(
134 &self,
135 broadcast_type: BroadcastType<B>,
136 ) -> ReceiverCount {
137 let count: ReceiverCount = self.receiver_count(broadcast_type);
138 count.max(0).min(ReceiverCount::MAX - 1) + 1
139 }
140
141 pub fn receiver_count_after_decrement<B: BroadcastTypeTrait>(
142 &self,
143 broadcast_type: BroadcastType<B>,
144 ) -> ReceiverCount {
145 let count: ReceiverCount = self.receiver_count(broadcast_type);
146 count.max(1).min(ReceiverCount::MAX) - 1
147 }
148
149 pub fn send<T, B>(
150 &self,
151 broadcast_type: BroadcastType<B>,
152 data: T,
153 ) -> BroadcastMapSendResult<Vec<u8>>
154 where
155 T: Into<Vec<u8>>,
156 B: BroadcastTypeTrait,
157 {
158 let key: String = BroadcastType::get_key(broadcast_type);
159 self.broadcast_map.send(&key, data.into())
160 }
161
162 pub async fn run<'a, F1, Fut1, F2, Fut2, F3, Fut3, B>(
163 &self,
164 ctx: &Context,
165 buffer_size: usize,
166 broadcast_type: BroadcastType<B>,
167 request_handler: F1,
168 on_sended: F2,
169 on_client_closed: F3,
170 ) where
171 F1: FnSendSyncStatic<Fut1>,
172 Fut1: FutureSendStatic,
173 F2: FnSendSyncStatic<Fut2>,
174 Fut2: FutureSendStatic,
175 F3: FnSendSyncStatic<Fut3>,
176 Fut3: FutureSendStatic,
177 B: BroadcastTypeTrait,
178 {
179 let mut receiver: Receiver<Vec<u8>> = match &broadcast_type {
180 BroadcastType::PointToPoint(key1, key2) => self.point_to_point(key1, key2),
181 BroadcastType::PointToGroup(key) => self.point_to_group(key),
182 };
183 let key: String = BroadcastType::get_key(broadcast_type);
184 let result_handle = || async {
185 ctx.aborted().await;
186 ctx.closed().await;
187 };
188 loop {
189 tokio::select! {
190 request_res = ctx.ws_request_from_stream(buffer_size) => {
191 let mut need_break = false;
192 if request_res.is_ok() {
193 request_handler(ctx.clone()).await;
194 } else {
195 need_break = true;
196 on_client_closed(ctx.clone()).await;
197 }
198 let body: ResponseBody = ctx.get_response_body().await;
199 let send_res: BroadcastMapSendResult<_> = self.broadcast_map.send(&key, body);
200 on_sended(ctx.clone()).await;
201 if need_break || send_res.is_err() {
202 break;
203 }
204 },
205 msg_res = receiver.recv() => {
206 if let Ok(msg) = msg_res {
207 if ctx.set_response_body(msg).await.send_body().await.is_err() {
208 break;
209 }
210 } else {
211 break;
212 }
213 }
214 }
215 }
216 result_handle().await;
217 }
218}