kitsune2_api 0.5.0-dev.1

p2p / dht communication framework api
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
//! Kitsune2 transport related types.

use crate::{protocol::*, *};
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::{Arc, Mutex, Weak};

type SpaceMap = Arc<Mutex<HashMap<SpaceId, DynTxSpaceHandler>>>;
type ModMap = Arc<Mutex<HashMap<(SpaceId, String), DynTxModuleHandler>>>;
type MessageBlocksMap =
    Arc<Mutex<HashMap<Url, HashMap<SpaceId, MessageBlockCount>>>>;

/// This is the low-level backend transport handler designed to work
/// with [DefaultTransport].
/// Construct using ([TxImpHnd::new]), with a high-level [DynTxHandler],
/// then call [DefaultTransport::create] to return the high-level handler
/// from the [TransportFactory].
pub struct TxImpHnd {
    handler: DynTxHandler,
    space_map: SpaceMap,
    mod_map: ModMap,
    blocked_message_counts: MessageBlocksMap,
}

impl TxImpHnd {
    /// When constructing a [Transport] from a [TransportFactory],
    /// you need a [TxImpHnd] for calling transport events.
    /// Pass the handler into here to construct one.
    pub fn new(handler: DynTxHandler) -> Arc<Self> {
        Arc::new(Self {
            handler,
            space_map: Arc::new(Mutex::new(HashMap::new())),
            mod_map: Arc::new(Mutex::new(HashMap::new())),
            blocked_message_counts: Arc::new(Mutex::new(HashMap::new())),
        })
    }

    /// Call this when you receive or bind a new address at which
    /// this local node can be reached by peers
    pub fn new_listening_address(&self, this_url: Url) -> BoxFut<'static, ()> {
        let handler = self.handler.clone();
        let space_map = self
            .space_map
            .clone()
            .lock()
            .unwrap()
            .values()
            .cloned()
            .collect::<Vec<_>>();

        Box::pin(async move {
            handler.new_listening_address(this_url.clone()).await;
            for s in space_map {
                s.new_listening_address(this_url.clone()).await;
            }
        })
    }

    /// Call this when you establish an outgoing connection and
    /// when you establish an incoming connection. If this call
    /// returns an error, the connection should be closed immediately.
    /// On success, this function returns bytes that should be
    /// sent as a preflight message for additional connection validation.
    /// (The preflight data should be sent even if it is zero length).
    pub fn peer_connect(
        &self,
        peer: Url,
    ) -> BoxFut<'_, K2Result<bytes::Bytes>> {
        Box::pin(async {
            for mod_handler in self.mod_map.lock().unwrap().values() {
                mod_handler.peer_connect(peer.clone())?;
            }
            for space_handler in self.space_map.lock().unwrap().values() {
                space_handler.peer_connect(peer.clone())?;
            }
            self.handler.peer_connect(peer.clone())?;

            // Check if any space has local agents before generating preflight.
            // If no space has local agents, we cannot generate a meaningful preflight
            // and should return an error. This is a temporary state that occurs
            // when receiving an incoming connection before any agent has joined.
            let space_handlers: Vec<_> =
                self.space_map.lock().unwrap().values().cloned().collect();
            if !space_handlers.is_empty() {
                let mut has_any_local_agents = false;
                for handler in &space_handlers {
                    if handler.has_local_agents().await? {
                        has_any_local_agents = true;
                        break;
                    }
                }
                if !has_any_local_agents {
                    return Err(K2Error::NoLocalAgentsDuringPreflight);
                }
            }

            let preflight =
                self.handler.preflight_gather_outgoing(peer).await?;
            let enc = (K2Proto {
                ty: K2WireType::Preflight as i32,
                data: preflight,
                space_id: None,
                module_id: None,
            })
            .encode()?;
            Ok(enc)
        })
    }

    /// Call this whenever a connection is closed.
    pub fn peer_disconnect(&self, peer: Url, reason: Option<String>) {
        for h in self.mod_map.lock().unwrap().values() {
            h.peer_disconnect(peer.clone(), reason.clone());
        }
        for h in self.space_map.lock().unwrap().values() {
            h.peer_disconnect(peer.clone(), reason.clone());
        }
        self.handler.peer_disconnect(peer, reason);
    }

    /// Call this whenever data is received on an open connection.
    pub fn recv_data(
        &self,
        peer: Url,
        data: bytes::Bytes,
    ) -> BoxFut<'_, K2Result<()>> {
        Box::pin(async move {
            let data = K2Proto::decode(&data)?;
            let message_type = data.ty();
            let K2Proto {
                space_id,
                module_id,
                data,
                ..
            } = data;

            // Except for preflight, unspecified and disconnect messages we
            // should drop messages if any agent is blocked for the given
            // peer URL and space id. We do not close the connection because
            // agents in other spaces may not be blocked on the same kitsune
            // instance.
            let start = std::time::Instant::now();
            if !self.check_message_permitted(
                &peer,
                &space_id,
                &module_id,
                &message_type,
            )? {
                let elapsed = start.elapsed();
                tracing::debug!(
                    ?peer,
                    "Checked message not permitted in {:?}",
                    elapsed
                );
                return Ok(());
            }

            let elapsed = start.elapsed();
            tracing::debug!(
                ?peer,
                "Checked message permitted in {:?}",
                elapsed
            );

            match message_type {
                K2WireType::Unspecified => Ok(()),
                K2WireType::Preflight => {
                    self.handler.preflight_validate_incoming(peer, data).await
                }
                K2WireType::Notify => {
                    if let Some(space_id) = space_id {
                        let space_id = SpaceId::from(space_id);
                        if let Some(h) =
                            self.space_map.lock().unwrap().get(&space_id)
                        {
                            h.recv_space_notify(peer, space_id, data)?;
                        }
                    }
                    Ok(())
                }
                K2WireType::Module => {
                    if let (Some(space_id), Some(module)) =
                        (space_id, module_id)
                    {
                        let space_id = SpaceId::from(space_id);
                        if let Some(h) = self
                            .mod_map
                            .lock()
                            .unwrap()
                            .get(&(space_id.clone(), module.clone()))
                        {
                            h.recv_module_msg(peer, space_id, module.clone(), data).inspect_err(|e| {
                            tracing::warn!(?module, "Error in recv_module_msg, peer connection will be closed: {e}");
                        })?;
                        }
                    }
                    Ok(())
                }
                K2WireType::Disconnect => {
                    let reason = String::from_utf8_lossy(&data).to_string();
                    Err(K2Error::other(format!("Remote Disconnect: {reason}")))
                }
            }
        })
    }

    /// Call this whenever a connection to a peer fails to get established,
    /// sending a message to a peer fails or when we get a disconnected
    /// event from a peer.
    pub fn set_unresponsive(
        &self,
        peer: Url,
        when: Timestamp,
    ) -> BoxFut<'_, K2Result<()>> {
        let space_map = self.space_map.lock().unwrap().clone();
        Box::pin(async move {
            for (space_id, space_handler) in space_map.iter() {
                if let Err(e) =
                    space_handler.set_unresponsive(peer.clone(), when).await
                {
                    tracing::error!(
                        "Failed to mark peer with url {peer} as unresponsive in space {space_id}: {e}"
                    );
                };
            }
            Ok(())
        })
    }

    /// Check whether a message is permitted for a given peer and space
    ///
    /// If any agent associated with the given peer and space id is blocked
    /// and the message is not of one of the explicitly allowed message types,
    /// this function will return false and increase the count of blocked
    /// messages by one.
    pub fn check_message_permitted(
        &self,
        peer_url: &Url,
        space_id: &Option<Bytes>,
        module_id: &Option<String>,
        message_type: &K2WireType,
    ) -> K2Result<bool> {
        // We accept the following messages also for peers at whose url all
        // agents are blocked:
        //
        // - Preflight: Such that we can discover any new agent infos available
        //   at that peer URL (agent infos are sent via preflight messages and
        //   a blocked agent cannot gossip, so we couldn't otherwise reliably
        //    learn about new agents at a peer URL).
        // - Unspecified: We allow unspecified messages in order to ensure that
        //   we're not constraining us with regards to updates in the
        //   networking protocol. And since we ignore unspecified messages
        //   anyway, we wouldn't gain much from blocking.
        // - Disconnect: If we receive a Disconnect message, we disconnect
        //   anyway and a disconnect message also wouldn't include a space id
        //   for which we could check for blocked agents.
        if matches!(
            message_type,
            K2WireType::Preflight
                | K2WireType::Unspecified
                | K2WireType::Disconnect
        ) {
            return Ok(true);
        }

        // If a space id was not provided, we reject the message and return
        // an error, which will cause the connection to be closed.
        //
        // Since both notify and module messages are scoped to a space and
        // therefore require a space id, a missing space id indicates that
        // the remote kitsune instance is not following the protocol and must
        // have been modified. Therefore none of the agents on that kitsune
        // instance should be trusted.
        //
        // NOTE: This logic may need to be modified, if at a later point
        // a new message type gets introduced that is not scoped to a space
        // but does need to undergo a (in that case not space-scoped) check
        // for blocks.
        let space_id = match space_id {
            None => {
                tracing::warn!(
                    ?peer_url,
                    "Received a message of type {:?} without space id which is violating the protocol. Dropping the message and closing the connection.",
                    message_type
                );
                return Err(K2Error::other(
                    "Received a message without space id.",
                ));
            }
            Some(id) => SpaceId::from(id.clone()),
        };
        let is_blocked = is_peer_blocked(
            self.space_map.clone(),
            self.blocked_message_counts.clone(),
            peer_url,
            &space_id,
            module_id,
            false,
        )?;
        Ok(!is_blocked)
    }
}

impl std::fmt::Debug for TxImpHnd {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(
            f,
            "TxImpHnd {{ handler: {:?}, space_map: [{} entries], mod_map: [{} entries] }}",
            self.handler,
            self.space_map.lock().unwrap().len(),
            self.mod_map.lock().unwrap().len()
        )
    }
}

/// Check whether any agent associated with the given peer url is blocked
/// for a space and increase the message blocks count by one if they are.
fn is_peer_blocked(
    space_map: SpaceMap,
    message_blocks_map: MessageBlocksMap,
    peer_url: &Url,
    space_id: &SpaceId,
    module_id: &Option<String>,
    outgoing: bool,
) -> K2Result<bool> {
    let space_handler =
        space_map.lock().expect("poisoned").get(space_id).cloned();
    match space_handler {
        Some(space_handler) => {
            let all_blocked = space_handler.is_any_agent_at_url_blocked(peer_url).inspect_err(|e| tracing::warn!(?space_id, ?peer_url, ?module_id, "Failed to check whether any agent is blocked, peer connection will be closed: {e}"))?;
            if all_blocked {
                tracing::debug!(
                    ?space_id,
                    ?peer_url,
                    ?module_id,
                    "At least one agent at peer is blocked, message will be dropped."
                );
                if outgoing {
                    incr_blocked_message_count_outgoing(
                        message_blocks_map,
                        peer_url.clone(),
                        space_id,
                    );
                } else {
                    incr_blocked_message_count_incoming(
                        message_blocks_map,
                        peer_url.clone(),
                        space_id,
                    );
                }
                return Ok(true);
            }
            Ok(false)
        }
        None => {
            tracing::error!(
                ?space_id,
                ?peer_url,
                ?module_id,
                "No space handler found. Message will be dropped."
            );
            Ok(true)
        }
    }
}

fn incr_blocked_message_count_incoming(
    message_blocks_map: MessageBlocksMap,
    peer_url: Url,
    space_id: &SpaceId,
) {
    let mut blocked_message_counts =
        message_blocks_map.lock().expect("poisoned");
    blocked_message_counts
        .entry(peer_url)
        .and_modify(|space_counts| {
            space_counts
                .entry(space_id.clone())
                .and_modify(|c| c.incoming += 1)
                .or_insert(MessageBlockCount {
                    incoming: 1,
                    outgoing: 0,
                });
        })
        .or_insert(
            [(
                space_id.clone(),
                MessageBlockCount {
                    incoming: 1,
                    outgoing: 0,
                },
            )]
            .into(),
        );
}

fn incr_blocked_message_count_outgoing(
    message_blocks_map: MessageBlocksMap,
    peer_url: Url,
    space_id: &SpaceId,
) {
    let mut blocked_message_counts =
        message_blocks_map.lock().expect("poisoned");
    blocked_message_counts
        .entry(peer_url)
        .and_modify(|space_counts| {
            space_counts
                .entry(space_id.clone())
                .and_modify(|c| c.outgoing += 1)
                .or_insert(MessageBlockCount {
                    incoming: 0,
                    outgoing: 1,
                });
        })
        .or_insert(
            [(
                space_id.clone(),
                MessageBlockCount {
                    incoming: 0,
                    outgoing: 1,
                },
            )]
            .into(),
        );
}
/// A low-level transport implementation.
pub trait TxImp: 'static + Send + Sync + std::fmt::Debug {
    /// Get the current url if any.
    fn url(&self) -> Option<Url>;

    /// Indicates that the implementation should close any open connections to
    /// the given peer. If a payload is provided, the implementation can
    /// make a best effort to send it to the remote first on a short timeout.
    /// Regardless of the success of the payload send, the connection should
    /// be closed.
    fn disconnect(
        &self,
        peer: Url,
        payload: Option<(String, bytes::Bytes)>,
    ) -> BoxFut<'_, ()>;

    /// Indicates that the implementation should send the payload to the remote
    /// peer, opening a connection if needed.
    fn send(&self, peer: Url, data: bytes::Bytes) -> BoxFut<'_, K2Result<()>>;

    /// Get the list of connected peers.
    fn get_connected_peers(&self) -> BoxFut<'_, K2Result<Vec<Url>>>;

    /// Dump network stats.
    fn dump_network_stats(&self) -> BoxFut<'_, K2Result<TransportStats>>;
}

/// Trait-object [TxImp].
pub type DynTxImp = Arc<dyn TxImp>;

/// A high-level wrapper around a low-level [DynTxImp] transport implementation.
#[cfg_attr(any(test, feature = "mockall"), mockall::automock)]
pub trait Transport: 'static + Send + Sync + std::fmt::Debug {
    /// Register a space handler for receiving incoming notifications.
    ///
    /// Panics if you attempt to register a duplicate handler for
    /// a space.
    ///
    /// Returns the current url if any.
    fn register_space_handler(
        &self,
        space_id: SpaceId,
        handler: DynTxSpaceHandler,
    ) -> Option<Url>;

    /// Register a module handler for receiving incoming module messages.
    ///
    /// Panics if you attempt to register a duplicate handler for the
    /// same (space_id, module).
    fn register_module_handler(
        &self,
        space_id: SpaceId,
        module: String,
        handler: DynTxModuleHandler,
    );

    /// Make a best effort to notify a peer that we are disconnecting and why.
    /// After a short time out, the connection will be closed even if the
    /// disconnect reason message is still pending.
    fn disconnect(&self, peer: Url, reason: Option<String>) -> BoxFut<'_, ()>;

    /// Notify a remote peer within a space. This is a fire-and-forget
    /// type message. The future this call returns will indicate any errors
    /// that occur up to the point where the message is handed off to
    /// the transport backend. After that, the future will return `Ok(())`
    /// but the remote peer may or may not actually receive the message.
    fn send_space_notify(
        &self,
        peer: Url,
        space_id: SpaceId,
        data: bytes::Bytes,
    ) -> BoxFut<'_, K2Result<()>>;

    /// Notify a remote peer module within a space. This is a fire-and-forget
    /// type message. The future this call returns will indicate any errors
    /// that occur up to the point where the message is handed off to
    /// the transport backend. After that, the future will return `Ok(())`
    /// but the remote peer may or may not actually receive the message.
    fn send_module(
        &self,
        peer: Url,
        space_id: SpaceId,
        module: String,
        data: bytes::Bytes,
    ) -> BoxFut<'_, K2Result<()>>;

    /// Get the list of connected peers.
    fn get_connected_peers(&self) -> BoxFut<'_, K2Result<Vec<Url>>>;

    /// Unregister a space handler and all module handlers for that space.
    fn unregister_space(&self, space_id: SpaceId) -> BoxFut<'_, ()>;

    /// Dump network stats.
    fn dump_network_stats(&self) -> BoxFut<'_, K2Result<ApiTransportStats>>;
}

/// Trait-object [Transport].
pub type DynTransport = Arc<dyn Transport>;

/// A weak trait-object [Transport].
///
/// This is provided in the API as a suggestion for modules that store a reference to the transport
/// for sending messages but also implement [`TxModuleHandler`]. When registering as a module
/// handler, the transport keeps a reference to your module. If you then store an owned reference
/// to the transport, you create a circular reference. By using a weak reference instead, you can
/// create a well-behaved module that will be dropped when a space shuts down.
pub type WeakDynTransport = Weak<dyn Transport>;

/// A high-level wrapper around a low-level [DynTxImp] transport implementation.
#[derive(Clone, Debug)]
pub struct DefaultTransport {
    imp: DynTxImp,
    space_map: SpaceMap,
    mod_map: ModMap,
    blocked_message_counts: MessageBlocksMap,
}

impl DefaultTransport {
    /// When constructing a [Transport] from a [TransportFactory],
    /// this function does the actual wrapping of your implemementation
    /// to produce the [Transport] struct.
    ///
    /// [DefaultTransport] is built to be used with the provided [TxImpHnd].
    pub fn create(hnd: &TxImpHnd, imp: DynTxImp) -> DynTransport {
        let out: DynTransport = Arc::new(DefaultTransport {
            imp,
            space_map: hnd.space_map.clone(),
            mod_map: hnd.mod_map.clone(),
            blocked_message_counts: hnd.blocked_message_counts.clone(),
        });
        out
    }

    // Check if this space has local agents before generating preflight.
    // If it doesn't have local agents, we cannot generate a meaningful preflight
    // and should return an error. This can happen when writing to our own peer store
    // (which is async) takes longer than other code reaching a send.
    async fn error_if_no_local_agents(
        &self,
        space_id: SpaceId,
    ) -> K2Result<()> {
        let space_handler =
            self.space_map.lock().unwrap().get(&space_id).cloned();
        if let Some(handler) = space_handler
            && !handler.has_local_agents().await?
        {
            return Err(K2Error::NoLocalAgentsDuringPreflight);
        }
        Ok(())
    }
}

impl Transport for DefaultTransport {
    fn register_space_handler(
        &self,
        space_id: SpaceId,
        handler: DynTxSpaceHandler,
    ) -> Option<Url> {
        let mut lock = self.space_map.lock().unwrap();
        if lock.insert(space_id.clone(), handler).is_some() {
            panic!("Attempted to register duplicate space handler! {space_id}");
        }
        // keep the lock locked while we fetch the url for atomicity.
        self.imp.url()
    }

    fn register_module_handler(
        &self,
        space_id: SpaceId,
        module: String,
        handler: DynTxModuleHandler,
    ) {
        if self
            .mod_map
            .lock()
            .unwrap()
            .insert((space_id.clone(), module.clone()), handler)
            .is_some()
        {
            panic!(
                "Attempted to register duplicate module handler! {space_id} {module}"
            );
        }
    }

    fn disconnect(&self, peer: Url, reason: Option<String>) -> BoxFut<'_, ()> {
        Box::pin(async move {
            let payload = match reason {
                None => None,
                Some(reason) => match (K2Proto {
                    ty: K2WireType::Disconnect as i32,
                    data: bytes::Bytes::copy_from_slice(reason.as_bytes()),
                    space_id: None,
                    module_id: None,
                })
                .encode()
                {
                    Ok(payload) => Some((reason, payload)),
                    Err(_) => None,
                },
            };

            self.imp.disconnect(peer, payload).await;
        })
    }

    fn send_space_notify(
        &self,
        peer_url: Url,
        space_id: SpaceId,
        data: bytes::Bytes,
    ) -> BoxFut<'_, K2Result<()>> {
        Box::pin(async move {
            self.error_if_no_local_agents(space_id.clone()).await?;
            if is_peer_blocked(
                self.space_map.clone(),
                self.blocked_message_counts.clone(),
                &peer_url,
                &space_id,
                &None,
                true,
            )? {
                tracing::warn!(
                    ?peer_url,
                    ?space_id,
                    "Attempted to send space notify message to a peer that is blocked in that space. Dropping message."
                );
                return Ok(());
            }
            let enc = (K2Proto {
                ty: K2WireType::Notify as i32,
                data,
                space_id: Some(space_id.into()),
                module_id: None,
            })
            .encode()?;
            self.imp.send(peer_url, enc).await
        })
    }

    fn send_module(
        &self,
        peer_url: Url,
        space_id: SpaceId,
        module: String,
        data: bytes::Bytes,
    ) -> BoxFut<'_, K2Result<()>> {
        Box::pin(async move {
            self.error_if_no_local_agents(space_id.clone()).await?;
            if is_peer_blocked(
                self.space_map.clone(),
                self.blocked_message_counts.clone(),
                &peer_url,
                &space_id,
                &None,
                true,
            )? {
                tracing::warn!(
                    ?peer_url,
                    ?space_id,
                    ?module,
                    "Attempted to send module message to a peer that is blocked in the associated space. Dropping message."
                );
                return Ok(());
            }
            let enc = (K2Proto {
                ty: K2WireType::Module as i32,
                data,
                space_id: Some(space_id.into()),
                module_id: Some(module),
            })
            .encode()?;
            self.imp.send(peer_url, enc).await
        })
    }

    fn get_connected_peers(&self) -> BoxFut<'_, K2Result<Vec<Url>>> {
        self.imp.get_connected_peers()
    }

    fn unregister_space(&self, space_id: SpaceId) -> BoxFut<'_, ()> {
        Box::pin(async move {
            // Remove the space handler.
            self.space_map.lock().unwrap().remove(&space_id);

            // Remove all module handlers for this space.
            // Done by keeping all module handlers that are not for this space.
            self.mod_map
                .lock()
                .unwrap()
                .retain(|(s, _), _| s != &space_id);
        })
    }

    fn dump_network_stats(&self) -> BoxFut<'_, K2Result<ApiTransportStats>> {
        Box::pin(async {
            let low_level_stats = self.imp.dump_network_stats().await?;
            let blocked_message_counts =
                self.blocked_message_counts.lock().expect("poisoned");
            Ok(ApiTransportStats {
                transport_stats: low_level_stats,
                blocked_message_counts: blocked_message_counts.clone(),
            })
        })
    }
}

/// Base trait for transport handler events.
/// The other three handler types are all based on this trait.
pub trait TxBaseHandler: 'static + Send + Sync + std::fmt::Debug {
    /// A notification that a new listening address has been bound.
    /// Peers should now go to this new address to reach this node.
    fn new_listening_address(&self, this_url: Url) -> BoxFut<'static, ()> {
        drop(this_url);
        Box::pin(async move {})
    }

    /// A peer has connected to us. In addition to the preflight
    /// logic in [TxHandler], this callback allows space and module
    /// logic to block connections to peers. Simply return an Err here.
    fn peer_connect(&self, peer: Url) -> K2Result<()> {
        drop(peer);
        Ok(())
    }

    /// A peer has disconnected from us. If they did so gracefully
    /// the reason will be is_some().
    fn peer_disconnect(&self, peer: Url, reason: Option<String>) {
        drop((peer, reason));
    }
}

/// Handler for whole transport-level events.
pub trait TxHandler: TxBaseHandler {
    /// Gather preflight data to send to a new opening connection.
    /// Returning an Err result will close this connection.
    ///
    /// The default implementation sends an empty preflight message.
    fn preflight_gather_outgoing(
        &self,
        peer_url: Url,
    ) -> BoxFut<'_, K2Result<bytes::Bytes>> {
        drop(peer_url);
        Box::pin(async { Ok(bytes::Bytes::new()) })
    }

    /// Validate preflight data sent by a remote peer on a new connection.
    /// Returning an Err result will close this connection.
    ///
    /// The default implementation ignores the preflight data,
    /// and considers it valid.
    fn preflight_validate_incoming(
        &self,
        peer_url: Url,
        data: bytes::Bytes,
    ) -> BoxFut<'_, K2Result<()>> {
        drop((peer_url, data));
        Box::pin(async { Ok(()) })
    }

    /// Check if this handler has any local agents joined.
    ///
    /// This is used by the transport to prevent sending messages before
    /// a local agent has joined, which would result in an empty preflight
    /// being sent to peers, causing them to block all messages from us.
    ///
    /// The default implementation returns true (assumes agents exist).
    fn has_local_agents(&self) -> BoxFut<'_, K2Result<bool>> {
        Box::pin(async { Ok(true) })
    }
}

/// Trait-object [TxHandler].
pub type DynTxHandler = Arc<dyn TxHandler>;

/// Handler for space-related events.
pub trait TxSpaceHandler: TxBaseHandler {
    /// The sync handler for receiving notifications sent by a remote
    /// peer in reference to a particular space. If this callback returns
    /// an error, then the connection which sent the message will be closed.
    fn recv_space_notify(
        &self,
        peer: Url,
        space_id: SpaceId,
        data: bytes::Bytes,
    ) -> K2Result<()> {
        drop((peer, space_id, data));
        Ok(())
    }

    /// Mark a peer as unresponsive in the space's peer meta store
    fn set_unresponsive(
        &self,
        peer: Url,
        when: Timestamp,
    ) -> BoxFut<'_, K2Result<()>> {
        drop((peer, when));
        Box::pin(async move { Ok(()) })
    }

    /// Return `true` if any agent using the passed peer [`Url`] is blocked.
    fn is_any_agent_at_url_blocked(&self, peer_url: &Url) -> K2Result<bool>;
    /// Check if this space has any local agents joined.
    ///
    /// This is used to prevent sending messages before a local agent has joined,
    /// which would result in an empty preflight being sent to peers, causing them
    /// to block all messages from us.
    ///
    /// The default implementation returns true (assumes agents exist).
    fn has_local_agents(&self) -> BoxFut<'_, K2Result<bool>> {
        Box::pin(async { Ok(true) })
    }
}

/// Trait-object [TxSpaceHandler].
pub type DynTxSpaceHandler = Arc<dyn TxSpaceHandler>;

/// Handler for module-related events.
pub trait TxModuleHandler: TxBaseHandler {
    /// The sync handler for receiving module messages sent by a remote
    /// peer in reference to a particular space. If this callback returns
    /// an error, then the connection which sent the message will be closed.
    fn recv_module_msg(
        &self,
        peer: Url,
        space_id: SpaceId,
        module: String,
        data: bytes::Bytes,
    ) -> K2Result<()> {
        drop((peer, space_id, module, data));
        Ok(())
    }
}

/// Trait-object [TxModuleHandler].
pub type DynTxModuleHandler = Arc<dyn TxModuleHandler>;

/// A factory for constructing Transport instances.
pub trait TransportFactory: 'static + Send + Sync + std::fmt::Debug {
    /// Help the builder construct a default config from the chosen
    /// module factories.
    fn default_config(&self, config: &mut config::Config) -> K2Result<()>;

    /// Validate configuration.
    ///
    /// The implementation should check only for values which are set and mandatory at first,
    /// while missing values that can be provided later, such as during space creation, should be ignored.
    fn validate_config(&self, config: &config::Config) -> K2Result<()>;

    /// Construct a transport instance.
    fn create(
        &self,
        builder: Arc<builder::Builder>,
        handler: DynTxHandler,
    ) -> BoxFut<'static, K2Result<DynTransport>>;
}

/// Trait-object [TransportFactory].
pub type DynTransportFactory = Arc<dyn TransportFactory>;

/// A count for incoming and outgoing blocked messages.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct MessageBlockCount {
    /// Count of incoming messages that have been blocked and dropped.
    pub incoming: u32,

    /// Count of outgoing messages that have been blocked and dropped.
    pub outgoing: u32,
}

/// Extended transport stats exposed via the API.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ApiTransportStats {
    /// Stats from the configured transport implementation.
    pub transport_stats: TransportStats,

    /// Blocked message counts.
    pub blocked_message_counts:
        HashMap<Url, HashMap<SpaceId, MessageBlockCount>>,
}

/// Stats for a transport connection.
///
/// This is intended to be a state dump that gives some insight into what the transport is doing.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TransportStats {
    /// The networking backend that is in use.
    pub backend: String,

    /// The list of peer urls that this Kitsune2 instance can currently be reached at.
    pub peer_urls: Vec<Url>,

    /// The list of current connections.
    pub connections: Vec<TransportConnectionStats>,
}

/// Stats for a single transport connection.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TransportConnectionStats {
    /// The public key of the remote peer.
    pub pub_key: String,

    /// The message count sent on this connection.
    pub send_message_count: u64,

    /// The bytes sent on this connection.
    pub send_bytes: u64,

    /// The message count received on this connection.
    pub recv_message_count: u64,

    /// The bytes received on this connection
    pub recv_bytes: u64,

    /// UNIX epoch timestamp in seconds when this connection was opened.
    pub opened_at_s: u64,

    /// True if this connection has successfully upgraded to a direct peer connection.
    pub is_direct: bool,
}