radicle_protocol/service/
io.rs

1use std::collections::VecDeque;
2use std::time;
3
4use localtime::LocalDuration;
5use log::*;
6use radicle::identity::RepoId;
7use radicle::node::config::FetchPackSizeLimit;
8use radicle::node::Address;
9use radicle::node::NodeId;
10use radicle::storage::refs::RefsAt;
11
12use crate::service::message::Message;
13use crate::service::session::Session;
14use crate::service::DisconnectReason;
15use crate::service::Link;
16
17use super::gossip;
18use super::message::{Announcement, AnnouncementMessage};
19
20/// I/O operation to execute at the network/wire level.
21#[derive(Debug)]
22pub enum Io {
23    /// There are some messages ready to be sent to a peer.
24    Write(NodeId, Vec<Message>),
25    /// Connect to a peer.
26    Connect(NodeId, Address),
27    /// Disconnect from a peer.
28    Disconnect(NodeId, DisconnectReason),
29    /// Fetch repository data from a peer.
30    Fetch {
31        /// Repo being fetched.
32        rid: RepoId,
33        /// Remote node being fetched from.
34        remote: NodeId,
35        /// If the node is fetching specific `rad/sigrefs`.
36        refs_at: Option<Vec<RefsAt>>,
37        /// Fetch timeout.
38        timeout: time::Duration,
39        /// Limit the number of bytes fetched.
40        reader_limit: FetchPackSizeLimit,
41    },
42    /// Ask for a wakeup in a specified amount of time.
43    Wakeup(LocalDuration),
44}
45
46/// Interface to the network.
47#[derive(Debug, Default)]
48pub struct Outbox {
49    /// Outgoing I/O queue.
50    io: VecDeque<Io>,
51}
52
53impl Outbox {
54    /// Connect to a peer.
55    pub fn connect(&mut self, id: NodeId, addr: Address) {
56        self.io.push_back(Io::Connect(id, addr));
57    }
58
59    /// Disconnect a peer.
60    pub fn disconnect(&mut self, id: NodeId, reason: DisconnectReason) {
61        self.io.push_back(Io::Disconnect(id, reason));
62    }
63
64    pub fn write(&mut self, remote: &Session, msg: Message) {
65        let level = match &msg {
66            Message::Ping(_) | Message::Pong { .. } => log::Level::Trace,
67            _ => log::Level::Debug,
68        };
69        msg.log(level, &remote.id, Link::Outbound);
70        trace!(target: "service", "Write {:?} to {}", &msg, remote);
71
72        self.io.push_back(Io::Write(remote.id, vec![msg]));
73    }
74
75    /// Announce something to a peer. This is meant for our own announcement messages.
76    pub fn announce<'a>(
77        &mut self,
78        ann: Announcement,
79        peers: impl Iterator<Item = &'a Session>,
80        gossip: &mut impl gossip::Store,
81    ) {
82        // Store our announcement so that it can be retrieved from us later, just like
83        // announcements we receive from peers.
84        if let Err(e) = gossip.announced(&ann.node, &ann) {
85            error!(target: "service", "Error updating our gossip store with announced message: {e}");
86        }
87
88        for peer in peers {
89            if let AnnouncementMessage::Refs(refs) = &ann.message {
90                if let Some(subscribe) = &peer.subscribe {
91                    if subscribe.filter.contains(&refs.rid) {
92                        self.write(peer, ann.clone().into());
93                    } else {
94                        debug!(
95                            target: "service",
96                            "Skipping refs announcement relay to {peer}: peer isn't subscribed to {}",
97                            refs.rid
98                        );
99                    }
100                } else {
101                    debug!(
102                        target: "service",
103                        "Skipping refs announcement relay to {peer}: peer didn't send a subscription filter"
104                    );
105                }
106            } else {
107                self.write(peer, ann.clone().into());
108            }
109        }
110    }
111
112    pub fn write_all(&mut self, remote: &Session, msgs: impl IntoIterator<Item = Message>) {
113        let msgs = msgs.into_iter().collect::<Vec<_>>();
114
115        for (ix, msg) in msgs.iter().enumerate() {
116            trace!(
117                target: "service",
118                "Write {:?} to {} ({}/{})",
119                msg,
120                remote,
121                ix + 1,
122                msgs.len()
123            );
124            msg.log(log::Level::Trace, &remote.id, Link::Outbound);
125        }
126        self.io.push_back(Io::Write(remote.id, msgs));
127    }
128
129    pub fn wakeup(&mut self, after: LocalDuration) {
130        self.io.push_back(Io::Wakeup(after));
131    }
132
133    pub fn fetch(
134        &mut self,
135        peer: &mut Session,
136        rid: RepoId,
137        refs_at: Vec<RefsAt>,
138        timeout: time::Duration,
139        reader_limit: FetchPackSizeLimit,
140    ) {
141        peer.fetching(rid);
142
143        let refs_at = (!refs_at.is_empty()).then_some(refs_at);
144
145        if let Some(refs_at) = &refs_at {
146            debug!(
147                target: "service",
148                "Fetch initiated for {rid} with {peer} ({} remote(s))..", refs_at.len()
149            );
150        } else {
151            debug!(target: "service", "Fetch initiated for {rid} with {peer} (all remotes)..");
152        }
153
154        self.io.push_back(Io::Fetch {
155            rid,
156            refs_at,
157            remote: peer.id,
158            timeout,
159            reader_limit,
160        });
161    }
162
163    /// Broadcast a message to a list of peers.
164    pub fn broadcast<'a>(
165        &mut self,
166        msg: impl Into<Message>,
167        peers: impl IntoIterator<Item = &'a Session>,
168    ) {
169        let msg = msg.into();
170        for peer in peers {
171            self.write(peer, msg.clone());
172        }
173    }
174
175    /// Relay a message to interested peers.
176    pub fn relay<'a>(&mut self, ann: Announcement, peers: impl IntoIterator<Item = &'a Session>) {
177        if let AnnouncementMessage::Refs(msg) = &ann.message {
178            let id = msg.rid;
179            let peers = peers.into_iter().filter(|p| {
180                if let Some(subscribe) = &p.subscribe {
181                    subscribe.filter.contains(&id)
182                } else {
183                    // If the peer did not send us a `subscribe` message, we don't
184                    // relay any messages to them.
185                    false
186                }
187            });
188            self.broadcast(ann, peers);
189        } else {
190            self.broadcast(ann, peers);
191        }
192    }
193
194    /// Number of items in outbox.
195    #[allow(clippy::len_without_is_empty)]
196    pub fn len(&self) -> usize {
197        self.io.len()
198    }
199
200    #[cfg(any(test, feature = "test"))]
201    pub fn queue(&mut self) -> &mut VecDeque<Io> {
202        &mut self.io
203    }
204}
205
206impl Iterator for Outbox {
207    type Item = Io;
208
209    fn next(&mut self) -> Option<Self::Item> {
210        self.io.pop_front()
211    }
212}