radicle/node/events.rs
1//! Events for `upload-pack` processes.
2pub mod upload_pack;
3pub use upload_pack::UploadPack;
4
5use std::ops::Deref;
6use std::sync::Arc;
7use std::sync::Mutex;
8use std::time;
9
10use crossbeam_channel as chan;
11
12use crate::git::fmt::Qualified;
13use crate::git::Oid;
14use crate::node;
15use crate::prelude::*;
16use crate::storage::{refs, RefUpdate};
17
18/// Maximum unconsumed events allowed per subscription.
19pub const MAX_PENDING_EVENTS: usize = 8192;
20
21/// A service event.
22///
23/// The node emits events of this type to its control socket for other
24/// programs to consume.
25#[non_exhaustive]
26#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
27#[serde(rename_all = "camelCase", tag = "type")]
28pub enum Event {
29 /// The node has received changes to Git references in a
30 /// repository stored on the node, from another node.
31 RefsFetched {
32 /// The node identifier of the other node.
33 remote: NodeId,
34 /// The identifier of the repository in question.
35 rid: RepoId,
36 /// The list of Git references that were updated.
37 updated: Vec<RefUpdate>,
38 },
39 /// The node has sent its list of Git references to another node
40 /// and the other has fetched the updated references.
41 RefsSynced {
42 /// The node identifier of the other node.
43 remote: NodeId,
44 /// The identifier of the repository in question.
45 rid: RepoId,
46 /// The `rad/sigrefs` reference that was fetched.
47 at: Oid,
48 },
49 /// The node has discovered a repository on new node on the
50 /// Radicle network.
51 SeedDiscovered {
52 /// The identifier of the repository in question.
53 rid: RepoId,
54 /// The node identifier of the other node.
55 nid: NodeId,
56 },
57 /// The node has dropped a repository on a node from its list of
58 /// known repositories and nodes.
59 SeedDropped {
60 /// The identifier of the repository in question.
61 rid: RepoId,
62 /// The node identifier of the other node.
63 nid: NodeId,
64 },
65 /// The node has connected directly to another node.
66 PeerConnected {
67 /// The node identifier of the other node.
68 nid: NodeId,
69 },
70 /// The node has terminated its direct connection to another node.
71 PeerDisconnected {
72 /// The node identifier of the other node.
73 nid: NodeId,
74 /// The reason why the connection was terminated.
75 reason: String,
76 },
77 /// The local node has received changes to Git references from its
78 /// local user. In other words, the local user has pushed to the
79 /// node, updated COBs, or otherwise updated refs in their local node.
80 LocalRefsAnnounced {
81 /// The identifier of the repository in question.
82 rid: RepoId,
83 /// List of changed Git references for the repository.
84 refs: refs::RefsAt,
85 /// When were the new references received? In other words,
86 /// when did the user run `git push`?
87 timestamp: Timestamp,
88 },
89 /// The node has received a message with a list of repositories on
90 /// another node on the network.
91 InventoryAnnounced {
92 /// The node identifier of the other node.
93 nid: NodeId,
94 /// List of repositories sent.
95 inventory: Vec<RepoId>,
96 /// When was the list sent?
97 timestamp: Timestamp,
98 },
99 /// The node has received a message about new signed Git
100 /// references ("sigrefs") for a repository on another node on the
101 /// network.
102 RefsAnnounced {
103 /// The node identifier of the other node.
104 nid: NodeId,
105 /// The identifier of the repository in question.
106 rid: RepoId,
107 /// List of Git references for the repository.
108 refs: Vec<refs::RefsAt>,
109 /// When was the list sent?
110 timestamp: Timestamp,
111 },
112 /// The node received a message about a new node on the network.
113 NodeAnnounced {
114 /// The node identifier of the other node.
115 nid: NodeId,
116 /// Alias for the other node.
117 alias: Alias,
118 /// When was the announcement sent?
119 timestamp: Timestamp,
120 /// What features did the node advertise to the other node.
121 features: node::Features,
122 /// What of its addresses did the node tell the other node about?
123 addresses: Vec<node::Address>,
124 },
125 /// The node has uploaded a Git pack file to another node.
126 UploadPack(upload_pack::UploadPack),
127 /// A canonical reference was updated after a fetch.
128 CanonicalRefUpdated {
129 /// The repository the canonical reference was updated for.
130 rid: RepoId,
131 /// The reference name of the canonical reference update.
132 refname: Qualified<'static>,
133 /// The new target of the reference, after the update.
134 target: Oid,
135 },
136}
137
138impl From<upload_pack::UploadPack> for Event {
139 fn from(value: upload_pack::UploadPack) -> Self {
140 Self::UploadPack(value)
141 }
142}
143
144/// Events feed.
145pub struct Events(chan::Receiver<Event>);
146
147impl IntoIterator for Events {
148 type Item = Event;
149 type IntoIter = chan::IntoIter<Event>;
150
151 fn into_iter(self) -> Self::IntoIter {
152 self.0.into_iter()
153 }
154}
155
156impl From<chan::Receiver<Event>> for Events {
157 fn from(value: chan::Receiver<Event>) -> Self {
158 Self(value)
159 }
160}
161
162impl Deref for Events {
163 type Target = chan::Receiver<Event>;
164
165 fn deref(&self) -> &Self::Target {
166 &self.0
167 }
168}
169
170impl Events {
171 /// Listen for events, and wait for the given predicate to return something,
172 /// or timeout if the specified amount of time has elapsed.
173 pub fn wait<F, T>(&self, mut f: F, timeout: time::Duration) -> Result<T, chan::RecvTimeoutError>
174 where
175 F: FnMut(&Event) -> Option<T>,
176 {
177 let start = time::Instant::now();
178
179 loop {
180 if let Some(timeout) = timeout.checked_sub(start.elapsed()) {
181 match self.recv_timeout(timeout) {
182 Ok(event) => {
183 if let Some(output) = f(&event) {
184 return Ok(output);
185 }
186 }
187 Err(err @ chan::RecvTimeoutError::Disconnected) => {
188 return Err(err);
189 }
190 Err(chan::RecvTimeoutError::Timeout) => {
191 // Keep trying until our timeout reaches zero.
192 continue;
193 }
194 }
195 } else {
196 return Err(chan::RecvTimeoutError::Timeout);
197 }
198 }
199 }
200}
201
202/// Publishes events to subscribers.
203#[derive(Debug, Clone)]
204pub struct Emitter<T> {
205 subscribers: Arc<Mutex<Vec<chan::Sender<T>>>>,
206}
207
208impl<T> Default for Emitter<T> {
209 fn default() -> Emitter<T> {
210 Emitter {
211 subscribers: Default::default(),
212 }
213 }
214}
215
216impl<T: Clone> Emitter<T> {
217 /// Emit event to subscribers and drop those who can't receive it.
218 /// Nb. subscribers are also dropped if their channel is full.
219 pub fn emit(&self, event: T) {
220 // SAFETY: We deliberately propagate panics from other threads holding the lock.
221 #[allow(clippy::unwrap_used)]
222 self.subscribers
223 .lock()
224 .unwrap()
225 .retain(|s| s.try_send(event.clone()).is_ok());
226 }
227
228 /// Emit a batch of events to subscribers and drop those who can't receive
229 /// them.
230 /// N.b. subscribers are also dropped if their channel is full.
231 pub fn emit_all(&self, events: impl IntoIterator<Item = T>) {
232 // SAFETY: We deliberately propagate panics from other threads holding the lock.
233 #[allow(clippy::unwrap_used)]
234 let mut subscribers = self.subscribers.lock().unwrap();
235 for event in events {
236 subscribers.retain(|s| s.try_send(event.clone()).is_ok());
237 }
238 }
239
240 /// Subscribe to events stream.
241 pub fn subscribe(&self) -> chan::Receiver<T> {
242 let (sender, receiver) = chan::bounded(MAX_PENDING_EVENTS);
243 // SAFETY: We deliberately propagate panics from other threads holding the lock.
244 #[allow(clippy::unwrap_used)]
245 let mut subs = self.subscribers.lock().unwrap();
246 subs.push(sender);
247
248 receiver
249 }
250
251 /// Number of subscribers.
252 pub fn subscriptions(&self) -> usize {
253 // SAFETY: We deliberately propagate panics from other threads holding the lock.
254 #[allow(clippy::unwrap_used)]
255 self.subscribers.lock().unwrap().len()
256 }
257
258 /// Number of messages that have not yet been received.
259 pub fn pending(&self) -> usize {
260 // SAFETY: We deliberately propagate panics from other threads holding the lock.
261 #[allow(clippy::unwrap_used)]
262 self.subscribers
263 .lock()
264 .unwrap()
265 .iter()
266 .map(|ch| ch.len())
267 .sum()
268 }
269}