1use crate::{protocol::*, *};
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6use std::sync::{Arc, Mutex, Weak};
7
8pub struct TxImpHnd {
14 handler: DynTxHandler,
15 space_map: Arc<Mutex<HashMap<SpaceId, DynTxSpaceHandler>>>,
16 mod_map: Arc<Mutex<HashMap<(SpaceId, String), DynTxModuleHandler>>>,
17}
18
19impl TxImpHnd {
20 pub fn new(handler: DynTxHandler) -> Arc<Self> {
24 Arc::new(Self {
25 handler,
26 space_map: Arc::new(Mutex::new(HashMap::new())),
27 mod_map: Arc::new(Mutex::new(HashMap::new())),
28 })
29 }
30
31 pub fn new_listening_address(&self, this_url: Url) -> BoxFut<'static, ()> {
34 let handler = self.handler.clone();
35 let space_map = self
36 .space_map
37 .clone()
38 .lock()
39 .unwrap()
40 .values()
41 .cloned()
42 .collect::<Vec<_>>();
43
44 Box::pin(async move {
45 handler.new_listening_address(this_url.clone()).await;
46 for s in space_map {
47 s.new_listening_address(this_url.clone()).await;
48 }
49 })
50 }
51
52 pub fn peer_connect(&self, peer: Url) -> K2Result<bytes::Bytes> {
59 for mod_handler in self.mod_map.lock().unwrap().values() {
60 mod_handler.peer_connect(peer.clone())?;
61 }
62 for space_handler in self.space_map.lock().unwrap().values() {
63 space_handler.peer_connect(peer.clone())?;
64 }
65 self.handler.peer_connect(peer.clone())?;
66 let preflight = self.handler.preflight_gather_outgoing(peer)?;
67 let enc = (K2Proto {
68 ty: K2WireType::Preflight as i32,
69 data: preflight,
70 space: None,
71 module: None,
72 })
73 .encode()?;
74 Ok(enc)
75 }
76
77 pub fn peer_disconnect(&self, peer: Url, reason: Option<String>) {
79 for h in self.mod_map.lock().unwrap().values() {
80 h.peer_disconnect(peer.clone(), reason.clone());
81 }
82 for h in self.space_map.lock().unwrap().values() {
83 h.peer_disconnect(peer.clone(), reason.clone());
84 }
85 self.handler.peer_disconnect(peer, reason);
86 }
87
88 pub fn recv_data(&self, peer: Url, data: bytes::Bytes) -> K2Result<()> {
90 let data = K2Proto::decode(&data)?;
91 let ty = data.ty();
92 let K2Proto {
93 space,
94 module,
95 data,
96 ..
97 } = data;
98
99 match ty {
100 K2WireType::Unspecified => Ok(()),
101 K2WireType::Preflight => {
102 self.handler.preflight_validate_incoming(peer, data)
103 }
104 K2WireType::Notify => {
105 if let Some(space) = space {
106 let space = SpaceId::from(space);
107 if let Some(h) = self.space_map.lock().unwrap().get(&space)
108 {
109 h.recv_space_notify(peer, space, data)?;
110 }
111 }
112 Ok(())
113 }
114 K2WireType::Module => {
115 if let (Some(space), Some(module)) = (space, module) {
116 let space = SpaceId::from(space);
117 if let Some(h) = self
118 .mod_map
119 .lock()
120 .unwrap()
121 .get(&(space.clone(), module.clone()))
122 {
123 h.recv_module_msg(peer, space, module.clone(), data).inspect_err(|e| {
124 tracing::warn!(?module, "Error in recv_module_msg, peer connection will be closed: {e}");
125 })?;
126 }
127 }
128 Ok(())
129 }
130 K2WireType::Disconnect => {
131 let reason = String::from_utf8_lossy(&data).to_string();
132 Err(K2Error::other(format!("Remote Disconnect: {reason}")))
133 }
134 }
135 }
136
137 pub fn set_unresponsive(
141 &self,
142 peer: Url,
143 when: Timestamp,
144 ) -> BoxFut<'_, K2Result<()>> {
145 let space_map = self.space_map.lock().unwrap().clone();
146 Box::pin(async move {
147 for (space_id, space_handler) in space_map.iter() {
148 if let Err(e) =
149 space_handler.set_unresponsive(peer.clone(), when).await
150 {
151 tracing::error!("Failed to mark peer with url {peer} as unresponsive in space {space_id}: {e}");
152 };
153 }
154 Ok(())
155 })
156 }
157}
158
159impl std::fmt::Debug for TxImpHnd {
160 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
161 write!(f,"TxImpHnd {{ handler: {:?}, space_map: [{} entries], mod_map: [{} entries] }}", self.handler, self.space_map.lock().unwrap().len(), self.mod_map.lock().unwrap().len() )
162 }
163}
164
165pub trait TxImp: 'static + Send + Sync + std::fmt::Debug {
167 fn url(&self) -> Option<Url>;
169
170 fn disconnect(
176 &self,
177 peer: Url,
178 payload: Option<(String, bytes::Bytes)>,
179 ) -> BoxFut<'_, ()>;
180
181 fn send(&self, peer: Url, data: bytes::Bytes) -> BoxFut<'_, K2Result<()>>;
184
185 fn get_connected_peers(&self) -> BoxFut<'_, K2Result<Vec<Url>>>;
187
188 fn dump_network_stats(&self) -> BoxFut<'_, K2Result<TransportStats>>;
190}
191
192pub type DynTxImp = Arc<dyn TxImp>;
194
195#[cfg_attr(any(test, feature = "mockall"), mockall::automock)]
197pub trait Transport: 'static + Send + Sync + std::fmt::Debug {
198 fn register_space_handler(
205 &self,
206 space: SpaceId,
207 handler: DynTxSpaceHandler,
208 ) -> Option<Url>;
209
210 fn register_module_handler(
215 &self,
216 space: SpaceId,
217 module: String,
218 handler: DynTxModuleHandler,
219 );
220
221 fn disconnect(&self, peer: Url, reason: Option<String>) -> BoxFut<'_, ()>;
225
226 fn send_space_notify(
232 &self,
233 peer: Url,
234 space: SpaceId,
235 data: bytes::Bytes,
236 ) -> BoxFut<'_, K2Result<()>>;
237
238 fn send_module(
244 &self,
245 peer: Url,
246 space: SpaceId,
247 module: String,
248 data: bytes::Bytes,
249 ) -> BoxFut<'_, K2Result<()>>;
250
251 fn get_connected_peers(&self) -> BoxFut<'_, K2Result<Vec<Url>>>;
253
254 fn unregister_space(&self, space: SpaceId) -> BoxFut<'_, ()>;
256
257 fn dump_network_stats(&self) -> BoxFut<'_, K2Result<TransportStats>>;
259}
260
261pub type DynTransport = Arc<dyn Transport>;
263
264pub type WeakDynTransport = Weak<dyn Transport>;
272
273#[derive(Clone, Debug)]
275pub struct DefaultTransport {
276 imp: DynTxImp,
277 space_map: Arc<Mutex<HashMap<SpaceId, DynTxSpaceHandler>>>,
278 mod_map: Arc<Mutex<HashMap<(SpaceId, String), DynTxModuleHandler>>>,
279}
280
281impl DefaultTransport {
282 pub fn create(hnd: &TxImpHnd, imp: DynTxImp) -> DynTransport {
288 let out: DynTransport = Arc::new(DefaultTransport {
289 imp,
290 space_map: hnd.space_map.clone(),
291 mod_map: hnd.mod_map.clone(),
292 });
293 out
294 }
295}
296
297impl Transport for DefaultTransport {
298 fn register_space_handler(
299 &self,
300 space: SpaceId,
301 handler: DynTxSpaceHandler,
302 ) -> Option<Url> {
303 let mut lock = self.space_map.lock().unwrap();
304 if lock.insert(space.clone(), handler).is_some() {
305 panic!("Attempted to register duplicate space handler! {space}");
306 }
307 self.imp.url()
309 }
310
311 fn register_module_handler(
312 &self,
313 space: SpaceId,
314 module: String,
315 handler: DynTxModuleHandler,
316 ) {
317 if self
318 .mod_map
319 .lock()
320 .unwrap()
321 .insert((space.clone(), module.clone()), handler)
322 .is_some()
323 {
324 panic!(
325 "Attempted to register duplicate module handler! {space} {module}"
326 );
327 }
328 }
329
330 fn disconnect(&self, peer: Url, reason: Option<String>) -> BoxFut<'_, ()> {
331 Box::pin(async move {
332 let payload = match reason {
333 None => None,
334 Some(reason) => match (K2Proto {
335 ty: K2WireType::Disconnect as i32,
336 data: bytes::Bytes::copy_from_slice(reason.as_bytes()),
337 space: None,
338 module: None,
339 })
340 .encode()
341 {
342 Ok(payload) => Some((reason, payload)),
343 Err(_) => None,
344 },
345 };
346
347 self.imp.disconnect(peer, payload).await;
348 })
349 }
350
351 fn send_space_notify(
352 &self,
353 peer: Url,
354 space: SpaceId,
355 data: bytes::Bytes,
356 ) -> BoxFut<'_, K2Result<()>> {
357 Box::pin(async move {
358 let enc = (K2Proto {
359 ty: K2WireType::Notify as i32,
360 data,
361 space: Some(space.into()),
362 module: None,
363 })
364 .encode()?;
365 self.imp.send(peer, enc).await
366 })
367 }
368
369 fn send_module(
370 &self,
371 peer: Url,
372 space: SpaceId,
373 module: String,
374 data: bytes::Bytes,
375 ) -> BoxFut<'_, K2Result<()>> {
376 Box::pin(async move {
377 let enc = (K2Proto {
378 ty: K2WireType::Module as i32,
379 data,
380 space: Some(space.into()),
381 module: Some(module),
382 })
383 .encode()?;
384 self.imp.send(peer, enc).await
385 })
386 }
387
388 fn get_connected_peers(&self) -> BoxFut<'_, K2Result<Vec<Url>>> {
389 self.imp.get_connected_peers()
390 }
391
392 fn unregister_space(&self, space: SpaceId) -> BoxFut<'_, ()> {
393 Box::pin(async move {
394 self.space_map.lock().unwrap().remove(&space);
396
397 self.mod_map.lock().unwrap().retain(|(s, _), _| s != &space);
400 })
401 }
402
403 fn dump_network_stats(&self) -> BoxFut<'_, K2Result<TransportStats>> {
404 self.imp.dump_network_stats()
405 }
406}
407
408pub trait TxBaseHandler: 'static + Send + Sync + std::fmt::Debug {
411 fn new_listening_address(&self, this_url: Url) -> BoxFut<'static, ()> {
414 drop(this_url);
415 Box::pin(async move {})
416 }
417
418 fn peer_connect(&self, peer: Url) -> K2Result<()> {
422 drop(peer);
423 Ok(())
424 }
425
426 fn peer_disconnect(&self, peer: Url, reason: Option<String>) {
429 drop((peer, reason));
430 }
431}
432
433pub trait TxHandler: TxBaseHandler {
435 fn preflight_gather_outgoing(
440 &self,
441 peer_url: Url,
442 ) -> K2Result<bytes::Bytes> {
443 drop(peer_url);
444 Ok(bytes::Bytes::new())
445 }
446
447 fn preflight_validate_incoming(
453 &self,
454 peer_url: Url,
455 data: bytes::Bytes,
456 ) -> K2Result<()> {
457 drop((peer_url, data));
458 Ok(())
459 }
460}
461
462pub type DynTxHandler = Arc<dyn TxHandler>;
464
465pub trait TxSpaceHandler: TxBaseHandler {
467 fn recv_space_notify(
471 &self,
472 peer: Url,
473 space: SpaceId,
474 data: bytes::Bytes,
475 ) -> K2Result<()> {
476 drop((peer, space, data));
477 Ok(())
478 }
479
480 fn set_unresponsive(
482 &self,
483 peer: Url,
484 when: Timestamp,
485 ) -> BoxFut<'_, K2Result<()>> {
486 drop((peer, when));
487 Box::pin(async move { Ok(()) })
488 }
489}
490
491pub type DynTxSpaceHandler = Arc<dyn TxSpaceHandler>;
493
494pub trait TxModuleHandler: TxBaseHandler {
496 fn recv_module_msg(
500 &self,
501 peer: Url,
502 space: SpaceId,
503 module: String,
504 data: bytes::Bytes,
505 ) -> K2Result<()> {
506 drop((peer, space, module, data));
507 Ok(())
508 }
509}
510
511pub type DynTxModuleHandler = Arc<dyn TxModuleHandler>;
513
514pub trait TransportFactory: 'static + Send + Sync + std::fmt::Debug {
516 fn default_config(&self, config: &mut config::Config) -> K2Result<()>;
519
520 fn validate_config(&self, config: &config::Config) -> K2Result<()>;
522
523 fn create(
525 &self,
526 builder: Arc<builder::Builder>,
527 handler: DynTxHandler,
528 ) -> BoxFut<'static, K2Result<DynTransport>>;
529}
530
531pub type DynTransportFactory = Arc<dyn TransportFactory>;
533
534#[derive(Debug, Clone, Serialize, Deserialize)]
538pub struct TransportStats {
539 pub backend: String,
541
542 pub peer_urls: Vec<Url>,
544
545 pub connections: Vec<TransportConnectionStats>,
547}
548
549#[derive(Debug, Clone, Serialize, Deserialize)]
551pub struct TransportConnectionStats {
552 pub pub_key: String,
554
555 pub send_message_count: u64,
557
558 pub send_bytes: u64,
560
561 pub recv_message_count: u64,
563
564 pub recv_bytes: u64,
566
567 pub opened_at_s: u64,
569
570 pub is_webrtc: bool,
572}