radicle_node/runtime/
handle.rs

1use std::net;
2use std::os::unix::net::UnixStream;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5use std::{fmt, io, time};
6
7use crossbeam_channel as chan;
8use radicle::node::{ConnectOptions, ConnectResult, Link, Seeds};
9use radicle::storage::refs::RefsAt;
10use reactor::poller::popol::PopolWaker;
11use serde_json::json;
12use thiserror::Error;
13
14use crate::identity::RepoId;
15use crate::node::{Alias, Command, FetchResult};
16use crate::profile::Home;
17use crate::runtime::Emitter;
18use crate::service;
19use crate::service::policy;
20use crate::service::NodeId;
21use crate::service::{CommandError, Config, QueryState};
22use crate::service::{Event, Events};
23use crate::wire;
24use crate::wire::StreamId;
25use crate::worker::TaskResult;
26
27/// An error resulting from a handle method.
28#[derive(Error, Debug)]
29pub enum Error {
30    /// The command channel is no longer connected.
31    #[error("command channel is not connected")]
32    ChannelDisconnected,
33    /// The command returned an error.
34    #[error("command failed: {0}")]
35    Command(#[from] CommandError),
36    /// The operation timed out.
37    #[error("the operation timed out")]
38    Timeout,
39    /// An I/O error occured.
40    #[error(transparent)]
41    Io(#[from] std::io::Error),
42}
43
44impl From<chan::RecvError> for Error {
45    fn from(_: chan::RecvError) -> Self {
46        Self::ChannelDisconnected
47    }
48}
49
50impl From<chan::RecvTimeoutError> for Error {
51    fn from(err: chan::RecvTimeoutError) -> Self {
52        match err {
53            chan::RecvTimeoutError::Timeout => Self::Timeout,
54            chan::RecvTimeoutError::Disconnected => Self::ChannelDisconnected,
55        }
56    }
57}
58
59impl<T> From<chan::SendError<T>> for Error {
60    fn from(_: chan::SendError<T>) -> Self {
61        Self::ChannelDisconnected
62    }
63}
64
65pub struct Handle {
66    pub(crate) home: Home,
67    pub(crate) controller: reactor::Controller<wire::Control, PopolWaker>,
68
69    /// Whether a shutdown was initiated or not. Prevents attempting to shutdown twice.
70    shutdown: Arc<AtomicBool>,
71    /// Publishes events to subscribers.
72    emitter: Emitter<Event>,
73}
74
75impl Handle {
76    /// Subscribe to events stream.
77    pub fn events(&self) -> Events {
78        Events::from(self.emitter.subscribe())
79    }
80}
81
82impl fmt::Debug for Handle {
83    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84        f.debug_struct("Handle").field("home", &self.home).finish()
85    }
86}
87
88impl Clone for Handle {
89    fn clone(&self) -> Self {
90        Self {
91            home: self.home.clone(),
92            controller: self.controller.clone(),
93            shutdown: self.shutdown.clone(),
94            emitter: self.emitter.clone(),
95        }
96    }
97}
98
99impl Handle {
100    pub fn new(
101        home: Home,
102        controller: reactor::Controller<wire::Control, PopolWaker>,
103        emitter: Emitter<Event>,
104    ) -> Self {
105        Self {
106            home,
107            controller,
108            shutdown: Arc::default(),
109            emitter,
110        }
111    }
112
113    pub fn worker_result(&mut self, result: TaskResult) -> Result<(), io::Error> {
114        self.controller.cmd(wire::Control::Worker(result))
115    }
116
117    pub fn flush(&mut self, remote: NodeId, stream: StreamId) -> Result<(), io::Error> {
118        self.controller.cmd(wire::Control::Flush { remote, stream })
119    }
120
121    pub(crate) fn command(&self, cmd: service::Command) -> Result<(), io::Error> {
122        self.controller.cmd(wire::Control::User(cmd))
123    }
124}
125
126impl radicle::node::Handle for Handle {
127    type Sessions = Vec<radicle::node::Session>;
128    type Events = Events;
129    type Event = Event;
130    type Error = Error;
131
132    fn nid(&self) -> Result<NodeId, Self::Error> {
133        let (sender, receiver) = chan::bounded(1);
134        let query: Arc<QueryState> = Arc::new(move |state| {
135            sender.send(*state.nid()).ok();
136            Ok(())
137        });
138        let (err_sender, err_receiver) = chan::bounded(1);
139        self.command(service::Command::QueryState(query, err_sender))?;
140        err_receiver.recv()??;
141
142        let nid = receiver.recv()?;
143
144        Ok(nid)
145    }
146
147    fn is_running(&self) -> bool {
148        true
149    }
150
151    fn connect(
152        &mut self,
153        node: NodeId,
154        addr: radicle::node::Address,
155        opts: ConnectOptions,
156    ) -> Result<ConnectResult, Error> {
157        let events = self.events();
158        let timeout = opts.timeout;
159        let sessions = self.sessions()?;
160        let session = sessions.iter().find(|s| s.nid == node);
161
162        if let Some(s) = session {
163            if s.state.is_connected() {
164                return Ok(ConnectResult::Connected);
165            }
166        }
167        self.command(service::Command::Connect(node, addr, opts))?;
168
169        events
170            .wait(
171                |e| match e {
172                    Event::PeerConnected { nid } if nid == &node => Some(ConnectResult::Connected),
173                    Event::PeerDisconnected { nid, reason } if nid == &node => {
174                        Some(ConnectResult::Disconnected {
175                            reason: reason.clone(),
176                        })
177                    }
178                    _ => None,
179                },
180                timeout,
181            )
182            .map_err(Error::from)
183    }
184
185    fn disconnect(&mut self, node: NodeId) -> Result<(), Self::Error> {
186        let events = self.events();
187        self.command(service::Command::Disconnect(node))?;
188        events
189            .wait(
190                |e| match e {
191                    Event::PeerDisconnected { nid, .. } if nid == &node => Some(()),
192                    _ => None,
193                },
194                time::Duration::MAX,
195            )
196            .map_err(Error::from)
197    }
198
199    fn seeds(&mut self, id: RepoId) -> Result<Seeds, Self::Error> {
200        let (sender, receiver) = chan::bounded(1);
201        self.command(service::Command::Seeds(id, sender))?;
202        receiver.recv().map_err(Error::from)
203    }
204
205    fn config(&self) -> Result<Config, Self::Error> {
206        let (sender, receiver) = chan::bounded(1);
207        self.command(service::Command::Config(sender))?;
208        receiver.recv().map_err(Error::from)
209    }
210
211    fn listen_addrs(&self) -> Result<Vec<net::SocketAddr>, Self::Error> {
212        let (sender, receiver) = chan::bounded(1);
213        self.command(service::Command::ListenAddrs(sender))?;
214        receiver.recv().map_err(Error::from)
215    }
216
217    fn fetch(
218        &mut self,
219        id: RepoId,
220        from: NodeId,
221        timeout: time::Duration,
222    ) -> Result<FetchResult, Error> {
223        let (sender, receiver) = chan::bounded(1);
224        self.command(service::Command::Fetch(id, from, timeout, sender))?;
225        receiver.recv().map_err(Error::from)
226    }
227
228    fn follow(&mut self, id: NodeId, alias: Option<Alias>) -> Result<bool, Error> {
229        let (sender, receiver) = chan::bounded(1);
230        self.command(service::Command::Follow(id, alias, sender))?;
231        receiver.recv().map_err(Error::from)
232    }
233
234    fn unfollow(&mut self, id: NodeId) -> Result<bool, Error> {
235        let (sender, receiver) = chan::bounded(1);
236        self.command(service::Command::Unfollow(id, sender))?;
237        receiver.recv().map_err(Error::from)
238    }
239
240    fn seed(&mut self, id: RepoId, scope: policy::Scope) -> Result<bool, Error> {
241        let (sender, receiver) = chan::bounded(1);
242        self.command(service::Command::Seed(id, scope, sender))?;
243        receiver.recv().map_err(Error::from)
244    }
245
246    fn unseed(&mut self, id: RepoId) -> Result<bool, Error> {
247        let (sender, receiver) = chan::bounded(1);
248        self.command(service::Command::Unseed(id, sender))?;
249        receiver.recv().map_err(Error::from)
250    }
251
252    fn announce_refs(&mut self, id: RepoId) -> Result<RefsAt, Error> {
253        let (sender, receiver) = chan::bounded(1);
254        self.command(service::Command::AnnounceRefs(id, sender))?;
255        receiver.recv().map_err(Error::from)
256    }
257
258    fn announce_inventory(&mut self) -> Result<(), Error> {
259        self.command(service::Command::AnnounceInventory)
260            .map_err(Error::from)
261    }
262
263    fn add_inventory(&mut self, rid: RepoId) -> Result<bool, Error> {
264        let (sender, receiver) = chan::bounded(1);
265        self.command(service::Command::AddInventory(rid, sender))?;
266        receiver.recv().map_err(Error::from)
267    }
268
269    fn subscribe(&self, _timeout: time::Duration) -> Result<Self::Events, Self::Error> {
270        Ok(self.events())
271    }
272
273    fn sessions(&self) -> Result<Self::Sessions, Error> {
274        let (sender, receiver) = chan::unbounded();
275        let query: Arc<QueryState> = Arc::new(move |state| {
276            let sessions = state
277                .sessions()
278                .iter()
279                .map(|(nid, s)| radicle::node::Session {
280                    nid: *nid,
281                    link: if s.link.is_inbound() {
282                        Link::Inbound
283                    } else {
284                        Link::Outbound
285                    },
286                    addr: s.addr.clone(),
287                    state: s.state.clone(),
288                })
289                .collect();
290            sender.send(sessions).ok();
291
292            Ok(())
293        });
294        let (err_sender, err_receiver) = chan::bounded(1);
295        self.command(service::Command::QueryState(query, err_sender))?;
296        err_receiver.recv()??;
297
298        let sessions = receiver.recv()?;
299
300        Ok(sessions)
301    }
302
303    fn shutdown(self) -> Result<(), Error> {
304        // If the current value is `false`, set it to `true`, otherwise error.
305        if self
306            .shutdown
307            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
308            .is_err()
309        {
310            return Ok(());
311        }
312        // Send a shutdown request to our own control socket. This is the only way to kill the
313        // control thread gracefully. Since the control thread may have called this function,
314        // the control socket may already be disconnected. Ignore errors.
315        UnixStream::connect(self.home.socket())
316            .and_then(|sock| Command::Shutdown.to_writer(sock))
317            .ok();
318
319        self.controller
320            .shutdown()
321            .map_err(|_| Error::ChannelDisconnected)
322    }
323
324    fn debug(&self) -> Result<serde_json::Value, Self::Error> {
325        let (sender, receiver) = chan::bounded(1);
326        let query: Arc<QueryState> = Arc::new(move |state| {
327            let debug = serde_json::json!({
328                "outboxSize": state.outbox().len(),
329                "fetching": state.fetching().iter().map(|(rid, state)| {
330                    json!({
331                        "rid": rid,
332                        "from": state.from,
333                        "refsAt": state.refs_at,
334                        "subscribers": state.subscribers.len(),
335                    })
336                }).collect::<Vec<_>>(),
337                "queue": state.sessions().values().map(|sess| {
338                    json!({
339                        "nid": sess.id,
340                        "queue": sess.queue.iter().map(|fetch| {
341                            json!({
342                                "rid": fetch.rid,
343                                "from": fetch.from,
344                                "refsAt": fetch.refs_at,
345                            })
346                        }).collect::<Vec<_>>()
347                    })
348                }).collect::<Vec<_>>(),
349                "rateLimiter": state.limiter().buckets.iter().map(|(host, bucket)| {
350                    json!({
351                        "host": host.to_string(),
352                        "bucket": bucket
353                    })
354                }).collect::<Vec<_>>(),
355                "events": json!({
356                    "subscribers": state.emitter().subscriptions(),
357                    "pending": state.emitter().pending(),
358                }),
359                "metrics": state.metrics(),
360            });
361            sender.send(debug).ok();
362
363            Ok(())
364        });
365        let (err_sender, err_receiver) = chan::bounded(1);
366        self.command(service::Command::QueryState(query, err_sender))?;
367        err_receiver.recv()??;
368
369        let debug = receiver.recv()?;
370
371        Ok(debug)
372    }
373}