radicle_node/
wire.rs

1//! Implementation of the transport protocol.
2//!
3//! We use the Noise XK handshake pattern to establish an encrypted stream with a remote peer.
4//! The handshake itself is implemented in the external [`cyphernet`] and [`netservices`] crates.
5use std::collections::hash_map::Entry;
6use std::collections::VecDeque;
7use std::os::unix::io::{AsRawFd, RawFd};
8use std::sync::Arc;
9use std::{io, net, time};
10
11use amplify::Wrapper as _;
12use crossbeam_channel as chan;
13use cyphernet::addr::{HostName, InetHost, NetAddr};
14use cyphernet::encrypt::noise::{HandshakePattern, Keyset, NoiseState};
15use cyphernet::proxy::socks5;
16use cyphernet::{Digest, EcSk, Ecdh, Sha256};
17use localtime::LocalTime;
18use netservices::resource::{ListenerEvent, NetAccept, NetTransport, SessionEvent};
19use netservices::session::{NoiseSession, ProtocolArtifact, Socks5Session};
20use netservices::{NetConnection, NetReader, NetWriter};
21use radicle::node::device::Device;
22use reactor::{ResourceId, ResourceType, Timestamp};
23
24use radicle::collections::RandomMap;
25use radicle::crypto;
26use radicle::node::config::AddressConfig;
27use radicle::node::Link;
28use radicle::node::NodeId;
29use radicle::storage::WriteStorage;
30use radicle_protocol::deserializer::Deserializer;
31pub use radicle_protocol::wire::frame;
32pub use radicle_protocol::wire::frame::{Frame, FrameData, StreamId};
33pub use radicle_protocol::wire::*;
34use radicle_protocol::worker::{FetchRequest, FetchResult};
35
36use crate::service;
37use crate::service::io::Io;
38use crate::service::FETCH_TIMEOUT;
39use crate::service::{session, DisconnectReason, Metrics, Service};
40use crate::worker;
41use crate::worker::{ChannelEvent, ChannelsConfig};
42use crate::worker::{Task, TaskResult};
43
44/// NoiseXK handshake pattern.
45pub const NOISE_XK: HandshakePattern = HandshakePattern {
46    initiator: cyphernet::encrypt::noise::InitiatorPattern::Xmitted,
47    responder: cyphernet::encrypt::noise::OneWayPattern::Known,
48};
49
50/// Default time to wait until a network connection is considered inactive.
51pub const DEFAULT_CONNECTION_TIMEOUT: time::Duration = time::Duration::from_secs(6);
52
53/// Default time to wait when dialing a connection, before the remote is considered unreachable.
54pub const DEFAULT_DIAL_TIMEOUT: time::Duration = time::Duration::from_secs(6);
55
56/// Maximum size of a peer inbox, in bytes.
57pub const MAX_INBOX_SIZE: usize = 1024 * 1024 * 2;
58
59/// Control message used internally between workers, users, and the service.
60#[allow(clippy::large_enum_variant)]
61#[derive(Debug)]
62pub enum Control {
63    /// Message from the user to the service.
64    User(service::Command),
65    /// Message from a worker to the service.
66    Worker(TaskResult),
67    /// Flush data in the given stream to the remote.
68    Flush { remote: NodeId, stream: StreamId },
69}
70
71/// Peer session type.
72pub type WireSession<G> = NoiseSession<G, Sha256, Socks5Session<net::TcpStream>>;
73/// Peer session type (read-only).
74pub type WireReader = NetReader<Socks5Session<net::TcpStream>>;
75/// Peer session type (write-only).
76pub type WireWriter<G> = NetWriter<NoiseState<G, Sha256>, Socks5Session<net::TcpStream>>;
77
78/// Reactor action.
79type Action<G> = reactor::Action<NetAccept<WireSession<G>>, NetTransport<WireSession<G>>>;
80
81/// A worker stream.
82struct Stream {
83    /// Channels.
84    channels: worker::Channels,
85    /// Data sent.
86    sent_bytes: usize,
87    /// Data received.
88    received_bytes: usize,
89}
90
91impl Stream {
92    fn new(channels: worker::Channels) -> Self {
93        Self {
94            channels,
95            sent_bytes: 0,
96            received_bytes: 0,
97        }
98    }
99}
100
101/// Streams associated with a connected peer.
102struct Streams {
103    /// Active streams and their associated worker channels.
104    /// Note that the gossip and control streams are not included here as they are always
105    /// implied to exist.
106    streams: RandomMap<StreamId, Stream>,
107    /// Connection direction.
108    link: Link,
109    /// Sequence number used to compute the next stream id.
110    seq: u64,
111}
112
113impl Streams {
114    /// Create a new [`Streams`] object, passing the connection link.
115    fn new(link: Link) -> Self {
116        Self {
117            streams: RandomMap::default(),
118            link,
119            seq: 0,
120        }
121    }
122
123    /// Get a known stream.
124    fn get(&self, stream: &StreamId) -> Option<&Stream> {
125        self.streams.get(stream)
126    }
127
128    /// Get a known stream, mutably.
129    fn get_mut(&mut self, stream: &StreamId) -> Option<&mut Stream> {
130        self.streams.get_mut(stream)
131    }
132
133    /// Open a new stream.
134    fn open(&mut self, config: ChannelsConfig) -> (StreamId, worker::Channels) {
135        self.seq += 1;
136
137        let id = StreamId::git(self.link)
138            .nth(self.seq)
139            .expect("Streams::open: too many streams");
140        let channels = self
141            .register(id, config)
142            .expect("Streams::open: stream was already open");
143
144        (id, channels)
145    }
146
147    /// Register an open stream.
148    fn register(&mut self, stream: StreamId, config: ChannelsConfig) -> Option<worker::Channels> {
149        let (wire, worker) = worker::Channels::pair(config)
150            .expect("Streams::register: fatal: unable to create channels");
151
152        match self.streams.entry(stream) {
153            Entry::Vacant(e) => {
154                e.insert(Stream::new(worker));
155                Some(wire)
156            }
157            Entry::Occupied(_) => None,
158        }
159    }
160
161    /// Unregister an open stream.
162    fn unregister(&mut self, stream: &StreamId) -> Option<Stream> {
163        self.streams.remove(stream)
164    }
165
166    /// Close all streams.
167    fn shutdown(&mut self) {
168        for (sid, stream) in self.streams.drain() {
169            log::debug!(target: "wire", "Closing worker stream {sid}");
170            stream.channels.close().ok();
171        }
172    }
173}
174
175/// The initial state of an outbound peer before handshake is completed.
176#[derive(Debug)]
177struct Outbound {
178    /// Resource ID, if registered.
179    id: Option<ResourceId>,
180    /// Remote address.
181    addr: NetAddr<HostName>,
182    /// Remote Node ID.
183    nid: NodeId,
184}
185
186/// The initial state of an inbound peer before handshake is completed.
187#[derive(Debug)]
188struct Inbound {
189    /// Resource ID, if registered.
190    id: Option<ResourceId>,
191    /// Remote address.
192    addr: NetAddr<HostName>,
193}
194
195/// Peer connection state machine.
196enum Peer {
197    /// The state after handshake is completed.
198    /// Peers in this state are handled by the underlying service.
199    Connected {
200        #[allow(dead_code)]
201        addr: NetAddr<HostName>,
202        link: Link,
203        nid: NodeId,
204        inbox: Deserializer<MAX_INBOX_SIZE, Frame>,
205        streams: Streams,
206    },
207    /// The peer was scheduled for disconnection. Once the transport is handed over
208    /// by the reactor, we can consider it disconnected.
209    Disconnecting {
210        link: Link,
211        nid: Option<NodeId>,
212        reason: DisconnectReason,
213    },
214}
215
216impl std::fmt::Debug for Peer {
217    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
218        match self {
219            Self::Connected { link, nid, .. } => write!(f, "Connected({link:?}, {nid})"),
220            Self::Disconnecting { .. } => write!(f, "Disconnecting"),
221        }
222    }
223}
224
225impl Peer {
226    /// Return the peer's id, if any.
227    fn id(&self) -> Option<&NodeId> {
228        match self {
229            Peer::Connected { nid, .. } | Peer::Disconnecting { nid: Some(nid), .. } => Some(nid),
230            Peer::Disconnecting { nid: None, .. } => None,
231        }
232    }
233
234    fn link(&self) -> Link {
235        match self {
236            Peer::Connected { link, .. } => *link,
237            Peer::Disconnecting { link, .. } => *link,
238        }
239    }
240
241    /// Connected peer.
242    fn connected(nid: NodeId, addr: NetAddr<HostName>, link: Link) -> Self {
243        Self::Connected {
244            link,
245            addr,
246            nid,
247            inbox: Deserializer::default(),
248            streams: Streams::new(link),
249        }
250    }
251}
252
253/// Holds connected peers.
254struct Peers(RandomMap<ResourceId, Peer>);
255
256impl Peers {
257    fn get_mut(&mut self, id: &ResourceId) -> Option<&mut Peer> {
258        self.0.get_mut(id)
259    }
260
261    fn entry(&mut self, id: ResourceId) -> Entry<ResourceId, Peer> {
262        self.0.entry(id)
263    }
264
265    fn insert(&mut self, id: ResourceId, peer: Peer) {
266        if self.0.insert(id, peer).is_some() {
267            log::warn!(target: "wire", "Replacing existing peer id={id}");
268        }
269    }
270
271    fn remove(&mut self, id: &ResourceId) -> Option<Peer> {
272        self.0.remove(id)
273    }
274
275    fn lookup(&self, node_id: &NodeId) -> Option<(ResourceId, &Peer)> {
276        self.0
277            .iter()
278            .find(|(_, peer)| peer.id() == Some(node_id))
279            .map(|(fd, peer)| (*fd, peer))
280    }
281
282    fn lookup_mut(&mut self, node_id: &NodeId) -> Option<(ResourceId, &mut Peer)> {
283        self.0
284            .iter_mut()
285            .find(|(_, peer)| peer.id() == Some(node_id))
286            .map(|(fd, peer)| (*fd, peer))
287    }
288
289    fn active(&self) -> impl Iterator<Item = (ResourceId, &NodeId, Link)> {
290        self.0.iter().filter_map(|(id, peer)| match peer {
291            Peer::Connected { nid, link, .. } => Some((*id, nid, *link)),
292            Peer::Disconnecting { .. } => None,
293        })
294    }
295
296    fn connected(&self) -> impl Iterator<Item = (ResourceId, &NodeId)> {
297        self.0.iter().filter_map(|(id, peer)| {
298            if let Peer::Connected { nid, .. } = peer {
299                Some((*id, nid))
300            } else {
301                None
302            }
303        })
304    }
305
306    fn iter(&self) -> impl Iterator<Item = &Peer> {
307        self.0.values()
308    }
309}
310
311/// Wire protocol implementation for a set of peers.
312pub struct Wire<D, S, G: crypto::signature::Signer<crypto::Signature> + Ecdh> {
313    /// Backing service instance.
314    service: Service<D, S, G>,
315    /// Worker pool interface.
316    worker: chan::Sender<Task>,
317    /// Used for authentication.
318    signer: Device<G>,
319    /// Node metrics.
320    metrics: service::Metrics,
321    /// Internal queue of actions to send to the reactor.
322    actions: VecDeque<Action<G>>,
323    /// Outbound attempted peers without a session.
324    outbound: RandomMap<RawFd, Outbound>,
325    /// Inbound peers without a session.
326    inbound: RandomMap<RawFd, Inbound>,
327    /// Listening addresses that are not yet registered.
328    listening: RandomMap<RawFd, net::SocketAddr>,
329    /// Peer (established) sessions.
330    peers: Peers,
331}
332
333impl<D, S, G> Wire<D, S, G>
334where
335    D: service::Store,
336    S: WriteStorage + 'static,
337    G: crypto::signature::Signer<crypto::Signature> + Ecdh<Pk = NodeId>,
338{
339    pub fn new(service: Service<D, S, G>, worker: chan::Sender<Task>, signer: Device<G>) -> Self {
340        assert!(service.started().is_some(), "Service must be initialized");
341
342        Self {
343            service,
344            worker,
345            signer,
346            metrics: Metrics::default(),
347            actions: VecDeque::new(),
348            inbound: RandomMap::default(),
349            outbound: RandomMap::default(),
350            listening: RandomMap::default(),
351            peers: Peers(RandomMap::default()),
352        }
353    }
354
355    pub fn listen(&mut self, socket: NetAccept<WireSession<G>>) {
356        self.listening
357            .insert(socket.as_raw_fd(), socket.local_addr());
358        self.actions.push_back(Action::RegisterListener(socket));
359    }
360
361    fn disconnect(&mut self, id: ResourceId, reason: DisconnectReason) -> Option<(NodeId, Link)> {
362        match self.peers.entry(id) {
363            Entry::Vacant(_) => {
364                // Connecting peer with no session.
365                log::debug!(target: "wire", "Disconnecting pending peer with id={id}: {reason}");
366                self.actions.push_back(Action::UnregisterTransport(id));
367
368                // Check for attempted outbound connections. Unestablished inbound connections don't
369                // have an NID yet.
370                self.outbound
371                    .values()
372                    .find(|o| o.id == Some(id))
373                    .map(|o| (o.nid, Link::Outbound))
374            }
375            Entry::Occupied(mut e) => match e.get_mut() {
376                Peer::Disconnecting { nid, link, .. } => {
377                    log::error!(target: "wire", "Peer with id={id} is already disconnecting");
378
379                    nid.map(|n| (n, *link))
380                }
381                Peer::Connected {
382                    nid, streams, link, ..
383                } => {
384                    log::debug!(target: "wire", "Disconnecting peer with id={id}: {reason}");
385                    let nid = *nid;
386                    let link = *link;
387
388                    streams.shutdown();
389                    e.insert(Peer::Disconnecting {
390                        nid: Some(nid),
391                        link,
392                        reason,
393                    });
394                    self.actions.push_back(Action::UnregisterTransport(id));
395
396                    Some((nid, link))
397                }
398            },
399        }
400    }
401
402    fn worker_result(&mut self, task: TaskResult) {
403        log::debug!(
404            target: "wire",
405            "Received fetch result from worker for stream {}, remote {}: {:?}",
406            task.stream, task.remote, task.result
407        );
408
409        let nid = task.remote;
410        let Some((fd, peer)) = self.peers.lookup_mut(&nid) else {
411            log::warn!(target: "wire", "Peer {nid} not found; ignoring fetch result");
412            return;
413        };
414
415        if let Peer::Connected { link, streams, .. } = peer {
416            // Nb. It's possible that the stream would already be unregistered if we received an
417            // early "close" from the remote. Otherwise, we unregister it here and send the "close"
418            // ourselves.
419            if let Some(s) = streams.unregister(&task.stream) {
420                log::debug!(
421                    target: "wire", "Stream {} of {} closing with {} byte(s) sent and {} byte(s) received",
422                    task.stream, task.remote, s.sent_bytes, s.received_bytes
423                );
424                let frame = Frame::<service::Message>::control(
425                    *link,
426                    frame::Control::Close {
427                        stream: task.stream,
428                    },
429                );
430                self.actions
431                    .push_back(Action::Send(fd, frame.encode_to_vec()));
432            }
433        } else {
434            // If the peer disconnected, we'll get here, but we still want to let the service know
435            // about the fetch result, so we don't return here.
436            log::warn!(target: "wire", "Peer {nid} is not connected; ignoring fetch result");
437            return;
438        };
439
440        // Only call into the service if we initiated this fetch.
441        match task.result {
442            FetchResult::Initiator { rid, result } => {
443                self.service.fetched(rid, nid, result);
444            }
445            FetchResult::Responder { rid, result } => {
446                if let Some(rid) = rid {
447                    if let Some(err) = result.err() {
448                        log::info!(target: "wire", "Peer {nid} failed to fetch {rid} from us: {err}");
449                    } else {
450                        log::info!(target: "wire", "Peer {nid} fetched {rid} from us successfully");
451                    }
452                }
453            }
454        }
455    }
456
457    fn flush(&mut self, remote: NodeId, stream: StreamId) {
458        let Some((fd, peer)) = self.peers.lookup_mut(&remote) else {
459            log::warn!(target: "wire", "Peer {remote} is not known; ignoring flush");
460            return;
461        };
462        let Peer::Connected { streams, link, .. } = peer else {
463            log::warn!(target: "wire", "Peer {remote} is not connected; ignoring flush");
464            return;
465        };
466        let Some(s) = streams.get_mut(&stream) else {
467            log::debug!(target: "wire", "Stream {stream} cannot be found; ignoring flush");
468            return;
469        };
470        let metrics = self.metrics.peer(remote);
471
472        for data in s.channels.try_iter() {
473            let frame = match data {
474                ChannelEvent::Data(data) => {
475                    metrics.sent_git_bytes += data.len();
476                    metrics.sent_bytes += data.len();
477                    Frame::<service::Message>::git(stream, data)
478                }
479                ChannelEvent::Close => Frame::control(*link, frame::Control::Close { stream }),
480                ChannelEvent::Eof => Frame::control(*link, frame::Control::Eof { stream }),
481            };
482            self.actions
483                .push_back(reactor::Action::Send(fd, frame.encode_to_vec()));
484        }
485    }
486
487    fn cleanup(&mut self, id: ResourceId, fd: RawFd) {
488        if self.inbound.remove(&fd).is_some() {
489            log::debug!(target: "wire", "Cleaning up inbound peer state with id={id} (fd={fd})");
490        } else if let Some(outbound) = self.outbound.remove(&fd) {
491            log::debug!(target: "wire", "Cleaning up outbound peer state with id={id} (fd={fd})");
492            self.service.disconnected(
493                outbound.nid,
494                Link::Outbound,
495                &DisconnectReason::connection(),
496            );
497        } else {
498            log::debug!(target: "wire", "Tried to cleanup unknown peer with id={id} (fd={fd})");
499        }
500    }
501}
502
503impl<D, S, G> reactor::Handler for Wire<D, S, G>
504where
505    D: service::Store + Send,
506    S: WriteStorage + Send + 'static,
507    G: crypto::signature::Signer<crypto::Signature> + Ecdh<Pk = NodeId> + Clone + Send,
508{
509    type Listener = NetAccept<WireSession<G>>;
510    type Transport = NetTransport<WireSession<G>>;
511    type Command = Control;
512
513    fn tick(&mut self, time: Timestamp) {
514        self.metrics.open_channels = self
515            .peers
516            .iter()
517            .filter_map(|p| {
518                if let Peer::Connected { streams, .. } = p {
519                    Some(streams.streams.len())
520                } else {
521                    None
522                }
523            })
524            .sum();
525        self.metrics.worker_queue_size = self.worker.len();
526        self.service.tick(
527            LocalTime::from_millis(time.as_millis() as u128),
528            &self.metrics,
529        );
530    }
531
532    fn handle_timer(&mut self) {
533        self.service.wake();
534    }
535
536    fn handle_listener_event(
537        &mut self,
538        _: ResourceId, // Nb. This is the ID of the listener socket.
539        event: ListenerEvent<WireSession<G>>,
540        _: Timestamp,
541    ) {
542        match event {
543            ListenerEvent::Accepted(connection) => {
544                let Ok(remote) = connection.remote_addr() else {
545                    log::warn!(target: "wire", "Accepted connection doesn't have remote address; dropping..");
546                    drop(connection);
547
548                    return;
549                };
550                let InetHost::Ip(ip) = remote.host else {
551                    log::error!(target: "wire", "Unexpected host type for inbound connection {remote}; dropping..");
552                    drop(connection);
553
554                    return;
555                };
556                let fd = connection.as_raw_fd();
557                log::debug!(target: "wire", "Inbound connection from {remote} (fd={fd})..");
558
559                // If the service doesn't want to accept this connection,
560                // we drop the connection here, which disconnects the socket.
561                if !self.service.accepted(ip) {
562                    log::debug!(target: "wire", "Rejecting inbound connection from {ip} (fd={fd})..");
563                    drop(connection);
564
565                    return;
566                }
567
568                let session = match accept::<G>(
569                    remote.clone().into(),
570                    connection,
571                    self.signer.clone().into_inner(),
572                ) {
573                    Ok(s) => s,
574                    Err(e) => {
575                        log::error!(target: "wire", "Error creating session for {ip}: {e}");
576                        return;
577                    }
578                };
579                let transport = match NetTransport::with_session(
580                    session,
581                    netservices::Direction::Inbound,
582                ) {
583                    Ok(transport) => transport,
584                    Err(err) => {
585                        log::error!(target: "wire", "Failed to create transport for accepted connection: {err}");
586                        return;
587                    }
588                };
589                log::debug!(target: "wire", "Accepted inbound connection from {remote} (fd={fd})..");
590
591                self.inbound.insert(
592                    fd,
593                    Inbound {
594                        id: None,
595                        addr: remote.into(),
596                    },
597                );
598                self.actions
599                    .push_back(reactor::Action::RegisterTransport(transport))
600            }
601            ListenerEvent::Failure(err) => {
602                log::error!(target: "wire", "Error listening for inbound connections: {err}");
603            }
604        }
605    }
606
607    fn handle_registered(&mut self, fd: RawFd, id: ResourceId, typ: ResourceType) {
608        match typ {
609            ResourceType::Listener => {
610                if let Some(local_addr) = self.listening.remove(&fd) {
611                    self.service.listening(local_addr);
612                }
613            }
614            ResourceType::Transport => {
615                if let Some(outbound) = self.outbound.get_mut(&fd) {
616                    log::debug!(target: "wire", "Outbound peer resource registered for {} with id={id} (fd={fd})", outbound.nid);
617                    outbound.id = Some(id);
618                } else if let Some(inbound) = self.inbound.get_mut(&fd) {
619                    log::debug!(target: "wire", "Inbound peer resource registered with id={id} (fd={fd})");
620                    inbound.id = Some(id);
621                } else {
622                    log::warn!(target: "wire", "Unknown peer registered with fd={fd} and id={id}");
623                }
624            }
625        }
626    }
627
628    fn handle_transport_event(
629        &mut self,
630        id: ResourceId,
631        event: SessionEvent<WireSession<G>>,
632        _: Timestamp,
633    ) {
634        match event {
635            SessionEvent::Established(fd, ProtocolArtifact { state, .. }) => {
636                // SAFETY: With the NoiseXK protocol, there is always a remote static key.
637                let nid: NodeId = state.remote_static_key.unwrap();
638                // Make sure we don't try to connect to ourselves by mistake.
639                if &nid == self.signer.public_key() {
640                    log::error!(target: "wire", "Self-connection detected, disconnecting..");
641                    self.disconnect(id, DisconnectReason::SelfConnection);
642
643                    return;
644                }
645                let (addr, link) = if let Some(peer) = self.inbound.remove(&fd) {
646                    self.metrics.peer(nid).inbound_connection_attempts += 1;
647                    (peer.addr, Link::Inbound)
648                } else if let Some(peer) = self.outbound.remove(&fd) {
649                    assert_eq!(nid, peer.nid);
650                    (peer.addr, Link::Outbound)
651                } else {
652                    log::error!(target: "wire", "Session for {nid} (id={id}) not found");
653                    return;
654                };
655                log::debug!(
656                    target: "wire",
657                    "Session established with {nid} (id={id}) (fd={fd}) ({})",
658                    if link.is_inbound() { "inbound" } else { "outbound" }
659                );
660
661                // Connections to close.
662                let mut disconnect = Vec::new();
663
664                // Handle conflicting connections.
665                // This is typical when nodes have mutually configured their nodes to connect to
666                // each other on startup. We handle this by deterministically choosing one node
667                // whos outbound connection is the one that is kept. The other connections are
668                // dropped.
669                {
670                    // Whether we have precedence in case of conflicting connections.
671                    // Having precedence means that our outbound connection will win over
672                    // the other node's outbound connection.
673                    let precedence = *self.signer.public_key() > nid;
674
675                    // Pre-existing connections that conflict with this newly established session.
676                    // Note that we can't know whether a connection is conflicting before we get the
677                    // remote static key.
678                    let mut conflicting = Vec::new();
679
680                    // Active sessions with the same NID but a different Resource ID are conflicting.
681                    conflicting.extend(
682                        self.peers
683                            .active()
684                            .filter(|(c_id, d, _)| **d == nid && *c_id != id)
685                            .map(|(c_id, _, link)| (c_id, link)),
686                    );
687
688                    // Outbound connection attempts with the same remote key but a different file
689                    // descriptor are conflicting.
690                    conflicting.extend(self.outbound.iter().filter_map(|(c_fd, other)| {
691                        if other.nid == nid && *c_fd != fd {
692                            other.id.map(|c_id| (c_id, Link::Outbound))
693                        } else {
694                            None
695                        }
696                    }));
697
698                    for (c_id, c_link) in conflicting {
699                        // If we have precedence, the inbound connection is closed.
700                        // In the case where both connections are inbound or outbound,
701                        // we close the newer connection, ie. the one with the higher
702                        // resource id.
703                        let close = match (link, c_link) {
704                            (Link::Inbound, Link::Outbound) => {
705                                if precedence {
706                                    id
707                                } else {
708                                    c_id
709                                }
710                            }
711                            (Link::Outbound, Link::Inbound) => {
712                                if precedence {
713                                    c_id
714                                } else {
715                                    id
716                                }
717                            }
718                            (Link::Inbound, Link::Inbound) => id.max(c_id),
719                            (Link::Outbound, Link::Outbound) => id.max(c_id),
720                        };
721
722                        log::warn!(
723                            target: "wire", "Established session (id={id}) conflicts with existing session for {nid} (id={c_id})"
724                        );
725                        disconnect.push(close);
726                    }
727                }
728                for id in &disconnect {
729                    log::warn!(
730                        target: "wire", "Closing conflicting session (id={id}) with {nid}.."
731                    );
732                    // Disconnect and return the associated NID of the peer, if available.
733                    if let Some((nid, link)) = self.disconnect(*id, DisconnectReason::Conflict) {
734                        // We disconnect the session eagerly because otherwise we will get the new
735                        // `connected` event before the `disconnect`, resulting in a duplicate
736                        // connection.
737                        self.service
738                            .disconnected(nid, link, &DisconnectReason::Conflict);
739                    }
740                }
741                if !disconnect.contains(&id) {
742                    self.peers
743                        .insert(id, Peer::connected(nid, addr.clone(), link));
744                    self.service.connected(nid, addr.into(), link);
745                }
746            }
747            SessionEvent::Data(data) => {
748                if let Some(Peer::Connected {
749                    nid,
750                    inbox,
751                    streams,
752                    ..
753                }) = self.peers.get_mut(&id)
754                {
755                    let metrics = self.metrics.peer(*nid);
756                    metrics.received_bytes += data.len();
757
758                    if inbox.input(&data).is_err() {
759                        log::error!(target: "wire", "Maximum inbox size ({MAX_INBOX_SIZE}) reached for peer {nid}");
760                        log::error!(target: "wire", "Unable to process messages fast enough for peer {nid}; disconnecting..");
761                        self.disconnect(id, DisconnectReason::Session(session::Error::Misbehavior));
762
763                        return;
764                    }
765
766                    loop {
767                        match inbox.deserialize_next() {
768                            Ok(Some(Frame {
769                                data: FrameData::Control(frame::Control::Open { stream }),
770                                ..
771                            })) => {
772                                log::debug!(target: "wire", "Received `open` command for stream {stream} from {nid}");
773                                metrics.streams_opened += 1;
774                                metrics.received_fetch_requests += 1;
775                                let reader_limit = self.service.config().limits.fetch_pack_receive;
776                                let Some(channels) = streams.register(
777                                    stream,
778                                    ChannelsConfig::new(FETCH_TIMEOUT)
779                                        .with_reader_limit(reader_limit),
780                                ) else {
781                                    log::warn!(target: "wire", "Peer attempted to open already-open stream stream {stream}");
782                                    continue;
783                                };
784
785                                let task = Task {
786                                    fetch: FetchRequest::Responder {
787                                        remote: *nid,
788                                        emitter: self.service.emitter(),
789                                    },
790                                    stream,
791                                    channels,
792                                };
793                                if let Err(e) = self.worker.try_send(task) {
794                                    log::error!(
795                                        target: "wire",
796                                        "Worker pool failed to accept incoming fetch request: {e}"
797                                    );
798                                }
799                            }
800                            Ok(Some(Frame {
801                                data: FrameData::Control(frame::Control::Eof { stream }),
802                                ..
803                            })) => {
804                                if let Some(s) = streams.get(&stream) {
805                                    log::debug!(target: "wire", "Received `end-of-file` on stream {stream} from {nid}");
806
807                                    if s.channels.send(ChannelEvent::Eof).is_err() {
808                                        log::error!(target: "wire", "Worker is disconnected; cannot send `EOF`");
809                                    }
810                                } else {
811                                    log::debug!(target: "wire", "Ignoring frame on closed or unknown stream {stream}");
812                                }
813                            }
814                            Ok(Some(Frame {
815                                data: FrameData::Control(frame::Control::Close { stream }),
816                                ..
817                            })) => {
818                                log::debug!(target: "wire", "Received `close` command for stream {stream} from {nid}");
819
820                                if let Some(s) = streams.unregister(&stream) {
821                                    log::debug!(
822                                        target: "wire",
823                                        "Stream {stream} of {nid} closed with {} byte(s) sent and {} byte(s) received",
824                                        s.sent_bytes, s.received_bytes
825                                    );
826                                    s.channels.close().ok();
827                                }
828                            }
829                            Ok(Some(Frame {
830                                data: FrameData::Gossip(msg),
831                                ..
832                            })) => {
833                                metrics.received_gossip_messages += 1;
834                                self.service.received_message(*nid, msg);
835                            }
836                            Ok(Some(Frame {
837                                stream,
838                                data: FrameData::Git(data),
839                                ..
840                            })) => {
841                                if let Some(s) = streams.get_mut(&stream) {
842                                    metrics.received_git_bytes += data.len();
843
844                                    if s.channels.send(ChannelEvent::Data(data)).is_err() {
845                                        log::error!(target: "wire", "Worker is disconnected; cannot send data");
846                                    }
847                                } else {
848                                    log::debug!(target: "wire", "Ignoring frame on closed or unknown stream {stream}");
849                                }
850                            }
851                            Ok(None) => {
852                                // Buffer is empty, or message isn't complete.
853                                break;
854                            }
855                            Err(e) => {
856                                log::error!(target: "wire", "Invalid gossip message from {nid}: {e}");
857
858                                if !inbox.is_empty() {
859                                    log::debug!(target: "wire", "Dropping read buffer for {nid} with {} bytes", inbox.len());
860                                }
861                                self.disconnect(
862                                    id,
863                                    DisconnectReason::Session(session::Error::Misbehavior),
864                                );
865                                break;
866                            }
867                        }
868                    }
869                } else {
870                    log::warn!(target: "wire", "Dropping message from unconnected peer (id={id})");
871                }
872            }
873            SessionEvent::Terminated(err) => {
874                self.disconnect(id, DisconnectReason::Connection(Arc::new(err)));
875            }
876        }
877    }
878
879    fn handle_command(&mut self, cmd: Self::Command) {
880        match cmd {
881            Control::User(cmd) => self.service.command(cmd),
882            Control::Worker(result) => self.worker_result(result),
883            Control::Flush { remote, stream } => self.flush(remote, stream),
884        }
885    }
886
887    fn handle_error(
888        &mut self,
889        err: reactor::Error<NetAccept<WireSession<G>>, NetTransport<WireSession<G>>>,
890    ) {
891        match err {
892            reactor::Error::Poll(err) => {
893                // TODO: This should be a fatal error, there's nothing we can do here.
894                log::error!(target: "wire", "Can't poll connections: {err}");
895            }
896            reactor::Error::ListenerDisconnect(id, _) => {
897                // TODO: This should be a fatal error, there's nothing we can do here.
898                log::error!(target: "wire", "Listener {id} disconnected");
899            }
900            reactor::Error::TransportDisconnect(id, transport) => {
901                let fd = transport.as_raw_fd();
902                log::error!(target: "wire", "Peer id={id} (fd={fd}) disconnected");
903
904                // We're dropping the TCP connection here.
905                drop(transport);
906
907                // The peer transport is already disconnected and removed from the reactor;
908                // therefore there is no need to initiate a disconnection. We simply remove
909                // the peer from the map.
910                match self.peers.remove(&id) {
911                    Some(mut peer) => {
912                        if let Peer::Connected { streams, .. } = &mut peer {
913                            streams.shutdown();
914                        }
915
916                        if let Some(id) = peer.id() {
917                            self.service.disconnected(
918                                *id,
919                                peer.link(),
920                                &DisconnectReason::connection(),
921                            );
922                        } else {
923                            log::debug!(target: "wire", "Inbound disconnection before handshake; ignoring..")
924                        }
925                    }
926                    None => self.cleanup(id, fd),
927                }
928            }
929        }
930    }
931
932    fn handover_listener(&mut self, id: ResourceId, _listener: Self::Listener) {
933        log::error!(target: "wire", "Listener handover is not supported (id={id})");
934    }
935
936    fn handover_transport(&mut self, id: ResourceId, transport: Self::Transport) {
937        let fd = transport.as_raw_fd();
938
939        match self.peers.entry(id) {
940            Entry::Occupied(e) => {
941                match e.get() {
942                    Peer::Disconnecting {
943                        nid, reason, link, ..
944                    } => {
945                        log::debug!(target: "wire", "Transport handover for disconnecting peer with id={id} (fd={fd})");
946
947                        // Disconnect TCP stream.
948                        drop(transport);
949
950                        // If there is no NID, the service is not aware of the peer.
951                        if let Some(nid) = nid {
952                            // In the case of a conflicting connection, there will be two resources
953                            // for the peer. However, at the service level, there is only one, and
954                            // it is identified by NID.
955                            //
956                            // Therefore, we specify which of the connections we're closing by
957                            // passing the `link`.
958                            self.service.disconnected(*nid, *link, reason);
959                        }
960                        e.remove();
961                    }
962                    Peer::Connected { nid, .. } => {
963                        panic!("Wire::handover_transport: Unexpected handover of connected peer {nid} with id={id} (fd={fd})");
964                    }
965                }
966            }
967            Entry::Vacant(_) => self.cleanup(id, fd),
968        }
969    }
970}
971
972impl<D, S, G> Iterator for Wire<D, S, G>
973where
974    D: service::Store,
975    S: WriteStorage + 'static,
976    G: crypto::signature::Signer<crypto::Signature> + Ecdh<Pk = NodeId> + Clone,
977{
978    type Item = Action<G>;
979
980    fn next(&mut self) -> Option<Self::Item> {
981        while let Some(ev) = self.service.next() {
982            match ev {
983                Io::Write(node_id, msgs) => {
984                    let (fd, link) = match self.peers.lookup(&node_id) {
985                        Some((fd, Peer::Connected { link, .. })) => (fd, *link),
986                        Some((_, peer)) => {
987                            // If the peer is disconnected by the wire protocol, the service may
988                            // not be aware of this yet, and may continue to write messages to it.
989                            log::debug!(target: "wire", "Dropping {} message(s) to {node_id} ({peer:?})", msgs.len());
990                            continue;
991                        }
992                        None => {
993                            log::error!(target: "wire", "Dropping {} message(s) to {node_id}: unknown peer", msgs.len());
994                            continue;
995                        }
996                    };
997                    log::trace!(
998                        target: "wire", "Writing {} message(s) to {}", msgs.len(), node_id
999                    );
1000                    let mut data = Vec::new();
1001                    let metrics = self.metrics.peer(node_id);
1002                    metrics.sent_gossip_messages += msgs.len();
1003
1004                    for msg in msgs {
1005                        Frame::gossip(link, msg).encode(&mut data);
1006                    }
1007                    metrics.sent_bytes += data.len();
1008
1009                    self.actions.push_back(reactor::Action::Send(fd, data));
1010                }
1011                Io::Connect(node_id, addr) => {
1012                    if self.peers.connected().any(|(_, id)| id == &node_id) {
1013                        log::error!(
1014                            target: "wire",
1015                            "Attempt to connect to already connected peer {node_id}"
1016                        );
1017                        // FIXME: The problem here is the session will stay in "initial" state,
1018                        // because it can't transition to attempted.
1019                        continue;
1020                    }
1021                    self.service.attempted(node_id, addr.clone());
1022                    self.metrics.peer(node_id).outbound_connection_attempts += 1;
1023
1024                    match dial::<G>(
1025                        addr.to_inner(),
1026                        node_id,
1027                        self.signer.clone().into_inner(),
1028                        self.service.config(),
1029                    )
1030                    .and_then(|session| {
1031                        NetTransport::<WireSession<G>>::with_session(
1032                            session,
1033                            netservices::Direction::Outbound,
1034                        )
1035                    }) {
1036                        Ok(transport) => {
1037                            self.outbound.insert(
1038                                transport.as_raw_fd(),
1039                                Outbound {
1040                                    id: None,
1041                                    nid: node_id,
1042                                    addr: addr.to_inner(),
1043                                },
1044                            );
1045                            log::debug!(
1046                                target: "wire",
1047                                "Registering outbound transport for {node_id} (fd={})..",
1048                                transport.as_raw_fd()
1049                            );
1050                            self.actions
1051                                .push_back(reactor::Action::RegisterTransport(transport));
1052                        }
1053                        Err(err) => {
1054                            log::error!(target: "wire", "Error establishing connection to {addr}: {err}");
1055
1056                            self.service.disconnected(
1057                                node_id,
1058                                Link::Outbound,
1059                                &DisconnectReason::Dial(Arc::new(err)),
1060                            );
1061                        }
1062                    }
1063                }
1064                Io::Disconnect(nid, reason) => {
1065                    if let Some((id, Peer::Connected { .. })) = self.peers.lookup(&nid) {
1066                        if let Some((nid, _)) = self.disconnect(id, reason) {
1067                            self.metrics.peer(nid).disconnects += 1;
1068                        }
1069                    } else {
1070                        log::warn!(target: "wire", "Peer {nid} is not connected: ignoring disconnect");
1071                    }
1072                }
1073                Io::Wakeup(d) => {
1074                    self.actions.push_back(reactor::Action::SetTimer(d.into()));
1075                }
1076                Io::Fetch {
1077                    rid,
1078                    remote,
1079                    timeout,
1080                    reader_limit,
1081                    refs_at,
1082                } => {
1083                    log::trace!(target: "wire", "Processing fetch for {rid} from {remote}..");
1084
1085                    let Some((fd, Peer::Connected { link, streams, .. })) =
1086                        self.peers.lookup_mut(&remote)
1087                    else {
1088                        // Nb. It's possible that a peer is disconnected while an `Io::Fetch`
1089                        // is in the service's i/o buffer. Since the service may not purge the
1090                        // buffer on disconnect, we should just ignore i/o actions that don't
1091                        // have a connected peer.
1092                        log::error!(target: "wire", "Peer {remote} is not connected: dropping fetch");
1093                        continue;
1094                    };
1095                    let (stream, channels) =
1096                        streams.open(ChannelsConfig::new(timeout).with_reader_limit(reader_limit));
1097
1098                    log::debug!(target: "wire", "Opened new stream with id {stream} for {rid} and remote {remote}");
1099
1100                    let link = *link;
1101                    let task = Task {
1102                        fetch: FetchRequest::Initiator {
1103                            rid,
1104                            remote,
1105                            refs_at,
1106                        },
1107                        stream,
1108                        channels,
1109                    };
1110
1111                    if !self.worker.is_empty() {
1112                        log::warn!(
1113                            target: "wire",
1114                            "Worker pool is busy: {} tasks pending, fetch requests may be delayed", self.worker.len()
1115                        );
1116                    }
1117                    if let Err(e) = self.worker.try_send(task) {
1118                        log::error!(
1119                            target: "wire",
1120                            "Worker pool failed to accept outgoing fetch request: {e}"
1121                        );
1122                    }
1123                    let metrics = self.metrics.peer(remote);
1124                    metrics.streams_opened += 1;
1125                    metrics.sent_fetch_requests += 1;
1126
1127                    self.actions.push_back(Action::Send(
1128                        fd,
1129                        Frame::<service::Message>::control(link, frame::Control::Open { stream })
1130                            .encode_to_vec(),
1131                    ));
1132                }
1133            }
1134        }
1135        self.actions.pop_front()
1136    }
1137}
1138
1139/// Establish a new outgoing connection.
1140pub fn dial<G: Ecdh<Pk = NodeId>>(
1141    remote_addr: NetAddr<HostName>,
1142    remote_id: <G as EcSk>::Pk,
1143    signer: G,
1144    config: &radicle::node::Config,
1145) -> io::Result<WireSession<G>> {
1146    // Determine what address to establish a TCP connection with, given the remote peer
1147    // address and our node configuration.
1148    let inet_addr: NetAddr<InetHost> = match (&remote_addr.host, config.proxy) {
1149        // For IP and DNS addresses, use the global proxy if set, otherwise use the address as-is.
1150        (HostName::Ip(_), Some(proxy)) => proxy.into(),
1151        (HostName::Ip(ip), None) => NetAddr::new(InetHost::Ip(*ip), remote_addr.port),
1152        (HostName::Dns(_), Some(proxy)) => proxy.into(),
1153        (HostName::Dns(dns), None) => NetAddr::new(InetHost::Dns(dns.clone()), remote_addr.port),
1154        // For onion addresses, handle with care.
1155        (HostName::Tor(onion), proxy) => match config.onion {
1156            // In onion proxy mode, simply use the configured proxy address.
1157            // This takes precedence over any global proxy.
1158            Some(AddressConfig::Proxy { address }) => address.into(),
1159            // In "forward" mode, if a global proxy is set, we use that, otherwise
1160            // we treat `.onion` addresses as regular DNS names.
1161            Some(AddressConfig::Forward) => {
1162                if let Some(proxy) = proxy {
1163                    proxy.into()
1164                } else {
1165                    NetAddr::new(InetHost::Dns(onion.to_string()), remote_addr.port)
1166                }
1167            }
1168            // If onion address support isn't configured, refuse to connect.
1169            None => {
1170                return Err(io::Error::new(
1171                    io::ErrorKind::Unsupported,
1172                    "no configuration found for .onion addresses",
1173                ));
1174            }
1175        },
1176        _ => {
1177            return Err(io::Error::new(
1178                io::ErrorKind::Unsupported,
1179                "unsupported remote address type",
1180            ));
1181        }
1182    };
1183    // Nb. This timeout is currently not used by the underlying library due to the
1184    // `socket2` library not supporting non-blocking connect with timeout.
1185    let connection = net::TcpStream::connect_nonblocking(inet_addr, DEFAULT_DIAL_TIMEOUT)?;
1186    // Whether to tunnel regular connections through the proxy.
1187    let force_proxy = config.proxy.is_some();
1188
1189    session::<G>(
1190        remote_addr,
1191        Some(remote_id),
1192        connection,
1193        force_proxy,
1194        signer,
1195    )
1196}
1197
1198/// Accept a new connection.
1199pub fn accept<G: Ecdh<Pk = NodeId>>(
1200    remote_addr: NetAddr<HostName>,
1201    connection: net::TcpStream,
1202    signer: G,
1203) -> io::Result<WireSession<G>> {
1204    session::<G>(remote_addr, None, connection, false, signer)
1205}
1206
1207/// Create a new [`WireSession`].
1208fn session<G: Ecdh<Pk = NodeId>>(
1209    remote_addr: NetAddr<HostName>,
1210    remote_id: Option<NodeId>,
1211    connection: net::TcpStream,
1212    force_proxy: bool,
1213    signer: G,
1214) -> io::Result<WireSession<G>> {
1215    // There are issues with setting TCP_NODELAY on WSL. Not a big deal.
1216    if let Err(e) = connection.set_nodelay(true) {
1217        log::warn!(target: "wire", "Unable to set TCP_NODELAY on fd {}: {e}", connection.as_raw_fd());
1218    }
1219    connection.set_read_timeout(Some(DEFAULT_CONNECTION_TIMEOUT))?;
1220    connection.set_write_timeout(Some(DEFAULT_CONNECTION_TIMEOUT))?;
1221
1222    let sock = socket2::Socket::from(connection);
1223    let ka = socket2::TcpKeepalive::new()
1224        .with_time(time::Duration::from_secs(30))
1225        .with_interval(time::Duration::from_secs(10))
1226        .with_retries(3);
1227    if let Err(e) = sock.set_tcp_keepalive(&ka) {
1228        log::warn!(target: "wire", "Unable to set TCP_KEEPALIVE on fd {}: {e}", sock.as_raw_fd());
1229    }
1230
1231    let socks5 = socks5::Socks5::with(remote_addr, force_proxy);
1232    let proxy = Socks5Session::with(sock.into(), socks5);
1233    let pair = G::generate_keypair();
1234    let keyset = Keyset {
1235        e: pair.0,
1236        s: Some(signer),
1237        re: None,
1238        rs: remote_id,
1239    };
1240    let noise = NoiseState::initialize::<{ Sha256::OUTPUT_LEN }>(
1241        NOISE_XK,
1242        remote_id.is_some(),
1243        &[],
1244        keyset,
1245    );
1246    Ok(WireSession::with(proxy, noise))
1247}
1248
1249#[cfg(test)]
1250mod test {
1251    use super::*;
1252    use crate::service::{Message, ZeroBytes};
1253    use crate::wire;
1254    use crate::wire::varint;
1255
1256    #[test]
1257    fn test_pong_message_with_extension() {
1258        use radicle_protocol::deserializer;
1259
1260        let mut stream = Vec::new();
1261        let pong = Message::Pong {
1262            zeroes: ZeroBytes::new(42),
1263        };
1264        frame::PROTOCOL_VERSION_STRING.encode(&mut stream);
1265        frame::StreamId::gossip(Link::Outbound).encode(&mut stream);
1266
1267        // Serialize gossip message with some extension fields.
1268        let mut gossip = pong.encode_to_vec();
1269        String::from("extra").encode(&mut gossip);
1270        48u8.encode(&mut gossip);
1271
1272        // Encode gossip message using the varint-prefix format into the stream.
1273        varint::payload::encode(&gossip, &mut stream);
1274
1275        let mut de = deserializer::Deserializer::<1024, Frame>::new(1024);
1276        de.input(&stream).unwrap();
1277
1278        // The "pong" message decodes successfully, even though there is trailing data.
1279        assert_eq!(
1280            de.deserialize_next().unwrap().unwrap(),
1281            Frame::gossip(Link::Outbound, pong)
1282        );
1283        assert!(de.deserialize_next().unwrap().is_none());
1284        assert!(de.is_empty());
1285    }
1286
1287    #[test]
1288    fn test_inventory_ann_with_extension() {
1289        use radicle_protocol::deserializer;
1290
1291        #[derive(Debug)]
1292        struct MessageWithExt {
1293            msg: Message,
1294            ext: String,
1295        }
1296
1297        impl wire::Encode for MessageWithExt {
1298            fn encode(&self, writer: &mut impl bytes::BufMut) {
1299                self.msg.encode(writer);
1300                self.ext.encode(writer);
1301            }
1302        }
1303
1304        impl wire::Decode for MessageWithExt {
1305            fn decode(reader: &mut impl bytes::Buf) -> Result<Self, wire::Error> {
1306                let msg = Message::decode(reader)?;
1307                let ext = String::decode(reader).unwrap_or_default();
1308
1309                Ok(MessageWithExt { msg, ext })
1310            }
1311        }
1312
1313        let rid = radicle::test::arbitrary::gen(1);
1314        let pk = radicle::test::arbitrary::gen(1);
1315        let sig: [u8; 64] = radicle::test::arbitrary::gen(1);
1316
1317        // Message with extension.
1318        let mut stream = Vec::new();
1319        let ann = Message::announcement(
1320            pk,
1321            service::gossip::inventory(radicle::node::Timestamp::MAX, [rid]),
1322            radicle::crypto::Signature::from(sig),
1323        );
1324        let pong = Message::Pong {
1325            zeroes: ZeroBytes::new(42),
1326        };
1327        // Framed message with extension.
1328        frame::Frame::gossip(
1329            Link::Outbound,
1330            MessageWithExt {
1331                msg: ann.clone(),
1332                ext: String::from("extra"),
1333            },
1334        )
1335        .encode(&mut stream);
1336        // Pong message that comes after, without extension.
1337        frame::Frame::gossip(Link::Outbound, pong.clone()).encode(&mut stream);
1338
1339        // First test deserializing using the message with extension type.
1340        {
1341            let mut de = deserializer::Deserializer::<1024, Frame<MessageWithExt>>::new(1024);
1342            de.input(&stream).unwrap();
1343
1344            radicle::assert_matches!(
1345                de.deserialize_next().unwrap().unwrap().data,
1346                FrameData::Gossip(MessageWithExt {
1347                    msg,
1348                    ext,
1349                }) if msg == ann && ext == *"extra"
1350            );
1351            radicle::assert_matches!(
1352                de.deserialize_next().unwrap().unwrap().data,
1353                FrameData::Gossip(MessageWithExt {
1354                    msg,
1355                    ext,
1356                }) if msg == pong && ext.is_empty()
1357            );
1358            assert!(de.deserialize_next().unwrap().is_none());
1359            assert!(de.is_empty());
1360        }
1361
1362        // Then test deserializing using the current message type without the extension.
1363        {
1364            let mut de = deserializer::Deserializer::<1024, Frame<Message>>::new(1024);
1365            de.input(&stream).unwrap();
1366
1367            radicle::assert_matches!(
1368                de.deserialize_next().unwrap().unwrap().data,
1369                FrameData::Gossip(msg)
1370                if msg == ann
1371            );
1372            radicle::assert_matches!(
1373                de.deserialize_next().unwrap().unwrap().data,
1374                FrameData::Gossip(msg)
1375                if msg == pong
1376            );
1377            assert!(de.deserialize_next().unwrap().is_none());
1378            assert!(de.is_empty());
1379        }
1380    }
1381}