1use crate::{protocol::*, *};
4#[cfg(feature = "mockall")]
5use mockall::automock;
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::sync::{Arc, Mutex, Weak};
9
10pub struct TxImpHnd {
16 handler: DynTxHandler,
17 space_map: Arc<Mutex<HashMap<SpaceId, DynTxSpaceHandler>>>,
18 mod_map: Arc<Mutex<HashMap<(SpaceId, String), DynTxModuleHandler>>>,
19}
20
21impl TxImpHnd {
22 pub fn new(handler: DynTxHandler) -> Arc<Self> {
26 Arc::new(Self {
27 handler,
28 space_map: Arc::new(Mutex::new(HashMap::new())),
29 mod_map: Arc::new(Mutex::new(HashMap::new())),
30 })
31 }
32
33 pub fn new_listening_address(&self, this_url: Url) -> BoxFut<'static, ()> {
36 let handler = self.handler.clone();
37 let space_map = self
38 .space_map
39 .clone()
40 .lock()
41 .unwrap()
42 .values()
43 .cloned()
44 .collect::<Vec<_>>();
45
46 Box::pin(async move {
47 handler.new_listening_address(this_url.clone()).await;
48 for s in space_map {
49 s.new_listening_address(this_url.clone()).await;
50 }
51 })
52 }
53
54 pub fn peer_connect(&self, peer: Url) -> K2Result<bytes::Bytes> {
61 for mod_handler in self.mod_map.lock().unwrap().values() {
62 mod_handler.peer_connect(peer.clone())?;
63 }
64 for space_handler in self.space_map.lock().unwrap().values() {
65 space_handler.peer_connect(peer.clone())?;
66 }
67 self.handler.peer_connect(peer.clone())?;
68 let preflight = self.handler.preflight_gather_outgoing(peer)?;
69 let enc = (K2Proto {
70 ty: K2WireType::Preflight as i32,
71 data: preflight,
72 space: None,
73 module: None,
74 })
75 .encode()?;
76 Ok(enc)
77 }
78
79 pub fn peer_disconnect(&self, peer: Url, reason: Option<String>) {
81 for h in self.mod_map.lock().unwrap().values() {
82 h.peer_disconnect(peer.clone(), reason.clone());
83 }
84 for h in self.space_map.lock().unwrap().values() {
85 h.peer_disconnect(peer.clone(), reason.clone());
86 }
87 self.handler.peer_disconnect(peer, reason);
88 }
89
90 pub fn recv_data(&self, peer: Url, data: bytes::Bytes) -> K2Result<()> {
92 let data = K2Proto::decode(&data)?;
93 let ty = data.ty();
94 let K2Proto {
95 space,
96 module,
97 data,
98 ..
99 } = data;
100
101 match ty {
102 K2WireType::Unspecified => Ok(()),
103 K2WireType::Preflight => {
104 self.handler.preflight_validate_incoming(peer, data)
105 }
106 K2WireType::Notify => {
107 if let Some(space) = space {
108 let space = SpaceId::from(space);
109 if let Some(h) = self.space_map.lock().unwrap().get(&space)
110 {
111 h.recv_space_notify(peer, space, data)?;
112 }
113 }
114 Ok(())
115 }
116 K2WireType::Module => {
117 if let (Some(space), Some(module)) = (space, module) {
118 let space = SpaceId::from(space);
119 if let Some(h) = self
120 .mod_map
121 .lock()
122 .unwrap()
123 .get(&(space.clone(), module.clone()))
124 {
125 h.recv_module_msg(peer, space, module.clone(), data).inspect_err(|e| {
126 tracing::warn!(?module, "Error in recv_module_msg, peer connection will be closed: {e}");
127 })?;
128 }
129 }
130 Ok(())
131 }
132 K2WireType::Disconnect => {
133 let reason = String::from_utf8_lossy(&data).to_string();
134 Err(K2Error::other(format!("Remote Disconnect: {reason}")))
135 }
136 }
137 }
138
139 pub fn set_unresponsive(
143 &self,
144 peer: Url,
145 when: Timestamp,
146 ) -> BoxFut<'_, K2Result<()>> {
147 let space_map = self.space_map.lock().unwrap().clone();
148 Box::pin(async move {
149 for (space_id, space_handler) in space_map.iter() {
150 if let Err(e) =
151 space_handler.set_unresponsive(peer.clone(), when).await
152 {
153 tracing::error!("Failed to mark peer with url {peer} as unresponsive in space {space_id}: {e}");
154 };
155 }
156 Ok(())
157 })
158 }
159}
160
161impl std::fmt::Debug for TxImpHnd {
162 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163 write!(f,"TxImpHnd {{ handler: {:?}, space_map: [{} entries], mod_map: [{} entries] }}", self.handler, self.space_map.lock().unwrap().len(), self.mod_map.lock().unwrap().len() )
164 }
165}
166
167pub trait TxImp: 'static + Send + Sync + std::fmt::Debug {
169 fn url(&self) -> Option<Url>;
171
172 fn disconnect(
178 &self,
179 peer: Url,
180 payload: Option<(String, bytes::Bytes)>,
181 ) -> BoxFut<'_, ()>;
182
183 fn send(&self, peer: Url, data: bytes::Bytes) -> BoxFut<'_, K2Result<()>>;
186
187 fn dump_network_stats(&self) -> BoxFut<'_, K2Result<TransportStats>>;
189}
190
191pub type DynTxImp = Arc<dyn TxImp>;
193
194#[cfg_attr(any(test, feature = "mockall"), automock)]
196pub trait Transport: 'static + Send + Sync + std::fmt::Debug {
197 fn register_space_handler(
204 &self,
205 space: SpaceId,
206 handler: DynTxSpaceHandler,
207 ) -> Option<Url>;
208
209 fn register_module_handler(
214 &self,
215 space: SpaceId,
216 module: String,
217 handler: DynTxModuleHandler,
218 );
219
220 fn disconnect(&self, peer: Url, reason: Option<String>) -> BoxFut<'_, ()>;
224
225 fn send_space_notify(
231 &self,
232 peer: Url,
233 space: SpaceId,
234 data: bytes::Bytes,
235 ) -> BoxFut<'_, K2Result<()>>;
236
237 fn send_module(
243 &self,
244 peer: Url,
245 space: SpaceId,
246 module: String,
247 data: bytes::Bytes,
248 ) -> BoxFut<'_, K2Result<()>>;
249
250 fn dump_network_stats(&self) -> BoxFut<'_, K2Result<TransportStats>>;
252}
253
254pub type DynTransport = Arc<dyn Transport>;
256
257pub type WeakDynTransport = Weak<dyn Transport>;
265
266#[derive(Clone, Debug)]
268pub struct DefaultTransport {
269 imp: DynTxImp,
270 space_map: Arc<Mutex<HashMap<SpaceId, DynTxSpaceHandler>>>,
271 mod_map: Arc<Mutex<HashMap<(SpaceId, String), DynTxModuleHandler>>>,
272}
273
274impl DefaultTransport {
275 pub fn create(hnd: &TxImpHnd, imp: DynTxImp) -> DynTransport {
281 let out: DynTransport = Arc::new(DefaultTransport {
282 imp,
283 space_map: hnd.space_map.clone(),
284 mod_map: hnd.mod_map.clone(),
285 });
286 out
287 }
288}
289
290impl Transport for DefaultTransport {
291 fn register_space_handler(
292 &self,
293 space: SpaceId,
294 handler: DynTxSpaceHandler,
295 ) -> Option<Url> {
296 let mut lock = self.space_map.lock().unwrap();
297 if lock.insert(space.clone(), handler).is_some() {
298 panic!("Attempted to register duplicate space handler! {space}");
299 }
300 self.imp.url()
302 }
303
304 fn register_module_handler(
305 &self,
306 space: SpaceId,
307 module: String,
308 handler: DynTxModuleHandler,
309 ) {
310 if self
311 .mod_map
312 .lock()
313 .unwrap()
314 .insert((space.clone(), module.clone()), handler)
315 .is_some()
316 {
317 panic!(
318 "Attempted to register duplicate module handler! {space} {module}"
319 );
320 }
321 }
322
323 fn disconnect(&self, peer: Url, reason: Option<String>) -> BoxFut<'_, ()> {
324 Box::pin(async move {
325 let payload = match reason {
326 None => None,
327 Some(reason) => match (K2Proto {
328 ty: K2WireType::Disconnect as i32,
329 data: bytes::Bytes::copy_from_slice(reason.as_bytes()),
330 space: None,
331 module: None,
332 })
333 .encode()
334 {
335 Ok(payload) => Some((reason, payload)),
336 Err(_) => None,
337 },
338 };
339
340 self.imp.disconnect(peer, payload).await;
341 })
342 }
343
344 fn send_space_notify(
345 &self,
346 peer: Url,
347 space: SpaceId,
348 data: bytes::Bytes,
349 ) -> BoxFut<'_, K2Result<()>> {
350 Box::pin(async move {
351 let enc = (K2Proto {
352 ty: K2WireType::Notify as i32,
353 data,
354 space: Some(space.into()),
355 module: None,
356 })
357 .encode()?;
358 self.imp.send(peer, enc).await
359 })
360 }
361
362 fn send_module(
363 &self,
364 peer: Url,
365 space: SpaceId,
366 module: String,
367 data: bytes::Bytes,
368 ) -> BoxFut<'_, K2Result<()>> {
369 Box::pin(async move {
370 let enc = (K2Proto {
371 ty: K2WireType::Module as i32,
372 data,
373 space: Some(space.into()),
374 module: Some(module),
375 })
376 .encode()?;
377 self.imp.send(peer, enc).await
378 })
379 }
380
381 fn dump_network_stats(&self) -> BoxFut<'_, K2Result<TransportStats>> {
382 self.imp.dump_network_stats()
383 }
384}
385
386pub trait TxBaseHandler: 'static + Send + Sync + std::fmt::Debug {
389 fn new_listening_address(&self, this_url: Url) -> BoxFut<'static, ()> {
392 drop(this_url);
393 Box::pin(async move {})
394 }
395
396 fn peer_connect(&self, peer: Url) -> K2Result<()> {
400 drop(peer);
401 Ok(())
402 }
403
404 fn peer_disconnect(&self, peer: Url, reason: Option<String>) {
407 drop((peer, reason));
408 }
409}
410
411pub trait TxHandler: TxBaseHandler {
413 fn preflight_gather_outgoing(
418 &self,
419 peer_url: Url,
420 ) -> K2Result<bytes::Bytes> {
421 drop(peer_url);
422 Ok(bytes::Bytes::new())
423 }
424
425 fn preflight_validate_incoming(
431 &self,
432 peer_url: Url,
433 data: bytes::Bytes,
434 ) -> K2Result<()> {
435 drop((peer_url, data));
436 Ok(())
437 }
438}
439
440pub type DynTxHandler = Arc<dyn TxHandler>;
442
443pub trait TxSpaceHandler: TxBaseHandler {
445 fn recv_space_notify(
449 &self,
450 peer: Url,
451 space: SpaceId,
452 data: bytes::Bytes,
453 ) -> K2Result<()> {
454 drop((peer, space, data));
455 Ok(())
456 }
457
458 fn set_unresponsive(
460 &self,
461 peer: Url,
462 when: Timestamp,
463 ) -> BoxFut<'_, K2Result<()>> {
464 drop((peer, when));
465 Box::pin(async move { Ok(()) })
466 }
467}
468
469pub type DynTxSpaceHandler = Arc<dyn TxSpaceHandler>;
471
472pub trait TxModuleHandler: TxBaseHandler {
474 fn recv_module_msg(
478 &self,
479 peer: Url,
480 space: SpaceId,
481 module: String,
482 data: bytes::Bytes,
483 ) -> K2Result<()> {
484 drop((peer, space, module, data));
485 Ok(())
486 }
487}
488
489pub type DynTxModuleHandler = Arc<dyn TxModuleHandler>;
491
492pub trait TransportFactory: 'static + Send + Sync + std::fmt::Debug {
494 fn default_config(&self, config: &mut config::Config) -> K2Result<()>;
497
498 fn validate_config(&self, config: &config::Config) -> K2Result<()>;
500
501 fn create(
503 &self,
504 builder: Arc<builder::Builder>,
505 handler: DynTxHandler,
506 ) -> BoxFut<'static, K2Result<DynTransport>>;
507}
508
509pub type DynTransportFactory = Arc<dyn TransportFactory>;
511
512#[derive(Debug, Clone, Serialize, Deserialize)]
516pub struct TransportStats {
517 pub backend: String,
519
520 pub peer_urls: Vec<Url>,
522
523 pub connections: Vec<TransportConnectionStats>,
525}
526
527#[derive(Debug, Clone, Serialize, Deserialize)]
529pub struct TransportConnectionStats {
530 pub pub_key: String,
532
533 pub send_message_count: u64,
535
536 pub send_bytes: u64,
538
539 pub recv_message_count: u64,
541
542 pub recv_bytes: u64,
544
545 pub opened_at_s: u64,
547
548 pub is_webrtc: bool,
550}