kitsune2_api/
transport.rs

1//! Kitsune2 transport related types.
2
3use crate::{protocol::*, *};
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6use std::sync::{Arc, Mutex, Weak};
7
8/// This is the low-level backend transport handler designed to work
9/// with [DefaultTransport].
10/// Construct using ([TxImpHnd::new]), with a high-level [DynTxHandler],
11/// then call [DefaultTransport::create] to return the high-level handler
12/// from the [TransportFactory].
13pub struct TxImpHnd {
14    handler: DynTxHandler,
15    space_map: Arc<Mutex<HashMap<SpaceId, DynTxSpaceHandler>>>,
16    mod_map: Arc<Mutex<HashMap<(SpaceId, String), DynTxModuleHandler>>>,
17}
18
19impl TxImpHnd {
20    /// When constructing a [Transport] from a [TransportFactory],
21    /// you need a [TxImpHnd] for calling transport events.
22    /// Pass the handler into here to construct one.
23    pub fn new(handler: DynTxHandler) -> Arc<Self> {
24        Arc::new(Self {
25            handler,
26            space_map: Arc::new(Mutex::new(HashMap::new())),
27            mod_map: Arc::new(Mutex::new(HashMap::new())),
28        })
29    }
30
31    /// Call this when you receive or bind a new address at which
32    /// this local node can be reached by peers
33    pub fn new_listening_address(&self, this_url: Url) -> BoxFut<'static, ()> {
34        let handler = self.handler.clone();
35        let space_map = self
36            .space_map
37            .clone()
38            .lock()
39            .unwrap()
40            .values()
41            .cloned()
42            .collect::<Vec<_>>();
43
44        Box::pin(async move {
45            handler.new_listening_address(this_url.clone()).await;
46            for s in space_map {
47                s.new_listening_address(this_url.clone()).await;
48            }
49        })
50    }
51
52    /// Call this when you establish an outgoing connection and
53    /// when you establish an incoming connection. If this call
54    /// returns an error, the connection should be closed immediately.
55    /// On success, this function returns bytes that should be
56    /// sent as a preflight message for additional connection validation.
57    /// (The preflight data should be sent even if it is zero length).
58    pub fn peer_connect(&self, peer: Url) -> K2Result<bytes::Bytes> {
59        for mod_handler in self.mod_map.lock().unwrap().values() {
60            mod_handler.peer_connect(peer.clone())?;
61        }
62        for space_handler in self.space_map.lock().unwrap().values() {
63            space_handler.peer_connect(peer.clone())?;
64        }
65        self.handler.peer_connect(peer.clone())?;
66        let preflight = self.handler.preflight_gather_outgoing(peer)?;
67        let enc = (K2Proto {
68            ty: K2WireType::Preflight as i32,
69            data: preflight,
70            space: None,
71            module: None,
72        })
73        .encode()?;
74        Ok(enc)
75    }
76
77    /// Call this whenever a connection is closed.
78    pub fn peer_disconnect(&self, peer: Url, reason: Option<String>) {
79        for h in self.mod_map.lock().unwrap().values() {
80            h.peer_disconnect(peer.clone(), reason.clone());
81        }
82        for h in self.space_map.lock().unwrap().values() {
83            h.peer_disconnect(peer.clone(), reason.clone());
84        }
85        self.handler.peer_disconnect(peer, reason);
86    }
87
88    /// Call this whenever data is received on an open connection.
89    pub fn recv_data(&self, peer: Url, data: bytes::Bytes) -> K2Result<()> {
90        let data = K2Proto::decode(&data)?;
91        let ty = data.ty();
92        let K2Proto {
93            space,
94            module,
95            data,
96            ..
97        } = data;
98
99        match ty {
100            K2WireType::Unspecified => Ok(()),
101            K2WireType::Preflight => {
102                self.handler.preflight_validate_incoming(peer, data)
103            }
104            K2WireType::Notify => {
105                if let Some(space) = space {
106                    let space = SpaceId::from(space);
107                    if let Some(h) = self.space_map.lock().unwrap().get(&space)
108                    {
109                        h.recv_space_notify(peer, space, data)?;
110                    }
111                }
112                Ok(())
113            }
114            K2WireType::Module => {
115                if let (Some(space), Some(module)) = (space, module) {
116                    let space = SpaceId::from(space);
117                    if let Some(h) = self
118                        .mod_map
119                        .lock()
120                        .unwrap()
121                        .get(&(space.clone(), module.clone()))
122                    {
123                        h.recv_module_msg(peer, space, module.clone(), data).inspect_err(|e| {
124                            tracing::warn!(?module, "Error in recv_module_msg, peer connection will be closed: {e}");
125                        })?;
126                    }
127                }
128                Ok(())
129            }
130            K2WireType::Disconnect => {
131                let reason = String::from_utf8_lossy(&data).to_string();
132                Err(K2Error::other(format!("Remote Disconnect: {reason}")))
133            }
134        }
135    }
136
137    /// Call this whenever a connection to a peer fails to get established,
138    /// sending a message to a peer fails or when we get a disconnected
139    /// event from a peer.
140    pub fn set_unresponsive(
141        &self,
142        peer: Url,
143        when: Timestamp,
144    ) -> BoxFut<'_, K2Result<()>> {
145        let space_map = self.space_map.lock().unwrap().clone();
146        Box::pin(async move {
147            for (space_id, space_handler) in space_map.iter() {
148                if let Err(e) =
149                    space_handler.set_unresponsive(peer.clone(), when).await
150                {
151                    tracing::error!("Failed to mark peer with url {peer} as unresponsive in space {space_id}: {e}");
152                };
153            }
154            Ok(())
155        })
156    }
157}
158
159impl std::fmt::Debug for TxImpHnd {
160    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
161        write!(f,"TxImpHnd {{ handler: {:?}, space_map: [{} entries], mod_map: [{} entries] }}", self.handler, self.space_map.lock().unwrap().len(), self.mod_map.lock().unwrap().len() )
162    }
163}
164
165/// A low-level transport implementation.
166pub trait TxImp: 'static + Send + Sync + std::fmt::Debug {
167    /// Get the current url if any.
168    fn url(&self) -> Option<Url>;
169
170    /// Indicates that the implementation should close any open connections to
171    /// the given peer. If a payload is provided, the implementation can
172    /// make a best effort to send it to the remote first on a short timeout.
173    /// Regardless of the success of the payload send, the connection should
174    /// be closed.
175    fn disconnect(
176        &self,
177        peer: Url,
178        payload: Option<(String, bytes::Bytes)>,
179    ) -> BoxFut<'_, ()>;
180
181    /// Indicates that the implementation should send the payload to the remote
182    /// peer, opening a connection if needed.
183    fn send(&self, peer: Url, data: bytes::Bytes) -> BoxFut<'_, K2Result<()>>;
184
185    /// Get the list of connected peers.
186    fn get_connected_peers(&self) -> BoxFut<'_, K2Result<Vec<Url>>>;
187
188    /// Dump network stats.
189    fn dump_network_stats(&self) -> BoxFut<'_, K2Result<TransportStats>>;
190}
191
192/// Trait-object [TxImp].
193pub type DynTxImp = Arc<dyn TxImp>;
194
195/// A high-level wrapper around a low-level [DynTxImp] transport implementation.
196#[cfg_attr(any(test, feature = "mockall"), mockall::automock)]
197pub trait Transport: 'static + Send + Sync + std::fmt::Debug {
198    /// Register a space handler for receiving incoming notifications.
199    ///
200    /// Panics if you attempt to register a duplicate handler for
201    /// a space.
202    ///
203    /// Returns the current url if any.
204    fn register_space_handler(
205        &self,
206        space: SpaceId,
207        handler: DynTxSpaceHandler,
208    ) -> Option<Url>;
209
210    /// Register a module handler for receiving incoming module messages.
211    ///
212    /// Panics if you attempt to register a duplicate handler for the
213    /// same (space, module).
214    fn register_module_handler(
215        &self,
216        space: SpaceId,
217        module: String,
218        handler: DynTxModuleHandler,
219    );
220
221    /// Make a best effort to notify a peer that we are disconnecting and why.
222    /// After a short time out, the connection will be closed even if the
223    /// disconnect reason message is still pending.
224    fn disconnect(&self, peer: Url, reason: Option<String>) -> BoxFut<'_, ()>;
225
226    /// Notify a remote peer within a space. This is a fire-and-forget
227    /// type message. The future this call returns will indicate any errors
228    /// that occur up to the point where the message is handed off to
229    /// the transport backend. After that, the future will return `Ok(())`
230    /// but the remote peer may or may not actually receive the message.
231    fn send_space_notify(
232        &self,
233        peer: Url,
234        space: SpaceId,
235        data: bytes::Bytes,
236    ) -> BoxFut<'_, K2Result<()>>;
237
238    /// Notify a remote peer module within a space. This is a fire-and-forget
239    /// type message. The future this call returns will indicate any errors
240    /// that occur up to the point where the message is handed off to
241    /// the transport backend. After that, the future will return `Ok(())`
242    /// but the remote peer may or may not actually receive the message.
243    fn send_module(
244        &self,
245        peer: Url,
246        space: SpaceId,
247        module: String,
248        data: bytes::Bytes,
249    ) -> BoxFut<'_, K2Result<()>>;
250
251    /// Get the list of connected peers.
252    fn get_connected_peers(&self) -> BoxFut<'_, K2Result<Vec<Url>>>;
253
254    /// Unregister a space handler and all module handlers for that space.
255    fn unregister_space(&self, space: SpaceId) -> BoxFut<'_, ()>;
256
257    /// Dump network stats.
258    fn dump_network_stats(&self) -> BoxFut<'_, K2Result<TransportStats>>;
259}
260
261/// Trait-object [Transport].
262pub type DynTransport = Arc<dyn Transport>;
263
264/// A weak trait-object [Transport].
265///
266/// This is provided in the API as a suggestion for modules that store a reference to the transport
267/// for sending messages but also implement [`TxModuleHandler`]. When registering as a module
268/// handler, the transport keeps a reference to your module. If you then store an owned reference
269/// to the transport, you create a circular reference. By using a weak reference instead, you can
270/// create a well-behaved module that will be dropped when a space shuts down.
271pub type WeakDynTransport = Weak<dyn Transport>;
272
273/// A high-level wrapper around a low-level [DynTxImp] transport implementation.
274#[derive(Clone, Debug)]
275pub struct DefaultTransport {
276    imp: DynTxImp,
277    space_map: Arc<Mutex<HashMap<SpaceId, DynTxSpaceHandler>>>,
278    mod_map: Arc<Mutex<HashMap<(SpaceId, String), DynTxModuleHandler>>>,
279}
280
281impl DefaultTransport {
282    /// When constructing a [Transport] from a [TransportFactory],
283    /// this function does the actual wrapping of your implemementation
284    /// to produce the [Transport] struct.
285    ///
286    /// [DefaultTransport] is built to be used with the provided [TxImpHnd].
287    pub fn create(hnd: &TxImpHnd, imp: DynTxImp) -> DynTransport {
288        let out: DynTransport = Arc::new(DefaultTransport {
289            imp,
290            space_map: hnd.space_map.clone(),
291            mod_map: hnd.mod_map.clone(),
292        });
293        out
294    }
295}
296
297impl Transport for DefaultTransport {
298    fn register_space_handler(
299        &self,
300        space: SpaceId,
301        handler: DynTxSpaceHandler,
302    ) -> Option<Url> {
303        let mut lock = self.space_map.lock().unwrap();
304        if lock.insert(space.clone(), handler).is_some() {
305            panic!("Attempted to register duplicate space handler! {space}");
306        }
307        // keep the lock locked while we fetch the url for atomicity.
308        self.imp.url()
309    }
310
311    fn register_module_handler(
312        &self,
313        space: SpaceId,
314        module: String,
315        handler: DynTxModuleHandler,
316    ) {
317        if self
318            .mod_map
319            .lock()
320            .unwrap()
321            .insert((space.clone(), module.clone()), handler)
322            .is_some()
323        {
324            panic!(
325                "Attempted to register duplicate module handler! {space} {module}"
326            );
327        }
328    }
329
330    fn disconnect(&self, peer: Url, reason: Option<String>) -> BoxFut<'_, ()> {
331        Box::pin(async move {
332            let payload = match reason {
333                None => None,
334                Some(reason) => match (K2Proto {
335                    ty: K2WireType::Disconnect as i32,
336                    data: bytes::Bytes::copy_from_slice(reason.as_bytes()),
337                    space: None,
338                    module: None,
339                })
340                .encode()
341                {
342                    Ok(payload) => Some((reason, payload)),
343                    Err(_) => None,
344                },
345            };
346
347            self.imp.disconnect(peer, payload).await;
348        })
349    }
350
351    fn send_space_notify(
352        &self,
353        peer: Url,
354        space: SpaceId,
355        data: bytes::Bytes,
356    ) -> BoxFut<'_, K2Result<()>> {
357        Box::pin(async move {
358            let enc = (K2Proto {
359                ty: K2WireType::Notify as i32,
360                data,
361                space: Some(space.into()),
362                module: None,
363            })
364            .encode()?;
365            self.imp.send(peer, enc).await
366        })
367    }
368
369    fn send_module(
370        &self,
371        peer: Url,
372        space: SpaceId,
373        module: String,
374        data: bytes::Bytes,
375    ) -> BoxFut<'_, K2Result<()>> {
376        Box::pin(async move {
377            let enc = (K2Proto {
378                ty: K2WireType::Module as i32,
379                data,
380                space: Some(space.into()),
381                module: Some(module),
382            })
383            .encode()?;
384            self.imp.send(peer, enc).await
385        })
386    }
387
388    fn get_connected_peers(&self) -> BoxFut<'_, K2Result<Vec<Url>>> {
389        self.imp.get_connected_peers()
390    }
391
392    fn unregister_space(&self, space: SpaceId) -> BoxFut<'_, ()> {
393        Box::pin(async move {
394            // Remove the space handler.
395            self.space_map.lock().unwrap().remove(&space);
396
397            // Remove all module handlers for this space.
398            // Done by keeping all module handlers that are not for this space.
399            self.mod_map.lock().unwrap().retain(|(s, _), _| s != &space);
400        })
401    }
402
403    fn dump_network_stats(&self) -> BoxFut<'_, K2Result<TransportStats>> {
404        self.imp.dump_network_stats()
405    }
406}
407
408/// Base trait for transport handler events.
409/// The other three handler types are all based on this trait.
410pub trait TxBaseHandler: 'static + Send + Sync + std::fmt::Debug {
411    /// A notification that a new listening address has been bound.
412    /// Peers should now go to this new address to reach this node.
413    fn new_listening_address(&self, this_url: Url) -> BoxFut<'static, ()> {
414        drop(this_url);
415        Box::pin(async move {})
416    }
417
418    /// A peer has connected to us. In addition to the preflight
419    /// logic in [TxHandler], this callback allows space and module
420    /// logic to block connections to peers. Simply return an Err here.
421    fn peer_connect(&self, peer: Url) -> K2Result<()> {
422        drop(peer);
423        Ok(())
424    }
425
426    /// A peer has disconnected from us. If they did so gracefully
427    /// the reason will be is_some().
428    fn peer_disconnect(&self, peer: Url, reason: Option<String>) {
429        drop((peer, reason));
430    }
431}
432
433/// Handler for whole transport-level events.
434pub trait TxHandler: TxBaseHandler {
435    /// Gather preflight data to send to a new opening connection.
436    /// Returning an Err result will close this connection.
437    ///
438    /// The default implementation sends an empty preflight message.
439    fn preflight_gather_outgoing(
440        &self,
441        peer_url: Url,
442    ) -> K2Result<bytes::Bytes> {
443        drop(peer_url);
444        Ok(bytes::Bytes::new())
445    }
446
447    /// Validate preflight data sent by a remote peer on a new connection.
448    /// Returning an Err result will close this connection.
449    ///
450    /// The default implementation ignores the preflight data,
451    /// and considers it valid.
452    fn preflight_validate_incoming(
453        &self,
454        peer_url: Url,
455        data: bytes::Bytes,
456    ) -> K2Result<()> {
457        drop((peer_url, data));
458        Ok(())
459    }
460}
461
462/// Trait-object [TxHandler].
463pub type DynTxHandler = Arc<dyn TxHandler>;
464
465/// Handler for space-related events.
466pub trait TxSpaceHandler: TxBaseHandler {
467    /// The sync handler for receiving notifications sent by a remote
468    /// peer in reference to a particular space. If this callback returns
469    /// an error, then the connection which sent the message will be closed.
470    fn recv_space_notify(
471        &self,
472        peer: Url,
473        space: SpaceId,
474        data: bytes::Bytes,
475    ) -> K2Result<()> {
476        drop((peer, space, data));
477        Ok(())
478    }
479
480    /// Mark a peer as unresponsive in the space's peer meta store
481    fn set_unresponsive(
482        &self,
483        peer: Url,
484        when: Timestamp,
485    ) -> BoxFut<'_, K2Result<()>> {
486        drop((peer, when));
487        Box::pin(async move { Ok(()) })
488    }
489}
490
491/// Trait-object [TxSpaceHandler].
492pub type DynTxSpaceHandler = Arc<dyn TxSpaceHandler>;
493
494/// Handler for module-related events.
495pub trait TxModuleHandler: TxBaseHandler {
496    /// The sync handler for receiving module messages sent by a remote
497    /// peer in reference to a particular space. If this callback returns
498    /// an error, then the connection which sent the message will be closed.
499    fn recv_module_msg(
500        &self,
501        peer: Url,
502        space: SpaceId,
503        module: String,
504        data: bytes::Bytes,
505    ) -> K2Result<()> {
506        drop((peer, space, module, data));
507        Ok(())
508    }
509}
510
511/// Trait-object [TxModuleHandler].
512pub type DynTxModuleHandler = Arc<dyn TxModuleHandler>;
513
514/// A factory for constructing Transport instances.
515pub trait TransportFactory: 'static + Send + Sync + std::fmt::Debug {
516    /// Help the builder construct a default config from the chosen
517    /// module factories.
518    fn default_config(&self, config: &mut config::Config) -> K2Result<()>;
519
520    /// Validate configuration.
521    fn validate_config(&self, config: &config::Config) -> K2Result<()>;
522
523    /// Construct a transport instance.
524    fn create(
525        &self,
526        builder: Arc<builder::Builder>,
527        handler: DynTxHandler,
528    ) -> BoxFut<'static, K2Result<DynTransport>>;
529}
530
531/// Trait-object [TransportFactory].
532pub type DynTransportFactory = Arc<dyn TransportFactory>;
533
534/// Stats for a transport connection.
535///
536/// This is intended to be a state dump that gives some insight into what the transport is doing.
537#[derive(Debug, Clone, Serialize, Deserialize)]
538pub struct TransportStats {
539    /// The networking backend that is in use.
540    pub backend: String,
541
542    /// The list of peer urls that this Kitsune2 instance can currently be reached at.
543    pub peer_urls: Vec<Url>,
544
545    /// The list of current connections.
546    pub connections: Vec<TransportConnectionStats>,
547}
548
549/// Stats for a single transport connection.
550#[derive(Debug, Clone, Serialize, Deserialize)]
551pub struct TransportConnectionStats {
552    /// The public key of the remote peer.
553    pub pub_key: String,
554
555    /// The message count sent on this connection.
556    pub send_message_count: u64,
557
558    /// The bytes sent on this connection.
559    pub send_bytes: u64,
560
561    /// The message count received on this connection.
562    pub recv_message_count: u64,
563
564    /// The bytes received on this connection
565    pub recv_bytes: u64,
566
567    /// UNIX epoch timestamp in seconds when this connection was opened.
568    pub opened_at_s: u64,
569
570    /// True if this connection has successfully upgraded to webrtc.
571    pub is_webrtc: bool,
572}