Skip to main content

radicle/node/
events.rs

1//! Events for `upload-pack` processes.
2pub mod upload_pack;
3pub use upload_pack::UploadPack;
4
5use std::ops::Deref;
6use std::sync::Arc;
7use std::sync::Mutex;
8use std::time;
9
10use crossbeam_channel as chan;
11
12use crate::git::fmt::Qualified;
13use crate::git::Oid;
14use crate::node;
15use crate::prelude::*;
16use crate::storage::{refs, RefUpdate};
17
18/// Maximum unconsumed events allowed per subscription.
19pub const MAX_PENDING_EVENTS: usize = 8192;
20
21/// A service event.
22///
23/// The node emits events of this type to its control socket for other
24/// programs to consume.
25#[non_exhaustive]
26#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
27#[serde(rename_all = "camelCase", tag = "type")]
28pub enum Event {
29    /// The node has received changes to Git references in a
30    /// repository stored on the node, from another node.
31    RefsFetched {
32        /// The node identifier of the other node.
33        remote: NodeId,
34        /// The identifier of the repository in question.
35        rid: RepoId,
36        /// The list of Git references that were updated.
37        updated: Vec<RefUpdate>,
38    },
39    /// The node has sent its list of Git references to another node
40    /// and the other has fetched the updated references.
41    RefsSynced {
42        /// The node identifier of the other node.
43        remote: NodeId,
44        /// The identifier of the repository in question.
45        rid: RepoId,
46        /// The `rad/sigrefs` reference that was fetched.
47        at: Oid,
48    },
49    /// The node has discovered a repository on new node on the
50    /// Radicle network.
51    SeedDiscovered {
52        /// The identifier of the repository in question.
53        rid: RepoId,
54        /// The node identifier of the other node.
55        nid: NodeId,
56    },
57    /// The node has dropped a repository on a node from its list of
58    /// known repositories and nodes.
59    SeedDropped {
60        /// The identifier of the repository in question.
61        rid: RepoId,
62        /// The node identifier of the other node.
63        nid: NodeId,
64    },
65    /// The node has connected directly to another node.
66    PeerConnected {
67        /// The node identifier of the other node.
68        nid: NodeId,
69    },
70    /// The node has terminated its direct connection to another node.
71    PeerDisconnected {
72        /// The node identifier of the other node.
73        nid: NodeId,
74        /// The reason why the connection was terminated.
75        reason: String,
76    },
77    /// The local node has received changes to Git references from its
78    /// local user. In other words, the local user has pushed to the
79    /// node, updated COBs, or otherwise updated refs in their local node.
80    LocalRefsAnnounced {
81        /// The identifier of the repository in question.
82        rid: RepoId,
83        /// List of changed Git references for the repository.
84        refs: refs::RefsAt,
85        /// When were the new references received? In other words,
86        /// when did the user run `git push`?
87        timestamp: Timestamp,
88    },
89    /// The node has received a message with a list of repositories on
90    /// another node on the network.
91    InventoryAnnounced {
92        /// The node identifier of the other node.
93        nid: NodeId,
94        /// List of repositories sent.
95        inventory: Vec<RepoId>,
96        /// When was the list sent?
97        timestamp: Timestamp,
98    },
99    /// The node has received a message about new signed Git
100    /// references ("sigrefs") for a repository on another node on the
101    /// network.
102    RefsAnnounced {
103        /// The node identifier of the other node.
104        nid: NodeId,
105        /// The identifier of the repository in question.
106        rid: RepoId,
107        /// List of Git references for the repository.
108        refs: Vec<refs::RefsAt>,
109        /// When was the list sent?
110        timestamp: Timestamp,
111    },
112    /// The node received a message about a new node on the network.
113    NodeAnnounced {
114        /// The node identifier of the other node.
115        nid: NodeId,
116        /// Alias for the other node.
117        alias: Alias,
118        /// When was the announcement sent?
119        timestamp: Timestamp,
120        /// What features did the node advertise to the other node.
121        features: node::Features,
122        /// What of its addresses did the node tell the other node about?
123        addresses: Vec<node::Address>,
124    },
125    /// The node has uploaded a Git pack file to another node.
126    UploadPack(upload_pack::UploadPack),
127    /// A canonical reference was updated after a fetch.
128    CanonicalRefUpdated {
129        /// The repository the canonical reference was updated for.
130        rid: RepoId,
131        /// The reference name of the canonical reference update.
132        refname: Qualified<'static>,
133        /// The new target of the reference, after the update.
134        target: Oid,
135    },
136}
137
138impl From<upload_pack::UploadPack> for Event {
139    fn from(value: upload_pack::UploadPack) -> Self {
140        Self::UploadPack(value)
141    }
142}
143
144/// Events feed.
145pub struct Events(chan::Receiver<Event>);
146
147impl IntoIterator for Events {
148    type Item = Event;
149    type IntoIter = chan::IntoIter<Event>;
150
151    fn into_iter(self) -> Self::IntoIter {
152        self.0.into_iter()
153    }
154}
155
156impl From<chan::Receiver<Event>> for Events {
157    fn from(value: chan::Receiver<Event>) -> Self {
158        Self(value)
159    }
160}
161
162impl Deref for Events {
163    type Target = chan::Receiver<Event>;
164
165    fn deref(&self) -> &Self::Target {
166        &self.0
167    }
168}
169
170impl Events {
171    /// Listen for events, and wait for the given predicate to return something,
172    /// or timeout if the specified amount of time has elapsed.
173    pub fn wait<F, T>(&self, mut f: F, timeout: time::Duration) -> Result<T, chan::RecvTimeoutError>
174    where
175        F: FnMut(&Event) -> Option<T>,
176    {
177        let start = time::Instant::now();
178
179        loop {
180            if let Some(timeout) = timeout.checked_sub(start.elapsed()) {
181                match self.recv_timeout(timeout) {
182                    Ok(event) => {
183                        if let Some(output) = f(&event) {
184                            return Ok(output);
185                        }
186                    }
187                    Err(err @ chan::RecvTimeoutError::Disconnected) => {
188                        return Err(err);
189                    }
190                    Err(chan::RecvTimeoutError::Timeout) => {
191                        // Keep trying until our timeout reaches zero.
192                        continue;
193                    }
194                }
195            } else {
196                return Err(chan::RecvTimeoutError::Timeout);
197            }
198        }
199    }
200}
201
202/// Publishes events to subscribers.
203#[derive(Debug, Clone)]
204pub struct Emitter<T> {
205    subscribers: Arc<Mutex<Vec<chan::Sender<T>>>>,
206}
207
208impl<T> Default for Emitter<T> {
209    fn default() -> Emitter<T> {
210        Emitter {
211            subscribers: Default::default(),
212        }
213    }
214}
215
216impl<T: Clone> Emitter<T> {
217    /// Emit event to subscribers and drop those who can't receive it.
218    /// Nb. subscribers are also dropped if their channel is full.
219    pub fn emit(&self, event: T) {
220        // SAFETY: We deliberately propagate panics from other threads holding the lock.
221        #[allow(clippy::unwrap_used)]
222        self.subscribers
223            .lock()
224            .unwrap()
225            .retain(|s| s.try_send(event.clone()).is_ok());
226    }
227
228    /// Emit a batch of events to subscribers and drop those who can't receive
229    /// them.
230    /// N.b. subscribers are also dropped if their channel is full.
231    pub fn emit_all(&self, events: impl IntoIterator<Item = T>) {
232        // SAFETY: We deliberately propagate panics from other threads holding the lock.
233        #[allow(clippy::unwrap_used)]
234        let mut subscribers = self.subscribers.lock().unwrap();
235        for event in events {
236            subscribers.retain(|s| s.try_send(event.clone()).is_ok());
237        }
238    }
239
240    /// Subscribe to events stream.
241    pub fn subscribe(&self) -> chan::Receiver<T> {
242        let (sender, receiver) = chan::bounded(MAX_PENDING_EVENTS);
243        // SAFETY: We deliberately propagate panics from other threads holding the lock.
244        #[allow(clippy::unwrap_used)]
245        let mut subs = self.subscribers.lock().unwrap();
246        subs.push(sender);
247
248        receiver
249    }
250
251    /// Number of subscribers.
252    pub fn subscriptions(&self) -> usize {
253        // SAFETY: We deliberately propagate panics from other threads holding the lock.
254        #[allow(clippy::unwrap_used)]
255        self.subscribers.lock().unwrap().len()
256    }
257
258    /// Number of messages that have not yet been received.
259    pub fn pending(&self) -> usize {
260        // SAFETY: We deliberately propagate panics from other threads holding the lock.
261        #[allow(clippy::unwrap_used)]
262        self.subscribers
263            .lock()
264            .unwrap()
265            .iter()
266            .map(|ch| ch.len())
267            .sum()
268    }
269}