Skip to main content

radicle_node/runtime/
handle.rs

1use std::collections::HashSet;
2use std::net;
3use std::path::PathBuf;
4use std::sync::Arc;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::{fmt, io, time};
7
8#[cfg(unix)]
9use std::os::unix::net::UnixStream;
10#[cfg(windows)]
11use uds_windows::UnixStream;
12
13use crossbeam_channel as chan;
14use radicle::crypto::PublicKey;
15use radicle::node::events::{Event, Events};
16use radicle::node::policy;
17use radicle::node::{Config, NodeId};
18use radicle::node::{ConnectOptions, ConnectResult, Seeds};
19use radicle::storage::refs;
20use serde_json::json;
21use thiserror::Error;
22
23use crate::identity::RepoId;
24use crate::node::{Alias, Command, FetchResult};
25use crate::profile::Home;
26use crate::reactor;
27use crate::runtime::Emitter;
28use crate::service;
29use crate::service::QueryState;
30use crate::storage::refs::RefsAt;
31use crate::wire;
32use crate::wire::StreamId;
33use crate::worker::TaskResult;
34
35/// An error resulting from a handle method.
36#[derive(Error, Debug)]
37pub enum Error {
38    /// The command channel is no longer connected.
39    #[error("command channel is not connected")]
40    ChannelDisconnected,
41    /// The command returned an error.
42    #[error("command failed: {0}")]
43    Command(#[from] service::command::Error),
44    /// The operation timed out.
45    #[error("the operation timed out")]
46    Timeout,
47    /// An I/O error occurred.
48    #[error(transparent)]
49    Io(#[from] std::io::Error),
50}
51
52impl From<chan::RecvError> for Error {
53    fn from(_: chan::RecvError) -> Self {
54        Self::ChannelDisconnected
55    }
56}
57
58impl From<chan::RecvTimeoutError> for Error {
59    fn from(err: chan::RecvTimeoutError) -> Self {
60        match err {
61            chan::RecvTimeoutError::Timeout => Self::Timeout,
62            chan::RecvTimeoutError::Disconnected => Self::ChannelDisconnected,
63        }
64    }
65}
66
67impl<T> From<chan::SendError<T>> for Error {
68    fn from(_: chan::SendError<T>) -> Self {
69        Self::ChannelDisconnected
70    }
71}
72
73pub struct Handle {
74    pub(crate) home: Home,
75
76    /// Path to the control socket in use. Required for shutdown.
77    pub(crate) socket: PathBuf,
78
79    pub(crate) controller: reactor::Controller,
80
81    /// Whether or not a shutdown was initiated. Prevents attempting to shutdown twice.
82    shutdown: Arc<AtomicBool>,
83    /// Publishes events to subscribers.
84    emitter: Emitter<Event>,
85}
86
87impl Handle {
88    /// Subscribe to events stream.
89    pub fn events(&self) -> Events {
90        Events::from(self.emitter.subscribe())
91    }
92}
93
94impl fmt::Debug for Handle {
95    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96        f.debug_struct("Handle").field("home", &self.home).finish()
97    }
98}
99
100impl Clone for Handle {
101    fn clone(&self) -> Self {
102        Self {
103            home: self.home.clone(),
104            socket: self.socket.clone(),
105            controller: self.controller.clone(),
106            shutdown: self.shutdown.clone(),
107            emitter: self.emitter.clone(),
108        }
109    }
110}
111
112impl Handle {
113    pub fn new(
114        home: Home,
115        socket: PathBuf,
116        controller: reactor::Controller,
117        emitter: Emitter<Event>,
118    ) -> Self {
119        Self {
120            home,
121            socket,
122            controller,
123            shutdown: Arc::default(),
124            emitter,
125        }
126    }
127
128    pub fn worker_result(&mut self, result: TaskResult) -> Result<(), io::Error> {
129        self.controller.cmd(wire::Control::Worker(result))
130    }
131
132    pub fn flush(&mut self, remote: NodeId, stream: StreamId) -> Result<(), io::Error> {
133        self.controller.cmd(wire::Control::Flush { remote, stream })
134    }
135
136    pub(crate) fn command(&self, cmd: service::Command) -> Result<(), io::Error> {
137        self.controller.cmd(wire::Control::User(cmd))
138    }
139}
140
141impl radicle::node::Handle for Handle {
142    type Sessions = Vec<radicle::node::Session>;
143    type Events = Events;
144    type Event = Event;
145    type Error = Error;
146
147    fn nid(&self) -> Result<NodeId, Self::Error> {
148        let (sender, receiver) = chan::bounded(1);
149        let query: Arc<QueryState> = Arc::new(move |state| {
150            sender.send(*state.nid()).ok();
151            Ok(())
152        });
153        let (err_sender, err_receiver) = chan::bounded(1);
154        self.command(service::Command::QueryState(query, err_sender))?;
155        err_receiver.recv()??;
156
157        let nid = receiver.recv()?;
158
159        Ok(nid)
160    }
161
162    fn is_running(&self) -> bool {
163        true
164    }
165
166    fn connect(
167        &mut self,
168        node: NodeId,
169        addr: radicle::node::Address,
170        opts: ConnectOptions,
171    ) -> Result<ConnectResult, Error> {
172        let events = self.events();
173        let timeout = opts.timeout;
174        let sessions = self.sessions()?;
175        let session = sessions.iter().find(|s| s.nid == node);
176
177        if let Some(s) = session {
178            if s.state.is_connected() {
179                return Ok(ConnectResult::Connected);
180            }
181        }
182        self.command(service::Command::Connect(node, addr, opts))?;
183
184        events
185            .wait(
186                |e| match e {
187                    Event::PeerConnected { nid } if nid == &node => Some(ConnectResult::Connected),
188                    Event::PeerDisconnected { nid, reason } if nid == &node => {
189                        Some(ConnectResult::Disconnected {
190                            reason: reason.clone(),
191                        })
192                    }
193                    _ => None,
194                },
195                timeout,
196            )
197            .map_err(Error::from)
198    }
199
200    fn disconnect(&mut self, node: NodeId) -> Result<(), Self::Error> {
201        let events = self.events();
202        self.command(service::Command::Disconnect(node))?;
203        events
204            .wait(
205                |e| match e {
206                    Event::PeerDisconnected { nid, .. } if nid == &node => Some(()),
207                    _ => None,
208                },
209                time::Duration::MAX,
210            )
211            .map_err(Error::from)
212    }
213
214    fn seeds_for(
215        &mut self,
216        id: RepoId,
217        namespaces: impl IntoIterator<Item = PublicKey>,
218    ) -> Result<Seeds, Self::Error> {
219        let (responder, receiver) = service::command::Responder::oneshot();
220        self.command(service::Command::Seeds(
221            id,
222            HashSet::from_iter(namespaces),
223            responder,
224        ))?;
225        Ok(receiver.recv()??)
226    }
227
228    fn config(&self) -> Result<Config, Self::Error> {
229        let (responder, receiver) = service::command::Responder::oneshot();
230        self.command(service::Command::Config(responder))?;
231        Ok(receiver.recv()??)
232    }
233
234    fn listen_addrs(&self) -> Result<Vec<net::SocketAddr>, Self::Error> {
235        let (responder, receiver) = service::command::Responder::oneshot();
236        self.command(service::Command::ListenAddrs(responder))?;
237        Ok(receiver.recv()??)
238    }
239
240    fn fetch(
241        &mut self,
242        id: RepoId,
243        from: NodeId,
244        timeout: time::Duration,
245        signed_references_minimum_feature_level: Option<refs::FeatureLevel>,
246    ) -> Result<FetchResult, Error> {
247        let (responder, receiver) = service::command::Responder::oneshot();
248        self.command(service::Command::Fetch(
249            id,
250            from,
251            timeout,
252            signed_references_minimum_feature_level,
253            responder,
254        ))?;
255        Ok(receiver.recv()??)
256    }
257
258    fn follow(&mut self, id: NodeId, alias: Option<Alias>) -> Result<bool, Error> {
259        let (responder, receiver) = service::command::Responder::oneshot();
260        self.command(service::Command::Follow(id, alias, responder))?;
261        Ok(receiver.recv()??)
262    }
263
264    fn unfollow(&mut self, id: NodeId) -> Result<bool, Error> {
265        let (responder, receiver) = service::command::Responder::oneshot();
266        self.command(service::Command::Unfollow(id, responder))?;
267        Ok(receiver.recv()??)
268    }
269
270    fn block(&mut self, id: NodeId) -> Result<bool, Self::Error> {
271        let (sender, receiver) = chan::bounded(1);
272        self.command(service::Command::Block(id, sender))?;
273        receiver.recv().map_err(Error::from)
274    }
275
276    fn seed(&mut self, id: RepoId, scope: policy::Scope) -> Result<bool, Error> {
277        let (responder, receiver) = service::command::Responder::oneshot();
278        self.command(service::Command::Seed(id, scope, responder))?;
279        Ok(receiver.recv()??)
280    }
281
282    fn unseed(&mut self, id: RepoId) -> Result<bool, Error> {
283        let (responder, receiver) = service::command::Responder::oneshot();
284        self.command(service::Command::Unseed(id, responder))?;
285        Ok(receiver.recv()??)
286    }
287
288    fn announce_refs_for(
289        &mut self,
290        id: RepoId,
291        namespaces: impl IntoIterator<Item = PublicKey>,
292    ) -> Result<RefsAt, Error> {
293        let (responder, receiver) = service::command::Responder::oneshot();
294        self.command(service::Command::AnnounceRefs(
295            id,
296            HashSet::from_iter(namespaces),
297            responder,
298        ))?;
299        Ok(receiver.recv()??)
300    }
301
302    fn announce_inventory(&mut self) -> Result<(), Error> {
303        self.command(service::Command::AnnounceInventory)
304            .map_err(Error::from)
305    }
306
307    fn add_inventory(&mut self, rid: RepoId) -> Result<bool, Error> {
308        let (responder, receiver) = service::command::Responder::oneshot();
309        self.command(service::Command::AddInventory(rid, responder))?;
310        Ok(receiver.recv()??)
311    }
312
313    fn subscribe(&self, _timeout: time::Duration) -> Result<Self::Events, Self::Error> {
314        Ok(self.events())
315    }
316
317    fn sessions(&self) -> Result<Self::Sessions, Error> {
318        let (sender, receiver) = chan::unbounded();
319        let query: Arc<QueryState> = Arc::new(move |state| {
320            let sessions = state
321                .sessions()
322                .values()
323                .map(radicle::node::Session::from)
324                .collect();
325            sender.send(sessions).ok();
326
327            Ok(())
328        });
329        let (err_sender, err_receiver) = chan::bounded(1);
330        self.command(service::Command::QueryState(query, err_sender))?;
331        err_receiver.recv()??;
332
333        let sessions = receiver.recv()?;
334
335        Ok(sessions)
336    }
337
338    fn session(&self, nid: NodeId) -> Result<Option<radicle::node::Session>, Self::Error> {
339        let (sender, receiver) = chan::bounded(1);
340        let query: Arc<QueryState> = Arc::new(move |state| {
341            let session = state.sessions().get(&nid).map(radicle::node::Session::from);
342            sender.send(session).ok();
343
344            Ok(())
345        });
346        let (err_sender, err_receiver) = chan::bounded(1);
347        self.command(service::Command::QueryState(query, err_sender))?;
348        err_receiver.recv()??;
349
350        let sessions = receiver.recv()?;
351
352        Ok(sessions)
353    }
354
355    fn shutdown(self) -> Result<(), Error> {
356        // If the current value is `false`, set it to `true`; otherwise, error.
357        if self
358            .shutdown
359            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
360            .is_err()
361        {
362            return Ok(());
363        }
364        // Send a shutdown request to our own control socket. This is the only way to kill the
365        // control thread gracefully. Since the control thread may have called this function,
366        // the control socket may already be disconnected. Ignore errors.
367        UnixStream::connect(self.socket)
368            .and_then(|sock| Command::Shutdown.to_writer(sock))
369            .ok();
370
371        self.controller
372            .shutdown()
373            .map_err(|_| Error::ChannelDisconnected)
374    }
375
376    fn debug(&self) -> Result<serde_json::Value, Self::Error> {
377        let (sender, receiver) = chan::bounded(1);
378        let query: Arc<QueryState> = Arc::new(move |state| {
379            let fetching = debug::Fetching::new(state.fetching());
380            let debug = serde_json::json!({
381                "outboxSize": state.outbox().len(),
382                "fetching": fetching,
383                "rateLimiter": state.limiter().buckets.iter().map(|(host, bucket)| {
384                    json!({
385                        "host": host.to_string(),
386                        "bucket": bucket
387                    })
388                }).collect::<Vec<_>>(),
389                "events": json!({
390                    "subscribers": state.emitter().subscriptions(),
391                    "pending": state.emitter().pending(),
392                }),
393                "metrics": state.metrics(),
394            });
395            sender.send(debug).ok();
396
397            Ok(())
398        });
399        let (err_sender, err_receiver) = chan::bounded(1);
400        self.command(service::Command::QueryState(query, err_sender))?;
401        err_receiver.recv()??;
402
403        let debug = receiver.recv()?;
404
405        Ok(debug)
406    }
407}
408
409mod debug {
410    //! Serialization formats for the output of [`Handle::debug`] output.
411
412    use radicle_protocol::fetcher;
413    use radicle_protocol::fetcher::FetcherState;
414    use serde::Serialize;
415
416    use super::{NodeId, RefsAt, RepoId};
417
418    #[derive(Serialize)]
419    #[serde(rename_all = "camelCase")]
420    pub struct Fetching {
421        active: Vec<ActiveFetch>,
422        queued: Vec<QueuedFetch>,
423    }
424
425    impl Fetching {
426        pub fn new(state: &FetcherState) -> Self {
427            let active = state
428                .active_fetches()
429                .iter()
430                .map(|(rid, fetch)| ActiveFetch::new(*rid, fetch.clone()))
431                .collect();
432            let queued = state
433                .queued_fetches()
434                .iter()
435                .flat_map(|(node, queue)| {
436                    queue
437                        .iter()
438                        .map(|fetch| QueuedFetch::new(*node, fetch.clone()))
439                })
440                .collect();
441            Self { active, queued }
442        }
443    }
444
445    #[derive(Serialize)]
446    #[serde(rename_all = "camelCase")]
447    pub struct ActiveFetch {
448        rid: RepoId,
449        from: NodeId,
450        refs_at: Vec<RefsAt>,
451    }
452
453    impl ActiveFetch {
454        pub fn new(rid: RepoId, fetch: fetcher::ActiveFetch) -> Self {
455            Self {
456                rid,
457                from: fetch.from,
458                refs_at: fetch.refs.into(),
459            }
460        }
461    }
462
463    #[derive(Serialize)]
464    #[serde(rename_all = "camelCase")]
465    pub struct QueuedFetch {
466        nid: NodeId,
467        rid: RepoId,
468        refs_at: Vec<RefsAt>,
469    }
470
471    impl QueuedFetch {
472        pub fn new(node: NodeId, fetch: fetcher::QueuedFetch) -> Self {
473            Self {
474                nid: node,
475                rid: fetch.rid,
476                refs_at: fetch.refs.into(),
477            }
478        }
479    }
480}