Skip to main content

radicle_protocol/service/
io.rs

1use std::collections::VecDeque;
2
3use localtime::LocalDuration;
4use log::*;
5use radicle::identity::RepoId;
6use radicle::node::Address;
7use radicle::node::NodeId;
8use radicle::node::config::FetchPackSizeLimit;
9use radicle::storage::refs::RefsAt;
10
11use crate::fetcher;
12use crate::service::DisconnectReason;
13use crate::service::Link;
14use crate::service::message::Message;
15use crate::service::session::Session;
16
17use super::gossip;
18use super::message::{Announcement, AnnouncementMessage};
19
20/// I/O operation to execute at the network/wire level.
21#[derive(Debug)]
22pub enum Io {
23    /// There are some messages ready to be sent to a peer.
24    Write(NodeId, Vec<Message>),
25    /// Connect to a peer.
26    Connect(NodeId, Address),
27    /// Disconnect from a peer.
28    Disconnect(NodeId, DisconnectReason),
29    /// Fetch repository data from a peer.
30    Fetch {
31        /// Repo being fetched.
32        rid: RepoId,
33        /// Remote node being fetched from.
34        remote: NodeId,
35        /// If the node is fetching specific `rad/sigrefs`.
36        refs_at: Option<Vec<RefsAt>>,
37        /// Limit the number of bytes fetched.
38        reader_limit: FetchPackSizeLimit,
39        /// Options for configuring the fetch worker, such as timeout, and
40        /// internal fetch protocol options.
41        config: fetcher::FetchConfig,
42    },
43    /// Ask for a wakeup in a specified amount of time.
44    Wakeup(LocalDuration),
45}
46
47/// Interface to the network.
48#[derive(Debug, Default)]
49pub struct Outbox {
50    /// Outgoing I/O queue.
51    io: VecDeque<Io>,
52}
53
54impl Outbox {
55    /// Connect to a peer.
56    pub fn connect(&mut self, id: NodeId, addr: Address) {
57        self.io.push_back(Io::Connect(id, addr));
58    }
59
60    /// Disconnect a peer.
61    pub fn disconnect(&mut self, id: NodeId, reason: DisconnectReason) {
62        self.io.push_back(Io::Disconnect(id, reason));
63    }
64
65    pub fn write(&mut self, remote: &Session, msg: Message) {
66        let level = match &msg {
67            Message::Ping(_) | Message::Pong { .. } => log::Level::Trace,
68            _ => log::Level::Debug,
69        };
70        msg.log(level, &remote.id, Link::Outbound);
71        trace!(target: "service", "Write {:?} to {}", &msg, remote);
72
73        self.io.push_back(Io::Write(remote.id, vec![msg]));
74    }
75
76    /// Announce something to a peer. This is meant for our own announcement messages.
77    pub fn announce<'a>(
78        &mut self,
79        ann: Announcement,
80        peers: impl Iterator<Item = &'a Session>,
81        gossip: &mut impl gossip::Store,
82    ) {
83        // Store our announcement so that it can be retrieved from us later, just like
84        // announcements we receive from peers.
85        if let Err(e) = gossip.announced(&ann.node, &ann) {
86            warn!(target: "service", "Failed to update gossip store with announced message: {e}");
87        }
88
89        for peer in peers {
90            if let AnnouncementMessage::Refs(refs) = &ann.message {
91                if let Some(subscribe) = &peer.subscribe {
92                    if subscribe.filter.contains(&refs.rid) {
93                        self.write(peer, ann.clone().into());
94                    } else {
95                        debug!(
96                            target: "service",
97                            "Skipping refs announcement relay to {peer}: peer isn't subscribed to {}",
98                            refs.rid
99                        );
100                    }
101                } else {
102                    debug!(
103                        target: "service",
104                        "Skipping refs announcement relay to {peer}: peer didn't send a subscription filter"
105                    );
106                }
107            } else {
108                self.write(peer, ann.clone().into());
109            }
110        }
111    }
112
113    pub fn write_all(&mut self, remote: &Session, msgs: impl IntoIterator<Item = Message>) {
114        let msgs = msgs.into_iter().collect::<Vec<_>>();
115
116        for (ix, msg) in msgs.iter().enumerate() {
117            trace!(
118                target: "service",
119                "Write {:?} to {} ({}/{})",
120                msg,
121                remote,
122                ix + 1,
123                msgs.len()
124            );
125            msg.log(log::Level::Trace, &remote.id, Link::Outbound);
126        }
127        self.io.push_back(Io::Write(remote.id, msgs));
128    }
129
130    pub fn wakeup(&mut self, after: LocalDuration) {
131        self.io.push_back(Io::Wakeup(after));
132    }
133
134    pub fn fetch(
135        &mut self,
136        peer: &mut Session,
137        rid: RepoId,
138        refs_at: Vec<RefsAt>,
139        reader_limit: FetchPackSizeLimit,
140        config: fetcher::FetchConfig,
141    ) {
142        let refs_at = (!refs_at.is_empty()).then_some(refs_at);
143
144        if let Some(refs_at) = &refs_at {
145            debug!(
146                target: "service",
147                "Fetch initiated for {rid} with {peer} ({} remote(s))..", refs_at.len()
148            );
149        } else {
150            debug!(target: "service", "Fetch initiated for {rid} with {peer} (all remotes)..");
151        }
152
153        self.io.push_back(Io::Fetch {
154            rid,
155            refs_at,
156            remote: peer.id,
157            reader_limit,
158            config,
159        });
160    }
161
162    /// Broadcast a message to a list of peers.
163    pub fn broadcast<'a>(
164        &mut self,
165        msg: impl Into<Message>,
166        peers: impl IntoIterator<Item = &'a Session>,
167    ) {
168        let msg = msg.into();
169        for peer in peers {
170            self.write(peer, msg.clone());
171        }
172    }
173
174    /// Relay a message to interested peers.
175    pub fn relay<'a>(&mut self, ann: Announcement, peers: impl IntoIterator<Item = &'a Session>) {
176        if let AnnouncementMessage::Refs(msg) = &ann.message {
177            let id = msg.rid;
178            let peers = peers.into_iter().filter(|p| {
179                if let Some(subscribe) = &p.subscribe {
180                    subscribe.filter.contains(&id)
181                } else {
182                    // If the peer did not send us a `subscribe` message, we don't
183                    // relay any messages to them.
184                    false
185                }
186            });
187            self.broadcast(ann, peers);
188        } else {
189            self.broadcast(ann, peers);
190        }
191    }
192
193    /// Number of items in outbox.
194    #[allow(clippy::len_without_is_empty)]
195    pub fn len(&self) -> usize {
196        self.io.len()
197    }
198
199    #[cfg(any(test, feature = "test"))]
200    pub fn queue(&mut self) -> &mut VecDeque<Io> {
201        &mut self.io
202    }
203}
204
205impl Iterator for Outbox {
206    type Item = Io;
207
208    fn next(&mut self) -> Option<Self::Item> {
209        self.io.pop_front()
210    }
211}