radicle_node/service/
io.rs

1use std::collections::VecDeque;
2use std::time;
3
4use log::*;
5use radicle::node::config::FetchPackSizeLimit;
6use radicle::storage::refs::RefsAt;
7
8use crate::prelude::*;
9use crate::service::session::Session;
10use crate::service::Link;
11
12use super::gossip;
13use super::message::{Announcement, AnnouncementMessage};
14
15/// I/O operation to execute at the network/wire level.
16#[derive(Debug)]
17pub enum Io {
18    /// There are some messages ready to be sent to a peer.
19    Write(NodeId, Vec<Message>),
20    /// Connect to a peer.
21    Connect(NodeId, Address),
22    /// Disconnect from a peer.
23    Disconnect(NodeId, DisconnectReason),
24    /// Fetch repository data from a peer.
25    Fetch {
26        /// Repo being fetched.
27        rid: RepoId,
28        /// Remote node being fetched from.
29        remote: NodeId,
30        /// If the node is fetching specific `rad/sigrefs`.
31        refs_at: Option<Vec<RefsAt>>,
32        /// Fetch timeout.
33        timeout: time::Duration,
34        /// Limit the number of bytes fetched.
35        reader_limit: FetchPackSizeLimit,
36    },
37    /// Ask for a wakeup in a specified amount of time.
38    Wakeup(LocalDuration),
39}
40
41/// Interface to the network.
42#[derive(Debug, Default)]
43pub struct Outbox {
44    /// Outgoing I/O queue.
45    io: VecDeque<Io>,
46}
47
48impl Outbox {
49    /// Connect to a peer.
50    pub fn connect(&mut self, id: NodeId, addr: Address) {
51        self.io.push_back(Io::Connect(id, addr));
52    }
53
54    /// Disconnect a peer.
55    pub fn disconnect(&mut self, id: NodeId, reason: DisconnectReason) {
56        self.io.push_back(Io::Disconnect(id, reason));
57    }
58
59    pub fn write(&mut self, remote: &Session, msg: Message) {
60        msg.log(log::Level::Debug, &remote.id, Link::Outbound);
61        trace!(target: "service", "Write {:?} to {}", &msg, remote);
62
63        self.io.push_back(Io::Write(remote.id, vec![msg]));
64    }
65
66    /// Announce something to a peer. This is meant for our own announcement messages.
67    pub fn announce<'a>(
68        &mut self,
69        ann: Announcement,
70        peers: impl Iterator<Item = &'a Session>,
71        gossip: &mut impl gossip::Store,
72    ) {
73        // Store our announcement so that it can be retrieved from us later, just like
74        // announcements we receive from peers.
75        if let Err(e) = gossip.announced(&ann.node, &ann) {
76            error!(target: "service", "Error updating our gossip store with announced message: {e}");
77        }
78
79        for peer in peers {
80            if let AnnouncementMessage::Refs(refs) = &ann.message {
81                if let Some(subscribe) = &peer.subscribe {
82                    if subscribe.filter.contains(&refs.rid) {
83                        self.write(peer, ann.clone().into());
84                    } else {
85                        debug!(
86                            target: "service",
87                            "Skipping refs announcement relay to {peer}: peer isn't subscribed to {}",
88                            refs.rid
89                        );
90                    }
91                } else {
92                    debug!(
93                        target: "service",
94                        "Skipping refs announcement relay to {peer}: peer didn't send a subscription filter"
95                    );
96                }
97            } else {
98                self.write(peer, ann.clone().into());
99            }
100        }
101    }
102
103    pub fn write_all(&mut self, remote: &Session, msgs: impl IntoIterator<Item = Message>) {
104        let msgs = msgs.into_iter().collect::<Vec<_>>();
105
106        for (ix, msg) in msgs.iter().enumerate() {
107            trace!(
108                target: "service",
109                "Write {:?} to {} ({}/{})",
110                msg,
111                remote,
112                ix + 1,
113                msgs.len()
114            );
115            msg.log(log::Level::Trace, &remote.id, Link::Outbound);
116        }
117        self.io.push_back(Io::Write(remote.id, msgs));
118    }
119
120    pub fn wakeup(&mut self, after: LocalDuration) {
121        self.io.push_back(Io::Wakeup(after));
122    }
123
124    pub fn fetch(
125        &mut self,
126        peer: &mut Session,
127        rid: RepoId,
128        refs_at: Vec<RefsAt>,
129        timeout: time::Duration,
130        reader_limit: FetchPackSizeLimit,
131    ) {
132        peer.fetching(rid);
133
134        let refs_at = (!refs_at.is_empty()).then_some(refs_at);
135
136        if let Some(refs_at) = &refs_at {
137            debug!(
138                target: "service",
139                "Fetch initiated for {rid} with {peer} ({} remote(s))..", refs_at.len()
140            );
141        } else {
142            debug!(target: "service", "Fetch initiated for {rid} with {peer} (all remotes)..");
143        }
144
145        self.io.push_back(Io::Fetch {
146            rid,
147            refs_at,
148            remote: peer.id,
149            timeout,
150            reader_limit,
151        });
152    }
153
154    /// Broadcast a message to a list of peers.
155    pub fn broadcast<'a>(
156        &mut self,
157        msg: impl Into<Message>,
158        peers: impl IntoIterator<Item = &'a Session>,
159    ) {
160        let msg = msg.into();
161        for peer in peers {
162            self.write(peer, msg.clone());
163        }
164    }
165
166    /// Relay a message to interested peers.
167    pub fn relay<'a>(&mut self, ann: Announcement, peers: impl IntoIterator<Item = &'a Session>) {
168        if let AnnouncementMessage::Refs(msg) = &ann.message {
169            let id = msg.rid;
170            let peers = peers.into_iter().filter(|p| {
171                if let Some(subscribe) = &p.subscribe {
172                    subscribe.filter.contains(&id)
173                } else {
174                    // If the peer did not send us a `subscribe` message, we don't
175                    // relay any messages to them.
176                    false
177                }
178            });
179            self.broadcast(ann, peers);
180        } else {
181            self.broadcast(ann, peers);
182        }
183    }
184
185    /// Number of items in outbox.
186    #[allow(clippy::len_without_is_empty)]
187    pub fn len(&self) -> usize {
188        self.io.len()
189    }
190
191    #[cfg(any(test, feature = "test"))]
192    pub(crate) fn queue(&mut self) -> &mut VecDeque<Io> {
193        &mut self.io
194    }
195}
196
197impl Iterator for Outbox {
198    type Item = Io;
199
200    fn next(&mut self) -> Option<Self::Item> {
201        self.io.pop_front()
202    }
203}