radicle_node/wire/
protocol.rs

1//! Implementation of the transport protocol.
2//!
3//! We use the Noise XK handshake pattern to establish an encrypted stream with a remote peer.
4//! The handshake itself is implemented in the external [`cyphernet`] and [`netservices`] crates.
5use std::collections::hash_map::Entry;
6use std::collections::VecDeque;
7use std::os::unix::io::{AsRawFd, RawFd};
8use std::sync::Arc;
9use std::{io, net, time};
10
11use amplify::Wrapper as _;
12use crossbeam_channel as chan;
13use cyphernet::addr::{HostName, InetHost, NetAddr};
14use cyphernet::encrypt::noise::{HandshakePattern, Keyset, NoiseState};
15use cyphernet::proxy::socks5;
16use cyphernet::{Digest, EcSk, Ecdh, Sha256};
17use localtime::LocalTime;
18use netservices::resource::{ListenerEvent, NetAccept, NetTransport, SessionEvent};
19use netservices::session::{NoiseSession, ProtocolArtifact, Socks5Session};
20use netservices::{NetConnection, NetReader, NetWriter};
21use radicle::node::device::Device;
22use reactor::{ResourceId, ResourceType, Timestamp};
23
24use radicle::collections::RandomMap;
25use radicle::crypto;
26use radicle::node::config::AddressConfig;
27use radicle::node::NodeId;
28use radicle::storage::WriteStorage;
29
30use crate::prelude::Deserializer;
31use crate::service;
32use crate::service::io::Io;
33use crate::service::FETCH_TIMEOUT;
34use crate::service::{session, DisconnectReason, Metrics, Service};
35use crate::wire::frame;
36use crate::wire::frame::{Frame, FrameData, StreamId};
37use crate::wire::Encode;
38use crate::worker;
39use crate::worker::{ChannelEvent, ChannelsConfig, FetchRequest, FetchResult, Task, TaskResult};
40use crate::Link;
41
42/// NoiseXK handshake pattern.
43pub const NOISE_XK: HandshakePattern = HandshakePattern {
44    initiator: cyphernet::encrypt::noise::InitiatorPattern::Xmitted,
45    responder: cyphernet::encrypt::noise::OneWayPattern::Known,
46};
47
48/// Default time to wait until a network connection is considered inactive.
49pub const DEFAULT_CONNECTION_TIMEOUT: time::Duration = time::Duration::from_secs(6);
50
51/// Default time to wait when dialing a connection, before the remote is considered unreachable.
52pub const DEFAULT_DIAL_TIMEOUT: time::Duration = time::Duration::from_secs(6);
53
54/// Maximum size of a peer inbox, in bytes.
55pub const MAX_INBOX_SIZE: usize = 1024 * 1024 * 2;
56
57/// Control message used internally between workers, users, and the service.
58#[allow(clippy::large_enum_variant)]
59#[derive(Debug)]
60pub enum Control {
61    /// Message from the user to the service.
62    User(service::Command),
63    /// Message from a worker to the service.
64    Worker(TaskResult),
65    /// Flush data in the given stream to the remote.
66    Flush { remote: NodeId, stream: StreamId },
67}
68
69/// Peer session type.
70pub type WireSession<G> = NoiseSession<G, Sha256, Socks5Session<net::TcpStream>>;
71/// Peer session type (read-only).
72pub type WireReader = NetReader<Socks5Session<net::TcpStream>>;
73/// Peer session type (write-only).
74pub type WireWriter<G> = NetWriter<NoiseState<G, Sha256>, Socks5Session<net::TcpStream>>;
75
76/// Reactor action.
77type Action<G> = reactor::Action<NetAccept<WireSession<G>>, NetTransport<WireSession<G>>>;
78
79/// A worker stream.
80struct Stream {
81    /// Channels.
82    channels: worker::Channels,
83    /// Data sent.
84    sent_bytes: usize,
85    /// Data received.
86    received_bytes: usize,
87}
88
89impl Stream {
90    fn new(channels: worker::Channels) -> Self {
91        Self {
92            channels,
93            sent_bytes: 0,
94            received_bytes: 0,
95        }
96    }
97}
98
99/// Streams associated with a connected peer.
100struct Streams {
101    /// Active streams and their associated worker channels.
102    /// Note that the gossip and control streams are not included here as they are always
103    /// implied to exist.
104    streams: RandomMap<StreamId, Stream>,
105    /// Connection direction.
106    link: Link,
107    /// Sequence number used to compute the next stream id.
108    seq: u64,
109}
110
111impl Streams {
112    /// Create a new [`Streams`] object, passing the connection link.
113    fn new(link: Link) -> Self {
114        Self {
115            streams: RandomMap::default(),
116            link,
117            seq: 0,
118        }
119    }
120
121    /// Get a known stream.
122    fn get(&self, stream: &StreamId) -> Option<&Stream> {
123        self.streams.get(stream)
124    }
125
126    /// Get a known stream, mutably.
127    fn get_mut(&mut self, stream: &StreamId) -> Option<&mut Stream> {
128        self.streams.get_mut(stream)
129    }
130
131    /// Open a new stream.
132    fn open(&mut self, config: ChannelsConfig) -> (StreamId, worker::Channels) {
133        self.seq += 1;
134
135        let id = StreamId::git(self.link)
136            .nth(self.seq)
137            .expect("Streams::open: too many streams");
138        let channels = self
139            .register(id, config)
140            .expect("Streams::open: stream was already open");
141
142        (id, channels)
143    }
144
145    /// Register an open stream.
146    fn register(&mut self, stream: StreamId, config: ChannelsConfig) -> Option<worker::Channels> {
147        let (wire, worker) = worker::Channels::pair(config)
148            .expect("Streams::register: fatal: unable to create channels");
149
150        match self.streams.entry(stream) {
151            Entry::Vacant(e) => {
152                e.insert(Stream::new(worker));
153                Some(wire)
154            }
155            Entry::Occupied(_) => None,
156        }
157    }
158
159    /// Unregister an open stream.
160    fn unregister(&mut self, stream: &StreamId) -> Option<Stream> {
161        self.streams.remove(stream)
162    }
163
164    /// Close all streams.
165    fn shutdown(&mut self) {
166        for (sid, stream) in self.streams.drain() {
167            log::debug!(target: "wire", "Closing worker stream {sid}");
168            stream.channels.close().ok();
169        }
170    }
171}
172
173/// The initial state of an outbound peer before handshake is completed.
174#[derive(Debug)]
175struct Outbound {
176    /// Resource ID, if registered.
177    id: Option<ResourceId>,
178    /// Remote address.
179    addr: NetAddr<HostName>,
180    /// Remote Node ID.
181    nid: NodeId,
182}
183
184/// The initial state of an inbound peer before handshake is completed.
185#[derive(Debug)]
186struct Inbound {
187    /// Resource ID, if registered.
188    id: Option<ResourceId>,
189    /// Remote address.
190    addr: NetAddr<HostName>,
191}
192
193/// Peer connection state machine.
194enum Peer {
195    /// The state after handshake is completed.
196    /// Peers in this state are handled by the underlying service.
197    Connected {
198        #[allow(dead_code)]
199        addr: NetAddr<HostName>,
200        link: Link,
201        nid: NodeId,
202        inbox: Deserializer<MAX_INBOX_SIZE, Frame>,
203        streams: Streams,
204    },
205    /// The peer was scheduled for disconnection. Once the transport is handed over
206    /// by the reactor, we can consider it disconnected.
207    Disconnecting {
208        link: Link,
209        nid: Option<NodeId>,
210        reason: DisconnectReason,
211    },
212}
213
214impl std::fmt::Debug for Peer {
215    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
216        match self {
217            Self::Connected { link, nid, .. } => write!(f, "Connected({link:?}, {nid})"),
218            Self::Disconnecting { .. } => write!(f, "Disconnecting"),
219        }
220    }
221}
222
223impl Peer {
224    /// Return the peer's id, if any.
225    fn id(&self) -> Option<&NodeId> {
226        match self {
227            Peer::Connected { nid, .. } | Peer::Disconnecting { nid: Some(nid), .. } => Some(nid),
228            Peer::Disconnecting { nid: None, .. } => None,
229        }
230    }
231
232    fn link(&self) -> Link {
233        match self {
234            Peer::Connected { link, .. } => *link,
235            Peer::Disconnecting { link, .. } => *link,
236        }
237    }
238
239    /// Connected peer.
240    fn connected(nid: NodeId, addr: NetAddr<HostName>, link: Link) -> Self {
241        Self::Connected {
242            link,
243            addr,
244            nid,
245            inbox: Deserializer::default(),
246            streams: Streams::new(link),
247        }
248    }
249}
250
251/// Holds connected peers.
252struct Peers(RandomMap<ResourceId, Peer>);
253
254impl Peers {
255    fn get_mut(&mut self, id: &ResourceId) -> Option<&mut Peer> {
256        self.0.get_mut(id)
257    }
258
259    fn entry(&mut self, id: ResourceId) -> Entry<ResourceId, Peer> {
260        self.0.entry(id)
261    }
262
263    fn insert(&mut self, id: ResourceId, peer: Peer) {
264        if self.0.insert(id, peer).is_some() {
265            log::warn!(target: "wire", "Replacing existing peer id={id}");
266        }
267    }
268
269    fn remove(&mut self, id: &ResourceId) -> Option<Peer> {
270        self.0.remove(id)
271    }
272
273    fn lookup(&self, node_id: &NodeId) -> Option<(ResourceId, &Peer)> {
274        self.0
275            .iter()
276            .find(|(_, peer)| peer.id() == Some(node_id))
277            .map(|(fd, peer)| (*fd, peer))
278    }
279
280    fn lookup_mut(&mut self, node_id: &NodeId) -> Option<(ResourceId, &mut Peer)> {
281        self.0
282            .iter_mut()
283            .find(|(_, peer)| peer.id() == Some(node_id))
284            .map(|(fd, peer)| (*fd, peer))
285    }
286
287    fn active(&self) -> impl Iterator<Item = (ResourceId, &NodeId, Link)> {
288        self.0.iter().filter_map(|(id, peer)| match peer {
289            Peer::Connected { nid, link, .. } => Some((*id, nid, *link)),
290            Peer::Disconnecting { .. } => None,
291        })
292    }
293
294    fn connected(&self) -> impl Iterator<Item = (ResourceId, &NodeId)> {
295        self.0.iter().filter_map(|(id, peer)| {
296            if let Peer::Connected { nid, .. } = peer {
297                Some((*id, nid))
298            } else {
299                None
300            }
301        })
302    }
303
304    fn iter(&self) -> impl Iterator<Item = &Peer> {
305        self.0.values()
306    }
307}
308
309/// Wire protocol implementation for a set of peers.
310pub struct Wire<D, S, G: crypto::signature::Signer<crypto::Signature> + Ecdh> {
311    /// Backing service instance.
312    service: Service<D, S, G>,
313    /// Worker pool interface.
314    worker: chan::Sender<Task>,
315    /// Used for authentication.
316    signer: Device<G>,
317    /// Node metrics.
318    metrics: service::Metrics,
319    /// Internal queue of actions to send to the reactor.
320    actions: VecDeque<Action<G>>,
321    /// Outbound attempted peers without a session.
322    outbound: RandomMap<RawFd, Outbound>,
323    /// Inbound peers without a session.
324    inbound: RandomMap<RawFd, Inbound>,
325    /// Listening addresses that are not yet registered.
326    listening: RandomMap<RawFd, net::SocketAddr>,
327    /// Peer (established) sessions.
328    peers: Peers,
329}
330
331impl<D, S, G> Wire<D, S, G>
332where
333    D: service::Store,
334    S: WriteStorage + 'static,
335    G: crypto::signature::Signer<crypto::Signature> + Ecdh<Pk = NodeId>,
336{
337    pub fn new(service: Service<D, S, G>, worker: chan::Sender<Task>, signer: Device<G>) -> Self {
338        assert!(service.started().is_some(), "Service must be initialized");
339
340        Self {
341            service,
342            worker,
343            signer,
344            metrics: Metrics::default(),
345            actions: VecDeque::new(),
346            inbound: RandomMap::default(),
347            outbound: RandomMap::default(),
348            listening: RandomMap::default(),
349            peers: Peers(RandomMap::default()),
350        }
351    }
352
353    pub fn listen(&mut self, socket: NetAccept<WireSession<G>>) {
354        self.listening
355            .insert(socket.as_raw_fd(), socket.local_addr());
356        self.actions.push_back(Action::RegisterListener(socket));
357    }
358
359    fn disconnect(&mut self, id: ResourceId, reason: DisconnectReason) -> Option<(NodeId, Link)> {
360        match self.peers.entry(id) {
361            Entry::Vacant(_) => {
362                // Connecting peer with no session.
363                log::debug!(target: "wire", "Disconnecting pending peer with id={id}: {reason}");
364                self.actions.push_back(Action::UnregisterTransport(id));
365
366                // Check for attempted outbound connections. Unestablished inbound connections don't
367                // have an NID yet.
368                self.outbound
369                    .values()
370                    .find(|o| o.id == Some(id))
371                    .map(|o| (o.nid, Link::Outbound))
372            }
373            Entry::Occupied(mut e) => match e.get_mut() {
374                Peer::Disconnecting { nid, link, .. } => {
375                    log::error!(target: "wire", "Peer with id={id} is already disconnecting");
376
377                    nid.map(|n| (n, *link))
378                }
379                Peer::Connected {
380                    nid, streams, link, ..
381                } => {
382                    log::debug!(target: "wire", "Disconnecting peer with id={id}: {reason}");
383                    let nid = *nid;
384                    let link = *link;
385
386                    streams.shutdown();
387                    e.insert(Peer::Disconnecting {
388                        nid: Some(nid),
389                        link,
390                        reason,
391                    });
392                    self.actions.push_back(Action::UnregisterTransport(id));
393
394                    Some((nid, link))
395                }
396            },
397        }
398    }
399
400    fn worker_result(&mut self, task: TaskResult) {
401        log::debug!(
402            target: "wire",
403            "Received fetch result from worker for stream {}, remote {}: {:?}",
404            task.stream, task.remote, task.result
405        );
406
407        let nid = task.remote;
408        let Some((fd, peer)) = self.peers.lookup_mut(&nid) else {
409            log::warn!(target: "wire", "Peer {nid} not found; ignoring fetch result");
410            return;
411        };
412
413        if let Peer::Connected { link, streams, .. } = peer {
414            // Nb. It's possible that the stream would already be unregistered if we received an
415            // early "close" from the remote. Otherwise, we unregister it here and send the "close"
416            // ourselves.
417            if let Some(s) = streams.unregister(&task.stream) {
418                log::debug!(
419                    target: "wire", "Stream {} of {} closing with {} byte(s) sent and {} byte(s) received",
420                    task.stream, task.remote, s.sent_bytes, s.received_bytes
421                );
422                let frame = Frame::<service::Message>::control(
423                    *link,
424                    frame::Control::Close {
425                        stream: task.stream,
426                    },
427                );
428                self.actions.push_back(Action::Send(fd, frame.to_bytes()));
429            }
430        } else {
431            // If the peer disconnected, we'll get here, but we still want to let the service know
432            // about the fetch result, so we don't return here.
433            log::warn!(target: "wire", "Peer {nid} is not connected; ignoring fetch result");
434            return;
435        };
436
437        // Only call into the service if we initiated this fetch.
438        match task.result {
439            FetchResult::Initiator { rid, result } => {
440                self.service.fetched(rid, nid, result);
441            }
442            FetchResult::Responder { rid, result } => {
443                if let Some(rid) = rid {
444                    if let Some(err) = result.err() {
445                        log::info!(target: "wire", "Peer {nid} failed to fetch {rid} from us: {err}");
446                    } else {
447                        log::info!(target: "wire", "Peer {nid} fetched {rid} from us successfully");
448                    }
449                }
450            }
451        }
452    }
453
454    fn flush(&mut self, remote: NodeId, stream: StreamId) {
455        let Some((fd, peer)) = self.peers.lookup_mut(&remote) else {
456            log::warn!(target: "wire", "Peer {remote} is not known; ignoring flush");
457            return;
458        };
459        let Peer::Connected { streams, link, .. } = peer else {
460            log::warn!(target: "wire", "Peer {remote} is not connected; ignoring flush");
461            return;
462        };
463        let Some(s) = streams.get_mut(&stream) else {
464            log::debug!(target: "wire", "Stream {stream} cannot be found; ignoring flush");
465            return;
466        };
467        let metrics = self.metrics.peer(remote);
468
469        for data in s.channels.try_iter() {
470            let frame = match data {
471                ChannelEvent::Data(data) => {
472                    metrics.sent_git_bytes += data.len();
473                    metrics.sent_bytes += data.len();
474                    Frame::<service::Message>::git(stream, data)
475                }
476                ChannelEvent::Close => Frame::control(*link, frame::Control::Close { stream }),
477                ChannelEvent::Eof => Frame::control(*link, frame::Control::Eof { stream }),
478            };
479            self.actions
480                .push_back(reactor::Action::Send(fd, frame.to_bytes()));
481        }
482    }
483
484    fn cleanup(&mut self, id: ResourceId, fd: RawFd) {
485        if self.inbound.remove(&fd).is_some() {
486            log::debug!(target: "wire", "Cleaning up inbound peer state with id={id} (fd={fd})");
487        } else if let Some(outbound) = self.outbound.remove(&fd) {
488            log::debug!(target: "wire", "Cleaning up outbound peer state with id={id} (fd={fd})");
489            self.service.disconnected(
490                outbound.nid,
491                Link::Outbound,
492                &DisconnectReason::connection(),
493            );
494        } else {
495            log::debug!(target: "wire", "Tried to cleanup unknown peer with id={id} (fd={fd})");
496        }
497    }
498}
499
500impl<D, S, G> reactor::Handler for Wire<D, S, G>
501where
502    D: service::Store + Send,
503    S: WriteStorage + Send + 'static,
504    G: crypto::signature::Signer<crypto::Signature> + Ecdh<Pk = NodeId> + Clone + Send,
505{
506    type Listener = NetAccept<WireSession<G>>;
507    type Transport = NetTransport<WireSession<G>>;
508    type Command = Control;
509
510    fn tick(&mut self, time: Timestamp) {
511        self.metrics.open_channels = self
512            .peers
513            .iter()
514            .filter_map(|p| {
515                if let Peer::Connected { streams, .. } = p {
516                    Some(streams.streams.len())
517                } else {
518                    None
519                }
520            })
521            .sum();
522        self.metrics.worker_queue_size = self.worker.len();
523        self.service.tick(
524            LocalTime::from_millis(time.as_millis() as u128),
525            &self.metrics,
526        );
527    }
528
529    fn handle_timer(&mut self) {
530        self.service.wake();
531    }
532
533    fn handle_listener_event(
534        &mut self,
535        _: ResourceId, // Nb. This is the ID of the listener socket.
536        event: ListenerEvent<WireSession<G>>,
537        _: Timestamp,
538    ) {
539        match event {
540            ListenerEvent::Accepted(connection) => {
541                let Ok(remote) = connection.remote_addr() else {
542                    log::warn!(target: "wire", "Accepted connection doesn't have remote address; dropping..");
543                    drop(connection);
544
545                    return;
546                };
547                let InetHost::Ip(ip) = remote.host else {
548                    log::error!(target: "wire", "Unexpected host type for inbound connection {remote}; dropping..");
549                    drop(connection);
550
551                    return;
552                };
553                let fd = connection.as_raw_fd();
554                log::debug!(target: "wire", "Inbound connection from {remote} (fd={fd})..");
555
556                // If the service doesn't want to accept this connection,
557                // we drop the connection here, which disconnects the socket.
558                if !self.service.accepted(ip) {
559                    log::debug!(target: "wire", "Rejecting inbound connection from {ip} (fd={fd})..");
560                    drop(connection);
561
562                    return;
563                }
564
565                let session = match accept::<G>(
566                    remote.clone().into(),
567                    connection,
568                    self.signer.clone().into_inner(),
569                ) {
570                    Ok(s) => s,
571                    Err(e) => {
572                        log::error!(target: "wire", "Error creating session for {ip}: {e}");
573                        return;
574                    }
575                };
576                let transport = match NetTransport::with_session(session, Link::Inbound) {
577                    Ok(transport) => transport,
578                    Err(err) => {
579                        log::error!(target: "wire", "Failed to create transport for accepted connection: {err}");
580                        return;
581                    }
582                };
583                log::debug!(target: "wire", "Accepted inbound connection from {remote} (fd={fd})..");
584
585                self.inbound.insert(
586                    fd,
587                    Inbound {
588                        id: None,
589                        addr: remote.into(),
590                    },
591                );
592                self.actions
593                    .push_back(reactor::Action::RegisterTransport(transport))
594            }
595            ListenerEvent::Failure(err) => {
596                log::error!(target: "wire", "Error listening for inbound connections: {err}");
597            }
598        }
599    }
600
601    fn handle_registered(&mut self, fd: RawFd, id: ResourceId, typ: ResourceType) {
602        match typ {
603            ResourceType::Listener => {
604                if let Some(local_addr) = self.listening.remove(&fd) {
605                    self.service.listening(local_addr);
606                }
607            }
608            ResourceType::Transport => {
609                if let Some(outbound) = self.outbound.get_mut(&fd) {
610                    log::debug!(target: "wire", "Outbound peer resource registered for {} with id={id} (fd={fd})", outbound.nid);
611                    outbound.id = Some(id);
612                } else if let Some(inbound) = self.inbound.get_mut(&fd) {
613                    log::debug!(target: "wire", "Inbound peer resource registered with id={id} (fd={fd})");
614                    inbound.id = Some(id);
615                } else {
616                    log::warn!(target: "wire", "Unknown peer registered with fd={fd} and id={id}");
617                }
618            }
619        }
620    }
621
622    fn handle_transport_event(
623        &mut self,
624        id: ResourceId,
625        event: SessionEvent<WireSession<G>>,
626        _: Timestamp,
627    ) {
628        match event {
629            SessionEvent::Established(fd, ProtocolArtifact { state, .. }) => {
630                // SAFETY: With the NoiseXK protocol, there is always a remote static key.
631                let nid: NodeId = state.remote_static_key.unwrap();
632                // Make sure we don't try to connect to ourselves by mistake.
633                if &nid == self.signer.public_key() {
634                    log::error!(target: "wire", "Self-connection detected, disconnecting..");
635                    self.disconnect(id, DisconnectReason::SelfConnection);
636
637                    return;
638                }
639                let (addr, link) = if let Some(peer) = self.inbound.remove(&fd) {
640                    self.metrics.peer(nid).inbound_connection_attempts += 1;
641                    (peer.addr, Link::Inbound)
642                } else if let Some(peer) = self.outbound.remove(&fd) {
643                    assert_eq!(nid, peer.nid);
644                    (peer.addr, Link::Outbound)
645                } else {
646                    log::error!(target: "wire", "Session for {nid} (id={id}) not found");
647                    return;
648                };
649                log::debug!(
650                    target: "wire",
651                    "Session established with {nid} (id={id}) (fd={fd}) ({})",
652                    if link.is_inbound() { "inbound" } else { "outbound" }
653                );
654
655                // Connections to close.
656                let mut disconnect = Vec::new();
657
658                // Handle conflicting connections.
659                // This is typical when nodes have mutually configured their nodes to connect to
660                // each other on startup. We handle this by deterministically choosing one node
661                // whos outbound connection is the one that is kept. The other connections are
662                // dropped.
663                {
664                    // Whether we have precedence in case of conflicting connections.
665                    // Having precedence means that our outbound connection will win over
666                    // the other node's outbound connection.
667                    let precedence = *self.signer.public_key() > nid;
668
669                    // Pre-existing connections that conflict with this newly established session.
670                    // Note that we can't know whether a connection is conflicting before we get the
671                    // remote static key.
672                    let mut conflicting = Vec::new();
673
674                    // Active sessions with the same NID but a different Resource ID are conflicting.
675                    conflicting.extend(
676                        self.peers
677                            .active()
678                            .filter(|(c_id, d, _)| **d == nid && *c_id != id)
679                            .map(|(c_id, _, link)| (c_id, link)),
680                    );
681
682                    // Outbound connection attempts with the same remote key but a different file
683                    // descriptor are conflicting.
684                    conflicting.extend(self.outbound.iter().filter_map(|(c_fd, other)| {
685                        if other.nid == nid && *c_fd != fd {
686                            other.id.map(|c_id| (c_id, Link::Outbound))
687                        } else {
688                            None
689                        }
690                    }));
691
692                    for (c_id, c_link) in conflicting {
693                        // If we have precedence, the inbound connection is closed.
694                        // In the case where both connections are inbound or outbound,
695                        // we close the newer connection, ie. the one with the higher
696                        // resource id.
697                        let close = match (link, c_link) {
698                            (Link::Inbound, Link::Outbound) => {
699                                if precedence {
700                                    id
701                                } else {
702                                    c_id
703                                }
704                            }
705                            (Link::Outbound, Link::Inbound) => {
706                                if precedence {
707                                    c_id
708                                } else {
709                                    id
710                                }
711                            }
712                            (Link::Inbound, Link::Inbound) => id.max(c_id),
713                            (Link::Outbound, Link::Outbound) => id.max(c_id),
714                        };
715
716                        log::warn!(
717                            target: "wire", "Established session (id={id}) conflicts with existing session for {nid} (id={c_id})"
718                        );
719                        disconnect.push(close);
720                    }
721                }
722                for id in &disconnect {
723                    log::warn!(
724                        target: "wire", "Closing conflicting session (id={id}) with {nid}.."
725                    );
726                    // Disconnect and return the associated NID of the peer, if available.
727                    if let Some((nid, link)) = self.disconnect(*id, DisconnectReason::Conflict) {
728                        // We disconnect the session eagerly because otherwise we will get the new
729                        // `connected` event before the `disconnect`, resulting in a duplicate
730                        // connection.
731                        self.service
732                            .disconnected(nid, link, &DisconnectReason::Conflict);
733                    }
734                }
735                if !disconnect.contains(&id) {
736                    self.peers
737                        .insert(id, Peer::connected(nid, addr.clone(), link));
738                    self.service.connected(nid, addr.into(), link);
739                }
740            }
741            SessionEvent::Data(data) => {
742                if let Some(Peer::Connected {
743                    nid,
744                    inbox,
745                    streams,
746                    ..
747                }) = self.peers.get_mut(&id)
748                {
749                    let metrics = self.metrics.peer(*nid);
750                    metrics.received_bytes += data.len();
751
752                    if inbox.input(&data).is_err() {
753                        log::error!(target: "wire", "Maximum inbox size ({MAX_INBOX_SIZE}) reached for peer {nid}");
754                        log::error!(target: "wire", "Unable to process messages fast enough for peer {nid}; disconnecting..");
755                        self.disconnect(id, DisconnectReason::Session(session::Error::Misbehavior));
756
757                        return;
758                    }
759
760                    loop {
761                        match inbox.deserialize_next() {
762                            Ok(Some(Frame {
763                                data: FrameData::Control(frame::Control::Open { stream }),
764                                ..
765                            })) => {
766                                log::debug!(target: "wire", "Received `open` command for stream {stream} from {nid}");
767                                metrics.streams_opened += 1;
768                                metrics.received_fetch_requests += 1;
769                                let reader_limit = self.service.config().limits.fetch_pack_receive;
770                                let Some(channels) = streams.register(
771                                    stream,
772                                    ChannelsConfig::new(FETCH_TIMEOUT)
773                                        .with_reader_limit(reader_limit),
774                                ) else {
775                                    log::warn!(target: "wire", "Peer attempted to open already-open stream stream {stream}");
776                                    continue;
777                                };
778
779                                let task = Task {
780                                    fetch: FetchRequest::Responder {
781                                        remote: *nid,
782                                        emitter: self.service.emitter(),
783                                    },
784                                    stream,
785                                    channels,
786                                };
787                                if let Err(e) = self.worker.try_send(task) {
788                                    log::error!(
789                                        target: "wire",
790                                        "Worker pool failed to accept incoming fetch request: {e}"
791                                    );
792                                }
793                            }
794                            Ok(Some(Frame {
795                                data: FrameData::Control(frame::Control::Eof { stream }),
796                                ..
797                            })) => {
798                                if let Some(s) = streams.get(&stream) {
799                                    log::debug!(target: "wire", "Received `end-of-file` on stream {stream} from {nid}");
800
801                                    if s.channels.send(ChannelEvent::Eof).is_err() {
802                                        log::error!(target: "wire", "Worker is disconnected; cannot send `EOF`");
803                                    }
804                                } else {
805                                    log::debug!(target: "wire", "Ignoring frame on closed or unknown stream {stream}");
806                                }
807                            }
808                            Ok(Some(Frame {
809                                data: FrameData::Control(frame::Control::Close { stream }),
810                                ..
811                            })) => {
812                                log::debug!(target: "wire", "Received `close` command for stream {stream} from {nid}");
813
814                                if let Some(s) = streams.unregister(&stream) {
815                                    log::debug!(
816                                        target: "wire",
817                                        "Stream {stream} of {nid} closed with {} byte(s) sent and {} byte(s) received",
818                                        s.sent_bytes, s.received_bytes
819                                    );
820                                    s.channels.close().ok();
821                                }
822                            }
823                            Ok(Some(Frame {
824                                data: FrameData::Gossip(msg),
825                                ..
826                            })) => {
827                                metrics.received_gossip_messages += 1;
828                                self.service.received_message(*nid, msg);
829                            }
830                            Ok(Some(Frame {
831                                stream,
832                                data: FrameData::Git(data),
833                                ..
834                            })) => {
835                                if let Some(s) = streams.get_mut(&stream) {
836                                    metrics.received_git_bytes += data.len();
837
838                                    if s.channels.send(ChannelEvent::Data(data)).is_err() {
839                                        log::error!(target: "wire", "Worker is disconnected; cannot send data");
840                                    }
841                                } else {
842                                    log::debug!(target: "wire", "Ignoring frame on closed or unknown stream {stream}");
843                                }
844                            }
845                            Ok(None) => {
846                                // Buffer is empty, or message isn't complete.
847                                break;
848                            }
849                            Err(e) => {
850                                log::error!(target: "wire", "Invalid gossip message from {nid}: {e}");
851
852                                if !inbox.is_empty() {
853                                    log::debug!(target: "wire", "Dropping read buffer for {nid} with {} bytes", inbox.len());
854                                }
855                                self.disconnect(
856                                    id,
857                                    DisconnectReason::Session(session::Error::Misbehavior),
858                                );
859                                break;
860                            }
861                        }
862                    }
863                } else {
864                    log::warn!(target: "wire", "Dropping message from unconnected peer (id={id})");
865                }
866            }
867            SessionEvent::Terminated(err) => {
868                self.disconnect(id, DisconnectReason::Connection(Arc::new(err)));
869            }
870        }
871    }
872
873    fn handle_command(&mut self, cmd: Self::Command) {
874        match cmd {
875            Control::User(cmd) => self.service.command(cmd),
876            Control::Worker(result) => self.worker_result(result),
877            Control::Flush { remote, stream } => self.flush(remote, stream),
878        }
879    }
880
881    fn handle_error(
882        &mut self,
883        err: reactor::Error<NetAccept<WireSession<G>>, NetTransport<WireSession<G>>>,
884    ) {
885        match err {
886            reactor::Error::Poll(err) => {
887                // TODO: This should be a fatal error, there's nothing we can do here.
888                log::error!(target: "wire", "Can't poll connections: {err}");
889            }
890            reactor::Error::ListenerDisconnect(id, _) => {
891                // TODO: This should be a fatal error, there's nothing we can do here.
892                log::error!(target: "wire", "Listener {id} disconnected");
893            }
894            reactor::Error::TransportDisconnect(id, transport) => {
895                let fd = transport.as_raw_fd();
896                log::error!(target: "wire", "Peer id={id} (fd={fd}) disconnected");
897
898                // We're dropping the TCP connection here.
899                drop(transport);
900
901                // The peer transport is already disconnected and removed from the reactor;
902                // therefore there is no need to initiate a disconnection. We simply remove
903                // the peer from the map.
904                match self.peers.remove(&id) {
905                    Some(mut peer) => {
906                        if let Peer::Connected { streams, .. } = &mut peer {
907                            streams.shutdown();
908                        }
909
910                        if let Some(id) = peer.id() {
911                            self.service.disconnected(
912                                *id,
913                                peer.link(),
914                                &DisconnectReason::connection(),
915                            );
916                        } else {
917                            log::debug!(target: "wire", "Inbound disconnection before handshake; ignoring..")
918                        }
919                    }
920                    None => self.cleanup(id, fd),
921                }
922            }
923        }
924    }
925
926    fn handover_listener(&mut self, id: ResourceId, _listener: Self::Listener) {
927        log::error!(target: "wire", "Listener handover is not supported (id={id})");
928    }
929
930    fn handover_transport(&mut self, id: ResourceId, transport: Self::Transport) {
931        let fd = transport.as_raw_fd();
932
933        match self.peers.entry(id) {
934            Entry::Occupied(e) => {
935                match e.get() {
936                    Peer::Disconnecting {
937                        nid, reason, link, ..
938                    } => {
939                        log::debug!(target: "wire", "Transport handover for disconnecting peer with id={id} (fd={fd})");
940
941                        // Disconnect TCP stream.
942                        drop(transport);
943
944                        // If there is no NID, the service is not aware of the peer.
945                        if let Some(nid) = nid {
946                            // In the case of a conflicting connection, there will be two resources
947                            // for the peer. However, at the service level, there is only one, and
948                            // it is identified by NID.
949                            //
950                            // Therefore, we specify which of the connections we're closing by
951                            // passing the `link`.
952                            self.service.disconnected(*nid, *link, reason);
953                        }
954                        e.remove();
955                    }
956                    Peer::Connected { nid, .. } => {
957                        panic!("Wire::handover_transport: Unexpected handover of connected peer {} with id={id} (fd={fd})", nid);
958                    }
959                }
960            }
961            Entry::Vacant(_) => self.cleanup(id, fd),
962        }
963    }
964}
965
966impl<D, S, G> Iterator for Wire<D, S, G>
967where
968    D: service::Store,
969    S: WriteStorage + 'static,
970    G: crypto::signature::Signer<crypto::Signature> + Ecdh<Pk = NodeId> + Clone,
971{
972    type Item = Action<G>;
973
974    fn next(&mut self) -> Option<Self::Item> {
975        while let Some(ev) = self.service.next() {
976            match ev {
977                Io::Write(node_id, msgs) => {
978                    let (fd, link) = match self.peers.lookup(&node_id) {
979                        Some((fd, Peer::Connected { link, .. })) => (fd, *link),
980                        Some((_, peer)) => {
981                            // If the peer is disconnected by the wire protocol, the service may
982                            // not be aware of this yet, and may continue to write messages to it.
983                            log::debug!(target: "wire", "Dropping {} message(s) to {node_id} ({peer:?})", msgs.len());
984                            continue;
985                        }
986                        None => {
987                            log::error!(target: "wire", "Dropping {} message(s) to {node_id}: unknown peer", msgs.len());
988                            continue;
989                        }
990                    };
991                    log::trace!(
992                        target: "wire", "Writing {} message(s) to {}", msgs.len(), node_id
993                    );
994                    let mut data = Vec::new();
995                    let metrics = self.metrics.peer(node_id);
996                    metrics.sent_gossip_messages += msgs.len();
997
998                    for msg in msgs {
999                        Frame::gossip(link, msg)
1000                            .encode(&mut data)
1001                            .expect("in-memory writes never fail");
1002                    }
1003                    metrics.sent_bytes += data.len();
1004
1005                    self.actions.push_back(reactor::Action::Send(fd, data));
1006                }
1007                Io::Connect(node_id, addr) => {
1008                    if self.peers.connected().any(|(_, id)| id == &node_id) {
1009                        log::error!(
1010                            target: "wire",
1011                            "Attempt to connect to already connected peer {node_id}"
1012                        );
1013                        // FIXME: The problem here is the session will stay in "initial" state,
1014                        // because it can't transition to attempted.
1015                        continue;
1016                    }
1017                    self.service.attempted(node_id, addr.clone());
1018                    self.metrics.peer(node_id).outbound_connection_attempts += 1;
1019
1020                    match dial::<G>(
1021                        addr.to_inner(),
1022                        node_id,
1023                        self.signer.clone().into_inner(),
1024                        self.service.config(),
1025                    )
1026                    .and_then(|session| {
1027                        NetTransport::<WireSession<G>>::with_session(session, Link::Outbound)
1028                    }) {
1029                        Ok(transport) => {
1030                            self.outbound.insert(
1031                                transport.as_raw_fd(),
1032                                Outbound {
1033                                    id: None,
1034                                    nid: node_id,
1035                                    addr: addr.to_inner(),
1036                                },
1037                            );
1038                            log::debug!(
1039                                target: "wire",
1040                                "Registering outbound transport for {node_id} (fd={})..",
1041                                transport.as_raw_fd()
1042                            );
1043                            self.actions
1044                                .push_back(reactor::Action::RegisterTransport(transport));
1045                        }
1046                        Err(err) => {
1047                            log::error!(target: "wire", "Error establishing connection to {addr}: {err}");
1048
1049                            self.service.disconnected(
1050                                node_id,
1051                                Link::Outbound,
1052                                &DisconnectReason::Dial(Arc::new(err)),
1053                            );
1054                        }
1055                    }
1056                }
1057                Io::Disconnect(nid, reason) => {
1058                    if let Some((id, Peer::Connected { .. })) = self.peers.lookup(&nid) {
1059                        if let Some((nid, _)) = self.disconnect(id, reason) {
1060                            self.metrics.peer(nid).disconnects += 1;
1061                        }
1062                    } else {
1063                        log::warn!(target: "wire", "Peer {nid} is not connected: ignoring disconnect");
1064                    }
1065                }
1066                Io::Wakeup(d) => {
1067                    self.actions.push_back(reactor::Action::SetTimer(d.into()));
1068                }
1069                Io::Fetch {
1070                    rid,
1071                    remote,
1072                    timeout,
1073                    reader_limit,
1074                    refs_at,
1075                } => {
1076                    log::trace!(target: "wire", "Processing fetch for {rid} from {remote}..");
1077
1078                    let Some((fd, Peer::Connected { link, streams, .. })) =
1079                        self.peers.lookup_mut(&remote)
1080                    else {
1081                        // Nb. It's possible that a peer is disconnected while an `Io::Fetch`
1082                        // is in the service's i/o buffer. Since the service may not purge the
1083                        // buffer on disconnect, we should just ignore i/o actions that don't
1084                        // have a connected peer.
1085                        log::error!(target: "wire", "Peer {remote} is not connected: dropping fetch");
1086                        continue;
1087                    };
1088                    let (stream, channels) =
1089                        streams.open(ChannelsConfig::new(timeout).with_reader_limit(reader_limit));
1090
1091                    log::debug!(target: "wire", "Opened new stream with id {stream} for {rid} and remote {remote}");
1092
1093                    let link = *link;
1094                    let task = Task {
1095                        fetch: FetchRequest::Initiator {
1096                            rid,
1097                            remote,
1098                            refs_at,
1099                        },
1100                        stream,
1101                        channels,
1102                    };
1103
1104                    if !self.worker.is_empty() {
1105                        log::warn!(
1106                            target: "wire",
1107                            "Worker pool is busy: {} tasks pending, fetch requests may be delayed", self.worker.len()
1108                        );
1109                    }
1110                    if let Err(e) = self.worker.try_send(task) {
1111                        log::error!(
1112                            target: "wire",
1113                            "Worker pool failed to accept outgoing fetch request: {e}"
1114                        );
1115                    }
1116                    let metrics = self.metrics.peer(remote);
1117                    metrics.streams_opened += 1;
1118                    metrics.sent_fetch_requests += 1;
1119
1120                    self.actions.push_back(Action::Send(
1121                        fd,
1122                        Frame::<service::Message>::control(link, frame::Control::Open { stream })
1123                            .to_bytes(),
1124                    ));
1125                }
1126            }
1127        }
1128        self.actions.pop_front()
1129    }
1130}
1131
1132/// Establish a new outgoing connection.
1133pub fn dial<G: Ecdh<Pk = NodeId>>(
1134    remote_addr: NetAddr<HostName>,
1135    remote_id: <G as EcSk>::Pk,
1136    signer: G,
1137    config: &service::Config,
1138) -> io::Result<WireSession<G>> {
1139    // Determine what address to establish a TCP connection with, given the remote peer
1140    // address and our node configuration.
1141    let inet_addr: NetAddr<InetHost> = match (&remote_addr.host, config.proxy) {
1142        // For IP and DNS addresses, use the global proxy if set, otherwise use the address as-is.
1143        (HostName::Ip(_), Some(proxy)) => proxy.into(),
1144        (HostName::Ip(ip), None) => NetAddr::new(InetHost::Ip(*ip), remote_addr.port),
1145        (HostName::Dns(_), Some(proxy)) => proxy.into(),
1146        (HostName::Dns(dns), None) => NetAddr::new(InetHost::Dns(dns.clone()), remote_addr.port),
1147        // For onion addresses, handle with care.
1148        (HostName::Tor(onion), proxy) => match config.onion {
1149            // In onion proxy mode, simply use the configured proxy address.
1150            // This takes precedence over any global proxy.
1151            Some(AddressConfig::Proxy { address }) => address.into(),
1152            // In "forward" mode, if a global proxy is set, we use that, otherwise
1153            // we treat `.onion` addresses as regular DNS names.
1154            Some(AddressConfig::Forward) => {
1155                if let Some(proxy) = proxy {
1156                    proxy.into()
1157                } else {
1158                    NetAddr::new(InetHost::Dns(onion.to_string()), remote_addr.port)
1159                }
1160            }
1161            // If onion address support isn't configured, refuse to connect.
1162            None => {
1163                return Err(io::Error::new(
1164                    io::ErrorKind::Unsupported,
1165                    "no configuration found for .onion addresses",
1166                ));
1167            }
1168        },
1169        _ => {
1170            return Err(io::Error::new(
1171                io::ErrorKind::Unsupported,
1172                "unsupported remote address type",
1173            ));
1174        }
1175    };
1176    // Nb. This timeout is currently not used by the underlying library due to the
1177    // `socket2` library not supporting non-blocking connect with timeout.
1178    let connection = net::TcpStream::connect_nonblocking(inet_addr, DEFAULT_DIAL_TIMEOUT)?;
1179    // Whether to tunnel regular connections through the proxy.
1180    let force_proxy = config.proxy.is_some();
1181
1182    session::<G>(
1183        remote_addr,
1184        Some(remote_id),
1185        connection,
1186        force_proxy,
1187        signer,
1188    )
1189}
1190
1191/// Accept a new connection.
1192pub fn accept<G: Ecdh<Pk = NodeId>>(
1193    remote_addr: NetAddr<HostName>,
1194    connection: net::TcpStream,
1195    signer: G,
1196) -> io::Result<WireSession<G>> {
1197    session::<G>(remote_addr, None, connection, false, signer)
1198}
1199
1200/// Create a new [`WireSession`].
1201fn session<G: Ecdh<Pk = NodeId>>(
1202    remote_addr: NetAddr<HostName>,
1203    remote_id: Option<NodeId>,
1204    connection: net::TcpStream,
1205    force_proxy: bool,
1206    signer: G,
1207) -> io::Result<WireSession<G>> {
1208    // There are issues with setting TCP_NODELAY on WSL. Not a big deal.
1209    if let Err(e) = connection.set_nodelay(true) {
1210        log::warn!(target: "wire", "Unable to set TCP_NODELAY on fd {}: {e}", connection.as_raw_fd());
1211    }
1212    connection.set_read_timeout(Some(DEFAULT_CONNECTION_TIMEOUT))?;
1213    connection.set_write_timeout(Some(DEFAULT_CONNECTION_TIMEOUT))?;
1214
1215    let sock = socket2::Socket::from(connection);
1216    let ka = socket2::TcpKeepalive::new()
1217        .with_time(time::Duration::from_secs(30))
1218        .with_interval(time::Duration::from_secs(10))
1219        .with_retries(3);
1220    if let Err(e) = sock.set_tcp_keepalive(&ka) {
1221        log::warn!(target: "wire", "Unable to set TCP_KEEPALIVE on fd {}: {e}", sock.as_raw_fd());
1222    }
1223
1224    let socks5 = socks5::Socks5::with(remote_addr, force_proxy);
1225    let proxy = Socks5Session::with(sock.into(), socks5);
1226    let pair = G::generate_keypair();
1227    let keyset = Keyset {
1228        e: pair.0,
1229        s: Some(signer),
1230        re: None,
1231        rs: remote_id,
1232    };
1233    let noise = NoiseState::initialize::<{ Sha256::OUTPUT_LEN }>(
1234        NOISE_XK,
1235        remote_id.is_some(),
1236        &[],
1237        keyset,
1238    );
1239    Ok(WireSession::with(proxy, noise))
1240}
1241
1242#[cfg(test)]
1243mod test {
1244    use super::*;
1245    use crate::service::{Message, ZeroBytes};
1246    use crate::wire;
1247    use crate::wire::varint;
1248
1249    #[test]
1250    fn test_pong_message_with_extension() {
1251        use crate::deserializer;
1252
1253        let mut stream = Vec::new();
1254        let pong = Message::Pong {
1255            zeroes: ZeroBytes::new(42),
1256        };
1257        frame::PROTOCOL_VERSION_STRING.encode(&mut stream).unwrap();
1258        frame::StreamId::gossip(Link::Outbound)
1259            .encode(&mut stream)
1260            .unwrap();
1261
1262        // Serialize gossip message with some extension fields.
1263        let mut gossip = wire::serialize(&pong);
1264        String::from("extra").encode(&mut gossip).unwrap();
1265        48u8.encode(&mut gossip).unwrap();
1266
1267        // Encode gossip message using the varint-prefix format into the stream.
1268        varint::payload::encode(&gossip, &mut stream).unwrap();
1269
1270        let mut de = deserializer::Deserializer::<1024, Frame>::new(1024);
1271        de.input(&stream).unwrap();
1272
1273        // The "pong" message decodes successfully, even though there is trailing data.
1274        assert_eq!(
1275            de.deserialize_next().unwrap().unwrap(),
1276            Frame::gossip(Link::Outbound, pong)
1277        );
1278        assert!(de.deserialize_next().unwrap().is_none());
1279        assert!(de.is_empty());
1280    }
1281
1282    #[test]
1283    fn test_inventory_ann_with_extension() {
1284        use crate::deserializer;
1285
1286        #[derive(Debug)]
1287        struct MessageWithExt {
1288            msg: Message,
1289            ext: String,
1290        }
1291
1292        impl wire::Encode for MessageWithExt {
1293            fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
1294                let mut n = self.msg.encode(writer)?;
1295                n += self.ext.encode(writer)?;
1296
1297                Ok(n)
1298            }
1299        }
1300
1301        impl wire::Decode for MessageWithExt {
1302            fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
1303                let msg = Message::decode(reader)?;
1304                let ext = String::decode(reader).unwrap_or_default();
1305
1306                Ok(MessageWithExt { msg, ext })
1307            }
1308        }
1309
1310        let rid = radicle::test::arbitrary::gen(1);
1311        let pk = radicle::test::arbitrary::gen(1);
1312        let sig: [u8; 64] = radicle::test::arbitrary::gen(1);
1313
1314        // Message with extension.
1315        let mut stream = Vec::new();
1316        let ann = Message::announcement(
1317            pk,
1318            service::gossip::inventory(radicle::node::Timestamp::MAX, [rid]),
1319            radicle::crypto::Signature::from(sig),
1320        );
1321        let pong = Message::Pong {
1322            zeroes: ZeroBytes::new(42),
1323        };
1324        // Framed message with extension.
1325        frame::Frame::gossip(
1326            Link::Outbound,
1327            MessageWithExt {
1328                msg: ann.clone(),
1329                ext: String::from("extra"),
1330            },
1331        )
1332        .encode(&mut stream)
1333        .unwrap();
1334        // Pong message that comes after, without extension.
1335        frame::Frame::gossip(Link::Outbound, pong.clone())
1336            .encode(&mut stream)
1337            .unwrap();
1338
1339        // First test deserializing using the message with extension type.
1340        {
1341            let mut de = deserializer::Deserializer::<1024, Frame<MessageWithExt>>::new(1024);
1342            de.input(&stream).unwrap();
1343
1344            radicle::assert_matches!(
1345                de.deserialize_next().unwrap().unwrap().data,
1346                FrameData::Gossip(MessageWithExt {
1347                    msg,
1348                    ext,
1349                }) if msg == ann && ext == *"extra"
1350            );
1351            radicle::assert_matches!(
1352                de.deserialize_next().unwrap().unwrap().data,
1353                FrameData::Gossip(MessageWithExt {
1354                    msg,
1355                    ext,
1356                }) if msg == pong && ext.is_empty()
1357            );
1358            assert!(de.deserialize_next().unwrap().is_none());
1359            assert!(de.is_empty());
1360        }
1361
1362        // Then test deserializing using the current message type without the extension.
1363        {
1364            let mut de = deserializer::Deserializer::<1024, Frame<Message>>::new(1024);
1365            de.input(&stream).unwrap();
1366
1367            radicle::assert_matches!(
1368                de.deserialize_next().unwrap().unwrap().data,
1369                FrameData::Gossip(msg)
1370                if msg == ann
1371            );
1372            radicle::assert_matches!(
1373                de.deserialize_next().unwrap().unwrap().data,
1374                FrameData::Gossip(msg)
1375                if msg == pong
1376            );
1377            assert!(de.deserialize_next().unwrap().is_none());
1378            assert!(de.is_empty());
1379        }
1380    }
1381}