1pub mod handle;
2pub mod thread;
3
4use std::os::unix::net::UnixListener;
5use std::path::PathBuf;
6use std::{fs, io, net};
7
8use crossbeam_channel as chan;
9use cyphernet::Ecdh;
10use netservices::resource::NetAccept;
11use radicle::cob::migrate;
12use radicle::crypto;
13use radicle::node::device::Device;
14use radicle_fetch::FetchLimit;
15use radicle_signals::Signal;
16use reactor::poller::popol;
17use reactor::Reactor;
18use thiserror::Error;
19
20use radicle::node;
21use radicle::node::address;
22use radicle::node::address::Store as _;
23use radicle::node::notifications;
24use radicle::node::Handle as _;
25use radicle::node::UserAgent;
26use radicle::profile::Home;
27use radicle::{cob, git, storage, Storage};
28
29use crate::control;
30use crate::node::{routing, NodeId};
31use crate::service::message::NodeAnnouncement;
32use crate::service::{gossip, policy, Event, INITIAL_SUBSCRIBE_BACKLOG_DELTA};
33use crate::wire;
34use crate::wire::{Decode, Wire};
35use crate::worker;
36use crate::{service, LocalTime};
37
38pub use handle::Error as HandleError;
39pub use handle::Handle;
40pub use node::events::Emitter;
41
42pub const MAX_PENDING_TASKS: usize = 1024;
44
45#[derive(Error, Debug)]
47pub enum Error {
48 #[error("routing database error: {0}")]
50 Routing(#[from] routing::Error),
51 #[error("cobs cache database error: {0}")]
53 CobsCache(#[from] cob::cache::Error),
54 #[error("node database error: {0}")]
56 Database(#[from] node::db::Error),
57 #[error("storage error: {0}")]
59 Storage(#[from] storage::Error),
60 #[error("policies database error: {0}")]
62 Policy(#[from] policy::Error),
63 #[error("notifications database error: {0}")]
65 Notifications(#[from] notifications::Error),
66 #[error("gossip database error: {0}")]
68 Gossip(#[from] gossip::Error),
69 #[error("address database error: {0}")]
71 Address(#[from] address::Error),
72 #[error("service error: {0}")]
74 Service(#[from] service::Error),
75 #[error("i/o error: {0}")]
77 Io(#[from] io::Error),
78 #[error("control socket error: {0}")]
80 Control(#[from] control::Error),
81 #[error(
83 "another node appears to be running; \
84 if this isn't the case, delete the socket file at '{0}' \
85 and restart the node"
86 )]
87 AlreadyRunning(PathBuf),
88 #[error("git version error: {0}")]
90 GitVersion(#[from] git::VersionError),
91}
92
93pub enum ControlSocket {
95 Bound(UnixListener, PathBuf),
97 Received(UnixListener),
99}
100
101pub struct Runtime {
103 pub id: NodeId,
104 pub home: Home,
105 pub control: ControlSocket,
106 pub handle: Handle,
107 pub storage: Storage,
108 pub reactor: Reactor<wire::Control, popol::Poller>,
109 pub pool: worker::Pool,
110 pub local_addrs: Vec<net::SocketAddr>,
111 pub signals: chan::Receiver<Signal>,
112}
113
114impl Runtime {
115 pub fn init<G>(
119 home: Home,
120 config: service::Config,
121 listen: Vec<net::SocketAddr>,
122 signals: chan::Receiver<Signal>,
123 signer: Device<G>,
124 ) -> Result<Runtime, Error>
125 where
126 G: crypto::signature::Signer<crypto::Signature> + Ecdh<Pk = NodeId> + Clone + 'static,
127 {
128 let id = *signer.public_key();
129 let alias = config.alias.clone();
130 let node_dir = home.node();
131 let network = config.network;
132 let rng = fastrand::Rng::new();
133 let clock = LocalTime::now();
134 let timestamp = clock.into();
135 let storage = Storage::open(home.storage(), git::UserInfo { alias, key: id })?;
136 let policy = config.seeding_policy.into();
137
138 for (key, _) in &config.extra {
139 log::warn!(target: "node", "Unused or deprecated configuration attribute {:?}", key);
140 }
141
142 log::info!(target: "node", "Opening policy database..");
143 let policies = home.policies_mut()?;
144 let policies = policy::Config::new(policy, policies);
145 let notifications = home.notifications_mut()?;
146 let mut cobs_cache = cob::cache::Store::open(home.cobs().join(cob::cache::COBS_DB_FILE))?;
147
148 match cobs_cache.check_version() {
149 Ok(()) => {}
150 Err(cob::cache::Error::OutOfDate) => {
151 log::info!(target: "node", "Migrating COBs cache..");
152 let version = cobs_cache.migrate(migrate::log)?;
153 log::info!(target: "node", "Migration of COBs cache complete (version={version})..");
154 }
155 Err(e) => return Err(e.into()),
156 }
157
158 log::info!(target: "node", "Default seeding policy set to '{}'", &policy);
159 log::info!(target: "node", "Initializing service ({:?})..", network);
160
161 let announcement = if let Some(ann) = fs::read(node_dir.join(node::NODE_ANNOUNCEMENT_FILE))
162 .ok()
163 .and_then(|ann| NodeAnnouncement::decode(&mut ann.as_slice()).ok())
164 .and_then(|ann| {
165 if clock - ann.timestamp.to_local_time() <= INITIAL_SUBSCRIBE_BACKLOG_DELTA {
169 Some(ann)
170 } else {
171 None
172 }
173 })
174 .and_then(|ann| {
175 if config.features() == ann.features
176 && config.alias == ann.alias
177 && config.external_addresses == ann.addresses.as_ref()
178 {
179 Some(ann)
180 } else {
181 None
182 }
183 }) {
184 log::info!(
185 target: "node",
186 "Loaded existing node announcement from file (timestamp={}, work={})",
187 ann.timestamp,
188 ann.work(),
189 );
190 ann
191 } else {
192 service::gossip::node(&config, timestamp)
193 .solve(Default::default())
194 .expect("Runtime::init: unable to solve proof-of-work puzzle")
195 };
196
197 log::info!(target: "node", "Opening node database..");
198 let db = home
199 .database_mut()?
200 .journal_mode(node::db::JournalMode::default())?
201 .init(
202 &id,
203 announcement.features,
204 &announcement.alias,
205 &announcement.agent,
206 announcement.timestamp,
207 announcement.addresses.iter(),
208 )?;
209 let mut stores: service::Stores<_> = db.clone().into();
210
211 if config.connect.is_empty() && stores.addresses().is_empty()? {
212 log::info!(target: "node", "Address book is empty. Adding bootstrap nodes..");
213
214 for (alias, version, addr) in config.network.bootstrap() {
215 let (id, addr) = addr.into();
216
217 stores.addresses_mut().insert(
218 &id,
219 version,
220 radicle::node::Features::SEED,
221 &alias,
222 0,
223 &UserAgent::default(),
224 clock.into(),
225 [node::KnownAddress::new(addr, address::Source::Bootstrap)],
226 )?;
227 }
228 log::info!(target: "node", "{} nodes added to address book", stores.addresses().len()?);
229 }
230
231 let emitter: Emitter<Event> = Default::default();
232 let mut service = service::Service::new(
233 config.clone(),
234 stores,
235 storage.clone(),
236 policies,
237 signer.clone(),
238 rng,
239 announcement,
240 emitter.clone(),
241 );
242 service.initialize(clock)?;
243
244 let (worker_send, worker_recv) = chan::bounded::<worker::Task>(MAX_PENDING_TASKS);
245 let mut wire = Wire::new(service, worker_send, signer.clone());
246 let mut local_addrs = Vec::new();
247
248 for addr in listen {
249 let listener = NetAccept::bind(&addr)?;
250 let local_addr = listener.local_addr();
251
252 local_addrs.push(local_addr);
253 wire.listen(listener);
254 }
255 let reactor = Reactor::named(wire, popol::Poller::new(), thread::name(&id, "service"))?;
256 let handle = Handle::new(home.clone(), reactor.controller(), emitter);
257
258 let nid = *signer.public_key();
259 let fetch = worker::FetchConfig {
260 limit: FetchLimit::default(),
261 local: nid,
262 expiry: worker::garbage::Expiry::default(),
263 };
264 let pool = worker::Pool::with(
265 worker_recv,
266 nid,
267 handle.clone(),
268 notifications,
269 cobs_cache,
270 db,
271 worker::Config {
272 capacity: config.workers,
273 storage: storage.clone(),
274 fetch,
275 policy,
276 policies_db: home.node().join(node::POLICIES_DB_FILE),
277 },
278 )?;
279 let control = Self::bind(home.socket())?;
280
281 Ok(Runtime {
282 id,
283 home,
284 control,
285 storage,
286 reactor,
287 handle,
288 pool,
289 signals,
290 local_addrs,
291 })
292 }
293
294 pub fn run(self) -> Result<(), Error> {
295 let home = self.home;
296 let (listener, remove) = match self.control {
297 ControlSocket::Bound(listener, path) => (listener, Some(path)),
298 ControlSocket::Received(listener) => (listener, None),
299 };
300
301 log::info!(target: "node", "Running node {} in {}..", self.id, home.path().display());
302
303 thread::spawn(&self.id, "control", {
304 let handle = self.handle.clone();
305 || control::listen(listener, handle)
306 });
307 let _signals = thread::spawn(&self.id, "signals", move || loop {
308 match self.signals.recv() {
309 Ok(Signal::Terminate | Signal::Interrupt) => {
310 log::info!(target: "node", "Termination signal received; shutting down..");
311 self.handle.shutdown().ok();
312 break;
313 }
314 Ok(Signal::Hangup) => {
315 log::debug!(target: "node", "Hangup signal (SIGHUP) received; ignoring..");
316 }
317 Ok(Signal::WindowChanged) => {}
318 Err(e) => {
319 log::warn!(target: "node", "Signal notifications channel error: {e}");
320 break;
321 }
322 }
323 });
324
325 self.pool.run().unwrap();
326 self.reactor.join().unwrap();
327
328 remove.map(|path| fs::remove_file(path).ok());
333
334 log::debug!(target: "node", "Node shutdown completed for {}", self.id);
335
336 Ok(())
337 }
338
339 #[cfg(all(feature = "systemd", target_family = "unix"))]
340 fn receive_listener() -> Option<UnixListener> {
341 use std::os::fd::FromRawFd;
342 match radicle_systemd::listen_fd("control") {
343 Ok(Some(fd)) => {
344 Some(unsafe {
352 UnixListener::from_raw_fd(fd)
355 })
356 }
357 Ok(None) => None,
358 Err(err) => {
359 log::trace!(target: "node", "Error receiving file descriptors from systemd: {err}");
360 None
361 }
362 }
363 }
364
365 fn bind(path: PathBuf) -> Result<ControlSocket, Error> {
366 #[cfg(all(feature = "systemd", target_family = "unix"))]
367 {
368 if let Some(listener) = Self::receive_listener() {
369 log::info!(target: "node", "Received control socket.");
370 return Ok(ControlSocket::Received(listener));
371 }
372 }
373
374 log::info!(target: "node", "Binding control socket {}..", &path.display());
375 match UnixListener::bind(&path) {
376 Ok(sock) => Ok(ControlSocket::Bound(sock, path)),
377 Err(err) if err.kind() == io::ErrorKind::AddrInUse => Err(Error::AlreadyRunning(path)),
378 Err(err) => Err(err.into()),
379 }
380 }
381}