radicle_node/
runtime.rs

1pub mod handle;
2pub mod thread;
3
4use std::path::PathBuf;
5use std::{fs, io, net};
6
7#[cfg(unix)]
8use std::os::unix::net::UnixListener as Listener;
9#[cfg(windows)]
10use winpipe::WinListener as Listener;
11
12use crossbeam_channel as chan;
13use cyphernet::Ecdh;
14use netservices::resource::NetAccept;
15use radicle::cob::migrate;
16use radicle::crypto;
17use radicle::node::device::Device;
18use radicle_fetch::FetchLimit;
19use radicle_signals::Signal;
20use reactor::poller::popol;
21use reactor::Reactor;
22use thiserror::Error;
23
24use radicle::node;
25use radicle::node::address;
26use radicle::node::address::Store as _;
27use radicle::node::notifications;
28use radicle::node::policy::config as policy;
29use radicle::node::Event;
30use radicle::node::UserAgent;
31use radicle::profile::Home;
32use radicle::{cob, git, storage, Storage};
33
34use crate::control;
35use crate::node::{routing, NodeId};
36use crate::service::gossip;
37use crate::wire;
38use crate::wire::Wire;
39use crate::worker;
40use crate::{service, LocalTime};
41
42pub use handle::Error as HandleError;
43pub use handle::Handle;
44pub use node::events::Emitter;
45
46/// Maximum pending worker tasks allowed.
47pub const MAX_PENDING_TASKS: usize = 1024;
48
49/// A client error.
50#[derive(Error, Debug)]
51pub enum Error {
52    /// A routing database error.
53    #[error("routing database error: {0}")]
54    Routing(#[from] routing::Error),
55    /// A cobs cache database error.
56    #[error("cobs cache database error: {0}")]
57    CobsCache(#[from] cob::cache::Error),
58    /// A node database error.
59    #[error("node database error: {0}")]
60    Database(#[from] node::db::Error),
61    /// A storage error.
62    #[error("storage error: {0}")]
63    Storage(#[from] storage::Error),
64    /// A policies database error.
65    #[error("policies database error: {0}")]
66    Policy(#[from] policy::Error),
67    /// A notifications database error.
68    #[error("notifications database error: {0}")]
69    Notifications(#[from] notifications::Error),
70    /// A gossip database error.
71    #[error("gossip database error: {0}")]
72    Gossip(#[from] gossip::Error),
73    /// An address database error.
74    #[error("address database error: {0}")]
75    Address(#[from] address::Error),
76    /// A service error.
77    #[error("service error: {0}")]
78    Service(Box<service::Error>),
79    /// An I/O error.
80    #[error("i/o error: {0}")]
81    Io(#[from] io::Error),
82    /// A control socket error.
83    #[error("control socket error: {0}")]
84    Control(#[from] control::Error),
85    /// Another node is already running.
86    #[error(
87        "another node appears to be running; \
88        if this isn't the case, delete the socket file at '{0}' \
89        and restart the node"
90    )]
91    AlreadyRunning(PathBuf),
92    /// A git version error.
93    #[error("git version error: {0}")]
94    GitVersion(#[from] git::VersionError),
95}
96
97impl From<service::Error> for Error {
98    fn from(e: service::Error) -> Self {
99        Self::Service(Box::new(e))
100    }
101}
102
103/// Wraps a [`Listener`] but tracks its origin.
104pub enum ControlSocket {
105    /// The listener was created by binding to it.
106    Bound(Listener, PathBuf),
107    /// The listener was received via socket activation.
108    Received(Listener),
109}
110
111/// Holds join handles to the client threads, as well as a client handle.
112pub struct Runtime {
113    pub id: NodeId,
114    pub home: Home,
115    pub control: ControlSocket,
116    pub handle: Handle,
117    pub storage: Storage,
118    pub reactor: Reactor<wire::Control, popol::Poller>,
119    pub pool: worker::Pool,
120    pub local_addrs: Vec<net::SocketAddr>,
121    pub signals: chan::Receiver<Signal>,
122}
123
124impl Runtime {
125    /// Initialize the runtime.
126    ///
127    /// This function spawns threads.
128    pub fn init<G>(
129        home: Home,
130        config: radicle::node::Config,
131        listen: Vec<net::SocketAddr>,
132        signals: chan::Receiver<Signal>,
133        signer: Device<G>,
134    ) -> Result<Runtime, Error>
135    where
136        G: crypto::signature::Signer<crypto::Signature> + Ecdh<Pk = NodeId> + Clone + 'static,
137    {
138        let id = *signer.public_key();
139        let alias = config.alias.clone();
140        let network = config.network;
141        let rng = fastrand::Rng::new();
142        let clock = LocalTime::now();
143        let timestamp = clock.into();
144        let storage = Storage::open(home.storage(), git::UserInfo { alias, key: id })?;
145        let policy = config.seeding_policy.into();
146
147        for (key, _) in &config.extra {
148            log::warn!(target: "node", "Unused or deprecated configuration attribute {key:?}");
149        }
150
151        log::info!(target: "node", "Opening policy database..");
152        let policies = home.policies_mut()?;
153        let policies = policy::Config::new(policy, policies);
154        let notifications = home.notifications_mut()?;
155        let mut cobs_cache = cob::cache::Store::open(home.cobs().join(cob::cache::COBS_DB_FILE))?;
156
157        match cobs_cache.check_version() {
158            Ok(()) => {}
159            Err(cob::cache::Error::OutOfDate) => {
160                log::info!(target: "node", "Migrating COBs cache..");
161                let version = cobs_cache.migrate(migrate::log)?;
162                log::info!(target: "node", "Migration of COBs cache complete (version={version})..");
163            }
164            Err(e) => return Err(e.into()),
165        }
166
167        log::info!(target: "node", "Default seeding policy set to '{}'", &policy);
168        log::info!(target: "node", "Initializing service ({network:?})..");
169
170        let announcement = service::gossip::node(&config, timestamp)
171            .solve(Default::default())
172            .expect("Runtime::init: unable to solve proof-of-work puzzle");
173
174        log::info!(target: "node", "Opening node database..");
175        let db = home
176            .database_mut()?
177            .journal_mode(node::db::JournalMode::default())?
178            .init(
179                &id,
180                announcement.features,
181                &announcement.alias,
182                &announcement.agent,
183                announcement.timestamp,
184                announcement.addresses.iter(),
185            )?;
186        let mut stores: service::Stores<_> = db.clone().into();
187
188        if config.connect.is_empty() && stores.addresses().is_empty()? {
189            log::info!(target: "node", "Address book is empty. Adding bootstrap nodes..");
190
191            for (alias, version, addrs) in config.network.bootstrap() {
192                for addr in addrs {
193                    let (id, addr) = addr.into();
194
195                    stores.addresses_mut().insert(
196                        &id,
197                        version,
198                        radicle::node::Features::SEED,
199                        &alias,
200                        0,
201                        &UserAgent::default(),
202                        clock.into(),
203                        [node::KnownAddress::new(addr, address::Source::Bootstrap)],
204                    )?;
205                }
206            }
207            log::info!(target: "node", "{} nodes added to address book", stores.addresses().len()?);
208        }
209
210        let emitter: Emitter<Event> = Default::default();
211        let mut service = service::Service::new(
212            config.clone(),
213            stores,
214            storage.clone(),
215            policies,
216            signer.clone(),
217            rng,
218            announcement,
219            emitter.clone(),
220        );
221        service.initialize(clock)?;
222
223        let (worker_send, worker_recv) = chan::bounded::<worker::Task>(MAX_PENDING_TASKS);
224        let mut wire = Wire::new(service, worker_send, signer.clone());
225        let mut local_addrs = Vec::new();
226
227        for addr in listen {
228            let listener = NetAccept::bind(&addr)?;
229            let local_addr = listener.local_addr();
230
231            local_addrs.push(local_addr);
232            wire.listen(listener);
233        }
234        let reactor = Reactor::named(wire, popol::Poller::new(), thread::name(&id, "service"))?;
235        let handle = Handle::new(home.clone(), reactor.controller(), emitter);
236
237        let nid = *signer.public_key();
238        let fetch = worker::FetchConfig {
239            limit: FetchLimit::default(),
240            local: nid,
241            expiry: worker::garbage::Expiry::default(),
242        };
243        let pool = worker::Pool::with(
244            worker_recv,
245            nid,
246            handle.clone(),
247            notifications,
248            cobs_cache,
249            db,
250            worker::Config {
251                capacity: config.workers.into(),
252                storage: storage.clone(),
253                fetch,
254                policy,
255                policies_db: home.node().join(node::POLICIES_DB_FILE),
256            },
257        )?;
258        let control = Self::bind(home.socket())?;
259
260        Ok(Runtime {
261            id,
262            home,
263            control,
264            storage,
265            reactor,
266            handle,
267            pool,
268            signals,
269            local_addrs,
270        })
271    }
272
273    pub fn run(self) -> Result<(), Error> {
274        let home = self.home;
275        let (listener, remove) = match self.control {
276            ControlSocket::Bound(listener, path) => (listener, Some(path)),
277            ControlSocket::Received(listener) => (listener, None),
278        };
279
280        log::info!(target: "node", "Running node {} in {}..", self.id, home.path().display());
281
282        thread::spawn(&self.id, "control", {
283            let handle = self.handle.clone();
284            || control::listen(listener, handle)
285        });
286
287        #[cfg(unix)]
288        let _signals = thread::spawn(&self.id, "signals", move || loop {
289            use radicle::node::Handle as _;
290
291            match self.signals.recv() {
292                Ok(Signal::Terminate | Signal::Interrupt) => {
293                    log::info!(target: "node", "Termination signal received; shutting down..");
294                    self.handle.shutdown().ok();
295                    break;
296                }
297                Ok(Signal::Hangup) => {
298                    log::debug!(target: "node", "Hangup signal (SIGHUP) received; ignoring..");
299                }
300                Ok(Signal::WindowChanged) => {}
301                Err(e) => {
302                    log::warn!(target: "node", "Signal notifications channel error: {e}");
303                    break;
304                }
305            }
306        });
307
308        self.pool.run().unwrap();
309        self.reactor.join().unwrap();
310
311        // Nb. We don't join the control thread here, as we have no way of notifying it that the
312        // node is shutting down.
313
314        // Remove control socket file, but don't freak out if it's not there anymore.
315        remove.map(|path| fs::remove_file(path).ok());
316
317        log::debug!(target: "node", "Node shutdown completed for {}", self.id);
318
319        Ok(())
320    }
321
322    #[cfg(all(feature = "systemd", target_os = "linux"))]
323    fn receive_listener() -> Option<Listener> {
324        let fd = match radicle_systemd::listen::fd("control") {
325            Ok(Some(fd)) => fd,
326            Ok(None) => return None,
327            Err(err) => {
328                log::error!(target: "node", "Error receiving listener from systemd: {err}");
329                return None;
330            }
331        };
332
333        let socket: socket2::Socket = unsafe { std::os::fd::FromRawFd::from_raw_fd(fd) };
334
335        let domain = match socket.domain() {
336            Ok(domain) => domain,
337            Err(err) => {
338                log::error!(target: "node", "Error receiving listener from systemd when inspecting domain of socket: {err}");
339                return None;
340            }
341        };
342
343        if domain != socket2::Domain::UNIX {
344            log::error!(target: "node", "Dropping listener received from systemd: Domain is not AF_UNIX.");
345            return None;
346        }
347
348        Some(Listener::from(socket))
349    }
350
351    fn bind(path: PathBuf) -> Result<ControlSocket, Error> {
352        #[cfg(all(feature = "systemd", target_os = "linux"))]
353        {
354            if let Some(listener) = Self::receive_listener() {
355                log::info!(target: "node", "Received control socket.");
356                return Ok(ControlSocket::Received(listener));
357            }
358        }
359
360        log::info!(target: "node", "Binding control socket {}..", &path.display());
361        match Listener::bind(&path) {
362            Ok(sock) => Ok(ControlSocket::Bound(sock, path)),
363            Err(err) if err.kind() == io::ErrorKind::AddrInUse => Err(Error::AlreadyRunning(path)),
364            Err(err) => Err(err.into()),
365        }
366    }
367}