radicle_node/service/
io.rs1use std::collections::VecDeque;
2use std::time;
3
4use log::*;
5use radicle::node::config::FetchPackSizeLimit;
6use radicle::storage::refs::RefsAt;
7
8use crate::prelude::*;
9use crate::service::session::Session;
10use crate::service::Link;
11
12use super::gossip;
13use super::message::{Announcement, AnnouncementMessage};
14
15#[derive(Debug)]
17pub enum Io {
18 Write(NodeId, Vec<Message>),
20 Connect(NodeId, Address),
22 Disconnect(NodeId, DisconnectReason),
24 Fetch {
26 rid: RepoId,
28 remote: NodeId,
30 refs_at: Option<Vec<RefsAt>>,
32 timeout: time::Duration,
34 reader_limit: FetchPackSizeLimit,
36 },
37 Wakeup(LocalDuration),
39}
40
41#[derive(Debug, Default)]
43pub struct Outbox {
44 io: VecDeque<Io>,
46}
47
48impl Outbox {
49 pub fn connect(&mut self, id: NodeId, addr: Address) {
51 self.io.push_back(Io::Connect(id, addr));
52 }
53
54 pub fn disconnect(&mut self, id: NodeId, reason: DisconnectReason) {
56 self.io.push_back(Io::Disconnect(id, reason));
57 }
58
59 pub fn write(&mut self, remote: &Session, msg: Message) {
60 msg.log(log::Level::Debug, &remote.id, Link::Outbound);
61 trace!(target: "service", "Write {:?} to {}", &msg, remote);
62
63 self.io.push_back(Io::Write(remote.id, vec![msg]));
64 }
65
66 pub fn announce<'a>(
68 &mut self,
69 ann: Announcement,
70 peers: impl Iterator<Item = &'a Session>,
71 gossip: &mut impl gossip::Store,
72 ) {
73 if let Err(e) = gossip.announced(&ann.node, &ann) {
76 error!(target: "service", "Error updating our gossip store with announced message: {e}");
77 }
78
79 for peer in peers {
80 if let AnnouncementMessage::Refs(refs) = &ann.message {
81 if let Some(subscribe) = &peer.subscribe {
82 if subscribe.filter.contains(&refs.rid) {
83 self.write(peer, ann.clone().into());
84 } else {
85 debug!(
86 target: "service",
87 "Skipping refs announcement relay to {peer}: peer isn't subscribed to {}",
88 refs.rid
89 );
90 }
91 } else {
92 debug!(
93 target: "service",
94 "Skipping refs announcement relay to {peer}: peer didn't send a subscription filter"
95 );
96 }
97 } else {
98 self.write(peer, ann.clone().into());
99 }
100 }
101 }
102
103 pub fn write_all(&mut self, remote: &Session, msgs: impl IntoIterator<Item = Message>) {
104 let msgs = msgs.into_iter().collect::<Vec<_>>();
105
106 for (ix, msg) in msgs.iter().enumerate() {
107 trace!(
108 target: "service",
109 "Write {:?} to {} ({}/{})",
110 msg,
111 remote,
112 ix + 1,
113 msgs.len()
114 );
115 msg.log(log::Level::Trace, &remote.id, Link::Outbound);
116 }
117 self.io.push_back(Io::Write(remote.id, msgs));
118 }
119
120 pub fn wakeup(&mut self, after: LocalDuration) {
121 self.io.push_back(Io::Wakeup(after));
122 }
123
124 pub fn fetch(
125 &mut self,
126 peer: &mut Session,
127 rid: RepoId,
128 refs_at: Vec<RefsAt>,
129 timeout: time::Duration,
130 reader_limit: FetchPackSizeLimit,
131 ) {
132 peer.fetching(rid);
133
134 let refs_at = (!refs_at.is_empty()).then_some(refs_at);
135
136 if let Some(refs_at) = &refs_at {
137 debug!(
138 target: "service",
139 "Fetch initiated for {rid} with {peer} ({} remote(s))..", refs_at.len()
140 );
141 } else {
142 debug!(target: "service", "Fetch initiated for {rid} with {peer} (all remotes)..");
143 }
144
145 self.io.push_back(Io::Fetch {
146 rid,
147 refs_at,
148 remote: peer.id,
149 timeout,
150 reader_limit,
151 });
152 }
153
154 pub fn broadcast<'a>(
156 &mut self,
157 msg: impl Into<Message>,
158 peers: impl IntoIterator<Item = &'a Session>,
159 ) {
160 let msg = msg.into();
161 for peer in peers {
162 self.write(peer, msg.clone());
163 }
164 }
165
166 pub fn relay<'a>(&mut self, ann: Announcement, peers: impl IntoIterator<Item = &'a Session>) {
168 if let AnnouncementMessage::Refs(msg) = &ann.message {
169 let id = msg.rid;
170 let peers = peers.into_iter().filter(|p| {
171 if let Some(subscribe) = &p.subscribe {
172 subscribe.filter.contains(&id)
173 } else {
174 false
177 }
178 });
179 self.broadcast(ann, peers);
180 } else {
181 self.broadcast(ann, peers);
182 }
183 }
184
185 #[allow(clippy::len_without_is_empty)]
187 pub fn len(&self) -> usize {
188 self.io.len()
189 }
190
191 #[cfg(any(test, feature = "test"))]
192 pub(crate) fn queue(&mut self) -> &mut VecDeque<Io> {
193 &mut self.io
194 }
195}
196
197impl Iterator for Outbox {
198 type Item = Io;
199
200 fn next(&mut self) -> Option<Self::Item> {
201 self.io.pop_front()
202 }
203}