radicle_protocol/service/
io.rs1use std::collections::VecDeque;
2use std::time;
3
4use localtime::LocalDuration;
5use log::*;
6use radicle::identity::RepoId;
7use radicle::node::config::FetchPackSizeLimit;
8use radicle::node::Address;
9use radicle::node::NodeId;
10use radicle::storage::refs::RefsAt;
11
12use crate::service::message::Message;
13use crate::service::session::Session;
14use crate::service::DisconnectReason;
15use crate::service::Link;
16
17use super::gossip;
18use super::message::{Announcement, AnnouncementMessage};
19
20#[derive(Debug)]
22pub enum Io {
23 Write(NodeId, Vec<Message>),
25 Connect(NodeId, Address),
27 Disconnect(NodeId, DisconnectReason),
29 Fetch {
31 rid: RepoId,
33 remote: NodeId,
35 refs_at: Option<Vec<RefsAt>>,
37 timeout: time::Duration,
39 reader_limit: FetchPackSizeLimit,
41 },
42 Wakeup(LocalDuration),
44}
45
46#[derive(Debug, Default)]
48pub struct Outbox {
49 io: VecDeque<Io>,
51}
52
53impl Outbox {
54 pub fn connect(&mut self, id: NodeId, addr: Address) {
56 self.io.push_back(Io::Connect(id, addr));
57 }
58
59 pub fn disconnect(&mut self, id: NodeId, reason: DisconnectReason) {
61 self.io.push_back(Io::Disconnect(id, reason));
62 }
63
64 pub fn write(&mut self, remote: &Session, msg: Message) {
65 msg.log(log::Level::Debug, &remote.id, Link::Outbound);
66 trace!(target: "service", "Write {:?} to {}", &msg, remote);
67
68 self.io.push_back(Io::Write(remote.id, vec![msg]));
69 }
70
71 pub fn announce<'a>(
73 &mut self,
74 ann: Announcement,
75 peers: impl Iterator<Item = &'a Session>,
76 gossip: &mut impl gossip::Store,
77 ) {
78 if let Err(e) = gossip.announced(&ann.node, &ann) {
81 error!(target: "service", "Error updating our gossip store with announced message: {e}");
82 }
83
84 for peer in peers {
85 if let AnnouncementMessage::Refs(refs) = &ann.message {
86 if let Some(subscribe) = &peer.subscribe {
87 if subscribe.filter.contains(&refs.rid) {
88 self.write(peer, ann.clone().into());
89 } else {
90 debug!(
91 target: "service",
92 "Skipping refs announcement relay to {peer}: peer isn't subscribed to {}",
93 refs.rid
94 );
95 }
96 } else {
97 debug!(
98 target: "service",
99 "Skipping refs announcement relay to {peer}: peer didn't send a subscription filter"
100 );
101 }
102 } else {
103 self.write(peer, ann.clone().into());
104 }
105 }
106 }
107
108 pub fn write_all(&mut self, remote: &Session, msgs: impl IntoIterator<Item = Message>) {
109 let msgs = msgs.into_iter().collect::<Vec<_>>();
110
111 for (ix, msg) in msgs.iter().enumerate() {
112 trace!(
113 target: "service",
114 "Write {:?} to {} ({}/{})",
115 msg,
116 remote,
117 ix + 1,
118 msgs.len()
119 );
120 msg.log(log::Level::Trace, &remote.id, Link::Outbound);
121 }
122 self.io.push_back(Io::Write(remote.id, msgs));
123 }
124
125 pub fn wakeup(&mut self, after: LocalDuration) {
126 self.io.push_back(Io::Wakeup(after));
127 }
128
129 pub fn fetch(
130 &mut self,
131 peer: &mut Session,
132 rid: RepoId,
133 refs_at: Vec<RefsAt>,
134 timeout: time::Duration,
135 reader_limit: FetchPackSizeLimit,
136 ) {
137 peer.fetching(rid);
138
139 let refs_at = (!refs_at.is_empty()).then_some(refs_at);
140
141 if let Some(refs_at) = &refs_at {
142 debug!(
143 target: "service",
144 "Fetch initiated for {rid} with {peer} ({} remote(s))..", refs_at.len()
145 );
146 } else {
147 debug!(target: "service", "Fetch initiated for {rid} with {peer} (all remotes)..");
148 }
149
150 self.io.push_back(Io::Fetch {
151 rid,
152 refs_at,
153 remote: peer.id,
154 timeout,
155 reader_limit,
156 });
157 }
158
159 pub fn broadcast<'a>(
161 &mut self,
162 msg: impl Into<Message>,
163 peers: impl IntoIterator<Item = &'a Session>,
164 ) {
165 let msg = msg.into();
166 for peer in peers {
167 self.write(peer, msg.clone());
168 }
169 }
170
171 pub fn relay<'a>(&mut self, ann: Announcement, peers: impl IntoIterator<Item = &'a Session>) {
173 if let AnnouncementMessage::Refs(msg) = &ann.message {
174 let id = msg.rid;
175 let peers = peers.into_iter().filter(|p| {
176 if let Some(subscribe) = &p.subscribe {
177 subscribe.filter.contains(&id)
178 } else {
179 false
182 }
183 });
184 self.broadcast(ann, peers);
185 } else {
186 self.broadcast(ann, peers);
187 }
188 }
189
190 #[allow(clippy::len_without_is_empty)]
192 pub fn len(&self) -> usize {
193 self.io.len()
194 }
195
196 #[cfg(any(test, feature = "test"))]
197 pub fn queue(&mut self) -> &mut VecDeque<Io> {
198 &mut self.io
199 }
200}
201
202impl Iterator for Outbox {
203 type Item = Io;
204
205 fn next(&mut self) -> Option<Self::Item> {
206 self.io.pop_front()
207 }
208}