Skip to main content

radicle_node/
runtime.rs

1pub mod handle;
2pub mod thread;
3
4use std::fmt::Debug;
5use std::path::PathBuf;
6use std::{fs, io, net};
7
8#[cfg(unix)]
9use std::os::unix::net::UnixListener;
10#[cfg(windows)]
11use uds_windows::UnixListener;
12
13use crossbeam_channel as chan;
14use cyphernet::Ecdh;
15use radicle::cob::migrate;
16use radicle::crypto;
17use radicle::node::device::Device;
18use radicle_signals::Signal;
19use thiserror::Error;
20
21use radicle::node;
22use radicle::node::address;
23use radicle::node::address::Store as _;
24use radicle::node::notifications;
25use radicle::node::policy::config as policy;
26use radicle::node::Event;
27use radicle::node::UserAgent;
28use radicle::profile::Home;
29use radicle::{cob, git, storage, Storage};
30
31use crate::control;
32use crate::node::{routing, NodeId};
33use crate::reactor;
34use crate::reactor::Reactor;
35use crate::service::gossip;
36use crate::wire::Wire;
37use crate::worker;
38use crate::{service, LocalTime};
39
40pub use handle::Error as HandleError;
41pub use handle::Handle;
42pub use node::events::Emitter;
43
44/// Maximum pending worker tasks allowed.
45pub const MAX_PENDING_TASKS: usize = 1024;
46
47/// A client error.
48#[derive(Error, Debug)]
49pub enum Error {
50    /// A routing database error.
51    #[error("routing database error: {0}")]
52    Routing(#[from] routing::Error),
53    /// A cobs cache database error.
54    #[error("cobs cache database error: {0}")]
55    CobsCache(#[from] cob::cache::Error),
56    /// A node database error.
57    #[error("node database error: {0}")]
58    Database(#[from] node::db::Error),
59    /// A storage error.
60    #[error("storage error: {0}")]
61    Storage(#[from] storage::Error),
62    /// A policies database error.
63    #[error("policies database error: {0}")]
64    Policy(#[from] policy::Error),
65    /// A notifications database error.
66    #[error("notifications database error: {0}")]
67    Notifications(#[from] notifications::Error),
68    /// A gossip database error.
69    #[error("gossip database error: {0}")]
70    Gossip(#[from] gossip::Error),
71    /// An address database error.
72    #[error("address database error: {0}")]
73    Address(#[from] address::Error),
74    /// A service error.
75    #[error("service error: {0}")]
76    Service(Box<service::Error>),
77    /// An I/O error.
78    #[error("i/o error: {0}")]
79    Io(#[from] io::Error),
80    /// A control socket error.
81    #[error("control socket error: {0}")]
82    Control(#[from] control::Error),
83    /// Another node is already running.
84    #[error(
85        "another node appears to be running; \
86        if this isn't the case, delete the socket file at '{0}' \
87        and restart the node"
88    )]
89    AlreadyRunning(PathBuf),
90    /// A git version error.
91    #[error("git version error: {0}")]
92    GitVersion(#[from] git::VersionError),
93}
94
95impl From<service::Error> for Error {
96    fn from(e: service::Error) -> Self {
97        Self::Service(Box::new(e))
98    }
99}
100
101/// Wraps a [`UnixListener`] but tracks its origin.
102pub enum ControlSocket {
103    /// The listener was created by binding to it.
104    Bound(UnixListener, PathBuf),
105    /// The listener was received via socket activation.
106    Received(UnixListener),
107}
108
109/// Holds join handles to the client threads, as well as a client handle.
110pub struct Runtime {
111    pub id: NodeId,
112    pub home: Home,
113    pub control: ControlSocket,
114    pub handle: Handle,
115    pub storage: Storage,
116    pub reactor: Reactor,
117    pub pool: worker::Pool,
118    pub local_addrs: Vec<net::SocketAddr>,
119    pub signals: chan::Receiver<Signal>,
120}
121
122impl Runtime {
123    /// Initialize the runtime.
124    ///
125    /// This function spawns threads.
126    pub fn init<G>(
127        home: Home,
128        config: radicle::node::Config,
129        listen: Vec<net::SocketAddr>,
130        signals: chan::Receiver<Signal>,
131        signer: Device<G>,
132    ) -> Result<Runtime, Error>
133    where
134        G: crypto::signature::Signer<crypto::Signature>
135            + Ecdh<Pk = NodeId>
136            + Clone
137            + Debug
138            + 'static,
139    {
140        let id = *signer.public_key();
141        let alias = config.alias.clone();
142        let network = config.network;
143        let rng = fastrand::Rng::new();
144        let clock = LocalTime::now();
145        let timestamp = clock.into();
146        let storage = Storage::open(home.storage(), git::UserInfo { alias, key: id })?;
147        let policy = config.seeding_policy.into();
148
149        for (key, _) in &config.extra {
150            log::warn!(target: "node", "Unused or deprecated configuration attribute {key:?}");
151        }
152
153        log::info!(target: "node", "Opening policy database..");
154        let policies = home.policies_mut()?;
155        let policies = policy::Config::new(policy, policies);
156        let notifications = home.notifications_mut()?;
157        let mut cobs_cache = cob::cache::Store::open(home.cobs().join(cob::cache::COBS_DB_FILE))?;
158
159        match cobs_cache.check_version() {
160            Ok(()) => {}
161            Err(cob::cache::Error::OutOfDate) => {
162                log::info!(target: "node", "Migrating COBs cache..");
163                let version = cobs_cache.migrate(migrate::log)?;
164                log::info!(target: "node", "Migration of COBs cache complete (version={version})..");
165            }
166            Err(e) => return Err(e.into()),
167        }
168
169        log::info!(target: "node", "Default seeding policy set to '{}'", &policy);
170        log::info!(target: "node", "Initializing service ({network:?})..");
171
172        let announcement = service::gossip::node(&config, timestamp)
173            .solve(Default::default())
174            .expect("Runtime::init: unable to solve proof-of-work puzzle");
175
176        log::info!(target: "node", "Opening node database..");
177        let db = home.database_mut(config.database)?.init(
178            &id,
179            announcement.features,
180            &announcement.alias,
181            &announcement.agent,
182            announcement.timestamp,
183            announcement.addresses.iter(),
184        )?;
185        let mut stores: service::Stores<_> = db.clone().into();
186
187        if config.connect.is_empty() && stores.addresses().is_empty()? {
188            log::info!(target: "node", "Address book is empty. Adding bootstrap nodes..");
189
190            for (alias, version, addrs) in config.network.bootstrap() {
191                for addr in addrs {
192                    let (id, addr) = addr.into();
193
194                    stores.addresses_mut().insert(
195                        &id,
196                        version,
197                        radicle::node::Features::SEED,
198                        &alias,
199                        0,
200                        &UserAgent::default(),
201                        clock.into(),
202                        [node::KnownAddress::new(addr, address::Source::Bootstrap)],
203                    )?;
204                }
205            }
206            log::info!(target: "node", "{} nodes added to address book", stores.addresses().len()?);
207        }
208
209        let emitter: Emitter<Event> = Default::default();
210        let mut service = service::Service::new(
211            config.clone(),
212            stores,
213            storage.clone(),
214            policies,
215            signer.clone(),
216            rng,
217            announcement,
218            emitter.clone(),
219        );
220        service.initialize(clock)?;
221
222        let (worker_send, worker_recv) = chan::bounded::<worker::Task>(MAX_PENDING_TASKS);
223        let mut wire = Wire::new(service, worker_send, signer.clone());
224        let mut local_addrs = Vec::new();
225
226        for addr in listen {
227            let listener = reactor::Listener::bind(addr)?;
228            let local_addr = listener.local_addr();
229
230            local_addrs.push(local_addr);
231            wire.listen(listener);
232        }
233        let reactor = Reactor::new(wire, thread::name(&id, "service"))?;
234        let handle = Handle::new(home.clone(), reactor.controller(), emitter);
235
236        let nid = *signer.public_key();
237        let fetch = worker::FetchConfig {
238            local: nid,
239            expiry: worker::garbage::Expiry::default(),
240        };
241        let pool = worker::Pool::with(
242            worker_recv,
243            nid,
244            handle.clone(),
245            notifications,
246            cobs_cache,
247            db,
248            worker::Config {
249                capacity: config.workers.into(),
250                storage: storage.clone(),
251                fetch,
252                policy,
253                policies_db: home.node().join(node::POLICIES_DB_FILE),
254            },
255        )?;
256        let control = Self::bind(home.socket())?;
257
258        Ok(Runtime {
259            id,
260            home,
261            control,
262            storage,
263            reactor,
264            handle,
265            pool,
266            signals,
267            local_addrs,
268        })
269    }
270
271    pub fn run(self) -> Result<(), Error> {
272        let home = self.home;
273        let (listener, remove) = match self.control {
274            ControlSocket::Bound(listener, path) => (listener, Some(path)),
275            ControlSocket::Received(listener) => (listener, None),
276        };
277
278        log::info!(target: "node", "Running node {} in {}..", self.id, home.path().display());
279
280        thread::spawn(&self.id, "control", {
281            let handle = self.handle.clone();
282            || control::listen(listener, handle)
283        });
284
285        let _signals = thread::spawn(&self.id, "signals", move || loop {
286            use radicle::node::Handle as _;
287
288            match self.signals.recv() {
289                Ok(Signal::Terminate | Signal::Interrupt) => {
290                    log::info!(target: "node", "Termination signal received; shutting down..");
291                    self.handle.shutdown().ok();
292                    break;
293                }
294                Ok(Signal::Hangup) => {
295                    log::debug!(target: "node", "Hangup signal (SIGHUP) received; ignoring..");
296                }
297                Ok(Signal::WindowChanged) => {}
298                Err(e) => {
299                    log::warn!(target: "node", "Signal notifications channel error: {e}");
300                    break;
301                }
302            }
303        });
304
305        self.pool.run().unwrap();
306        self.reactor.join().unwrap();
307
308        // Nb. We don't join the control thread here, as we have no way of notifying it that the
309        // node is shutting down.
310
311        // Remove control socket file, but don't freak out if it's not there anymore.
312        remove.map(|path| fs::remove_file(path).ok());
313
314        log::debug!(target: "node", "Node shutdown completed for {}", self.id);
315
316        Ok(())
317    }
318
319    #[cfg(all(feature = "systemd", target_os = "linux"))]
320    fn receive_listener() -> Option<UnixListener> {
321        let fd = match radicle_systemd::listen::fd("control") {
322            Ok(Some(fd)) => fd,
323            Ok(None) => return None,
324            Err(err) => {
325                log::error!(target: "node", "Error receiving listener from systemd: {err}");
326                return None;
327            }
328        };
329
330        let socket: socket2::Socket = unsafe { std::os::fd::FromRawFd::from_raw_fd(fd) };
331
332        let domain = match socket.domain() {
333            Ok(domain) => domain,
334            Err(err) => {
335                log::error!(target: "node", "Error receiving listener from systemd when inspecting domain of socket: {err}");
336                return None;
337            }
338        };
339
340        if domain != socket2::Domain::UNIX {
341            log::error!(target: "node", "Dropping listener received from systemd: Domain is not AF_UNIX.");
342            return None;
343        }
344
345        Some(UnixListener::from(socket))
346    }
347
348    fn bind(path: PathBuf) -> Result<ControlSocket, Error> {
349        #[cfg(all(feature = "systemd", target_os = "linux"))]
350        {
351            if let Some(listener) = Self::receive_listener() {
352                log::info!(target: "node", "Received control socket.");
353                return Ok(ControlSocket::Received(listener));
354            }
355        }
356
357        log::info!(target: "node", "Binding control socket {}..", &path.display());
358        match UnixListener::bind(&path) {
359            Ok(sock) => Ok(ControlSocket::Bound(sock, path)),
360            Err(err) if err.kind() == io::ErrorKind::AddrInUse => Err(Error::AlreadyRunning(path)),
361            Err(err) => Err(err.into()),
362        }
363    }
364}