radicle_node/
runtime.rs

1pub mod handle;
2pub mod thread;
3
4use std::os::unix::net::UnixListener;
5use std::path::PathBuf;
6use std::{fs, io, net};
7
8use crossbeam_channel as chan;
9use cyphernet::Ecdh;
10use netservices::resource::NetAccept;
11use radicle::cob::migrate;
12use radicle::crypto;
13use radicle::node::device::Device;
14use radicle_fetch::FetchLimit;
15use radicle_signals::Signal;
16use reactor::poller::popol;
17use reactor::Reactor;
18use thiserror::Error;
19
20use radicle::node;
21use radicle::node::address;
22use radicle::node::address::Store as _;
23use radicle::node::notifications;
24use radicle::node::Handle as _;
25use radicle::node::UserAgent;
26use radicle::profile::Home;
27use radicle::{cob, git, storage, Storage};
28
29use crate::control;
30use crate::node::{routing, NodeId};
31use crate::service::message::NodeAnnouncement;
32use crate::service::{gossip, policy, Event, INITIAL_SUBSCRIBE_BACKLOG_DELTA};
33use crate::wire;
34use crate::wire::{Decode, Wire};
35use crate::worker;
36use crate::{service, LocalTime};
37
38pub use handle::Error as HandleError;
39pub use handle::Handle;
40pub use node::events::Emitter;
41
42/// Maximum pending worker tasks allowed.
43pub const MAX_PENDING_TASKS: usize = 1024;
44
45/// A client error.
46#[derive(Error, Debug)]
47pub enum Error {
48    /// A routing database error.
49    #[error("routing database error: {0}")]
50    Routing(#[from] routing::Error),
51    /// A cobs cache database error.
52    #[error("cobs cache database error: {0}")]
53    CobsCache(#[from] cob::cache::Error),
54    /// A node database error.
55    #[error("node database error: {0}")]
56    Database(#[from] node::db::Error),
57    /// A storage error.
58    #[error("storage error: {0}")]
59    Storage(#[from] storage::Error),
60    /// A policies database error.
61    #[error("policies database error: {0}")]
62    Policy(#[from] policy::Error),
63    /// A notifications database error.
64    #[error("notifications database error: {0}")]
65    Notifications(#[from] notifications::Error),
66    /// A gossip database error.
67    #[error("gossip database error: {0}")]
68    Gossip(#[from] gossip::Error),
69    /// An address database error.
70    #[error("address database error: {0}")]
71    Address(#[from] address::Error),
72    /// A service error.
73    #[error("service error: {0}")]
74    Service(#[from] service::Error),
75    /// An I/O error.
76    #[error("i/o error: {0}")]
77    Io(#[from] io::Error),
78    /// A control socket error.
79    #[error("control socket error: {0}")]
80    Control(#[from] control::Error),
81    /// Another node is already running.
82    #[error(
83        "another node appears to be running; \
84        if this isn't the case, delete the socket file at '{0}' \
85        and restart the node"
86    )]
87    AlreadyRunning(PathBuf),
88    /// A git version error.
89    #[error("git version error: {0}")]
90    GitVersion(#[from] git::VersionError),
91}
92
93/// Wraps a [`UnixListener`] but tracks its origin.
94pub enum ControlSocket {
95    /// The listener was created by binding to it.
96    Bound(UnixListener, PathBuf),
97    /// The listener was received via socket activation.
98    Received(UnixListener),
99}
100
101/// Holds join handles to the client threads, as well as a client handle.
102pub struct Runtime {
103    pub id: NodeId,
104    pub home: Home,
105    pub control: ControlSocket,
106    pub handle: Handle,
107    pub storage: Storage,
108    pub reactor: Reactor<wire::Control, popol::Poller>,
109    pub pool: worker::Pool,
110    pub local_addrs: Vec<net::SocketAddr>,
111    pub signals: chan::Receiver<Signal>,
112}
113
114impl Runtime {
115    /// Initialize the runtime.
116    ///
117    /// This function spawns threads.
118    pub fn init<G>(
119        home: Home,
120        config: service::Config,
121        listen: Vec<net::SocketAddr>,
122        signals: chan::Receiver<Signal>,
123        signer: Device<G>,
124    ) -> Result<Runtime, Error>
125    where
126        G: crypto::signature::Signer<crypto::Signature> + Ecdh<Pk = NodeId> + Clone + 'static,
127    {
128        let id = *signer.public_key();
129        let alias = config.alias.clone();
130        let node_dir = home.node();
131        let network = config.network;
132        let rng = fastrand::Rng::new();
133        let clock = LocalTime::now();
134        let timestamp = clock.into();
135        let storage = Storage::open(home.storage(), git::UserInfo { alias, key: id })?;
136        let policy = config.seeding_policy.into();
137
138        for (key, _) in &config.extra {
139            log::warn!(target: "node", "Unused or deprecated configuration attribute {:?}", key);
140        }
141
142        log::info!(target: "node", "Opening policy database..");
143        let policies = home.policies_mut()?;
144        let policies = policy::Config::new(policy, policies);
145        let notifications = home.notifications_mut()?;
146        let mut cobs_cache = cob::cache::Store::open(home.cobs().join(cob::cache::COBS_DB_FILE))?;
147
148        match cobs_cache.check_version() {
149            Ok(()) => {}
150            Err(cob::cache::Error::OutOfDate) => {
151                log::info!(target: "node", "Migrating COBs cache..");
152                let version = cobs_cache.migrate(migrate::log)?;
153                log::info!(target: "node", "Migration of COBs cache complete (version={version})..");
154            }
155            Err(e) => return Err(e.into()),
156        }
157
158        log::info!(target: "node", "Default seeding policy set to '{}'", &policy);
159        log::info!(target: "node", "Initializing service ({:?})..", network);
160
161        let announcement = if let Some(ann) = fs::read(node_dir.join(node::NODE_ANNOUNCEMENT_FILE))
162            .ok()
163            .and_then(|ann| NodeAnnouncement::decode(&mut ann.as_slice()).ok())
164            .and_then(|ann| {
165                // If our announcement was made some time ago, the timestamp on it will be old,
166                // and it might not get gossiped to new nodes since it will be purged from caches.
167                // Therefore, we make sure it's never too old.
168                if clock - ann.timestamp.to_local_time() <= INITIAL_SUBSCRIBE_BACKLOG_DELTA {
169                    Some(ann)
170                } else {
171                    None
172                }
173            })
174            .and_then(|ann| {
175                if config.features() == ann.features
176                    && config.alias == ann.alias
177                    && config.external_addresses == ann.addresses.as_ref()
178                {
179                    Some(ann)
180                } else {
181                    None
182                }
183            }) {
184            log::info!(
185                target: "node",
186                "Loaded existing node announcement from file (timestamp={}, work={})",
187                ann.timestamp,
188                ann.work(),
189            );
190            ann
191        } else {
192            service::gossip::node(&config, timestamp)
193                .solve(Default::default())
194                .expect("Runtime::init: unable to solve proof-of-work puzzle")
195        };
196
197        log::info!(target: "node", "Opening node database..");
198        let db = home
199            .database_mut()?
200            .journal_mode(node::db::JournalMode::default())?
201            .init(
202                &id,
203                announcement.features,
204                &announcement.alias,
205                &announcement.agent,
206                announcement.timestamp,
207                announcement.addresses.iter(),
208            )?;
209        let mut stores: service::Stores<_> = db.clone().into();
210
211        if config.connect.is_empty() && stores.addresses().is_empty()? {
212            log::info!(target: "node", "Address book is empty. Adding bootstrap nodes..");
213
214            for (alias, version, addr) in config.network.bootstrap() {
215                let (id, addr) = addr.into();
216
217                stores.addresses_mut().insert(
218                    &id,
219                    version,
220                    radicle::node::Features::SEED,
221                    &alias,
222                    0,
223                    &UserAgent::default(),
224                    clock.into(),
225                    [node::KnownAddress::new(addr, address::Source::Bootstrap)],
226                )?;
227            }
228            log::info!(target: "node", "{} nodes added to address book", stores.addresses().len()?);
229        }
230
231        let emitter: Emitter<Event> = Default::default();
232        let mut service = service::Service::new(
233            config.clone(),
234            stores,
235            storage.clone(),
236            policies,
237            signer.clone(),
238            rng,
239            announcement,
240            emitter.clone(),
241        );
242        service.initialize(clock)?;
243
244        let (worker_send, worker_recv) = chan::bounded::<worker::Task>(MAX_PENDING_TASKS);
245        let mut wire = Wire::new(service, worker_send, signer.clone());
246        let mut local_addrs = Vec::new();
247
248        for addr in listen {
249            let listener = NetAccept::bind(&addr)?;
250            let local_addr = listener.local_addr();
251
252            local_addrs.push(local_addr);
253            wire.listen(listener);
254        }
255        let reactor = Reactor::named(wire, popol::Poller::new(), thread::name(&id, "service"))?;
256        let handle = Handle::new(home.clone(), reactor.controller(), emitter);
257
258        let nid = *signer.public_key();
259        let fetch = worker::FetchConfig {
260            limit: FetchLimit::default(),
261            local: nid,
262            expiry: worker::garbage::Expiry::default(),
263        };
264        let pool = worker::Pool::with(
265            worker_recv,
266            nid,
267            handle.clone(),
268            notifications,
269            cobs_cache,
270            db,
271            worker::Config {
272                capacity: config.workers,
273                storage: storage.clone(),
274                fetch,
275                policy,
276                policies_db: home.node().join(node::POLICIES_DB_FILE),
277            },
278        )?;
279        let control = Self::bind(home.socket())?;
280
281        Ok(Runtime {
282            id,
283            home,
284            control,
285            storage,
286            reactor,
287            handle,
288            pool,
289            signals,
290            local_addrs,
291        })
292    }
293
294    pub fn run(self) -> Result<(), Error> {
295        let home = self.home;
296        let (listener, remove) = match self.control {
297            ControlSocket::Bound(listener, path) => (listener, Some(path)),
298            ControlSocket::Received(listener) => (listener, None),
299        };
300
301        log::info!(target: "node", "Running node {} in {}..", self.id, home.path().display());
302
303        thread::spawn(&self.id, "control", {
304            let handle = self.handle.clone();
305            || control::listen(listener, handle)
306        });
307        let _signals = thread::spawn(&self.id, "signals", move || loop {
308            match self.signals.recv() {
309                Ok(Signal::Terminate | Signal::Interrupt) => {
310                    log::info!(target: "node", "Termination signal received; shutting down..");
311                    self.handle.shutdown().ok();
312                    break;
313                }
314                Ok(Signal::Hangup) => {
315                    log::debug!(target: "node", "Hangup signal (SIGHUP) received; ignoring..");
316                }
317                Ok(Signal::WindowChanged) => {}
318                Err(e) => {
319                    log::warn!(target: "node", "Signal notifications channel error: {e}");
320                    break;
321                }
322            }
323        });
324
325        self.pool.run().unwrap();
326        self.reactor.join().unwrap();
327
328        // Nb. We don't join the control thread here, as we have no way of notifying it that the
329        // node is shutting down.
330
331        // Remove control socket file, but don't freak out if it's not there anymore.
332        remove.map(|path| fs::remove_file(path).ok());
333
334        log::debug!(target: "node", "Node shutdown completed for {}", self.id);
335
336        Ok(())
337    }
338
339    #[cfg(all(feature = "systemd", target_family = "unix"))]
340    fn receive_listener() -> Option<UnixListener> {
341        use std::os::fd::FromRawFd;
342        match radicle_systemd::listen_fd("control") {
343            Ok(Some(fd)) => {
344                // NOTE: Here, we should make a call to [`fstat(2)`](man:fstat(2))
345                // and make sure that the file descriptor we received actually
346                // is `AF_UNIX`. However, this requires fiddling with
347                // `libc` types or another dependency like `nix`, see
348                // <https://github.com/lucab/libsystemd-rs/blob/b43fa5e3b5eca3e6aa16a6c2fad87220dc0ad7a0/src/activation.rs#L192-L196>
349                // systemd also implements such a check, see
350                // <https://github.com/systemd/systemd/blob/v254/src/libsystemd/sd-daemon/sd-daemon.c#L357-L398>
351                Some(unsafe {
352                    // SAFETY: We take ownership of this FD from systemd,
353                    // which guarantees that it is open.
354                    UnixListener::from_raw_fd(fd)
355                })
356            }
357            Ok(None) => None,
358            Err(err) => {
359                log::trace!(target: "node", "Error receiving file descriptors from systemd: {err}");
360                None
361            }
362        }
363    }
364
365    fn bind(path: PathBuf) -> Result<ControlSocket, Error> {
366        #[cfg(all(feature = "systemd", target_family = "unix"))]
367        {
368            if let Some(listener) = Self::receive_listener() {
369                log::info!(target: "node", "Received control socket.");
370                return Ok(ControlSocket::Received(listener));
371            }
372        }
373
374        log::info!(target: "node", "Binding control socket {}..", &path.display());
375        match UnixListener::bind(&path) {
376            Ok(sock) => Ok(ControlSocket::Bound(sock, path)),
377            Err(err) if err.kind() == io::ErrorKind::AddrInUse => Err(Error::AlreadyRunning(path)),
378            Err(err) => Err(err.into()),
379        }
380    }
381}