kitsune2_api/transport.rs
1//! Kitsune2 transport related types.
2
3use crate::{protocol::*, *};
4#[cfg(feature = "mockall")]
5use mockall::automock;
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::sync::{Arc, Mutex, Weak};
9
10/// This is the low-level backend transport handler designed to work
11/// with [DefaultTransport].
12/// Construct using ([TxImpHnd::new]), with a high-level [DynTxHandler],
13/// then call [DefaultTransport::create] to return the high-level handler
14/// from the [TransportFactory].
15pub struct TxImpHnd {
16 handler: DynTxHandler,
17 space_map: Arc<Mutex<HashMap<SpaceId, DynTxSpaceHandler>>>,
18 mod_map: Arc<Mutex<HashMap<(SpaceId, String), DynTxModuleHandler>>>,
19}
20
21impl TxImpHnd {
22 /// When constructing a [Transport] from a [TransportFactory],
23 /// you need a [TxImpHnd] for calling transport events.
24 /// Pass the handler into here to construct one.
25 pub fn new(handler: DynTxHandler) -> Arc<Self> {
26 Arc::new(Self {
27 handler,
28 space_map: Arc::new(Mutex::new(HashMap::new())),
29 mod_map: Arc::new(Mutex::new(HashMap::new())),
30 })
31 }
32
33 /// Call this when you receive or bind a new address at which
34 /// this local node can be reached by peers
35 pub fn new_listening_address(&self, this_url: Url) -> BoxFut<'static, ()> {
36 self.handler.new_listening_address(this_url)
37 }
38
39 /// Call this when you establish an outgoing connection and
40 /// when you establish an incoming connection. If this call
41 /// returns an error, the connection should be closed immediately.
42 /// On success, this function returns bytes that should be
43 /// sent as a preflight message for additional connection validation.
44 /// (The preflight data should be sent even if it is zero length).
45 pub fn peer_connect(&self, peer: Url) -> K2Result<bytes::Bytes> {
46 for mod_handler in self.mod_map.lock().unwrap().values() {
47 mod_handler.peer_connect(peer.clone())?;
48 }
49 for space_handler in self.space_map.lock().unwrap().values() {
50 space_handler.peer_connect(peer.clone())?;
51 }
52 self.handler.peer_connect(peer.clone())?;
53 let preflight = self.handler.preflight_gather_outgoing(peer)?;
54 let enc = (K2Proto {
55 ty: K2WireType::Preflight as i32,
56 data: preflight,
57 space: None,
58 module: None,
59 })
60 .encode()?;
61 Ok(enc)
62 }
63
64 /// Call this whenever a connection is closed.
65 pub fn peer_disconnect(&self, peer: Url, reason: Option<String>) {
66 for h in self.mod_map.lock().unwrap().values() {
67 h.peer_disconnect(peer.clone(), reason.clone());
68 }
69 for h in self.space_map.lock().unwrap().values() {
70 h.peer_disconnect(peer.clone(), reason.clone());
71 }
72 self.handler.peer_disconnect(peer, reason);
73 }
74
75 /// Call this whenever data is received on an open connection.
76 pub fn recv_data(&self, peer: Url, data: bytes::Bytes) -> K2Result<()> {
77 let data = K2Proto::decode(&data)?;
78 let ty = data.ty();
79 let K2Proto {
80 space,
81 module,
82 data,
83 ..
84 } = data;
85
86 match ty {
87 K2WireType::Unspecified => Ok(()),
88 K2WireType::Preflight => {
89 self.handler.preflight_validate_incoming(peer, data)
90 }
91 K2WireType::Notify => {
92 if let Some(space) = space {
93 let space = SpaceId::from(space);
94 if let Some(h) = self.space_map.lock().unwrap().get(&space)
95 {
96 h.recv_space_notify(peer, space, data)?;
97 }
98 }
99 Ok(())
100 }
101 K2WireType::Module => {
102 if let (Some(space), Some(module)) = (space, module) {
103 let space = SpaceId::from(space);
104 if let Some(h) = self
105 .mod_map
106 .lock()
107 .unwrap()
108 .get(&(space.clone(), module.clone()))
109 {
110 h.recv_module_msg(peer, space, module, data)?;
111 }
112 }
113 Ok(())
114 }
115 K2WireType::Disconnect => {
116 let reason = String::from_utf8_lossy(&data).to_string();
117 Err(K2Error::other(format!("Remote Disconnect: {reason}")))
118 }
119 }
120 }
121}
122
123/// A low-level transport implementation.
124pub trait TxImp: 'static + Send + Sync + std::fmt::Debug {
125 /// Get the current url if any.
126 fn url(&self) -> Option<Url>;
127
128 /// Indicates that the implementation should close any open connections to
129 /// the given peer. If a payload is provided, the implementation can
130 /// make a best effort to send it to the remote first on a short timeout.
131 /// Regardless of the success of the payload send, the connection should
132 /// be closed.
133 fn disconnect(
134 &self,
135 peer: Url,
136 payload: Option<(String, bytes::Bytes)>,
137 ) -> BoxFut<'_, ()>;
138
139 /// Indicates that the implementation should send the payload to the remote
140 /// peer, opening a connection if needed.
141 fn send(&self, peer: Url, data: bytes::Bytes) -> BoxFut<'_, K2Result<()>>;
142
143 /// Dump network stats.
144 fn dump_network_stats(&self) -> BoxFut<'_, K2Result<TransportStats>>;
145}
146
147/// Trait-object [TxImp].
148pub type DynTxImp = Arc<dyn TxImp>;
149
150/// A high-level wrapper around a low-level [DynTxImp] transport implementation.
151#[cfg_attr(any(test, feature = "mockall"), automock)]
152pub trait Transport: 'static + Send + Sync + std::fmt::Debug {
153 /// Register a space handler for receiving incoming notifications.
154 ///
155 /// Panics if you attempt to register a duplicate handler for
156 /// a space.
157 ///
158 /// Returns the current url if any.
159 fn register_space_handler(
160 &self,
161 space: SpaceId,
162 handler: DynTxSpaceHandler,
163 ) -> Option<Url>;
164
165 /// Register a module handler for receiving incoming module messages.
166 ///
167 /// Panics if you attempt to register a duplicate handler for the
168 /// same (space, module).
169 fn register_module_handler(
170 &self,
171 space: SpaceId,
172 module: String,
173 handler: DynTxModuleHandler,
174 );
175
176 /// Make a best effort to notify a peer that we are disconnecting and why.
177 /// After a short time out, the connection will be closed even if the
178 /// disconnect reason message is still pending.
179 fn disconnect(&self, peer: Url, reason: Option<String>) -> BoxFut<'_, ()>;
180
181 /// Notify a remote peer within a space. This is a fire-and-forget
182 /// type message. The future this call returns will indicate any errors
183 /// that occur up to the point where the message is handed off to
184 /// the transport backend. After that, the future will return `Ok(())`
185 /// but the remote peer may or may not actually receive the message.
186 fn send_space_notify(
187 &self,
188 peer: Url,
189 space: SpaceId,
190 data: bytes::Bytes,
191 ) -> BoxFut<'_, K2Result<()>>;
192
193 /// Notify a remote peer module within a space. This is a fire-and-forget
194 /// type message. The future this call returns will indicate any errors
195 /// that occur up to the point where the message is handed off to
196 /// the transport backend. After that, the future will return `Ok(())`
197 /// but the remote peer may or may not actually receive the message.
198 fn send_module(
199 &self,
200 peer: Url,
201 space: SpaceId,
202 module: String,
203 data: bytes::Bytes,
204 ) -> BoxFut<'_, K2Result<()>>;
205
206 /// Dump network stats.
207 fn dump_network_stats(&self) -> BoxFut<'_, K2Result<TransportStats>>;
208}
209
210/// Trait-object [Transport].
211pub type DynTransport = Arc<dyn Transport>;
212
213/// A weak trait-object [Transport].
214///
215/// This is provided in the API as a suggestion for modules that store a reference to the transport
216/// for sending messages but also implement [`TxModuleHandler`]. When registering as a module
217/// handler, the transport keeps a reference to your module. If you then store an owned reference
218/// to the transport, you create a circular reference. By using a weak reference instead, you can
219/// create a well-behaved module that will be dropped when a space shuts down.
220pub type WeakDynTransport = Weak<dyn Transport>;
221
222/// A high-level wrapper around a low-level [DynTxImp] transport implementation.
223#[derive(Clone, Debug)]
224pub struct DefaultTransport {
225 imp: DynTxImp,
226 space_map: Arc<Mutex<HashMap<SpaceId, DynTxSpaceHandler>>>,
227 mod_map: Arc<Mutex<HashMap<(SpaceId, String), DynTxModuleHandler>>>,
228}
229
230impl DefaultTransport {
231 /// When constructing a [Transport] from a [TransportFactory],
232 /// this function does the actual wrapping of your implemementation
233 /// to produce the [Transport] struct.
234 ///
235 /// [DefaultTransport] is built to be used with the provided [TxImpHnd].
236 pub fn create(hnd: &TxImpHnd, imp: DynTxImp) -> DynTransport {
237 let out: DynTransport = Arc::new(DefaultTransport {
238 imp,
239 space_map: hnd.space_map.clone(),
240 mod_map: hnd.mod_map.clone(),
241 });
242 out
243 }
244}
245
246impl Transport for DefaultTransport {
247 fn register_space_handler(
248 &self,
249 space: SpaceId,
250 handler: DynTxSpaceHandler,
251 ) -> Option<Url> {
252 let mut lock = self.space_map.lock().unwrap();
253 if lock.insert(space.clone(), handler).is_some() {
254 panic!("Attempted to register duplicate space handler! {space}");
255 }
256 // keep the lock locked while we fetch the url for atomicity.
257 self.imp.url()
258 }
259
260 fn register_module_handler(
261 &self,
262 space: SpaceId,
263 module: String,
264 handler: DynTxModuleHandler,
265 ) {
266 if self
267 .mod_map
268 .lock()
269 .unwrap()
270 .insert((space.clone(), module.clone()), handler)
271 .is_some()
272 {
273 panic!(
274 "Attempted to register duplicate module handler! {space} {module}"
275 );
276 }
277 }
278
279 fn disconnect(&self, peer: Url, reason: Option<String>) -> BoxFut<'_, ()> {
280 Box::pin(async move {
281 let payload = match reason {
282 None => None,
283 Some(reason) => match (K2Proto {
284 ty: K2WireType::Disconnect as i32,
285 data: bytes::Bytes::copy_from_slice(reason.as_bytes()),
286 space: None,
287 module: None,
288 })
289 .encode()
290 {
291 Ok(payload) => Some((reason, payload)),
292 Err(_) => None,
293 },
294 };
295
296 self.imp.disconnect(peer, payload).await;
297 })
298 }
299
300 fn send_space_notify(
301 &self,
302 peer: Url,
303 space: SpaceId,
304 data: bytes::Bytes,
305 ) -> BoxFut<'_, K2Result<()>> {
306 Box::pin(async move {
307 let enc = (K2Proto {
308 ty: K2WireType::Notify as i32,
309 data,
310 space: Some(space.into()),
311 module: None,
312 })
313 .encode()?;
314 self.imp.send(peer, enc).await
315 })
316 }
317
318 fn send_module(
319 &self,
320 peer: Url,
321 space: SpaceId,
322 module: String,
323 data: bytes::Bytes,
324 ) -> BoxFut<'_, K2Result<()>> {
325 Box::pin(async move {
326 let enc = (K2Proto {
327 ty: K2WireType::Module as i32,
328 data,
329 space: Some(space.into()),
330 module: Some(module),
331 })
332 .encode()?;
333 self.imp.send(peer, enc).await
334 })
335 }
336
337 fn dump_network_stats(&self) -> BoxFut<'_, K2Result<TransportStats>> {
338 self.imp.dump_network_stats()
339 }
340}
341
342/// Base trait for transport handler events.
343/// The other three handler types are all based on this trait.
344pub trait TxBaseHandler: 'static + Send + Sync + std::fmt::Debug {
345 /// A notification that a new listening address has been bound.
346 /// Peers should now go to this new address to reach this node.
347 fn new_listening_address(&self, this_url: Url) -> BoxFut<'static, ()> {
348 drop(this_url);
349 Box::pin(async move {})
350 }
351
352 /// A peer has connected to us. In addition to the preflight
353 /// logic in [TxHandler], this callback allows space and module
354 /// logic to block connections to peers. Simply return an Err here.
355 fn peer_connect(&self, peer: Url) -> K2Result<()> {
356 drop(peer);
357 Ok(())
358 }
359
360 /// A peer has disconnected from us. If they did so gracefully
361 /// the reason will be is_some().
362 fn peer_disconnect(&self, peer: Url, reason: Option<String>) {
363 drop((peer, reason));
364 }
365}
366
367/// Handler for whole transport-level events.
368pub trait TxHandler: TxBaseHandler {
369 /// Gather preflight data to send to a new opening connection.
370 /// Returning an Err result will close this connection.
371 ///
372 /// The default implementation sends an empty preflight message.
373 fn preflight_gather_outgoing(
374 &self,
375 peer_url: Url,
376 ) -> K2Result<bytes::Bytes> {
377 drop(peer_url);
378 Ok(bytes::Bytes::new())
379 }
380
381 /// Validate preflight data sent by a remote peer on a new connection.
382 /// Returning an Err result will close this connection.
383 ///
384 /// The default implementation ignores the preflight data,
385 /// and considers it valid.
386 fn preflight_validate_incoming(
387 &self,
388 peer_url: Url,
389 data: bytes::Bytes,
390 ) -> K2Result<()> {
391 drop((peer_url, data));
392 Ok(())
393 }
394}
395
396/// Trait-object [TxHandler].
397pub type DynTxHandler = Arc<dyn TxHandler>;
398
399/// Handler for space-related events.
400pub trait TxSpaceHandler: TxBaseHandler {
401 /// The sync handler for receiving notifications sent by a remote
402 /// peer in reference to a particular space. If this callback returns
403 /// an error, then the connection which sent the message will be closed.
404 fn recv_space_notify(
405 &self,
406 peer: Url,
407 space: SpaceId,
408 data: bytes::Bytes,
409 ) -> K2Result<()> {
410 drop((peer, space, data));
411 Ok(())
412 }
413}
414
415/// Trait-object [TxSpaceHandler].
416pub type DynTxSpaceHandler = Arc<dyn TxSpaceHandler>;
417
418/// Handler for module-related events.
419pub trait TxModuleHandler: TxBaseHandler {
420 /// The sync handler for receiving module messages sent by a remote
421 /// peer in reference to a particular space. If this callback returns
422 /// an error, then the connection which sent the message will be closed.
423 fn recv_module_msg(
424 &self,
425 peer: Url,
426 space: SpaceId,
427 module: String,
428 data: bytes::Bytes,
429 ) -> K2Result<()> {
430 drop((peer, space, module, data));
431 Ok(())
432 }
433}
434
435/// Trait-object [TxModuleHandler].
436pub type DynTxModuleHandler = Arc<dyn TxModuleHandler>;
437
438/// A factory for constructing Transport instances.
439pub trait TransportFactory: 'static + Send + Sync + std::fmt::Debug {
440 /// Help the builder construct a default config from the chosen
441 /// module factories.
442 fn default_config(&self, config: &mut config::Config) -> K2Result<()>;
443
444 /// Validate configuration.
445 fn validate_config(&self, config: &config::Config) -> K2Result<()>;
446
447 /// Construct a transport instance.
448 fn create(
449 &self,
450 builder: Arc<builder::Builder>,
451 handler: DynTxHandler,
452 ) -> BoxFut<'static, K2Result<DynTransport>>;
453}
454
455/// Trait-object [TransportFactory].
456pub type DynTransportFactory = Arc<dyn TransportFactory>;
457
458/// Stats for a transport connection.
459///
460/// This is intended to be a state dump that gives some insight into what the transport is doing.
461#[derive(Debug, Clone, Serialize, Deserialize)]
462pub struct TransportStats {
463 /// The networking backend that is in use.
464 pub backend: String,
465
466 /// The list of peer urls that this Kitsune2 instance can currently be reached at.
467 pub peer_urls: Vec<Url>,
468
469 /// The list of current connections.
470 pub connections: Vec<TransportConnectionStats>,
471}
472
473/// Stats for a single transport connection.
474#[derive(Debug, Clone, Serialize, Deserialize)]
475pub struct TransportConnectionStats {
476 /// The public key of the remote peer.
477 pub pub_key: String,
478
479 /// The message count sent on this connection.
480 pub send_message_count: u64,
481
482 /// The bytes sent on this connection.
483 pub send_bytes: u64,
484
485 /// The message count received on this connection.
486 pub recv_message_count: u64,
487
488 /// The bytes received on this connection
489 pub recv_bytes: u64,
490
491 /// UNIX epoch timestamp in seconds when this connection was opened.
492 pub opened_at_s: u64,
493
494 /// True if this connection has successfully upgraded to webrtc.
495 pub is_webrtc: bool,
496}