radicle_protocol/service/
io.rs1use std::collections::VecDeque;
2use std::time;
3
4use localtime::LocalDuration;
5use log::*;
6use radicle::identity::RepoId;
7use radicle::node::config::FetchPackSizeLimit;
8use radicle::node::Address;
9use radicle::node::NodeId;
10use radicle::storage::refs::RefsAt;
11
12use crate::service::message::Message;
13use crate::service::session::Session;
14use crate::service::DisconnectReason;
15use crate::service::Link;
16
17use super::gossip;
18use super::message::{Announcement, AnnouncementMessage};
19
20#[derive(Debug)]
22pub enum Io {
23 Write(NodeId, Vec<Message>),
25 Connect(NodeId, Address),
27 Disconnect(NodeId, DisconnectReason),
29 Fetch {
31 rid: RepoId,
33 remote: NodeId,
35 refs_at: Option<Vec<RefsAt>>,
37 timeout: time::Duration,
39 reader_limit: FetchPackSizeLimit,
41 },
42 Wakeup(LocalDuration),
44}
45
46#[derive(Debug, Default)]
48pub struct Outbox {
49 io: VecDeque<Io>,
51}
52
53impl Outbox {
54 pub fn connect(&mut self, id: NodeId, addr: Address) {
56 self.io.push_back(Io::Connect(id, addr));
57 }
58
59 pub fn disconnect(&mut self, id: NodeId, reason: DisconnectReason) {
61 self.io.push_back(Io::Disconnect(id, reason));
62 }
63
64 pub fn write(&mut self, remote: &Session, msg: Message) {
65 let level = match &msg {
66 Message::Ping(_) | Message::Pong { .. } => log::Level::Trace,
67 _ => log::Level::Debug,
68 };
69 msg.log(level, &remote.id, Link::Outbound);
70 trace!(target: "service", "Write {:?} to {}", &msg, remote);
71
72 self.io.push_back(Io::Write(remote.id, vec![msg]));
73 }
74
75 pub fn announce<'a>(
77 &mut self,
78 ann: Announcement,
79 peers: impl Iterator<Item = &'a Session>,
80 gossip: &mut impl gossip::Store,
81 ) {
82 if let Err(e) = gossip.announced(&ann.node, &ann) {
85 error!(target: "service", "Error updating our gossip store with announced message: {e}");
86 }
87
88 for peer in peers {
89 if let AnnouncementMessage::Refs(refs) = &ann.message {
90 if let Some(subscribe) = &peer.subscribe {
91 if subscribe.filter.contains(&refs.rid) {
92 self.write(peer, ann.clone().into());
93 } else {
94 debug!(
95 target: "service",
96 "Skipping refs announcement relay to {peer}: peer isn't subscribed to {}",
97 refs.rid
98 );
99 }
100 } else {
101 debug!(
102 target: "service",
103 "Skipping refs announcement relay to {peer}: peer didn't send a subscription filter"
104 );
105 }
106 } else {
107 self.write(peer, ann.clone().into());
108 }
109 }
110 }
111
112 pub fn write_all(&mut self, remote: &Session, msgs: impl IntoIterator<Item = Message>) {
113 let msgs = msgs.into_iter().collect::<Vec<_>>();
114
115 for (ix, msg) in msgs.iter().enumerate() {
116 trace!(
117 target: "service",
118 "Write {:?} to {} ({}/{})",
119 msg,
120 remote,
121 ix + 1,
122 msgs.len()
123 );
124 msg.log(log::Level::Trace, &remote.id, Link::Outbound);
125 }
126 self.io.push_back(Io::Write(remote.id, msgs));
127 }
128
129 pub fn wakeup(&mut self, after: LocalDuration) {
130 self.io.push_back(Io::Wakeup(after));
131 }
132
133 pub fn fetch(
134 &mut self,
135 peer: &mut Session,
136 rid: RepoId,
137 refs_at: Vec<RefsAt>,
138 timeout: time::Duration,
139 reader_limit: FetchPackSizeLimit,
140 ) {
141 peer.fetching(rid);
142
143 let refs_at = (!refs_at.is_empty()).then_some(refs_at);
144
145 if let Some(refs_at) = &refs_at {
146 debug!(
147 target: "service",
148 "Fetch initiated for {rid} with {peer} ({} remote(s))..", refs_at.len()
149 );
150 } else {
151 debug!(target: "service", "Fetch initiated for {rid} with {peer} (all remotes)..");
152 }
153
154 self.io.push_back(Io::Fetch {
155 rid,
156 refs_at,
157 remote: peer.id,
158 timeout,
159 reader_limit,
160 });
161 }
162
163 pub fn broadcast<'a>(
165 &mut self,
166 msg: impl Into<Message>,
167 peers: impl IntoIterator<Item = &'a Session>,
168 ) {
169 let msg = msg.into();
170 for peer in peers {
171 self.write(peer, msg.clone());
172 }
173 }
174
175 pub fn relay<'a>(&mut self, ann: Announcement, peers: impl IntoIterator<Item = &'a Session>) {
177 if let AnnouncementMessage::Refs(msg) = &ann.message {
178 let id = msg.rid;
179 let peers = peers.into_iter().filter(|p| {
180 if let Some(subscribe) = &p.subscribe {
181 subscribe.filter.contains(&id)
182 } else {
183 false
186 }
187 });
188 self.broadcast(ann, peers);
189 } else {
190 self.broadcast(ann, peers);
191 }
192 }
193
194 #[allow(clippy::len_without_is_empty)]
196 pub fn len(&self) -> usize {
197 self.io.len()
198 }
199
200 #[cfg(any(test, feature = "test"))]
201 pub fn queue(&mut self) -> &mut VecDeque<Io> {
202 &mut self.io
203 }
204}
205
206impl Iterator for Outbox {
207 type Item = Io;
208
209 fn next(&mut self) -> Option<Self::Item> {
210 self.io.pop_front()
211 }
212}