radicle_protocol/service/
io.rs

1use std::collections::VecDeque;
2use std::time;
3
4use localtime::LocalDuration;
5use log::*;
6use radicle::identity::RepoId;
7use radicle::node::config::FetchPackSizeLimit;
8use radicle::node::Address;
9use radicle::node::NodeId;
10use radicle::storage::refs::RefsAt;
11
12use crate::service::message::Message;
13use crate::service::session::Session;
14use crate::service::DisconnectReason;
15use crate::service::Link;
16
17use super::gossip;
18use super::message::{Announcement, AnnouncementMessage};
19
20/// I/O operation to execute at the network/wire level.
21#[derive(Debug)]
22pub enum Io {
23    /// There are some messages ready to be sent to a peer.
24    Write(NodeId, Vec<Message>),
25    /// Connect to a peer.
26    Connect(NodeId, Address),
27    /// Disconnect from a peer.
28    Disconnect(NodeId, DisconnectReason),
29    /// Fetch repository data from a peer.
30    Fetch {
31        /// Repo being fetched.
32        rid: RepoId,
33        /// Remote node being fetched from.
34        remote: NodeId,
35        /// If the node is fetching specific `rad/sigrefs`.
36        refs_at: Option<Vec<RefsAt>>,
37        /// Fetch timeout.
38        timeout: time::Duration,
39        /// Limit the number of bytes fetched.
40        reader_limit: FetchPackSizeLimit,
41    },
42    /// Ask for a wakeup in a specified amount of time.
43    Wakeup(LocalDuration),
44}
45
46/// Interface to the network.
47#[derive(Debug, Default)]
48pub struct Outbox {
49    /// Outgoing I/O queue.
50    io: VecDeque<Io>,
51}
52
53impl Outbox {
54    /// Connect to a peer.
55    pub fn connect(&mut self, id: NodeId, addr: Address) {
56        self.io.push_back(Io::Connect(id, addr));
57    }
58
59    /// Disconnect a peer.
60    pub fn disconnect(&mut self, id: NodeId, reason: DisconnectReason) {
61        self.io.push_back(Io::Disconnect(id, reason));
62    }
63
64    pub fn write(&mut self, remote: &Session, msg: Message) {
65        msg.log(log::Level::Debug, &remote.id, Link::Outbound);
66        trace!(target: "service", "Write {:?} to {}", &msg, remote);
67
68        self.io.push_back(Io::Write(remote.id, vec![msg]));
69    }
70
71    /// Announce something to a peer. This is meant for our own announcement messages.
72    pub fn announce<'a>(
73        &mut self,
74        ann: Announcement,
75        peers: impl Iterator<Item = &'a Session>,
76        gossip: &mut impl gossip::Store,
77    ) {
78        // Store our announcement so that it can be retrieved from us later, just like
79        // announcements we receive from peers.
80        if let Err(e) = gossip.announced(&ann.node, &ann) {
81            error!(target: "service", "Error updating our gossip store with announced message: {e}");
82        }
83
84        for peer in peers {
85            if let AnnouncementMessage::Refs(refs) = &ann.message {
86                if let Some(subscribe) = &peer.subscribe {
87                    if subscribe.filter.contains(&refs.rid) {
88                        self.write(peer, ann.clone().into());
89                    } else {
90                        debug!(
91                            target: "service",
92                            "Skipping refs announcement relay to {peer}: peer isn't subscribed to {}",
93                            refs.rid
94                        );
95                    }
96                } else {
97                    debug!(
98                        target: "service",
99                        "Skipping refs announcement relay to {peer}: peer didn't send a subscription filter"
100                    );
101                }
102            } else {
103                self.write(peer, ann.clone().into());
104            }
105        }
106    }
107
108    pub fn write_all(&mut self, remote: &Session, msgs: impl IntoIterator<Item = Message>) {
109        let msgs = msgs.into_iter().collect::<Vec<_>>();
110
111        for (ix, msg) in msgs.iter().enumerate() {
112            trace!(
113                target: "service",
114                "Write {:?} to {} ({}/{})",
115                msg,
116                remote,
117                ix + 1,
118                msgs.len()
119            );
120            msg.log(log::Level::Trace, &remote.id, Link::Outbound);
121        }
122        self.io.push_back(Io::Write(remote.id, msgs));
123    }
124
125    pub fn wakeup(&mut self, after: LocalDuration) {
126        self.io.push_back(Io::Wakeup(after));
127    }
128
129    pub fn fetch(
130        &mut self,
131        peer: &mut Session,
132        rid: RepoId,
133        refs_at: Vec<RefsAt>,
134        timeout: time::Duration,
135        reader_limit: FetchPackSizeLimit,
136    ) {
137        peer.fetching(rid);
138
139        let refs_at = (!refs_at.is_empty()).then_some(refs_at);
140
141        if let Some(refs_at) = &refs_at {
142            debug!(
143                target: "service",
144                "Fetch initiated for {rid} with {peer} ({} remote(s))..", refs_at.len()
145            );
146        } else {
147            debug!(target: "service", "Fetch initiated for {rid} with {peer} (all remotes)..");
148        }
149
150        self.io.push_back(Io::Fetch {
151            rid,
152            refs_at,
153            remote: peer.id,
154            timeout,
155            reader_limit,
156        });
157    }
158
159    /// Broadcast a message to a list of peers.
160    pub fn broadcast<'a>(
161        &mut self,
162        msg: impl Into<Message>,
163        peers: impl IntoIterator<Item = &'a Session>,
164    ) {
165        let msg = msg.into();
166        for peer in peers {
167            self.write(peer, msg.clone());
168        }
169    }
170
171    /// Relay a message to interested peers.
172    pub fn relay<'a>(&mut self, ann: Announcement, peers: impl IntoIterator<Item = &'a Session>) {
173        if let AnnouncementMessage::Refs(msg) = &ann.message {
174            let id = msg.rid;
175            let peers = peers.into_iter().filter(|p| {
176                if let Some(subscribe) = &p.subscribe {
177                    subscribe.filter.contains(&id)
178                } else {
179                    // If the peer did not send us a `subscribe` message, we don't
180                    // relay any messages to them.
181                    false
182                }
183            });
184            self.broadcast(ann, peers);
185        } else {
186            self.broadcast(ann, peers);
187        }
188    }
189
190    /// Number of items in outbox.
191    #[allow(clippy::len_without_is_empty)]
192    pub fn len(&self) -> usize {
193        self.io.len()
194    }
195
196    #[cfg(any(test, feature = "test"))]
197    pub fn queue(&mut self) -> &mut VecDeque<Io> {
198        &mut self.io
199    }
200}
201
202impl Iterator for Outbox {
203    type Item = Io;
204
205    fn next(&mut self) -> Option<Self::Item> {
206        self.io.pop_front()
207    }
208}