Skip to main content

radicle_protocol/service/
io.rs

1use std::collections::VecDeque;
2use std::time;
3
4use localtime::LocalDuration;
5use log::*;
6use radicle::identity::RepoId;
7use radicle::node::config::FetchPackSizeLimit;
8use radicle::node::Address;
9use radicle::node::NodeId;
10use radicle::storage::refs::RefsAt;
11
12use crate::service::message::Message;
13use crate::service::session::Session;
14use crate::service::DisconnectReason;
15use crate::service::Link;
16
17use super::gossip;
18use super::message::{Announcement, AnnouncementMessage};
19
20/// I/O operation to execute at the network/wire level.
21#[derive(Debug)]
22pub enum Io {
23    /// There are some messages ready to be sent to a peer.
24    Write(NodeId, Vec<Message>),
25    /// Connect to a peer.
26    Connect(NodeId, Address),
27    /// Disconnect from a peer.
28    Disconnect(NodeId, DisconnectReason),
29    /// Fetch repository data from a peer.
30    Fetch {
31        /// Repo being fetched.
32        rid: RepoId,
33        /// Remote node being fetched from.
34        remote: NodeId,
35        /// If the node is fetching specific `rad/sigrefs`.
36        refs_at: Option<Vec<RefsAt>>,
37        /// Fetch timeout.
38        timeout: time::Duration,
39        /// Limit the number of bytes fetched.
40        reader_limit: FetchPackSizeLimit,
41    },
42    /// Ask for a wakeup in a specified amount of time.
43    Wakeup(LocalDuration),
44}
45
46/// Interface to the network.
47#[derive(Debug, Default)]
48pub struct Outbox {
49    /// Outgoing I/O queue.
50    io: VecDeque<Io>,
51}
52
53impl Outbox {
54    /// Connect to a peer.
55    pub fn connect(&mut self, id: NodeId, addr: Address) {
56        self.io.push_back(Io::Connect(id, addr));
57    }
58
59    /// Disconnect a peer.
60    pub fn disconnect(&mut self, id: NodeId, reason: DisconnectReason) {
61        self.io.push_back(Io::Disconnect(id, reason));
62    }
63
64    pub fn write(&mut self, remote: &Session, msg: Message) {
65        let level = match &msg {
66            Message::Ping(_) | Message::Pong { .. } => log::Level::Trace,
67            _ => log::Level::Debug,
68        };
69        msg.log(level, &remote.id, Link::Outbound);
70        trace!(target: "service", "Write {:?} to {}", &msg, remote);
71
72        self.io.push_back(Io::Write(remote.id, vec![msg]));
73    }
74
75    /// Announce something to a peer. This is meant for our own announcement messages.
76    pub fn announce<'a>(
77        &mut self,
78        ann: Announcement,
79        peers: impl Iterator<Item = &'a Session>,
80        gossip: &mut impl gossip::Store,
81    ) {
82        // Store our announcement so that it can be retrieved from us later, just like
83        // announcements we receive from peers.
84        if let Err(e) = gossip.announced(&ann.node, &ann) {
85            warn!(target: "service", "Failed to update gossip store with announced message: {e}");
86        }
87
88        for peer in peers {
89            if let AnnouncementMessage::Refs(refs) = &ann.message {
90                if let Some(subscribe) = &peer.subscribe {
91                    if subscribe.filter.contains(&refs.rid) {
92                        self.write(peer, ann.clone().into());
93                    } else {
94                        debug!(
95                            target: "service",
96                            "Skipping refs announcement relay to {peer}: peer isn't subscribed to {}",
97                            refs.rid
98                        );
99                    }
100                } else {
101                    debug!(
102                        target: "service",
103                        "Skipping refs announcement relay to {peer}: peer didn't send a subscription filter"
104                    );
105                }
106            } else {
107                self.write(peer, ann.clone().into());
108            }
109        }
110    }
111
112    pub fn write_all(&mut self, remote: &Session, msgs: impl IntoIterator<Item = Message>) {
113        let msgs = msgs.into_iter().collect::<Vec<_>>();
114
115        for (ix, msg) in msgs.iter().enumerate() {
116            trace!(
117                target: "service",
118                "Write {:?} to {} ({}/{})",
119                msg,
120                remote,
121                ix + 1,
122                msgs.len()
123            );
124            msg.log(log::Level::Trace, &remote.id, Link::Outbound);
125        }
126        self.io.push_back(Io::Write(remote.id, msgs));
127    }
128
129    pub fn wakeup(&mut self, after: LocalDuration) {
130        self.io.push_back(Io::Wakeup(after));
131    }
132
133    pub fn fetch(
134        &mut self,
135        peer: &mut Session,
136        rid: RepoId,
137        refs_at: Vec<RefsAt>,
138        timeout: time::Duration,
139        reader_limit: FetchPackSizeLimit,
140    ) {
141        let refs_at = (!refs_at.is_empty()).then_some(refs_at);
142
143        if let Some(refs_at) = &refs_at {
144            debug!(
145                target: "service",
146                "Fetch initiated for {rid} with {peer} ({} remote(s))..", refs_at.len()
147            );
148        } else {
149            debug!(target: "service", "Fetch initiated for {rid} with {peer} (all remotes)..");
150        }
151
152        self.io.push_back(Io::Fetch {
153            rid,
154            refs_at,
155            remote: peer.id,
156            timeout,
157            reader_limit,
158        });
159    }
160
161    /// Broadcast a message to a list of peers.
162    pub fn broadcast<'a>(
163        &mut self,
164        msg: impl Into<Message>,
165        peers: impl IntoIterator<Item = &'a Session>,
166    ) {
167        let msg = msg.into();
168        for peer in peers {
169            self.write(peer, msg.clone());
170        }
171    }
172
173    /// Relay a message to interested peers.
174    pub fn relay<'a>(&mut self, ann: Announcement, peers: impl IntoIterator<Item = &'a Session>) {
175        if let AnnouncementMessage::Refs(msg) = &ann.message {
176            let id = msg.rid;
177            let peers = peers.into_iter().filter(|p| {
178                if let Some(subscribe) = &p.subscribe {
179                    subscribe.filter.contains(&id)
180                } else {
181                    // If the peer did not send us a `subscribe` message, we don't
182                    // relay any messages to them.
183                    false
184                }
185            });
186            self.broadcast(ann, peers);
187        } else {
188            self.broadcast(ann, peers);
189        }
190    }
191
192    /// Number of items in outbox.
193    #[allow(clippy::len_without_is_empty)]
194    pub fn len(&self) -> usize {
195        self.io.len()
196    }
197
198    #[cfg(any(test, feature = "test"))]
199    pub fn queue(&mut self) -> &mut VecDeque<Io> {
200        &mut self.io
201    }
202}
203
204impl Iterator for Outbox {
205    type Item = Io;
206
207    fn next(&mut self) -> Option<Self::Item> {
208        self.io.pop_front()
209    }
210}