kitsune2_api/
transport.rs

1//! Kitsune2 transport related types.
2
3use crate::{protocol::*, *};
4#[cfg(feature = "mockall")]
5use mockall::automock;
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::sync::{Arc, Mutex, Weak};
9
10/// This is the low-level backend transport handler designed to work
11/// with [DefaultTransport].
12/// Construct using ([TxImpHnd::new]), with a high-level [DynTxHandler],
13/// then call [DefaultTransport::create] to return the high-level handler
14/// from the [TransportFactory].
15pub struct TxImpHnd {
16    handler: DynTxHandler,
17    space_map: Arc<Mutex<HashMap<SpaceId, DynTxSpaceHandler>>>,
18    mod_map: Arc<Mutex<HashMap<(SpaceId, String), DynTxModuleHandler>>>,
19}
20
21impl TxImpHnd {
22    /// When constructing a [Transport] from a [TransportFactory],
23    /// you need a [TxImpHnd] for calling transport events.
24    /// Pass the handler into here to construct one.
25    pub fn new(handler: DynTxHandler) -> Arc<Self> {
26        Arc::new(Self {
27            handler,
28            space_map: Arc::new(Mutex::new(HashMap::new())),
29            mod_map: Arc::new(Mutex::new(HashMap::new())),
30        })
31    }
32
33    /// Call this when you receive or bind a new address at which
34    /// this local node can be reached by peers
35    pub fn new_listening_address(&self, this_url: Url) -> BoxFut<'static, ()> {
36        let handler = self.handler.clone();
37        let space_map = self
38            .space_map
39            .clone()
40            .lock()
41            .unwrap()
42            .values()
43            .cloned()
44            .collect::<Vec<_>>();
45
46        Box::pin(async move {
47            handler.new_listening_address(this_url.clone()).await;
48            for s in space_map {
49                s.new_listening_address(this_url.clone()).await;
50            }
51        })
52    }
53
54    /// Call this when you establish an outgoing connection and
55    /// when you establish an incoming connection. If this call
56    /// returns an error, the connection should be closed immediately.
57    /// On success, this function returns bytes that should be
58    /// sent as a preflight message for additional connection validation.
59    /// (The preflight data should be sent even if it is zero length).
60    pub fn peer_connect(&self, peer: Url) -> K2Result<bytes::Bytes> {
61        for mod_handler in self.mod_map.lock().unwrap().values() {
62            mod_handler.peer_connect(peer.clone())?;
63        }
64        for space_handler in self.space_map.lock().unwrap().values() {
65            space_handler.peer_connect(peer.clone())?;
66        }
67        self.handler.peer_connect(peer.clone())?;
68        let preflight = self.handler.preflight_gather_outgoing(peer)?;
69        let enc = (K2Proto {
70            ty: K2WireType::Preflight as i32,
71            data: preflight,
72            space: None,
73            module: None,
74        })
75        .encode()?;
76        Ok(enc)
77    }
78
79    /// Call this whenever a connection is closed.
80    pub fn peer_disconnect(&self, peer: Url, reason: Option<String>) {
81        for h in self.mod_map.lock().unwrap().values() {
82            h.peer_disconnect(peer.clone(), reason.clone());
83        }
84        for h in self.space_map.lock().unwrap().values() {
85            h.peer_disconnect(peer.clone(), reason.clone());
86        }
87        self.handler.peer_disconnect(peer, reason);
88    }
89
90    /// Call this whenever data is received on an open connection.
91    pub fn recv_data(&self, peer: Url, data: bytes::Bytes) -> K2Result<()> {
92        let data = K2Proto::decode(&data)?;
93        let ty = data.ty();
94        let K2Proto {
95            space,
96            module,
97            data,
98            ..
99        } = data;
100
101        match ty {
102            K2WireType::Unspecified => Ok(()),
103            K2WireType::Preflight => {
104                self.handler.preflight_validate_incoming(peer, data)
105            }
106            K2WireType::Notify => {
107                if let Some(space) = space {
108                    let space = SpaceId::from(space);
109                    if let Some(h) = self.space_map.lock().unwrap().get(&space)
110                    {
111                        h.recv_space_notify(peer, space, data)?;
112                    }
113                }
114                Ok(())
115            }
116            K2WireType::Module => {
117                if let (Some(space), Some(module)) = (space, module) {
118                    let space = SpaceId::from(space);
119                    if let Some(h) = self
120                        .mod_map
121                        .lock()
122                        .unwrap()
123                        .get(&(space.clone(), module.clone()))
124                    {
125                        h.recv_module_msg(peer, space, module.clone(), data).inspect_err(|e| {
126                            tracing::warn!(?module, "Error in recv_module_msg, peer connection will be closed: {e}");
127                        })?;
128                    }
129                }
130                Ok(())
131            }
132            K2WireType::Disconnect => {
133                let reason = String::from_utf8_lossy(&data).to_string();
134                Err(K2Error::other(format!("Remote Disconnect: {reason}")))
135            }
136        }
137    }
138
139    /// Call this whenever a connection to a peer fails to get established,
140    /// sending a message to a peer fails or when we get a disconnected
141    /// event from a peer.
142    pub fn set_unresponsive(
143        &self,
144        peer: Url,
145        when: Timestamp,
146    ) -> BoxFut<'_, K2Result<()>> {
147        let space_map = self.space_map.lock().unwrap().clone();
148        Box::pin(async move {
149            for (space_id, space_handler) in space_map.iter() {
150                if let Err(e) =
151                    space_handler.set_unresponsive(peer.clone(), when).await
152                {
153                    tracing::error!("Failed to mark peer with url {peer} as unresponsive in space {space_id}: {e}");
154                };
155            }
156            Ok(())
157        })
158    }
159}
160
161impl std::fmt::Debug for TxImpHnd {
162    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163        write!(f,"TxImpHnd {{ handler: {:?}, space_map: [{} entries], mod_map: [{} entries] }}", self.handler, self.space_map.lock().unwrap().len(), self.mod_map.lock().unwrap().len() )
164    }
165}
166
167/// A low-level transport implementation.
168pub trait TxImp: 'static + Send + Sync + std::fmt::Debug {
169    /// Get the current url if any.
170    fn url(&self) -> Option<Url>;
171
172    /// Indicates that the implementation should close any open connections to
173    /// the given peer. If a payload is provided, the implementation can
174    /// make a best effort to send it to the remote first on a short timeout.
175    /// Regardless of the success of the payload send, the connection should
176    /// be closed.
177    fn disconnect(
178        &self,
179        peer: Url,
180        payload: Option<(String, bytes::Bytes)>,
181    ) -> BoxFut<'_, ()>;
182
183    /// Indicates that the implementation should send the payload to the remote
184    /// peer, opening a connection if needed.
185    fn send(&self, peer: Url, data: bytes::Bytes) -> BoxFut<'_, K2Result<()>>;
186
187    /// Dump network stats.
188    fn dump_network_stats(&self) -> BoxFut<'_, K2Result<TransportStats>>;
189}
190
191/// Trait-object [TxImp].
192pub type DynTxImp = Arc<dyn TxImp>;
193
194/// A high-level wrapper around a low-level [DynTxImp] transport implementation.
195#[cfg_attr(any(test, feature = "mockall"), automock)]
196pub trait Transport: 'static + Send + Sync + std::fmt::Debug {
197    /// Register a space handler for receiving incoming notifications.
198    ///
199    /// Panics if you attempt to register a duplicate handler for
200    /// a space.
201    ///
202    /// Returns the current url if any.
203    fn register_space_handler(
204        &self,
205        space: SpaceId,
206        handler: DynTxSpaceHandler,
207    ) -> Option<Url>;
208
209    /// Register a module handler for receiving incoming module messages.
210    ///
211    /// Panics if you attempt to register a duplicate handler for the
212    /// same (space, module).
213    fn register_module_handler(
214        &self,
215        space: SpaceId,
216        module: String,
217        handler: DynTxModuleHandler,
218    );
219
220    /// Make a best effort to notify a peer that we are disconnecting and why.
221    /// After a short time out, the connection will be closed even if the
222    /// disconnect reason message is still pending.
223    fn disconnect(&self, peer: Url, reason: Option<String>) -> BoxFut<'_, ()>;
224
225    /// Notify a remote peer within a space. This is a fire-and-forget
226    /// type message. The future this call returns will indicate any errors
227    /// that occur up to the point where the message is handed off to
228    /// the transport backend. After that, the future will return `Ok(())`
229    /// but the remote peer may or may not actually receive the message.
230    fn send_space_notify(
231        &self,
232        peer: Url,
233        space: SpaceId,
234        data: bytes::Bytes,
235    ) -> BoxFut<'_, K2Result<()>>;
236
237    /// Notify a remote peer module within a space. This is a fire-and-forget
238    /// type message. The future this call returns will indicate any errors
239    /// that occur up to the point where the message is handed off to
240    /// the transport backend. After that, the future will return `Ok(())`
241    /// but the remote peer may or may not actually receive the message.
242    fn send_module(
243        &self,
244        peer: Url,
245        space: SpaceId,
246        module: String,
247        data: bytes::Bytes,
248    ) -> BoxFut<'_, K2Result<()>>;
249
250    /// Dump network stats.
251    fn dump_network_stats(&self) -> BoxFut<'_, K2Result<TransportStats>>;
252}
253
254/// Trait-object [Transport].
255pub type DynTransport = Arc<dyn Transport>;
256
257/// A weak trait-object [Transport].
258///
259/// This is provided in the API as a suggestion for modules that store a reference to the transport
260/// for sending messages but also implement [`TxModuleHandler`]. When registering as a module
261/// handler, the transport keeps a reference to your module. If you then store an owned reference
262/// to the transport, you create a circular reference. By using a weak reference instead, you can
263/// create a well-behaved module that will be dropped when a space shuts down.
264pub type WeakDynTransport = Weak<dyn Transport>;
265
266/// A high-level wrapper around a low-level [DynTxImp] transport implementation.
267#[derive(Clone, Debug)]
268pub struct DefaultTransport {
269    imp: DynTxImp,
270    space_map: Arc<Mutex<HashMap<SpaceId, DynTxSpaceHandler>>>,
271    mod_map: Arc<Mutex<HashMap<(SpaceId, String), DynTxModuleHandler>>>,
272}
273
274impl DefaultTransport {
275    /// When constructing a [Transport] from a [TransportFactory],
276    /// this function does the actual wrapping of your implemementation
277    /// to produce the [Transport] struct.
278    ///
279    /// [DefaultTransport] is built to be used with the provided [TxImpHnd].
280    pub fn create(hnd: &TxImpHnd, imp: DynTxImp) -> DynTransport {
281        let out: DynTransport = Arc::new(DefaultTransport {
282            imp,
283            space_map: hnd.space_map.clone(),
284            mod_map: hnd.mod_map.clone(),
285        });
286        out
287    }
288}
289
290impl Transport for DefaultTransport {
291    fn register_space_handler(
292        &self,
293        space: SpaceId,
294        handler: DynTxSpaceHandler,
295    ) -> Option<Url> {
296        let mut lock = self.space_map.lock().unwrap();
297        if lock.insert(space.clone(), handler).is_some() {
298            panic!("Attempted to register duplicate space handler! {space}");
299        }
300        // keep the lock locked while we fetch the url for atomicity.
301        self.imp.url()
302    }
303
304    fn register_module_handler(
305        &self,
306        space: SpaceId,
307        module: String,
308        handler: DynTxModuleHandler,
309    ) {
310        if self
311            .mod_map
312            .lock()
313            .unwrap()
314            .insert((space.clone(), module.clone()), handler)
315            .is_some()
316        {
317            panic!(
318                "Attempted to register duplicate module handler! {space} {module}"
319            );
320        }
321    }
322
323    fn disconnect(&self, peer: Url, reason: Option<String>) -> BoxFut<'_, ()> {
324        Box::pin(async move {
325            let payload = match reason {
326                None => None,
327                Some(reason) => match (K2Proto {
328                    ty: K2WireType::Disconnect as i32,
329                    data: bytes::Bytes::copy_from_slice(reason.as_bytes()),
330                    space: None,
331                    module: None,
332                })
333                .encode()
334                {
335                    Ok(payload) => Some((reason, payload)),
336                    Err(_) => None,
337                },
338            };
339
340            self.imp.disconnect(peer, payload).await;
341        })
342    }
343
344    fn send_space_notify(
345        &self,
346        peer: Url,
347        space: SpaceId,
348        data: bytes::Bytes,
349    ) -> BoxFut<'_, K2Result<()>> {
350        Box::pin(async move {
351            let enc = (K2Proto {
352                ty: K2WireType::Notify as i32,
353                data,
354                space: Some(space.into()),
355                module: None,
356            })
357            .encode()?;
358            self.imp.send(peer, enc).await
359        })
360    }
361
362    fn send_module(
363        &self,
364        peer: Url,
365        space: SpaceId,
366        module: String,
367        data: bytes::Bytes,
368    ) -> BoxFut<'_, K2Result<()>> {
369        Box::pin(async move {
370            let enc = (K2Proto {
371                ty: K2WireType::Module as i32,
372                data,
373                space: Some(space.into()),
374                module: Some(module),
375            })
376            .encode()?;
377            self.imp.send(peer, enc).await
378        })
379    }
380
381    fn dump_network_stats(&self) -> BoxFut<'_, K2Result<TransportStats>> {
382        self.imp.dump_network_stats()
383    }
384}
385
386/// Base trait for transport handler events.
387/// The other three handler types are all based on this trait.
388pub trait TxBaseHandler: 'static + Send + Sync + std::fmt::Debug {
389    /// A notification that a new listening address has been bound.
390    /// Peers should now go to this new address to reach this node.
391    fn new_listening_address(&self, this_url: Url) -> BoxFut<'static, ()> {
392        drop(this_url);
393        Box::pin(async move {})
394    }
395
396    /// A peer has connected to us. In addition to the preflight
397    /// logic in [TxHandler], this callback allows space and module
398    /// logic to block connections to peers. Simply return an Err here.
399    fn peer_connect(&self, peer: Url) -> K2Result<()> {
400        drop(peer);
401        Ok(())
402    }
403
404    /// A peer has disconnected from us. If they did so gracefully
405    /// the reason will be is_some().
406    fn peer_disconnect(&self, peer: Url, reason: Option<String>) {
407        drop((peer, reason));
408    }
409}
410
411/// Handler for whole transport-level events.
412pub trait TxHandler: TxBaseHandler {
413    /// Gather preflight data to send to a new opening connection.
414    /// Returning an Err result will close this connection.
415    ///
416    /// The default implementation sends an empty preflight message.
417    fn preflight_gather_outgoing(
418        &self,
419        peer_url: Url,
420    ) -> K2Result<bytes::Bytes> {
421        drop(peer_url);
422        Ok(bytes::Bytes::new())
423    }
424
425    /// Validate preflight data sent by a remote peer on a new connection.
426    /// Returning an Err result will close this connection.
427    ///
428    /// The default implementation ignores the preflight data,
429    /// and considers it valid.
430    fn preflight_validate_incoming(
431        &self,
432        peer_url: Url,
433        data: bytes::Bytes,
434    ) -> K2Result<()> {
435        drop((peer_url, data));
436        Ok(())
437    }
438}
439
440/// Trait-object [TxHandler].
441pub type DynTxHandler = Arc<dyn TxHandler>;
442
443/// Handler for space-related events.
444pub trait TxSpaceHandler: TxBaseHandler {
445    /// The sync handler for receiving notifications sent by a remote
446    /// peer in reference to a particular space. If this callback returns
447    /// an error, then the connection which sent the message will be closed.
448    fn recv_space_notify(
449        &self,
450        peer: Url,
451        space: SpaceId,
452        data: bytes::Bytes,
453    ) -> K2Result<()> {
454        drop((peer, space, data));
455        Ok(())
456    }
457
458    /// Mark a peer as unresponsive in the space's peer meta store
459    fn set_unresponsive(
460        &self,
461        peer: Url,
462        when: Timestamp,
463    ) -> BoxFut<'_, K2Result<()>> {
464        drop((peer, when));
465        Box::pin(async move { Ok(()) })
466    }
467}
468
469/// Trait-object [TxSpaceHandler].
470pub type DynTxSpaceHandler = Arc<dyn TxSpaceHandler>;
471
472/// Handler for module-related events.
473pub trait TxModuleHandler: TxBaseHandler {
474    /// The sync handler for receiving module messages sent by a remote
475    /// peer in reference to a particular space. If this callback returns
476    /// an error, then the connection which sent the message will be closed.
477    fn recv_module_msg(
478        &self,
479        peer: Url,
480        space: SpaceId,
481        module: String,
482        data: bytes::Bytes,
483    ) -> K2Result<()> {
484        drop((peer, space, module, data));
485        Ok(())
486    }
487}
488
489/// Trait-object [TxModuleHandler].
490pub type DynTxModuleHandler = Arc<dyn TxModuleHandler>;
491
492/// A factory for constructing Transport instances.
493pub trait TransportFactory: 'static + Send + Sync + std::fmt::Debug {
494    /// Help the builder construct a default config from the chosen
495    /// module factories.
496    fn default_config(&self, config: &mut config::Config) -> K2Result<()>;
497
498    /// Validate configuration.
499    fn validate_config(&self, config: &config::Config) -> K2Result<()>;
500
501    /// Construct a transport instance.
502    fn create(
503        &self,
504        builder: Arc<builder::Builder>,
505        handler: DynTxHandler,
506    ) -> BoxFut<'static, K2Result<DynTransport>>;
507}
508
509/// Trait-object [TransportFactory].
510pub type DynTransportFactory = Arc<dyn TransportFactory>;
511
512/// Stats for a transport connection.
513///
514/// This is intended to be a state dump that gives some insight into what the transport is doing.
515#[derive(Debug, Clone, Serialize, Deserialize)]
516pub struct TransportStats {
517    /// The networking backend that is in use.
518    pub backend: String,
519
520    /// The list of peer urls that this Kitsune2 instance can currently be reached at.
521    pub peer_urls: Vec<Url>,
522
523    /// The list of current connections.
524    pub connections: Vec<TransportConnectionStats>,
525}
526
527/// Stats for a single transport connection.
528#[derive(Debug, Clone, Serialize, Deserialize)]
529pub struct TransportConnectionStats {
530    /// The public key of the remote peer.
531    pub pub_key: String,
532
533    /// The message count sent on this connection.
534    pub send_message_count: u64,
535
536    /// The bytes sent on this connection.
537    pub send_bytes: u64,
538
539    /// The message count received on this connection.
540    pub recv_message_count: u64,
541
542    /// The bytes received on this connection
543    pub recv_bytes: u64,
544
545    /// UNIX epoch timestamp in seconds when this connection was opened.
546    pub opened_at_s: u64,
547
548    /// True if this connection has successfully upgraded to webrtc.
549    pub is_webrtc: bool,
550}