Skip to main content

radicle_node/runtime/
handle.rs

1use std::collections::HashSet;
2use std::net;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5use std::{fmt, io, time};
6
7#[cfg(unix)]
8use std::os::unix::net::UnixStream;
9#[cfg(windows)]
10use uds_windows::UnixStream;
11
12use crossbeam_channel as chan;
13use radicle::crypto::PublicKey;
14use radicle::node::events::{Event, Events};
15use radicle::node::policy;
16use radicle::node::{Config, NodeId};
17use radicle::node::{ConnectOptions, ConnectResult, Seeds};
18use serde_json::json;
19use thiserror::Error;
20
21use crate::identity::RepoId;
22use crate::node::{Alias, Command, FetchResult};
23use crate::profile::Home;
24use crate::reactor;
25use crate::runtime::Emitter;
26use crate::service;
27use crate::service::QueryState;
28use crate::storage::refs::RefsAt;
29use crate::wire;
30use crate::wire::StreamId;
31use crate::worker::TaskResult;
32
33/// An error resulting from a handle method.
34#[derive(Error, Debug)]
35pub enum Error {
36    /// The command channel is no longer connected.
37    #[error("command channel is not connected")]
38    ChannelDisconnected,
39    /// The command returned an error.
40    #[error("command failed: {0}")]
41    Command(#[from] service::command::Error),
42    /// The operation timed out.
43    #[error("the operation timed out")]
44    Timeout,
45    /// An I/O error occurred.
46    #[error(transparent)]
47    Io(#[from] std::io::Error),
48}
49
50impl From<chan::RecvError> for Error {
51    fn from(_: chan::RecvError) -> Self {
52        Self::ChannelDisconnected
53    }
54}
55
56impl From<chan::RecvTimeoutError> for Error {
57    fn from(err: chan::RecvTimeoutError) -> Self {
58        match err {
59            chan::RecvTimeoutError::Timeout => Self::Timeout,
60            chan::RecvTimeoutError::Disconnected => Self::ChannelDisconnected,
61        }
62    }
63}
64
65impl<T> From<chan::SendError<T>> for Error {
66    fn from(_: chan::SendError<T>) -> Self {
67        Self::ChannelDisconnected
68    }
69}
70
71pub struct Handle {
72    pub(crate) home: Home,
73    pub(crate) controller: reactor::Controller,
74
75    /// Whether a shutdown was initiated or not. Prevents attempting to shutdown twice.
76    shutdown: Arc<AtomicBool>,
77    /// Publishes events to subscribers.
78    emitter: Emitter<Event>,
79}
80
81impl Handle {
82    /// Subscribe to events stream.
83    pub fn events(&self) -> Events {
84        Events::from(self.emitter.subscribe())
85    }
86}
87
88impl fmt::Debug for Handle {
89    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
90        f.debug_struct("Handle").field("home", &self.home).finish()
91    }
92}
93
94impl Clone for Handle {
95    fn clone(&self) -> Self {
96        Self {
97            home: self.home.clone(),
98            controller: self.controller.clone(),
99            shutdown: self.shutdown.clone(),
100            emitter: self.emitter.clone(),
101        }
102    }
103}
104
105impl Handle {
106    pub fn new(home: Home, controller: reactor::Controller, emitter: Emitter<Event>) -> Self {
107        Self {
108            home,
109            controller,
110            shutdown: Arc::default(),
111            emitter,
112        }
113    }
114
115    pub fn worker_result(&mut self, result: TaskResult) -> Result<(), io::Error> {
116        self.controller.cmd(wire::Control::Worker(result))
117    }
118
119    pub fn flush(&mut self, remote: NodeId, stream: StreamId) -> Result<(), io::Error> {
120        self.controller.cmd(wire::Control::Flush { remote, stream })
121    }
122
123    pub(crate) fn command(&self, cmd: service::Command) -> Result<(), io::Error> {
124        self.controller.cmd(wire::Control::User(cmd))
125    }
126}
127
128impl radicle::node::Handle for Handle {
129    type Sessions = Vec<radicle::node::Session>;
130    type Events = Events;
131    type Event = Event;
132    type Error = Error;
133
134    fn nid(&self) -> Result<NodeId, Self::Error> {
135        let (sender, receiver) = chan::bounded(1);
136        let query: Arc<QueryState> = Arc::new(move |state| {
137            sender.send(*state.nid()).ok();
138            Ok(())
139        });
140        let (err_sender, err_receiver) = chan::bounded(1);
141        self.command(service::Command::QueryState(query, err_sender))?;
142        err_receiver.recv()??;
143
144        let nid = receiver.recv()?;
145
146        Ok(nid)
147    }
148
149    fn is_running(&self) -> bool {
150        true
151    }
152
153    fn connect(
154        &mut self,
155        node: NodeId,
156        addr: radicle::node::Address,
157        opts: ConnectOptions,
158    ) -> Result<ConnectResult, Error> {
159        let events = self.events();
160        let timeout = opts.timeout;
161        let sessions = self.sessions()?;
162        let session = sessions.iter().find(|s| s.nid == node);
163
164        if let Some(s) = session {
165            if s.state.is_connected() {
166                return Ok(ConnectResult::Connected);
167            }
168        }
169        self.command(service::Command::Connect(node, addr, opts))?;
170
171        events
172            .wait(
173                |e| match e {
174                    Event::PeerConnected { nid } if nid == &node => Some(ConnectResult::Connected),
175                    Event::PeerDisconnected { nid, reason } if nid == &node => {
176                        Some(ConnectResult::Disconnected {
177                            reason: reason.clone(),
178                        })
179                    }
180                    _ => None,
181                },
182                timeout,
183            )
184            .map_err(Error::from)
185    }
186
187    fn disconnect(&mut self, node: NodeId) -> Result<(), Self::Error> {
188        let events = self.events();
189        self.command(service::Command::Disconnect(node))?;
190        events
191            .wait(
192                |e| match e {
193                    Event::PeerDisconnected { nid, .. } if nid == &node => Some(()),
194                    _ => None,
195                },
196                time::Duration::MAX,
197            )
198            .map_err(Error::from)
199    }
200
201    fn seeds_for(
202        &mut self,
203        id: RepoId,
204        namespaces: impl IntoIterator<Item = PublicKey>,
205    ) -> Result<Seeds, Self::Error> {
206        let (responder, receiver) = service::command::Responder::oneshot();
207        self.command(service::Command::Seeds(
208            id,
209            HashSet::from_iter(namespaces),
210            responder,
211        ))?;
212        Ok(receiver.recv()??)
213    }
214
215    fn config(&self) -> Result<Config, Self::Error> {
216        let (responder, receiver) = service::command::Responder::oneshot();
217        self.command(service::Command::Config(responder))?;
218        Ok(receiver.recv()??)
219    }
220
221    fn listen_addrs(&self) -> Result<Vec<net::SocketAddr>, Self::Error> {
222        let (responder, receiver) = service::command::Responder::oneshot();
223        self.command(service::Command::ListenAddrs(responder))?;
224        Ok(receiver.recv()??)
225    }
226
227    fn fetch(
228        &mut self,
229        id: RepoId,
230        from: NodeId,
231        timeout: time::Duration,
232    ) -> Result<FetchResult, Error> {
233        let (responder, receiver) = service::command::Responder::oneshot();
234        self.command(service::Command::Fetch(id, from, timeout, responder))?;
235        Ok(receiver.recv()??)
236    }
237
238    fn follow(&mut self, id: NodeId, alias: Option<Alias>) -> Result<bool, Error> {
239        let (responder, receiver) = service::command::Responder::oneshot();
240        self.command(service::Command::Follow(id, alias, responder))?;
241        Ok(receiver.recv()??)
242    }
243
244    fn unfollow(&mut self, id: NodeId) -> Result<bool, Error> {
245        let (responder, receiver) = service::command::Responder::oneshot();
246        self.command(service::Command::Unfollow(id, responder))?;
247        Ok(receiver.recv()??)
248    }
249
250    fn block(&mut self, id: NodeId) -> Result<bool, Self::Error> {
251        let (sender, receiver) = chan::bounded(1);
252        self.command(service::Command::Block(id, sender))?;
253        receiver.recv().map_err(Error::from)
254    }
255
256    fn seed(&mut self, id: RepoId, scope: policy::Scope) -> Result<bool, Error> {
257        let (responder, receiver) = service::command::Responder::oneshot();
258        self.command(service::Command::Seed(id, scope, responder))?;
259        Ok(receiver.recv()??)
260    }
261
262    fn unseed(&mut self, id: RepoId) -> Result<bool, Error> {
263        let (responder, receiver) = service::command::Responder::oneshot();
264        self.command(service::Command::Unseed(id, responder))?;
265        Ok(receiver.recv()??)
266    }
267
268    fn announce_refs_for(
269        &mut self,
270        id: RepoId,
271        namespaces: impl IntoIterator<Item = PublicKey>,
272    ) -> Result<RefsAt, Error> {
273        let (responder, receiver) = service::command::Responder::oneshot();
274        self.command(service::Command::AnnounceRefs(
275            id,
276            HashSet::from_iter(namespaces),
277            responder,
278        ))?;
279        Ok(receiver.recv()??)
280    }
281
282    fn announce_inventory(&mut self) -> Result<(), Error> {
283        self.command(service::Command::AnnounceInventory)
284            .map_err(Error::from)
285    }
286
287    fn add_inventory(&mut self, rid: RepoId) -> Result<bool, Error> {
288        let (responder, receiver) = service::command::Responder::oneshot();
289        self.command(service::Command::AddInventory(rid, responder))?;
290        Ok(receiver.recv()??)
291    }
292
293    fn subscribe(&self, _timeout: time::Duration) -> Result<Self::Events, Self::Error> {
294        Ok(self.events())
295    }
296
297    fn sessions(&self) -> Result<Self::Sessions, Error> {
298        let (sender, receiver) = chan::unbounded();
299        let query: Arc<QueryState> = Arc::new(move |state| {
300            let sessions = state
301                .sessions()
302                .values()
303                .map(radicle::node::Session::from)
304                .collect();
305            sender.send(sessions).ok();
306
307            Ok(())
308        });
309        let (err_sender, err_receiver) = chan::bounded(1);
310        self.command(service::Command::QueryState(query, err_sender))?;
311        err_receiver.recv()??;
312
313        let sessions = receiver.recv()?;
314
315        Ok(sessions)
316    }
317
318    fn session(&self, nid: NodeId) -> Result<Option<radicle::node::Session>, Self::Error> {
319        let (sender, receiver) = chan::bounded(1);
320        let query: Arc<QueryState> = Arc::new(move |state| {
321            let session = state.sessions().get(&nid).map(radicle::node::Session::from);
322            sender.send(session).ok();
323
324            Ok(())
325        });
326        let (err_sender, err_receiver) = chan::bounded(1);
327        self.command(service::Command::QueryState(query, err_sender))?;
328        err_receiver.recv()??;
329
330        let sessions = receiver.recv()?;
331
332        Ok(sessions)
333    }
334
335    fn shutdown(self) -> Result<(), Error> {
336        // If the current value is `false`, set it to `true`, otherwise error.
337        if self
338            .shutdown
339            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
340            .is_err()
341        {
342            return Ok(());
343        }
344        // Send a shutdown request to our own control socket. This is the only way to kill the
345        // control thread gracefully. Since the control thread may have called this function,
346        // the control socket may already be disconnected. Ignore errors.
347        UnixStream::connect(self.home.socket())
348            .and_then(|sock| Command::Shutdown.to_writer(sock))
349            .ok();
350
351        self.controller
352            .shutdown()
353            .map_err(|_| Error::ChannelDisconnected)
354    }
355
356    fn debug(&self) -> Result<serde_json::Value, Self::Error> {
357        let (sender, receiver) = chan::bounded(1);
358        let query: Arc<QueryState> = Arc::new(move |state| {
359            let debug = serde_json::json!({
360                "outboxSize": state.outbox().len(),
361                "fetching": state.fetching(),
362                "rateLimiter": state.limiter(),
363                "events": json!({
364                    "subscribers": state.emitter().subscriptions(),
365                    "pending": state.emitter().pending(),
366                }),
367                "metrics": state.metrics(),
368            });
369            sender.send(debug).ok();
370
371            Ok(())
372        });
373        let (err_sender, err_receiver) = chan::bounded(1);
374        self.command(service::Command::QueryState(query, err_sender))?;
375        err_receiver.recv()??;
376
377        let debug = receiver.recv()?;
378
379        Ok(debug)
380    }
381}