radicle_protocol/service/
io.rs1use std::collections::VecDeque;
2
3use localtime::LocalDuration;
4use log::*;
5use radicle::identity::RepoId;
6use radicle::node::Address;
7use radicle::node::NodeId;
8use radicle::node::config::FetchPackSizeLimit;
9use radicle::storage::refs::RefsAt;
10
11use crate::fetcher;
12use crate::service::DisconnectReason;
13use crate::service::Link;
14use crate::service::message::Message;
15use crate::service::session::Session;
16
17use super::gossip;
18use super::message::{Announcement, AnnouncementMessage};
19
20#[derive(Debug)]
22pub enum Io {
23 Write(NodeId, Vec<Message>),
25 Connect(NodeId, Address),
27 Disconnect(NodeId, DisconnectReason),
29 Fetch {
31 rid: RepoId,
33 remote: NodeId,
35 refs_at: Option<Vec<RefsAt>>,
37 reader_limit: FetchPackSizeLimit,
39 config: fetcher::FetchConfig,
42 },
43 Wakeup(LocalDuration),
45}
46
47#[derive(Debug, Default)]
49pub struct Outbox {
50 io: VecDeque<Io>,
52}
53
54impl Outbox {
55 pub fn connect(&mut self, id: NodeId, addr: Address) {
57 self.io.push_back(Io::Connect(id, addr));
58 }
59
60 pub fn disconnect(&mut self, id: NodeId, reason: DisconnectReason) {
62 self.io.push_back(Io::Disconnect(id, reason));
63 }
64
65 pub fn write(&mut self, remote: &Session, msg: Message) {
66 let level = match &msg {
67 Message::Ping(_) | Message::Pong { .. } => log::Level::Trace,
68 _ => log::Level::Debug,
69 };
70 msg.log(level, &remote.id, Link::Outbound);
71 trace!(target: "service", "Write {:?} to {}", &msg, remote);
72
73 self.io.push_back(Io::Write(remote.id, vec![msg]));
74 }
75
76 pub fn announce<'a>(
78 &mut self,
79 ann: Announcement,
80 peers: impl Iterator<Item = &'a Session>,
81 gossip: &mut impl gossip::Store,
82 ) {
83 if let Err(e) = gossip.announced(&ann.node, &ann) {
86 warn!(target: "service", "Failed to update gossip store with announced message: {e}");
87 }
88
89 for peer in peers {
90 if let AnnouncementMessage::Refs(refs) = &ann.message {
91 if let Some(subscribe) = &peer.subscribe {
92 if subscribe.filter.contains(&refs.rid) {
93 self.write(peer, ann.clone().into());
94 } else {
95 debug!(
96 target: "service",
97 "Skipping refs announcement relay to {peer}: peer isn't subscribed to {}",
98 refs.rid
99 );
100 }
101 } else {
102 debug!(
103 target: "service",
104 "Skipping refs announcement relay to {peer}: peer didn't send a subscription filter"
105 );
106 }
107 } else {
108 self.write(peer, ann.clone().into());
109 }
110 }
111 }
112
113 pub fn write_all(&mut self, remote: &Session, msgs: impl IntoIterator<Item = Message>) {
114 let msgs = msgs.into_iter().collect::<Vec<_>>();
115
116 for (ix, msg) in msgs.iter().enumerate() {
117 trace!(
118 target: "service",
119 "Write {:?} to {} ({}/{})",
120 msg,
121 remote,
122 ix + 1,
123 msgs.len()
124 );
125 msg.log(log::Level::Trace, &remote.id, Link::Outbound);
126 }
127 self.io.push_back(Io::Write(remote.id, msgs));
128 }
129
130 pub fn wakeup(&mut self, after: LocalDuration) {
131 self.io.push_back(Io::Wakeup(after));
132 }
133
134 pub fn fetch(
135 &mut self,
136 peer: &mut Session,
137 rid: RepoId,
138 refs_at: Vec<RefsAt>,
139 reader_limit: FetchPackSizeLimit,
140 config: fetcher::FetchConfig,
141 ) {
142 let refs_at = (!refs_at.is_empty()).then_some(refs_at);
143
144 if let Some(refs_at) = &refs_at {
145 debug!(
146 target: "service",
147 "Fetch initiated for {rid} with {peer} ({} remote(s))..", refs_at.len()
148 );
149 } else {
150 debug!(target: "service", "Fetch initiated for {rid} with {peer} (all remotes)..");
151 }
152
153 self.io.push_back(Io::Fetch {
154 rid,
155 refs_at,
156 remote: peer.id,
157 reader_limit,
158 config,
159 });
160 }
161
162 pub fn broadcast<'a>(
164 &mut self,
165 msg: impl Into<Message>,
166 peers: impl IntoIterator<Item = &'a Session>,
167 ) {
168 let msg = msg.into();
169 for peer in peers {
170 self.write(peer, msg.clone());
171 }
172 }
173
174 pub fn relay<'a>(&mut self, ann: Announcement, peers: impl IntoIterator<Item = &'a Session>) {
176 if let AnnouncementMessage::Refs(msg) = &ann.message {
177 let id = msg.rid;
178 let peers = peers.into_iter().filter(|p| {
179 if let Some(subscribe) = &p.subscribe {
180 subscribe.filter.contains(&id)
181 } else {
182 false
185 }
186 });
187 self.broadcast(ann, peers);
188 } else {
189 self.broadcast(ann, peers);
190 }
191 }
192
193 #[allow(clippy::len_without_is_empty)]
195 pub fn len(&self) -> usize {
196 self.io.len()
197 }
198
199 #[cfg(any(test, feature = "test"))]
200 pub fn queue(&mut self) -> &mut VecDeque<Io> {
201 &mut self.io
202 }
203}
204
205impl Iterator for Outbox {
206 type Item = Io;
207
208 fn next(&mut self) -> Option<Self::Item> {
209 self.io.pop_front()
210 }
211}