radicle_node/runtime/
handle.rs

1use std::net;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::sync::Arc;
4use std::{fmt, io, time};
5
6#[cfg(unix)]
7use std::os::unix::net::UnixStream as Stream;
8#[cfg(windows)]
9use winpipe::WinStream as Stream;
10
11use crossbeam_channel as chan;
12use radicle::node::events::{Event, Events};
13use radicle::node::policy;
14use radicle::node::{Config, NodeId};
15use radicle::node::{ConnectOptions, ConnectResult, Seeds};
16use reactor::poller::popol::PopolWaker;
17use serde_json::json;
18use thiserror::Error;
19
20use crate::identity::RepoId;
21use crate::node::{Alias, Command, FetchResult};
22use crate::profile::Home;
23use crate::runtime::Emitter;
24use crate::service;
25use crate::service::{CommandError, QueryState};
26use crate::storage::refs::RefsAt;
27use crate::wire;
28use crate::wire::StreamId;
29use crate::worker::TaskResult;
30
31/// An error resulting from a handle method.
32#[derive(Error, Debug)]
33pub enum Error {
34    /// The command channel is no longer connected.
35    #[error("command channel is not connected")]
36    ChannelDisconnected,
37    /// The command returned an error.
38    #[error("command failed: {0}")]
39    Command(#[from] CommandError),
40    /// The operation timed out.
41    #[error("the operation timed out")]
42    Timeout,
43    /// An I/O error occured.
44    #[error(transparent)]
45    Io(#[from] std::io::Error),
46}
47
48impl From<chan::RecvError> for Error {
49    fn from(_: chan::RecvError) -> Self {
50        Self::ChannelDisconnected
51    }
52}
53
54impl From<chan::RecvTimeoutError> for Error {
55    fn from(err: chan::RecvTimeoutError) -> Self {
56        match err {
57            chan::RecvTimeoutError::Timeout => Self::Timeout,
58            chan::RecvTimeoutError::Disconnected => Self::ChannelDisconnected,
59        }
60    }
61}
62
63impl<T> From<chan::SendError<T>> for Error {
64    fn from(_: chan::SendError<T>) -> Self {
65        Self::ChannelDisconnected
66    }
67}
68
69pub struct Handle {
70    pub(crate) home: Home,
71    pub(crate) controller: reactor::Controller<wire::Control, PopolWaker>,
72
73    /// Whether a shutdown was initiated or not. Prevents attempting to shutdown twice.
74    shutdown: Arc<AtomicBool>,
75    /// Publishes events to subscribers.
76    emitter: Emitter<Event>,
77}
78
79impl Handle {
80    /// Subscribe to events stream.
81    pub fn events(&self) -> Events {
82        Events::from(self.emitter.subscribe())
83    }
84}
85
86impl fmt::Debug for Handle {
87    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88        f.debug_struct("Handle").field("home", &self.home).finish()
89    }
90}
91
92impl Clone for Handle {
93    fn clone(&self) -> Self {
94        Self {
95            home: self.home.clone(),
96            controller: self.controller.clone(),
97            shutdown: self.shutdown.clone(),
98            emitter: self.emitter.clone(),
99        }
100    }
101}
102
103impl Handle {
104    pub fn new(
105        home: Home,
106        controller: reactor::Controller<wire::Control, PopolWaker>,
107        emitter: Emitter<Event>,
108    ) -> Self {
109        Self {
110            home,
111            controller,
112            shutdown: Arc::default(),
113            emitter,
114        }
115    }
116
117    pub fn worker_result(&mut self, result: TaskResult) -> Result<(), io::Error> {
118        self.controller.cmd(wire::Control::Worker(result))
119    }
120
121    pub fn flush(&mut self, remote: NodeId, stream: StreamId) -> Result<(), io::Error> {
122        self.controller.cmd(wire::Control::Flush { remote, stream })
123    }
124
125    pub(crate) fn command(&self, cmd: service::Command) -> Result<(), io::Error> {
126        self.controller.cmd(wire::Control::User(cmd))
127    }
128}
129
130impl radicle::node::Handle for Handle {
131    type Sessions = Vec<radicle::node::Session>;
132    type Events = Events;
133    type Event = Event;
134    type Error = Error;
135
136    fn nid(&self) -> Result<NodeId, Self::Error> {
137        let (sender, receiver) = chan::bounded(1);
138        let query: Arc<QueryState> = Arc::new(move |state| {
139            sender.send(*state.nid()).ok();
140            Ok(())
141        });
142        let (err_sender, err_receiver) = chan::bounded(1);
143        self.command(service::Command::QueryState(query, err_sender))?;
144        err_receiver.recv()??;
145
146        let nid = receiver.recv()?;
147
148        Ok(nid)
149    }
150
151    fn is_running(&self) -> bool {
152        true
153    }
154
155    fn connect(
156        &mut self,
157        node: NodeId,
158        addr: radicle::node::Address,
159        opts: ConnectOptions,
160    ) -> Result<ConnectResult, Error> {
161        let events = self.events();
162        let timeout = opts.timeout;
163        let sessions = self.sessions()?;
164        let session = sessions.iter().find(|s| s.nid == node);
165
166        if let Some(s) = session {
167            if s.state.is_connected() {
168                return Ok(ConnectResult::Connected);
169            }
170        }
171        self.command(service::Command::Connect(node, addr, opts))?;
172
173        events
174            .wait(
175                |e| match e {
176                    Event::PeerConnected { nid } if nid == &node => Some(ConnectResult::Connected),
177                    Event::PeerDisconnected { nid, reason } if nid == &node => {
178                        Some(ConnectResult::Disconnected {
179                            reason: reason.clone(),
180                        })
181                    }
182                    _ => None,
183                },
184                timeout,
185            )
186            .map_err(Error::from)
187    }
188
189    fn disconnect(&mut self, node: NodeId) -> Result<(), Self::Error> {
190        let events = self.events();
191        self.command(service::Command::Disconnect(node))?;
192        events
193            .wait(
194                |e| match e {
195                    Event::PeerDisconnected { nid, .. } if nid == &node => Some(()),
196                    _ => None,
197                },
198                time::Duration::MAX,
199            )
200            .map_err(Error::from)
201    }
202
203    fn seeds(&mut self, id: RepoId) -> Result<Seeds, Self::Error> {
204        let (sender, receiver) = chan::bounded(1);
205        self.command(service::Command::Seeds(id, sender))?;
206        receiver.recv().map_err(Error::from)
207    }
208
209    fn config(&self) -> Result<Config, Self::Error> {
210        let (sender, receiver) = chan::bounded(1);
211        self.command(service::Command::Config(sender))?;
212        receiver.recv().map_err(Error::from)
213    }
214
215    fn listen_addrs(&self) -> Result<Vec<net::SocketAddr>, Self::Error> {
216        let (sender, receiver) = chan::bounded(1);
217        self.command(service::Command::ListenAddrs(sender))?;
218        receiver.recv().map_err(Error::from)
219    }
220
221    fn fetch(
222        &mut self,
223        id: RepoId,
224        from: NodeId,
225        timeout: time::Duration,
226    ) -> Result<FetchResult, Error> {
227        let (sender, receiver) = chan::bounded(1);
228        self.command(service::Command::Fetch(id, from, timeout, sender))?;
229        receiver.recv().map_err(Error::from)
230    }
231
232    fn follow(&mut self, id: NodeId, alias: Option<Alias>) -> Result<bool, Error> {
233        let (sender, receiver) = chan::bounded(1);
234        self.command(service::Command::Follow(id, alias, sender))?;
235        receiver.recv().map_err(Error::from)
236    }
237
238    fn unfollow(&mut self, id: NodeId) -> Result<bool, Error> {
239        let (sender, receiver) = chan::bounded(1);
240        self.command(service::Command::Unfollow(id, sender))?;
241        receiver.recv().map_err(Error::from)
242    }
243
244    fn seed(&mut self, id: RepoId, scope: policy::Scope) -> Result<bool, Error> {
245        let (sender, receiver) = chan::bounded(1);
246        self.command(service::Command::Seed(id, scope, sender))?;
247        receiver.recv().map_err(Error::from)
248    }
249
250    fn unseed(&mut self, id: RepoId) -> Result<bool, Error> {
251        let (sender, receiver) = chan::bounded(1);
252        self.command(service::Command::Unseed(id, sender))?;
253        receiver.recv().map_err(Error::from)
254    }
255
256    fn announce_refs(&mut self, id: RepoId) -> Result<RefsAt, Error> {
257        let (sender, receiver) = chan::bounded(1);
258        self.command(service::Command::AnnounceRefs(id, sender))?;
259        receiver.recv().map_err(Error::from)
260    }
261
262    fn announce_inventory(&mut self) -> Result<(), Error> {
263        self.command(service::Command::AnnounceInventory)
264            .map_err(Error::from)
265    }
266
267    fn add_inventory(&mut self, rid: RepoId) -> Result<bool, Error> {
268        let (sender, receiver) = chan::bounded(1);
269        self.command(service::Command::AddInventory(rid, sender))?;
270        receiver.recv().map_err(Error::from)
271    }
272
273    fn subscribe(&self, _timeout: time::Duration) -> Result<Self::Events, Self::Error> {
274        Ok(self.events())
275    }
276
277    fn sessions(&self) -> Result<Self::Sessions, Error> {
278        let (sender, receiver) = chan::unbounded();
279        let query: Arc<QueryState> = Arc::new(move |state| {
280            let sessions = state
281                .sessions()
282                .values()
283                .map(radicle::node::Session::from)
284                .collect();
285            sender.send(sessions).ok();
286
287            Ok(())
288        });
289        let (err_sender, err_receiver) = chan::bounded(1);
290        self.command(service::Command::QueryState(query, err_sender))?;
291        err_receiver.recv()??;
292
293        let sessions = receiver.recv()?;
294
295        Ok(sessions)
296    }
297
298    fn session(&self, nid: NodeId) -> Result<Option<radicle::node::Session>, Self::Error> {
299        let (sender, receiver) = chan::bounded(1);
300        let query: Arc<QueryState> = Arc::new(move |state| {
301            let session = state.sessions().get(&nid).map(radicle::node::Session::from);
302            sender.send(session).ok();
303
304            Ok(())
305        });
306        let (err_sender, err_receiver) = chan::bounded(1);
307        self.command(service::Command::QueryState(query, err_sender))?;
308        err_receiver.recv()??;
309
310        let sessions = receiver.recv()?;
311
312        Ok(sessions)
313    }
314
315    fn shutdown(self) -> Result<(), Error> {
316        // If the current value is `false`, set it to `true`, otherwise error.
317        if self
318            .shutdown
319            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
320            .is_err()
321        {
322            return Ok(());
323        }
324        // Send a shutdown request to our own control socket. This is the only way to kill the
325        // control thread gracefully. Since the control thread may have called this function,
326        // the control socket may already be disconnected. Ignore errors.
327        Stream::connect(self.home.socket())
328            .and_then(|sock| Command::Shutdown.to_writer(sock))
329            .ok();
330
331        self.controller
332            .shutdown()
333            .map_err(|_| Error::ChannelDisconnected)
334    }
335
336    fn debug(&self) -> Result<serde_json::Value, Self::Error> {
337        let (sender, receiver) = chan::bounded(1);
338        let query: Arc<QueryState> = Arc::new(move |state| {
339            let debug = serde_json::json!({
340                "outboxSize": state.outbox().len(),
341                "fetching": state.fetching().iter().map(|(rid, state)| {
342                    json!({
343                        "rid": rid,
344                        "from": state.from,
345                        "refsAt": state.refs_at,
346                        "subscribers": state.subscribers.len(),
347                    })
348                }).collect::<Vec<_>>(),
349                "queue": state.sessions().values().map(|sess| {
350                    json!({
351                        "nid": sess.id,
352                        "queue": sess.queue.iter().map(|fetch| {
353                            json!({
354                                "rid": fetch.rid,
355                                "from": fetch.from,
356                                "refsAt": fetch.refs_at,
357                            })
358                        }).collect::<Vec<_>>()
359                    })
360                }).collect::<Vec<_>>(),
361                "rateLimiter": state.limiter().buckets.iter().map(|(host, bucket)| {
362                    json!({
363                        "host": host.to_string(),
364                        "bucket": bucket
365                    })
366                }).collect::<Vec<_>>(),
367                "events": json!({
368                    "subscribers": state.emitter().subscriptions(),
369                    "pending": state.emitter().pending(),
370                }),
371                "metrics": state.metrics(),
372            });
373            sender.send(debug).ok();
374
375            Ok(())
376        });
377        let (err_sender, err_receiver) = chan::bounded(1);
378        self.command(service::Command::QueryState(query, err_sender))?;
379        err_receiver.recv()??;
380
381        let debug = receiver.recv()?;
382
383        Ok(debug)
384    }
385}