Skip to main content

radicle_node/runtime/
handle.rs

1use std::collections::HashSet;
2use std::net;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5use std::{fmt, io, time};
6
7#[cfg(unix)]
8use std::os::unix::net::UnixStream;
9#[cfg(windows)]
10use uds_windows::UnixStream;
11
12use crossbeam_channel as chan;
13use radicle::crypto::PublicKey;
14use radicle::node::events::{Event, Events};
15use radicle::node::policy;
16use radicle::node::{Config, NodeId};
17use radicle::node::{ConnectOptions, ConnectResult, Seeds};
18use radicle::storage::refs;
19use serde_json::json;
20use thiserror::Error;
21
22use crate::identity::RepoId;
23use crate::node::{Alias, Command, FetchResult};
24use crate::profile::Home;
25use crate::reactor;
26use crate::runtime::Emitter;
27use crate::service;
28use crate::service::QueryState;
29use crate::storage::refs::RefsAt;
30use crate::wire;
31use crate::wire::StreamId;
32use crate::worker::TaskResult;
33
34/// An error resulting from a handle method.
35#[derive(Error, Debug)]
36pub enum Error {
37    /// The command channel is no longer connected.
38    #[error("command channel is not connected")]
39    ChannelDisconnected,
40    /// The command returned an error.
41    #[error("command failed: {0}")]
42    Command(#[from] service::command::Error),
43    /// The operation timed out.
44    #[error("the operation timed out")]
45    Timeout,
46    /// An I/O error occurred.
47    #[error(transparent)]
48    Io(#[from] std::io::Error),
49}
50
51impl From<chan::RecvError> for Error {
52    fn from(_: chan::RecvError) -> Self {
53        Self::ChannelDisconnected
54    }
55}
56
57impl From<chan::RecvTimeoutError> for Error {
58    fn from(err: chan::RecvTimeoutError) -> Self {
59        match err {
60            chan::RecvTimeoutError::Timeout => Self::Timeout,
61            chan::RecvTimeoutError::Disconnected => Self::ChannelDisconnected,
62        }
63    }
64}
65
66impl<T> From<chan::SendError<T>> for Error {
67    fn from(_: chan::SendError<T>) -> Self {
68        Self::ChannelDisconnected
69    }
70}
71
72pub struct Handle {
73    pub(crate) home: Home,
74    pub(crate) controller: reactor::Controller,
75
76    /// Whether a shutdown was initiated or not. Prevents attempting to shutdown twice.
77    shutdown: Arc<AtomicBool>,
78    /// Publishes events to subscribers.
79    emitter: Emitter<Event>,
80}
81
82impl Handle {
83    /// Subscribe to events stream.
84    pub fn events(&self) -> Events {
85        Events::from(self.emitter.subscribe())
86    }
87}
88
89impl fmt::Debug for Handle {
90    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
91        f.debug_struct("Handle").field("home", &self.home).finish()
92    }
93}
94
95impl Clone for Handle {
96    fn clone(&self) -> Self {
97        Self {
98            home: self.home.clone(),
99            controller: self.controller.clone(),
100            shutdown: self.shutdown.clone(),
101            emitter: self.emitter.clone(),
102        }
103    }
104}
105
106impl Handle {
107    pub fn new(home: Home, controller: reactor::Controller, emitter: Emitter<Event>) -> Self {
108        Self {
109            home,
110            controller,
111            shutdown: Arc::default(),
112            emitter,
113        }
114    }
115
116    pub fn worker_result(&mut self, result: TaskResult) -> Result<(), io::Error> {
117        self.controller.cmd(wire::Control::Worker(result))
118    }
119
120    pub fn flush(&mut self, remote: NodeId, stream: StreamId) -> Result<(), io::Error> {
121        self.controller.cmd(wire::Control::Flush { remote, stream })
122    }
123
124    pub(crate) fn command(&self, cmd: service::Command) -> Result<(), io::Error> {
125        self.controller.cmd(wire::Control::User(cmd))
126    }
127}
128
129impl radicle::node::Handle for Handle {
130    type Sessions = Vec<radicle::node::Session>;
131    type Events = Events;
132    type Event = Event;
133    type Error = Error;
134
135    fn nid(&self) -> Result<NodeId, Self::Error> {
136        let (sender, receiver) = chan::bounded(1);
137        let query: Arc<QueryState> = Arc::new(move |state| {
138            sender.send(*state.nid()).ok();
139            Ok(())
140        });
141        let (err_sender, err_receiver) = chan::bounded(1);
142        self.command(service::Command::QueryState(query, err_sender))?;
143        err_receiver.recv()??;
144
145        let nid = receiver.recv()?;
146
147        Ok(nid)
148    }
149
150    fn is_running(&self) -> bool {
151        true
152    }
153
154    fn connect(
155        &mut self,
156        node: NodeId,
157        addr: radicle::node::Address,
158        opts: ConnectOptions,
159    ) -> Result<ConnectResult, Error> {
160        let events = self.events();
161        let timeout = opts.timeout;
162        let sessions = self.sessions()?;
163        let session = sessions.iter().find(|s| s.nid == node);
164
165        if let Some(s) = session {
166            if s.state.is_connected() {
167                return Ok(ConnectResult::Connected);
168            }
169        }
170        self.command(service::Command::Connect(node, addr, opts))?;
171
172        events
173            .wait(
174                |e| match e {
175                    Event::PeerConnected { nid } if nid == &node => Some(ConnectResult::Connected),
176                    Event::PeerDisconnected { nid, reason } if nid == &node => {
177                        Some(ConnectResult::Disconnected {
178                            reason: reason.clone(),
179                        })
180                    }
181                    _ => None,
182                },
183                timeout,
184            )
185            .map_err(Error::from)
186    }
187
188    fn disconnect(&mut self, node: NodeId) -> Result<(), Self::Error> {
189        let events = self.events();
190        self.command(service::Command::Disconnect(node))?;
191        events
192            .wait(
193                |e| match e {
194                    Event::PeerDisconnected { nid, .. } if nid == &node => Some(()),
195                    _ => None,
196                },
197                time::Duration::MAX,
198            )
199            .map_err(Error::from)
200    }
201
202    fn seeds_for(
203        &mut self,
204        id: RepoId,
205        namespaces: impl IntoIterator<Item = PublicKey>,
206    ) -> Result<Seeds, Self::Error> {
207        let (responder, receiver) = service::command::Responder::oneshot();
208        self.command(service::Command::Seeds(
209            id,
210            HashSet::from_iter(namespaces),
211            responder,
212        ))?;
213        Ok(receiver.recv()??)
214    }
215
216    fn config(&self) -> Result<Config, Self::Error> {
217        let (responder, receiver) = service::command::Responder::oneshot();
218        self.command(service::Command::Config(responder))?;
219        Ok(receiver.recv()??)
220    }
221
222    fn listen_addrs(&self) -> Result<Vec<net::SocketAddr>, Self::Error> {
223        let (responder, receiver) = service::command::Responder::oneshot();
224        self.command(service::Command::ListenAddrs(responder))?;
225        Ok(receiver.recv()??)
226    }
227
228    fn fetch(
229        &mut self,
230        id: RepoId,
231        from: NodeId,
232        timeout: time::Duration,
233        signed_references_minimum_feature_level: Option<refs::FeatureLevel>,
234    ) -> Result<FetchResult, Error> {
235        let (responder, receiver) = service::command::Responder::oneshot();
236        self.command(service::Command::Fetch(
237            id,
238            from,
239            timeout,
240            signed_references_minimum_feature_level,
241            responder,
242        ))?;
243        Ok(receiver.recv()??)
244    }
245
246    fn follow(&mut self, id: NodeId, alias: Option<Alias>) -> Result<bool, Error> {
247        let (responder, receiver) = service::command::Responder::oneshot();
248        self.command(service::Command::Follow(id, alias, responder))?;
249        Ok(receiver.recv()??)
250    }
251
252    fn unfollow(&mut self, id: NodeId) -> Result<bool, Error> {
253        let (responder, receiver) = service::command::Responder::oneshot();
254        self.command(service::Command::Unfollow(id, responder))?;
255        Ok(receiver.recv()??)
256    }
257
258    fn block(&mut self, id: NodeId) -> Result<bool, Self::Error> {
259        let (sender, receiver) = chan::bounded(1);
260        self.command(service::Command::Block(id, sender))?;
261        receiver.recv().map_err(Error::from)
262    }
263
264    fn seed(&mut self, id: RepoId, scope: policy::Scope) -> Result<bool, Error> {
265        let (responder, receiver) = service::command::Responder::oneshot();
266        self.command(service::Command::Seed(id, scope, responder))?;
267        Ok(receiver.recv()??)
268    }
269
270    fn unseed(&mut self, id: RepoId) -> Result<bool, Error> {
271        let (responder, receiver) = service::command::Responder::oneshot();
272        self.command(service::Command::Unseed(id, responder))?;
273        Ok(receiver.recv()??)
274    }
275
276    fn announce_refs_for(
277        &mut self,
278        id: RepoId,
279        namespaces: impl IntoIterator<Item = PublicKey>,
280    ) -> Result<RefsAt, Error> {
281        let (responder, receiver) = service::command::Responder::oneshot();
282        self.command(service::Command::AnnounceRefs(
283            id,
284            HashSet::from_iter(namespaces),
285            responder,
286        ))?;
287        Ok(receiver.recv()??)
288    }
289
290    fn announce_inventory(&mut self) -> Result<(), Error> {
291        self.command(service::Command::AnnounceInventory)
292            .map_err(Error::from)
293    }
294
295    fn add_inventory(&mut self, rid: RepoId) -> Result<bool, Error> {
296        let (responder, receiver) = service::command::Responder::oneshot();
297        self.command(service::Command::AddInventory(rid, responder))?;
298        Ok(receiver.recv()??)
299    }
300
301    fn subscribe(&self, _timeout: time::Duration) -> Result<Self::Events, Self::Error> {
302        Ok(self.events())
303    }
304
305    fn sessions(&self) -> Result<Self::Sessions, Error> {
306        let (sender, receiver) = chan::unbounded();
307        let query: Arc<QueryState> = Arc::new(move |state| {
308            let sessions = state
309                .sessions()
310                .values()
311                .map(radicle::node::Session::from)
312                .collect();
313            sender.send(sessions).ok();
314
315            Ok(())
316        });
317        let (err_sender, err_receiver) = chan::bounded(1);
318        self.command(service::Command::QueryState(query, err_sender))?;
319        err_receiver.recv()??;
320
321        let sessions = receiver.recv()?;
322
323        Ok(sessions)
324    }
325
326    fn session(&self, nid: NodeId) -> Result<Option<radicle::node::Session>, Self::Error> {
327        let (sender, receiver) = chan::bounded(1);
328        let query: Arc<QueryState> = Arc::new(move |state| {
329            let session = state.sessions().get(&nid).map(radicle::node::Session::from);
330            sender.send(session).ok();
331
332            Ok(())
333        });
334        let (err_sender, err_receiver) = chan::bounded(1);
335        self.command(service::Command::QueryState(query, err_sender))?;
336        err_receiver.recv()??;
337
338        let sessions = receiver.recv()?;
339
340        Ok(sessions)
341    }
342
343    fn shutdown(self) -> Result<(), Error> {
344        // If the current value is `false`, set it to `true`, otherwise error.
345        if self
346            .shutdown
347            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
348            .is_err()
349        {
350            return Ok(());
351        }
352        // Send a shutdown request to our own control socket. This is the only way to kill the
353        // control thread gracefully. Since the control thread may have called this function,
354        // the control socket may already be disconnected. Ignore errors.
355        UnixStream::connect(self.home.socket())
356            .and_then(|sock| Command::Shutdown.to_writer(sock))
357            .ok();
358
359        self.controller
360            .shutdown()
361            .map_err(|_| Error::ChannelDisconnected)
362    }
363
364    fn debug(&self) -> Result<serde_json::Value, Self::Error> {
365        let (sender, receiver) = chan::bounded(1);
366        let query: Arc<QueryState> = Arc::new(move |state| {
367            let fetching = debug::Fetching::new(state.fetching());
368            let debug = serde_json::json!({
369                "outboxSize": state.outbox().len(),
370                "fetching": fetching,
371                "rateLimiter": state.limiter().buckets.iter().map(|(host, bucket)| {
372                    json!({
373                        "host": host.to_string(),
374                        "bucket": bucket
375                    })
376                }).collect::<Vec<_>>(),
377                "events": json!({
378                    "subscribers": state.emitter().subscriptions(),
379                    "pending": state.emitter().pending(),
380                }),
381                "metrics": state.metrics(),
382            });
383            sender.send(debug).ok();
384
385            Ok(())
386        });
387        let (err_sender, err_receiver) = chan::bounded(1);
388        self.command(service::Command::QueryState(query, err_sender))?;
389        err_receiver.recv()??;
390
391        let debug = receiver.recv()?;
392
393        Ok(debug)
394    }
395}
396
397mod debug {
398    //! Serialization formats for the output of [`Handle::debug`] output.
399
400    use radicle_protocol::fetcher;
401    use radicle_protocol::fetcher::FetcherState;
402    use serde::Serialize;
403
404    use super::{NodeId, RefsAt, RepoId};
405
406    #[derive(Serialize)]
407    #[serde(rename_all = "camelCase")]
408    pub struct Fetching {
409        active: Vec<ActiveFetch>,
410        queued: Vec<QueuedFetch>,
411    }
412
413    impl Fetching {
414        pub fn new(state: &FetcherState) -> Self {
415            let active = state
416                .active_fetches()
417                .iter()
418                .map(|(rid, fetch)| ActiveFetch::new(*rid, fetch.clone()))
419                .collect();
420            let queued = state
421                .queued_fetches()
422                .iter()
423                .flat_map(|(node, queue)| {
424                    queue
425                        .iter()
426                        .map(|fetch| QueuedFetch::new(*node, fetch.clone()))
427                })
428                .collect();
429            Self { active, queued }
430        }
431    }
432
433    #[derive(Serialize)]
434    #[serde(rename_all = "camelCase")]
435    pub struct ActiveFetch {
436        rid: RepoId,
437        from: NodeId,
438        refs_at: Vec<RefsAt>,
439    }
440
441    impl ActiveFetch {
442        pub fn new(rid: RepoId, fetch: fetcher::ActiveFetch) -> Self {
443            Self {
444                rid,
445                from: fetch.from,
446                refs_at: fetch.refs.into(),
447            }
448        }
449    }
450
451    #[derive(Serialize)]
452    #[serde(rename_all = "camelCase")]
453    pub struct QueuedFetch {
454        nid: NodeId,
455        rid: RepoId,
456        refs_at: Vec<RefsAt>,
457    }
458
459    impl QueuedFetch {
460        pub fn new(node: NodeId, fetch: fetcher::QueuedFetch) -> Self {
461            Self {
462                nid: node,
463                rid: fetch.rid,
464                refs_at: fetch.refs.into(),
465            }
466        }
467    }
468}