Skip to main content

radicle_node/
runtime.rs

1pub mod handle;
2pub mod thread;
3
4use std::fmt::Debug;
5use std::path::PathBuf;
6use std::str::FromStr as _;
7use std::{fs, io, net};
8
9#[cfg(unix)]
10use std::os::unix::net::UnixListener;
11#[cfg(windows)]
12use uds_windows::UnixListener;
13
14use crossbeam_channel as chan;
15use cyphernet::Ecdh;
16use radicle::cob::migrate;
17use radicle::crypto;
18use radicle::node::device::Device;
19use radicle_signals::Signal;
20use thiserror::Error;
21
22use radicle::node;
23use radicle::node::Event;
24use radicle::node::UserAgent;
25use radicle::node::address;
26use radicle::node::address::Store as _;
27use radicle::node::notifications;
28use radicle::node::policy::config as policy;
29use radicle::profile::Home;
30use radicle::{Storage, cob, git, storage};
31
32use crate::control;
33use crate::node::{NodeId, routing};
34use crate::reactor;
35use crate::reactor::Reactor;
36use crate::service::gossip;
37use crate::wire::Wire;
38use crate::worker;
39use crate::{LocalTime, service};
40
41pub use handle::Error as HandleError;
42pub use handle::Handle;
43pub use node::events::Emitter;
44
45/// Maximum pending worker tasks allowed.
46pub const MAX_PENDING_TASKS: usize = 1024;
47
48/// A client error.
49#[derive(Error, Debug)]
50pub enum Error {
51    /// A routing database error.
52    #[error("routing database error: {0}")]
53    Routing(#[from] routing::Error),
54    /// A cobs cache database error.
55    #[error("cobs cache database error: {0}")]
56    CobsCache(#[from] cob::cache::Error),
57    /// A node database error.
58    #[error("node database error: {0}")]
59    Database(#[from] node::db::Error),
60    /// A storage error.
61    #[error("storage error: {0}")]
62    Storage(#[from] storage::Error),
63    /// A policies database error.
64    #[error("policies database error: {0}")]
65    Policy(#[from] policy::Error),
66    /// A notifications database error.
67    #[error("notifications database error: {0}")]
68    Notifications(#[from] notifications::Error),
69    /// A gossip database error.
70    #[error("gossip database error: {0}")]
71    Gossip(#[from] gossip::Error),
72    /// An address database error.
73    #[error("address database error: {0}")]
74    Address(#[from] address::Error),
75    /// A service error.
76    #[error("service error: {0}")]
77    Service(Box<service::Error>),
78    /// An I/O error.
79    #[error("i/o error: {0}")]
80    Io(#[from] io::Error),
81    /// A control socket error.
82    #[error("control socket error: {0}")]
83    Control(#[from] control::Error),
84    /// Another node is already running.
85    #[error(
86        "another node appears to be running; \
87        if this isn't the case, delete the socket file at '{0}' \
88        and restart the node"
89    )]
90    AlreadyRunning(PathBuf),
91    /// A git version error.
92    #[error("git version error: {0}")]
93    GitVersion(#[from] git::VersionError),
94}
95
96impl From<service::Error> for Error {
97    fn from(e: service::Error) -> Self {
98        Self::Service(Box::new(e))
99    }
100}
101
102/// Wraps a [`UnixListener`] but tracks its origin.
103pub enum ControlSocket {
104    /// The listener was created by binding to it.
105    Bound(UnixListener, PathBuf),
106    /// The listener was received via socket activation.
107    Received(UnixListener),
108}
109
110/// Holds join handles to the client threads, as well as a client handle.
111pub struct Runtime {
112    pub id: NodeId,
113    pub home: Home,
114    pub control: ControlSocket,
115    pub handle: Handle,
116    pub storage: Storage,
117    pub reactor: Reactor,
118    pub pool: worker::Pool,
119    pub local_addrs: Vec<net::SocketAddr>,
120    pub signals: chan::Receiver<Signal>,
121}
122
123impl Runtime {
124    /// Initialize the runtime.
125    ///
126    /// This function spawns threads.
127    pub fn init<G>(
128        home: Home,
129        config: radicle::node::Config,
130        socket: PathBuf,
131        listen: Vec<net::SocketAddr>,
132        signals: chan::Receiver<Signal>,
133        signer: Device<G>,
134    ) -> Result<Runtime, Error>
135    where
136        G: crypto::signature::Signer<crypto::Signature>
137            + Ecdh<Pk = NodeId>
138            + Clone
139            + Debug
140            + 'static,
141    {
142        let id = *signer.public_key();
143        let alias = config.alias.clone();
144        let network = config.network;
145        let rng = fastrand::Rng::new();
146        let clock = LocalTime::now();
147        let timestamp = clock.into();
148        let storage = Storage::open(home.storage(), git::UserInfo { alias, key: id })?;
149        let policy = config.seeding_policy.into();
150
151        for (key, _) in &config.extra {
152            log::warn!(target: "node", "Unused or deprecated configuration attribute {key:?}");
153        }
154
155        log::info!(target: "node", "Opening policy database..");
156        let policies = home.policies_mut()?;
157        let policies = policy::Config::new(policy, policies);
158        let notifications = home.notifications_mut()?;
159        let mut cobs_cache = cob::cache::Store::open(home.cobs().join(cob::cache::COBS_DB_FILE))?;
160
161        match cobs_cache.check_version() {
162            Ok(()) => {}
163            Err(cob::cache::Error::OutOfDate) => {
164                log::info!(target: "node", "Migrating COBs cache..");
165                let version = cobs_cache.migrate(migrate::log)?;
166                log::info!(target: "node", "Migration of COBs cache complete (version={version})..");
167            }
168            Err(e) => return Err(e.into()),
169        }
170
171        log::info!(target: "node", "Default seeding policy set to '{}'", &policy);
172        log::info!(target: "node", "Initializing service ({network:?})..");
173
174        let announcement = service::gossip::node(&config, timestamp)
175            .solve(Default::default())
176            .expect("Runtime::init: unable to solve proof-of-work puzzle");
177
178        log::info!(target: "node", "Opening node database..");
179        let db = home.database_mut(config.database)?.init(
180            &id,
181            announcement.features,
182            &announcement.alias,
183            &announcement.agent,
184            announcement.timestamp,
185            announcement.addresses.iter(),
186        )?;
187        let mut stores: service::Stores<_> = db.clone().into();
188
189        if config.connect.is_empty() && stores.addresses().is_empty()? {
190            log::info!(target: "node", "Address book is empty. Adding bootstrap nodes..");
191
192            for (alias, version, addrs) in config.network.bootstrap() {
193                for addr in addrs {
194                    let (id, addr) = addr.into();
195
196                    stores.addresses_mut().insert(
197                        &id,
198                        version,
199                        radicle::node::Features::SEED,
200                        &alias,
201                        0,
202                        &UserAgent::from_str("/radicle/runtime/bootstrap/")
203                            .expect("valid user agent"),
204                        clock.into(),
205                        [node::KnownAddress::new(addr, address::Source::Bootstrap)],
206                    )?;
207                }
208            }
209            log::info!(target: "node", "{} nodes added to address book", stores.addresses().len()?);
210        }
211
212        let emitter: Emitter<Event> = Default::default();
213        let mut service = service::Service::new(
214            config.clone(),
215            stores,
216            storage.clone(),
217            policies,
218            signer.clone(),
219            rng,
220            announcement,
221            emitter.clone(),
222        );
223        service.initialize(clock)?;
224
225        let (worker_send, worker_recv) = chan::bounded::<worker::Task>(MAX_PENDING_TASKS);
226        let mut wire = Wire::new(service, worker_send, signer.clone());
227        let mut local_addrs = Vec::new();
228
229        for addr in listen {
230            let listener = reactor::Listener::bind(addr)?;
231            let local_addr = listener.local_addr();
232
233            local_addrs.push(local_addr);
234            wire.listen(listener);
235        }
236        let reactor = Reactor::new(wire, thread::name(&id, "service"))?;
237        let handle = Handle::new(home.clone(), socket.clone(), reactor.controller(), emitter);
238
239        let nid = *signer.public_key();
240        let fetch = worker::FetchConfig {
241            local: nid,
242            expiry: worker::garbage::Expiry::default(),
243        };
244        let pool = worker::Pool::with(
245            worker_recv,
246            nid,
247            handle.clone(),
248            notifications,
249            cobs_cache,
250            db,
251            worker::Config {
252                capacity: config.workers.into(),
253                storage: storage.clone(),
254                fetch,
255                policy,
256                policies_db: home.node().join(node::POLICIES_DB_FILE),
257            },
258        )?;
259        let control = Self::bind(socket)?;
260
261        Ok(Runtime {
262            id,
263            home,
264            control,
265            storage,
266            reactor,
267            handle,
268            pool,
269            signals,
270            local_addrs,
271        })
272    }
273
274    pub fn run(self) -> Result<(), Error> {
275        let home = self.home;
276        let (listener, remove) = match self.control {
277            ControlSocket::Bound(listener, path) => (listener, Some(path)),
278            ControlSocket::Received(listener) => (listener, None),
279        };
280
281        log::info!(target: "node", "Running node {} in {}..", self.id, home.path().display());
282
283        thread::spawn(&self.id, "control", {
284            let handle = self.handle.clone();
285            || control::listen(listener, handle)
286        });
287
288        let _signals = thread::spawn(&self.id, "signals", move || {
289            loop {
290                use radicle::node::Handle as _;
291
292                match self.signals.recv() {
293                    Ok(Signal::Terminate | Signal::Interrupt) => {
294                        log::info!(target: "node", "Termination signal received; shutting down..");
295                        self.handle.shutdown().ok();
296                        break;
297                    }
298                    Ok(Signal::Hangup) => {
299                        log::debug!(target: "node", "Hangup signal (SIGHUP) received; ignoring..");
300                    }
301                    Ok(Signal::WindowChanged) => {}
302                    Err(e) => {
303                        log::warn!(target: "node", "Signal notifications channel error: {e}");
304                        break;
305                    }
306                }
307            }
308        });
309
310        self.pool.run().unwrap();
311        self.reactor.join().unwrap();
312
313        // Nb. We don't join the control thread here, as we have no way of notifying it that the
314        // node is shutting down.
315
316        // Remove control socket file, but don't freak out if it's not there anymore.
317        remove.map(|path| fs::remove_file(path).ok());
318
319        log::debug!(target: "node", "Node shutdown completed for {}", self.id);
320
321        Ok(())
322    }
323
324    #[cfg(all(feature = "socket2", feature = "systemd", target_os = "linux"))]
325    fn receive_listener() -> Option<UnixListener> {
326        // SAFETY: When `fd` is called, no other threads are spawned (yet).
327        let fd = match unsafe { radicle_systemd::listen::fd("control") } {
328            Ok(Some(fd)) => fd,
329            Ok(None) => return None,
330            Err(err) => {
331                log::error!(target: "node", "Error receiving listener from systemd: {err}");
332                return None;
333            }
334        };
335
336        let socket: socket2::Socket = unsafe { std::os::fd::FromRawFd::from_raw_fd(fd) };
337
338        let domain = match socket.domain() {
339            Ok(domain) => domain,
340            Err(err) => {
341                log::error!(target: "node", "Error receiving listener from systemd when inspecting domain of socket: {err}");
342                return None;
343            }
344        };
345
346        if domain != socket2::Domain::UNIX {
347            log::error!(target: "node", "Dropping listener received from systemd: Domain is not AF_UNIX.");
348            return None;
349        }
350
351        Some(UnixListener::from(socket))
352    }
353
354    fn bind(path: PathBuf) -> Result<ControlSocket, Error> {
355        #[cfg(all(feature = "socket2", feature = "systemd", target_os = "linux"))]
356        {
357            if let Some(listener) = Self::receive_listener() {
358                log::info!(target: "node", "Received control socket.");
359                return Ok(ControlSocket::Received(listener));
360            }
361        }
362
363        log::info!(target: "node", "Binding control socket {}..", &path.display());
364        match UnixListener::bind(&path) {
365            Ok(sock) => Ok(ControlSocket::Bound(sock, path)),
366            Err(err) if err.kind() == io::ErrorKind::AddrInUse => Err(Error::AlreadyRunning(path)),
367            Err(err) => Err(err.into()),
368        }
369    }
370}