kitsune2_api/
transport.rs

1//! Kitsune2 transport related types.
2
3use crate::{protocol::*, *};
4#[cfg(feature = "mockall")]
5use mockall::automock;
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::sync::{Arc, Mutex, Weak};
9
10/// This is the low-level backend transport handler designed to work
11/// with [DefaultTransport].
12/// Construct using ([TxImpHnd::new]), with a high-level [DynTxHandler],
13/// then call [DefaultTransport::create] to return the high-level handler
14/// from the [TransportFactory].
15pub struct TxImpHnd {
16    handler: DynTxHandler,
17    space_map: Arc<Mutex<HashMap<SpaceId, DynTxSpaceHandler>>>,
18    mod_map: Arc<Mutex<HashMap<(SpaceId, String), DynTxModuleHandler>>>,
19}
20
21impl TxImpHnd {
22    /// When constructing a [Transport] from a [TransportFactory],
23    /// you need a [TxImpHnd] for calling transport events.
24    /// Pass the handler into here to construct one.
25    pub fn new(handler: DynTxHandler) -> Arc<Self> {
26        Arc::new(Self {
27            handler,
28            space_map: Arc::new(Mutex::new(HashMap::new())),
29            mod_map: Arc::new(Mutex::new(HashMap::new())),
30        })
31    }
32
33    /// Call this when you receive or bind a new address at which
34    /// this local node can be reached by peers
35    pub fn new_listening_address(&self, this_url: Url) -> BoxFut<'static, ()> {
36        self.handler.new_listening_address(this_url)
37    }
38
39    /// Call this when you establish an outgoing connection and
40    /// when you establish an incoming connection. If this call
41    /// returns an error, the connection should be closed immediately.
42    /// On success, this function returns bytes that should be
43    /// sent as a preflight message for additional connection validation.
44    /// (The preflight data should be sent even if it is zero length).
45    pub fn peer_connect(&self, peer: Url) -> K2Result<bytes::Bytes> {
46        for mod_handler in self.mod_map.lock().unwrap().values() {
47            mod_handler.peer_connect(peer.clone())?;
48        }
49        for space_handler in self.space_map.lock().unwrap().values() {
50            space_handler.peer_connect(peer.clone())?;
51        }
52        self.handler.peer_connect(peer.clone())?;
53        let preflight = self.handler.preflight_gather_outgoing(peer)?;
54        let enc = (K2Proto {
55            ty: K2WireType::Preflight as i32,
56            data: preflight,
57            space: None,
58            module: None,
59        })
60        .encode()?;
61        Ok(enc)
62    }
63
64    /// Call this whenever a connection is closed.
65    pub fn peer_disconnect(&self, peer: Url, reason: Option<String>) {
66        for h in self.mod_map.lock().unwrap().values() {
67            h.peer_disconnect(peer.clone(), reason.clone());
68        }
69        for h in self.space_map.lock().unwrap().values() {
70            h.peer_disconnect(peer.clone(), reason.clone());
71        }
72        self.handler.peer_disconnect(peer, reason);
73    }
74
75    /// Call this whenever data is received on an open connection.
76    pub fn recv_data(&self, peer: Url, data: bytes::Bytes) -> K2Result<()> {
77        let data = K2Proto::decode(&data)?;
78        let ty = data.ty();
79        let K2Proto {
80            space,
81            module,
82            data,
83            ..
84        } = data;
85
86        match ty {
87            K2WireType::Unspecified => Ok(()),
88            K2WireType::Preflight => {
89                self.handler.preflight_validate_incoming(peer, data)
90            }
91            K2WireType::Notify => {
92                if let Some(space) = space {
93                    let space = SpaceId::from(space);
94                    if let Some(h) = self.space_map.lock().unwrap().get(&space)
95                    {
96                        h.recv_space_notify(peer, space, data)?;
97                    }
98                }
99                Ok(())
100            }
101            K2WireType::Module => {
102                if let (Some(space), Some(module)) = (space, module) {
103                    let space = SpaceId::from(space);
104                    if let Some(h) = self
105                        .mod_map
106                        .lock()
107                        .unwrap()
108                        .get(&(space.clone(), module.clone()))
109                    {
110                        h.recv_module_msg(peer, space, module, data)?;
111                    }
112                }
113                Ok(())
114            }
115            K2WireType::Disconnect => {
116                let reason = String::from_utf8_lossy(&data).to_string();
117                Err(K2Error::other(format!("Remote Disconnect: {reason}")))
118            }
119        }
120    }
121}
122
123/// A low-level transport implementation.
124pub trait TxImp: 'static + Send + Sync + std::fmt::Debug {
125    /// Get the current url if any.
126    fn url(&self) -> Option<Url>;
127
128    /// Indicates that the implementation should close any open connections to
129    /// the given peer. If a payload is provided, the implementation can
130    /// make a best effort to send it to the remote first on a short timeout.
131    /// Regardless of the success of the payload send, the connection should
132    /// be closed.
133    fn disconnect(
134        &self,
135        peer: Url,
136        payload: Option<(String, bytes::Bytes)>,
137    ) -> BoxFut<'_, ()>;
138
139    /// Indicates that the implementation should send the payload to the remote
140    /// peer, opening a connection if needed.
141    fn send(&self, peer: Url, data: bytes::Bytes) -> BoxFut<'_, K2Result<()>>;
142
143    /// Dump network stats.
144    fn dump_network_stats(&self) -> BoxFut<'_, K2Result<TransportStats>>;
145}
146
147/// Trait-object [TxImp].
148pub type DynTxImp = Arc<dyn TxImp>;
149
150/// A high-level wrapper around a low-level [DynTxImp] transport implementation.
151#[cfg_attr(any(test, feature = "mockall"), automock)]
152pub trait Transport: 'static + Send + Sync + std::fmt::Debug {
153    /// Register a space handler for receiving incoming notifications.
154    ///
155    /// Panics if you attempt to register a duplicate handler for
156    /// a space.
157    ///
158    /// Returns the current url if any.
159    fn register_space_handler(
160        &self,
161        space: SpaceId,
162        handler: DynTxSpaceHandler,
163    ) -> Option<Url>;
164
165    /// Register a module handler for receiving incoming module messages.
166    ///
167    /// Panics if you attempt to register a duplicate handler for the
168    /// same (space, module).
169    fn register_module_handler(
170        &self,
171        space: SpaceId,
172        module: String,
173        handler: DynTxModuleHandler,
174    );
175
176    /// Make a best effort to notify a peer that we are disconnecting and why.
177    /// After a short time out, the connection will be closed even if the
178    /// disconnect reason message is still pending.
179    fn disconnect(&self, peer: Url, reason: Option<String>) -> BoxFut<'_, ()>;
180
181    /// Notify a remote peer within a space. This is a fire-and-forget
182    /// type message. The future this call returns will indicate any errors
183    /// that occur up to the point where the message is handed off to
184    /// the transport backend. After that, the future will return `Ok(())`
185    /// but the remote peer may or may not actually receive the message.
186    fn send_space_notify(
187        &self,
188        peer: Url,
189        space: SpaceId,
190        data: bytes::Bytes,
191    ) -> BoxFut<'_, K2Result<()>>;
192
193    /// Notify a remote peer module within a space. This is a fire-and-forget
194    /// type message. The future this call returns will indicate any errors
195    /// that occur up to the point where the message is handed off to
196    /// the transport backend. After that, the future will return `Ok(())`
197    /// but the remote peer may or may not actually receive the message.
198    fn send_module(
199        &self,
200        peer: Url,
201        space: SpaceId,
202        module: String,
203        data: bytes::Bytes,
204    ) -> BoxFut<'_, K2Result<()>>;
205
206    /// Dump network stats.
207    fn dump_network_stats(&self) -> BoxFut<'_, K2Result<TransportStats>>;
208}
209
210/// Trait-object [Transport].
211pub type DynTransport = Arc<dyn Transport>;
212
213/// A weak trait-object [Transport].
214///
215/// This is provided in the API as a suggestion for modules that store a reference to the transport
216/// for sending messages but also implement [`TxModuleHandler`]. When registering as a module
217/// handler, the transport keeps a reference to your module. If you then store an owned reference
218/// to the transport, you create a circular reference. By using a weak reference instead, you can
219/// create a well-behaved module that will be dropped when a space shuts down.
220pub type WeakDynTransport = Weak<dyn Transport>;
221
222/// A high-level wrapper around a low-level [DynTxImp] transport implementation.
223#[derive(Clone, Debug)]
224pub struct DefaultTransport {
225    imp: DynTxImp,
226    space_map: Arc<Mutex<HashMap<SpaceId, DynTxSpaceHandler>>>,
227    mod_map: Arc<Mutex<HashMap<(SpaceId, String), DynTxModuleHandler>>>,
228}
229
230impl DefaultTransport {
231    /// When constructing a [Transport] from a [TransportFactory],
232    /// this function does the actual wrapping of your implemementation
233    /// to produce the [Transport] struct.
234    ///
235    /// [DefaultTransport] is built to be used with the provided [TxImpHnd].
236    pub fn create(hnd: &TxImpHnd, imp: DynTxImp) -> DynTransport {
237        let out: DynTransport = Arc::new(DefaultTransport {
238            imp,
239            space_map: hnd.space_map.clone(),
240            mod_map: hnd.mod_map.clone(),
241        });
242        out
243    }
244}
245
246impl Transport for DefaultTransport {
247    fn register_space_handler(
248        &self,
249        space: SpaceId,
250        handler: DynTxSpaceHandler,
251    ) -> Option<Url> {
252        let mut lock = self.space_map.lock().unwrap();
253        if lock.insert(space.clone(), handler).is_some() {
254            panic!("Attempted to register duplicate space handler! {space}");
255        }
256        // keep the lock locked while we fetch the url for atomicity.
257        self.imp.url()
258    }
259
260    fn register_module_handler(
261        &self,
262        space: SpaceId,
263        module: String,
264        handler: DynTxModuleHandler,
265    ) {
266        if self
267            .mod_map
268            .lock()
269            .unwrap()
270            .insert((space.clone(), module.clone()), handler)
271            .is_some()
272        {
273            panic!(
274                "Attempted to register duplicate module handler! {space} {module}"
275            );
276        }
277    }
278
279    fn disconnect(&self, peer: Url, reason: Option<String>) -> BoxFut<'_, ()> {
280        Box::pin(async move {
281            let payload = match reason {
282                None => None,
283                Some(reason) => match (K2Proto {
284                    ty: K2WireType::Disconnect as i32,
285                    data: bytes::Bytes::copy_from_slice(reason.as_bytes()),
286                    space: None,
287                    module: None,
288                })
289                .encode()
290                {
291                    Ok(payload) => Some((reason, payload)),
292                    Err(_) => None,
293                },
294            };
295
296            self.imp.disconnect(peer, payload).await;
297        })
298    }
299
300    fn send_space_notify(
301        &self,
302        peer: Url,
303        space: SpaceId,
304        data: bytes::Bytes,
305    ) -> BoxFut<'_, K2Result<()>> {
306        Box::pin(async move {
307            let enc = (K2Proto {
308                ty: K2WireType::Notify as i32,
309                data,
310                space: Some(space.into()),
311                module: None,
312            })
313            .encode()?;
314            self.imp.send(peer, enc).await
315        })
316    }
317
318    fn send_module(
319        &self,
320        peer: Url,
321        space: SpaceId,
322        module: String,
323        data: bytes::Bytes,
324    ) -> BoxFut<'_, K2Result<()>> {
325        Box::pin(async move {
326            let enc = (K2Proto {
327                ty: K2WireType::Module as i32,
328                data,
329                space: Some(space.into()),
330                module: Some(module),
331            })
332            .encode()?;
333            self.imp.send(peer, enc).await
334        })
335    }
336
337    fn dump_network_stats(&self) -> BoxFut<'_, K2Result<TransportStats>> {
338        self.imp.dump_network_stats()
339    }
340}
341
342/// Base trait for transport handler events.
343/// The other three handler types are all based on this trait.
344pub trait TxBaseHandler: 'static + Send + Sync + std::fmt::Debug {
345    /// A notification that a new listening address has been bound.
346    /// Peers should now go to this new address to reach this node.
347    fn new_listening_address(&self, this_url: Url) -> BoxFut<'static, ()> {
348        drop(this_url);
349        Box::pin(async move {})
350    }
351
352    /// A peer has connected to us. In addition to the preflight
353    /// logic in [TxHandler], this callback allows space and module
354    /// logic to block connections to peers. Simply return an Err here.
355    fn peer_connect(&self, peer: Url) -> K2Result<()> {
356        drop(peer);
357        Ok(())
358    }
359
360    /// A peer has disconnected from us. If they did so gracefully
361    /// the reason will be is_some().
362    fn peer_disconnect(&self, peer: Url, reason: Option<String>) {
363        drop((peer, reason));
364    }
365}
366
367/// Handler for whole transport-level events.
368pub trait TxHandler: TxBaseHandler {
369    /// Gather preflight data to send to a new opening connection.
370    /// Returning an Err result will close this connection.
371    ///
372    /// The default implementation sends an empty preflight message.
373    fn preflight_gather_outgoing(
374        &self,
375        peer_url: Url,
376    ) -> K2Result<bytes::Bytes> {
377        drop(peer_url);
378        Ok(bytes::Bytes::new())
379    }
380
381    /// Validate preflight data sent by a remote peer on a new connection.
382    /// Returning an Err result will close this connection.
383    ///
384    /// The default implementation ignores the preflight data,
385    /// and considers it valid.
386    fn preflight_validate_incoming(
387        &self,
388        peer_url: Url,
389        data: bytes::Bytes,
390    ) -> K2Result<()> {
391        drop((peer_url, data));
392        Ok(())
393    }
394}
395
396/// Trait-object [TxHandler].
397pub type DynTxHandler = Arc<dyn TxHandler>;
398
399/// Handler for space-related events.
400pub trait TxSpaceHandler: TxBaseHandler {
401    /// The sync handler for receiving notifications sent by a remote
402    /// peer in reference to a particular space. If this callback returns
403    /// an error, then the connection which sent the message will be closed.
404    fn recv_space_notify(
405        &self,
406        peer: Url,
407        space: SpaceId,
408        data: bytes::Bytes,
409    ) -> K2Result<()> {
410        drop((peer, space, data));
411        Ok(())
412    }
413}
414
415/// Trait-object [TxSpaceHandler].
416pub type DynTxSpaceHandler = Arc<dyn TxSpaceHandler>;
417
418/// Handler for module-related events.
419pub trait TxModuleHandler: TxBaseHandler {
420    /// The sync handler for receiving module messages sent by a remote
421    /// peer in reference to a particular space. If this callback returns
422    /// an error, then the connection which sent the message will be closed.
423    fn recv_module_msg(
424        &self,
425        peer: Url,
426        space: SpaceId,
427        module: String,
428        data: bytes::Bytes,
429    ) -> K2Result<()> {
430        drop((peer, space, module, data));
431        Ok(())
432    }
433}
434
435/// Trait-object [TxModuleHandler].
436pub type DynTxModuleHandler = Arc<dyn TxModuleHandler>;
437
438/// A factory for constructing Transport instances.
439pub trait TransportFactory: 'static + Send + Sync + std::fmt::Debug {
440    /// Help the builder construct a default config from the chosen
441    /// module factories.
442    fn default_config(&self, config: &mut config::Config) -> K2Result<()>;
443
444    /// Validate configuration.
445    fn validate_config(&self, config: &config::Config) -> K2Result<()>;
446
447    /// Construct a transport instance.
448    fn create(
449        &self,
450        builder: Arc<builder::Builder>,
451        handler: DynTxHandler,
452    ) -> BoxFut<'static, K2Result<DynTransport>>;
453}
454
455/// Trait-object [TransportFactory].
456pub type DynTransportFactory = Arc<dyn TransportFactory>;
457
458/// Stats for a transport connection.
459///
460/// This is intended to be a state dump that gives some insight into what the transport is doing.
461#[derive(Debug, Clone, Serialize, Deserialize)]
462pub struct TransportStats {
463    /// The networking backend that is in use.
464    pub backend: String,
465
466    /// The list of peer urls that this Kitsune2 instance can currently be reached at.
467    pub peer_urls: Vec<Url>,
468
469    /// The list of current connections.
470    pub connections: Vec<TransportConnectionStats>,
471}
472
473/// Stats for a single transport connection.
474#[derive(Debug, Clone, Serialize, Deserialize)]
475pub struct TransportConnectionStats {
476    /// The public key of the remote peer.
477    pub pub_key: String,
478
479    /// The message count sent on this connection.
480    pub send_message_count: u64,
481
482    /// The bytes sent on this connection.
483    pub send_bytes: u64,
484
485    /// The message count received on this connection.
486    pub recv_message_count: u64,
487
488    /// The bytes received on this connection
489    pub recv_bytes: u64,
490
491    /// UNIX epoch timestamp in seconds when this connection was opened.
492    pub opened_at_s: u64,
493
494    /// True if this connection has successfully upgraded to webrtc.
495    pub is_webrtc: bool,
496}