1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
//! Events for `upload-pack` processes.
pub mod upload_pack;
pub use upload_pack::UploadPack;
use std::ops::Deref;
use std::sync::Arc;
use std::sync::Mutex;
use std::time;
use crossbeam_channel as chan;
use crate::git::fmt::Qualified;
use crate::git::Oid;
use crate::node;
use crate::prelude::*;
use crate::storage::{refs, RefUpdate};
/// Maximum unconsumed events allowed per subscription.
pub const MAX_PENDING_EVENTS: usize = 8192;
/// A service event.
///
/// The node emits events of this type to its control socket for other
/// programs to consume.
#[non_exhaustive]
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase", tag = "type")]
pub enum Event {
/// The node has received changes to Git references in a
/// repository stored on the node, from another node.
RefsFetched {
/// The node identifier of the other node.
remote: NodeId,
/// The identifier of the repository in question.
rid: RepoId,
/// The list of Git references that were updated.
updated: Vec<RefUpdate>,
},
/// The node has sent its list of Git references to another node
/// and the other has fetched the updated references.
RefsSynced {
/// The node identifier of the other node.
remote: NodeId,
/// The identifier of the repository in question.
rid: RepoId,
/// The `rad/sigrefs` reference that was fetched.
at: Oid,
},
/// The node has discovered a repository on new node on the
/// Radicle network.
SeedDiscovered {
/// The identifier of the repository in question.
rid: RepoId,
/// The node identifier of the other node.
nid: NodeId,
},
/// The node has dropped a repository on a node from its list of
/// known repositories and nodes.
SeedDropped {
/// The identifier of the repository in question.
rid: RepoId,
/// The node identifier of the other node.
nid: NodeId,
},
/// The node has connected directly to another node.
PeerConnected {
/// The node identifier of the other node.
nid: NodeId,
},
/// The node has terminated its direct connection to another node.
PeerDisconnected {
/// The node identifier of the other node.
nid: NodeId,
/// The reason why the connection was terminated.
reason: String,
},
/// The local node has received changes to Git references from its
/// local user. In other words, the local user has pushed to the
/// node, updated COBs, or otherwise updated refs in their local node.
LocalRefsAnnounced {
/// The identifier of the repository in question.
rid: RepoId,
/// List of changed Git references for the repository.
refs: refs::RefsAt,
/// When were the new references received? In other words,
/// when did the user run `git push`?
timestamp: Timestamp,
},
/// The node has received a message with a list of repositories on
/// another node on the network.
InventoryAnnounced {
/// The node identifier of the other node.
nid: NodeId,
/// List of repositories sent.
inventory: Vec<RepoId>,
/// When was the list sent?
timestamp: Timestamp,
},
/// The node has received a message about new signed Git
/// references ("sigrefs") for a repository on another node on the
/// network.
RefsAnnounced {
/// The node identifier of the other node.
nid: NodeId,
/// The identifier of the repository in question.
rid: RepoId,
/// List of Git references for the repository.
refs: Vec<refs::RefsAt>,
/// When was the list sent?
timestamp: Timestamp,
},
/// The node received a message about a new node on the network.
NodeAnnounced {
/// The node identifier of the other node.
nid: NodeId,
/// Alias for the other node.
alias: Alias,
/// When was the announcement sent?
timestamp: Timestamp,
/// What features did the node advertise to the other node.
features: node::Features,
/// What of its addresses did the node tell the other node about?
addresses: Vec<node::Address>,
},
/// The node has uploaded a Git pack file to another node.
UploadPack(upload_pack::UploadPack),
/// A canonical reference was updated after a fetch.
CanonicalRefUpdated {
/// The repository the canonical reference was updated for.
rid: RepoId,
/// The reference name of the canonical reference update.
refname: Qualified<'static>,
/// The new target of the reference, after the update.
target: Oid,
},
}
impl From<upload_pack::UploadPack> for Event {
fn from(value: upload_pack::UploadPack) -> Self {
Self::UploadPack(value)
}
}
/// Events feed.
pub struct Events(chan::Receiver<Event>);
impl IntoIterator for Events {
type Item = Event;
type IntoIter = chan::IntoIter<Event>;
fn into_iter(self) -> Self::IntoIter {
self.0.into_iter()
}
}
impl From<chan::Receiver<Event>> for Events {
fn from(value: chan::Receiver<Event>) -> Self {
Self(value)
}
}
impl Deref for Events {
type Target = chan::Receiver<Event>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl Events {
/// Listen for events, and wait for the given predicate to return something,
/// or timeout if the specified amount of time has elapsed.
pub fn wait<F, T>(&self, mut f: F, timeout: time::Duration) -> Result<T, chan::RecvTimeoutError>
where
F: FnMut(&Event) -> Option<T>,
{
let start = time::Instant::now();
loop {
if let Some(timeout) = timeout.checked_sub(start.elapsed()) {
match self.recv_timeout(timeout) {
Ok(event) => {
if let Some(output) = f(&event) {
return Ok(output);
}
}
Err(err @ chan::RecvTimeoutError::Disconnected) => {
return Err(err);
}
Err(chan::RecvTimeoutError::Timeout) => {
// Keep trying until our timeout reaches zero.
continue;
}
}
} else {
return Err(chan::RecvTimeoutError::Timeout);
}
}
}
}
/// Publishes events to subscribers.
#[derive(Debug, Clone)]
pub struct Emitter<T> {
subscribers: Arc<Mutex<Vec<chan::Sender<T>>>>,
}
impl<T> Default for Emitter<T> {
fn default() -> Emitter<T> {
Emitter {
subscribers: Default::default(),
}
}
}
impl<T: Clone> Emitter<T> {
/// Emit event to subscribers and drop those who can't receive it.
/// Nb. subscribers are also dropped if their channel is full.
pub fn emit(&self, event: T) {
// SAFETY: We deliberately propagate panics from other threads holding the lock.
#[allow(clippy::unwrap_used)]
self.subscribers
.lock()
.unwrap()
.retain(|s| s.try_send(event.clone()).is_ok());
}
/// Emit a batch of events to subscribers and drop those who can't receive
/// them.
/// N.b. subscribers are also dropped if their channel is full.
pub fn emit_all(&self, events: impl IntoIterator<Item = T>) {
// SAFETY: We deliberately propagate panics from other threads holding the lock.
#[allow(clippy::unwrap_used)]
let mut subscribers = self.subscribers.lock().unwrap();
for event in events {
subscribers.retain(|s| s.try_send(event.clone()).is_ok());
}
}
/// Subscribe to events stream.
pub fn subscribe(&self) -> chan::Receiver<T> {
let (sender, receiver) = chan::bounded(MAX_PENDING_EVENTS);
// SAFETY: We deliberately propagate panics from other threads holding the lock.
#[allow(clippy::unwrap_used)]
let mut subs = self.subscribers.lock().unwrap();
subs.push(sender);
receiver
}
/// Number of subscribers.
pub fn subscriptions(&self) -> usize {
// SAFETY: We deliberately propagate panics from other threads holding the lock.
#[allow(clippy::unwrap_used)]
self.subscribers.lock().unwrap().len()
}
/// Number of messages that have not yet been received.
pub fn pending(&self) -> usize {
// SAFETY: We deliberately propagate panics from other threads holding the lock.
#[allow(clippy::unwrap_used)]
self.subscribers
.lock()
.unwrap()
.iter()
.map(|ch| ch.len())
.sum()
}
}