radicle_node/
runtime.rs

1pub mod handle;
2pub mod thread;
3
4use std::os::unix::net::UnixListener;
5use std::path::PathBuf;
6use std::{fs, io, net};
7
8use crossbeam_channel as chan;
9use cyphernet::Ecdh;
10use netservices::resource::NetAccept;
11use radicle_fetch::FetchLimit;
12use radicle_signals::Signal;
13use reactor::poller::popol;
14use reactor::Reactor;
15use thiserror::Error;
16
17use radicle::node;
18use radicle::node::address;
19use radicle::node::address::Store as _;
20use radicle::node::notifications;
21use radicle::node::Handle as _;
22use radicle::node::UserAgent;
23use radicle::profile::Home;
24use radicle::{cob, git, storage, Storage};
25
26use crate::control;
27use crate::crypto::Signer;
28use crate::node::{routing, NodeId};
29use crate::service::message::NodeAnnouncement;
30use crate::service::{gossip, policy, Event, INITIAL_SUBSCRIBE_BACKLOG_DELTA};
31use crate::wire;
32use crate::wire::{Decode, Wire};
33use crate::worker;
34use crate::{service, LocalTime};
35
36pub use handle::Error as HandleError;
37pub use handle::Handle;
38pub use node::events::Emitter;
39
40/// Maximum pending worker tasks allowed.
41pub const MAX_PENDING_TASKS: usize = 1024;
42
43/// A client error.
44#[derive(Error, Debug)]
45pub enum Error {
46    /// A routing database error.
47    #[error("routing database error: {0}")]
48    Routing(#[from] routing::Error),
49    /// A cobs cache database error.
50    #[error("cobs cache database error: {0}")]
51    CobsCache(#[from] cob::cache::Error),
52    /// A node database error.
53    #[error("node database error: {0}")]
54    Database(#[from] node::db::Error),
55    /// A storage error.
56    #[error("storage error: {0}")]
57    Storage(#[from] storage::Error),
58    /// A policies database error.
59    #[error("policies database error: {0}")]
60    Policy(#[from] policy::Error),
61    /// A notifications database error.
62    #[error("notifications database error: {0}")]
63    Notifications(#[from] notifications::Error),
64    /// A gossip database error.
65    #[error("gossip database error: {0}")]
66    Gossip(#[from] gossip::Error),
67    /// An address database error.
68    #[error("address database error: {0}")]
69    Address(#[from] address::Error),
70    /// A service error.
71    #[error("service error: {0}")]
72    Service(#[from] service::Error),
73    /// An I/O error.
74    #[error("i/o error: {0}")]
75    Io(#[from] io::Error),
76    /// A control socket error.
77    #[error("control socket error: {0}")]
78    Control(#[from] control::Error),
79    /// Another node is already running.
80    #[error(
81        "another node appears to be running; \
82        if this isn't the case, delete the socket file at '{0}' \
83        and restart the node"
84    )]
85    AlreadyRunning(PathBuf),
86    /// A git version error.
87    #[error("git version error: {0}")]
88    GitVersion(#[from] git::VersionError),
89}
90
91/// Wraps a [`UnixListener`] but tracks its origin.
92pub enum ControlSocket {
93    /// The listener was created by binding to it.
94    Bound(UnixListener, PathBuf),
95    /// The listener was received via socket activation.
96    Received(UnixListener),
97}
98
99/// Holds join handles to the client threads, as well as a client handle.
100pub struct Runtime {
101    pub id: NodeId,
102    pub home: Home,
103    pub control: ControlSocket,
104    pub handle: Handle,
105    pub storage: Storage,
106    pub reactor: Reactor<wire::Control, popol::Poller>,
107    pub pool: worker::Pool,
108    pub local_addrs: Vec<net::SocketAddr>,
109    pub signals: chan::Receiver<Signal>,
110}
111
112impl Runtime {
113    /// Initialize the runtime.
114    ///
115    /// This function spawns threads.
116    pub fn init<G>(
117        home: Home,
118        config: service::Config,
119        listen: Vec<net::SocketAddr>,
120        signals: chan::Receiver<Signal>,
121        signer: G,
122    ) -> Result<Runtime, Error>
123    where
124        G: Signer + Ecdh<Pk = NodeId> + Clone + 'static,
125    {
126        let id = *signer.public_key();
127        let alias = config.alias.clone();
128        let node_dir = home.node();
129        let network = config.network;
130        let rng = fastrand::Rng::new();
131        let clock = LocalTime::now();
132        let timestamp = clock.into();
133        let storage = Storage::open(home.storage(), git::UserInfo { alias, key: id })?;
134        let policy = config.seeding_policy.into();
135
136        for (key, _) in &config.extra {
137            log::warn!(target: "node", "Unused or deprecated configuration attribute {:?}", key);
138        }
139
140        log::info!(target: "node", "Opening policy database..");
141        let policies = home.policies_mut()?;
142        let policies = policy::Config::new(policy, policies);
143        let notifications = home.notifications_mut()?;
144        let cobs_cache = cob::cache::Store::open(home.cobs().join(cob::cache::COBS_DB_FILE))?;
145
146        log::info!(target: "node", "Default seeding policy set to '{}'", &policy);
147        log::info!(target: "node", "Initializing service ({:?})..", network);
148
149        let announcement = if let Some(ann) = fs::read(node_dir.join(node::NODE_ANNOUNCEMENT_FILE))
150            .ok()
151            .and_then(|ann| NodeAnnouncement::decode(&mut ann.as_slice()).ok())
152            .and_then(|ann| {
153                // If our announcement was made some time ago, the timestamp on it will be old,
154                // and it might not get gossiped to new nodes since it will be purged from caches.
155                // Therefore, we make sure it's never too old.
156                if clock - ann.timestamp.to_local_time() <= INITIAL_SUBSCRIBE_BACKLOG_DELTA {
157                    Some(ann)
158                } else {
159                    None
160                }
161            })
162            .and_then(|ann| {
163                if config.features() == ann.features
164                    && config.alias == ann.alias
165                    && config.external_addresses == ann.addresses.as_ref()
166                {
167                    Some(ann)
168                } else {
169                    None
170                }
171            }) {
172            log::info!(
173                target: "node",
174                "Loaded existing node announcement from file (timestamp={}, work={})",
175                ann.timestamp,
176                ann.work(),
177            );
178            ann
179        } else {
180            service::gossip::node(&config, timestamp)
181                .solve(Default::default())
182                .expect("Runtime::init: unable to solve proof-of-work puzzle")
183        };
184
185        log::info!(target: "node", "Opening node database..");
186        let db = home
187            .database_mut()?
188            .journal_mode(node::db::JournalMode::default())?
189            .init(
190                &id,
191                announcement.features,
192                &announcement.alias,
193                &announcement.agent,
194                announcement.timestamp,
195                announcement.addresses.iter(),
196            )?;
197        let mut stores: service::Stores<_> = db.clone().into();
198
199        if config.connect.is_empty() && stores.addresses().is_empty()? {
200            log::info!(target: "node", "Address book is empty. Adding bootstrap nodes..");
201
202            for (alias, version, addr) in config.network.bootstrap() {
203                let (id, addr) = addr.into();
204
205                stores.addresses_mut().insert(
206                    &id,
207                    version,
208                    radicle::node::Features::SEED,
209                    &alias,
210                    0,
211                    &UserAgent::default(),
212                    clock.into(),
213                    [node::KnownAddress::new(addr, address::Source::Bootstrap)],
214                )?;
215            }
216            log::info!(target: "node", "{} nodes added to address book", stores.addresses().len()?);
217        }
218
219        let emitter: Emitter<Event> = Default::default();
220        let mut service = service::Service::new(
221            config.clone(),
222            stores,
223            storage.clone(),
224            policies,
225            signer.clone(),
226            rng,
227            announcement,
228            emitter.clone(),
229        );
230        service.initialize(clock)?;
231
232        let (worker_send, worker_recv) = chan::bounded::<worker::Task>(MAX_PENDING_TASKS);
233        let mut wire = Wire::new(service, worker_send, signer.clone());
234        let mut local_addrs = Vec::new();
235
236        for addr in listen {
237            let listener = NetAccept::bind(&addr)?;
238            let local_addr = listener.local_addr();
239
240            local_addrs.push(local_addr);
241            wire.listen(listener);
242        }
243        let reactor = Reactor::named(wire, popol::Poller::new(), thread::name(&id, "service"))?;
244        let handle = Handle::new(home.clone(), reactor.controller(), emitter);
245
246        let nid = *signer.public_key();
247        let fetch = worker::FetchConfig {
248            limit: FetchLimit::default(),
249            local: nid,
250            expiry: worker::garbage::Expiry::default(),
251        };
252        let pool = worker::Pool::with(
253            worker_recv,
254            nid,
255            handle.clone(),
256            notifications,
257            cobs_cache,
258            db,
259            worker::Config {
260                capacity: config.workers,
261                storage: storage.clone(),
262                fetch,
263                policy,
264                policies_db: home.node().join(node::POLICIES_DB_FILE),
265            },
266        )?;
267        let control = Self::bind(home.socket())?;
268
269        Ok(Runtime {
270            id,
271            home,
272            control,
273            storage,
274            reactor,
275            handle,
276            pool,
277            signals,
278            local_addrs,
279        })
280    }
281
282    pub fn run(self) -> Result<(), Error> {
283        let home = self.home;
284        let (listener, remove) = match self.control {
285            ControlSocket::Bound(listener, path) => (listener, Some(path)),
286            ControlSocket::Received(listener) => (listener, None),
287        };
288
289        log::info!(target: "node", "Running node {} in {}..", self.id, home.path().display());
290
291        thread::spawn(&self.id, "control", {
292            let handle = self.handle.clone();
293            || control::listen(listener, handle)
294        });
295        let _signals = thread::spawn(&self.id, "signals", move || loop {
296            match self.signals.recv() {
297                Ok(Signal::Terminate | Signal::Interrupt) => {
298                    log::info!(target: "node", "Termination signal received; shutting down..");
299                    self.handle.shutdown().ok();
300                    break;
301                }
302                Ok(Signal::Hangup) => {
303                    log::debug!(target: "node", "Hangup signal (SIGHUP) received; ignoring..");
304                }
305                Ok(Signal::WindowChanged) => {}
306                Err(e) => {
307                    log::warn!(target: "node", "Signal notifications channel error: {e}");
308                    break;
309                }
310            }
311        });
312
313        self.pool.run().unwrap();
314        self.reactor.join().unwrap();
315
316        // Nb. We don't join the control thread here, as we have no way of notifying it that the
317        // node is shutting down.
318
319        // Remove control socket file, but don't freak out if it's not there anymore.
320        remove.map(|path| fs::remove_file(path).ok());
321
322        log::debug!(target: "node", "Node shutdown completed for {}", self.id);
323
324        Ok(())
325    }
326
327    #[cfg(all(feature = "systemd", target_family = "unix"))]
328    fn receive_listener() -> Option<UnixListener> {
329        use std::os::fd::FromRawFd;
330        match radicle_systemd::listen_fd("control") {
331            Ok(Some(fd)) => {
332                // NOTE: Here, we should make a call to [`fstat(2)`](man:fstat(2))
333                // and make sure that the file descriptor we received actually
334                // is `AF_UNIX`. However, this requires fiddling with
335                // `libc` types or another dependency like `nix`, see
336                // <https://github.com/lucab/libsystemd-rs/blob/b43fa5e3b5eca3e6aa16a6c2fad87220dc0ad7a0/src/activation.rs#L192-L196>
337                // systemd also implements such a check, see
338                // <https://github.com/systemd/systemd/blob/v254/src/libsystemd/sd-daemon/sd-daemon.c#L357-L398>
339                Some(unsafe {
340                    // SAFETY: We take ownership of this FD from systemd,
341                    // which guarantees that it is open.
342                    UnixListener::from_raw_fd(fd)
343                })
344            }
345            Ok(None) => None,
346            Err(err) => {
347                log::trace!(target: "node", "Error receiving file descriptors from systemd: {err}");
348                None
349            }
350        }
351    }
352
353    fn bind(path: PathBuf) -> Result<ControlSocket, Error> {
354        #[cfg(all(feature = "systemd", target_family = "unix"))]
355        {
356            if let Some(listener) = Self::receive_listener() {
357                log::info!(target: "node", "Received control socket.");
358                return Ok(ControlSocket::Received(listener));
359            }
360        }
361
362        log::info!(target: "node", "Binding control socket {}..", &path.display());
363        match UnixListener::bind(&path) {
364            Ok(sock) => Ok(ControlSocket::Bound(sock, path)),
365            Err(err) if err.kind() == io::ErrorKind::AddrInUse => Err(Error::AlreadyRunning(path)),
366            Err(err) => Err(err.into()),
367        }
368    }
369}