1pub mod handle;
2pub mod thread;
3
4use std::fmt::Debug;
5use std::path::PathBuf;
6use std::{fs, io, net};
7
8#[cfg(unix)]
9use std::os::unix::net::UnixListener;
10#[cfg(windows)]
11use uds_windows::UnixListener;
12
13use crossbeam_channel as chan;
14use cyphernet::Ecdh;
15use radicle::cob::migrate;
16use radicle::crypto;
17use radicle::node::device::Device;
18use radicle_signals::Signal;
19use thiserror::Error;
20
21use radicle::node;
22use radicle::node::address;
23use radicle::node::address::Store as _;
24use radicle::node::notifications;
25use radicle::node::policy::config as policy;
26use radicle::node::Event;
27use radicle::node::UserAgent;
28use radicle::profile::Home;
29use radicle::{cob, git, storage, Storage};
30
31use crate::control;
32use crate::node::{routing, NodeId};
33use crate::reactor;
34use crate::reactor::Reactor;
35use crate::service::gossip;
36use crate::wire::Wire;
37use crate::worker;
38use crate::{service, LocalTime};
39
40pub use handle::Error as HandleError;
41pub use handle::Handle;
42pub use node::events::Emitter;
43
44pub const MAX_PENDING_TASKS: usize = 1024;
46
47#[derive(Error, Debug)]
49pub enum Error {
50 #[error("routing database error: {0}")]
52 Routing(#[from] routing::Error),
53 #[error("cobs cache database error: {0}")]
55 CobsCache(#[from] cob::cache::Error),
56 #[error("node database error: {0}")]
58 Database(#[from] node::db::Error),
59 #[error("storage error: {0}")]
61 Storage(#[from] storage::Error),
62 #[error("policies database error: {0}")]
64 Policy(#[from] policy::Error),
65 #[error("notifications database error: {0}")]
67 Notifications(#[from] notifications::Error),
68 #[error("gossip database error: {0}")]
70 Gossip(#[from] gossip::Error),
71 #[error("address database error: {0}")]
73 Address(#[from] address::Error),
74 #[error("service error: {0}")]
76 Service(Box<service::Error>),
77 #[error("i/o error: {0}")]
79 Io(#[from] io::Error),
80 #[error("control socket error: {0}")]
82 Control(#[from] control::Error),
83 #[error(
85 "another node appears to be running; \
86 if this isn't the case, delete the socket file at '{0}' \
87 and restart the node"
88 )]
89 AlreadyRunning(PathBuf),
90 #[error("git version error: {0}")]
92 GitVersion(#[from] git::VersionError),
93}
94
95impl From<service::Error> for Error {
96 fn from(e: service::Error) -> Self {
97 Self::Service(Box::new(e))
98 }
99}
100
101pub enum ControlSocket {
103 Bound(UnixListener, PathBuf),
105 Received(UnixListener),
107}
108
109pub struct Runtime {
111 pub id: NodeId,
112 pub home: Home,
113 pub control: ControlSocket,
114 pub handle: Handle,
115 pub storage: Storage,
116 pub reactor: Reactor,
117 pub pool: worker::Pool,
118 pub local_addrs: Vec<net::SocketAddr>,
119 pub signals: chan::Receiver<Signal>,
120}
121
122impl Runtime {
123 pub fn init<G>(
127 home: Home,
128 config: radicle::node::Config,
129 listen: Vec<net::SocketAddr>,
130 signals: chan::Receiver<Signal>,
131 signer: Device<G>,
132 ) -> Result<Runtime, Error>
133 where
134 G: crypto::signature::Signer<crypto::Signature>
135 + Ecdh<Pk = NodeId>
136 + Clone
137 + Debug
138 + 'static,
139 {
140 let id = *signer.public_key();
141 let alias = config.alias.clone();
142 let network = config.network;
143 let rng = fastrand::Rng::new();
144 let clock = LocalTime::now();
145 let timestamp = clock.into();
146 let storage = Storage::open(home.storage(), git::UserInfo { alias, key: id })?;
147 let policy = config.seeding_policy.into();
148
149 for (key, _) in &config.extra {
150 log::warn!(target: "node", "Unused or deprecated configuration attribute {key:?}");
151 }
152
153 log::info!(target: "node", "Opening policy database..");
154 let policies = home.policies_mut()?;
155 let policies = policy::Config::new(policy, policies);
156 let notifications = home.notifications_mut()?;
157 let mut cobs_cache = cob::cache::Store::open(home.cobs().join(cob::cache::COBS_DB_FILE))?;
158
159 match cobs_cache.check_version() {
160 Ok(()) => {}
161 Err(cob::cache::Error::OutOfDate) => {
162 log::info!(target: "node", "Migrating COBs cache..");
163 let version = cobs_cache.migrate(migrate::log)?;
164 log::info!(target: "node", "Migration of COBs cache complete (version={version})..");
165 }
166 Err(e) => return Err(e.into()),
167 }
168
169 log::info!(target: "node", "Default seeding policy set to '{}'", &policy);
170 log::info!(target: "node", "Initializing service ({network:?})..");
171
172 let announcement = service::gossip::node(&config, timestamp)
173 .solve(Default::default())
174 .expect("Runtime::init: unable to solve proof-of-work puzzle");
175
176 log::info!(target: "node", "Opening node database..");
177 let db = home.database_mut(config.database)?.init(
178 &id,
179 announcement.features,
180 &announcement.alias,
181 &announcement.agent,
182 announcement.timestamp,
183 announcement.addresses.iter(),
184 )?;
185 let mut stores: service::Stores<_> = db.clone().into();
186
187 if config.connect.is_empty() && stores.addresses().is_empty()? {
188 log::info!(target: "node", "Address book is empty. Adding bootstrap nodes..");
189
190 for (alias, version, addrs) in config.network.bootstrap() {
191 for addr in addrs {
192 let (id, addr) = addr.into();
193
194 stores.addresses_mut().insert(
195 &id,
196 version,
197 radicle::node::Features::SEED,
198 &alias,
199 0,
200 &UserAgent::default(),
201 clock.into(),
202 [node::KnownAddress::new(addr, address::Source::Bootstrap)],
203 )?;
204 }
205 }
206 log::info!(target: "node", "{} nodes added to address book", stores.addresses().len()?);
207 }
208
209 let emitter: Emitter<Event> = Default::default();
210 let mut service = service::Service::new(
211 config.clone(),
212 stores,
213 storage.clone(),
214 policies,
215 signer.clone(),
216 rng,
217 announcement,
218 emitter.clone(),
219 );
220 service.initialize(clock)?;
221
222 let (worker_send, worker_recv) = chan::bounded::<worker::Task>(MAX_PENDING_TASKS);
223 let mut wire = Wire::new(service, worker_send, signer.clone());
224 let mut local_addrs = Vec::new();
225
226 for addr in listen {
227 let listener = reactor::Listener::bind(addr)?;
228 let local_addr = listener.local_addr();
229
230 local_addrs.push(local_addr);
231 wire.listen(listener);
232 }
233 let reactor = Reactor::new(wire, thread::name(&id, "service"))?;
234 let handle = Handle::new(home.clone(), reactor.controller(), emitter);
235
236 let nid = *signer.public_key();
237 let fetch = worker::FetchConfig {
238 local: nid,
239 expiry: worker::garbage::Expiry::default(),
240 };
241 let pool = worker::Pool::with(
242 worker_recv,
243 nid,
244 handle.clone(),
245 notifications,
246 cobs_cache,
247 db,
248 worker::Config {
249 capacity: config.workers.into(),
250 storage: storage.clone(),
251 fetch,
252 policy,
253 policies_db: home.node().join(node::POLICIES_DB_FILE),
254 },
255 )?;
256 let control = Self::bind(home.socket())?;
257
258 Ok(Runtime {
259 id,
260 home,
261 control,
262 storage,
263 reactor,
264 handle,
265 pool,
266 signals,
267 local_addrs,
268 })
269 }
270
271 pub fn run(self) -> Result<(), Error> {
272 let home = self.home;
273 let (listener, remove) = match self.control {
274 ControlSocket::Bound(listener, path) => (listener, Some(path)),
275 ControlSocket::Received(listener) => (listener, None),
276 };
277
278 log::info!(target: "node", "Running node {} in {}..", self.id, home.path().display());
279
280 thread::spawn(&self.id, "control", {
281 let handle = self.handle.clone();
282 || control::listen(listener, handle)
283 });
284
285 let _signals = thread::spawn(&self.id, "signals", move || loop {
286 use radicle::node::Handle as _;
287
288 match self.signals.recv() {
289 Ok(Signal::Terminate | Signal::Interrupt) => {
290 log::info!(target: "node", "Termination signal received; shutting down..");
291 self.handle.shutdown().ok();
292 break;
293 }
294 Ok(Signal::Hangup) => {
295 log::debug!(target: "node", "Hangup signal (SIGHUP) received; ignoring..");
296 }
297 Ok(Signal::WindowChanged) => {}
298 Err(e) => {
299 log::warn!(target: "node", "Signal notifications channel error: {e}");
300 break;
301 }
302 }
303 });
304
305 self.pool.run().unwrap();
306 self.reactor.join().unwrap();
307
308 remove.map(|path| fs::remove_file(path).ok());
313
314 log::debug!(target: "node", "Node shutdown completed for {}", self.id);
315
316 Ok(())
317 }
318
319 #[cfg(all(feature = "systemd", target_os = "linux"))]
320 fn receive_listener() -> Option<UnixListener> {
321 let fd = match radicle_systemd::listen::fd("control") {
322 Ok(Some(fd)) => fd,
323 Ok(None) => return None,
324 Err(err) => {
325 log::error!(target: "node", "Error receiving listener from systemd: {err}");
326 return None;
327 }
328 };
329
330 let socket: socket2::Socket = unsafe { std::os::fd::FromRawFd::from_raw_fd(fd) };
331
332 let domain = match socket.domain() {
333 Ok(domain) => domain,
334 Err(err) => {
335 log::error!(target: "node", "Error receiving listener from systemd when inspecting domain of socket: {err}");
336 return None;
337 }
338 };
339
340 if domain != socket2::Domain::UNIX {
341 log::error!(target: "node", "Dropping listener received from systemd: Domain is not AF_UNIX.");
342 return None;
343 }
344
345 Some(UnixListener::from(socket))
346 }
347
348 fn bind(path: PathBuf) -> Result<ControlSocket, Error> {
349 #[cfg(all(feature = "systemd", target_os = "linux"))]
350 {
351 if let Some(listener) = Self::receive_listener() {
352 log::info!(target: "node", "Received control socket.");
353 return Ok(ControlSocket::Received(listener));
354 }
355 }
356
357 log::info!(target: "node", "Binding control socket {}..", &path.display());
358 match UnixListener::bind(&path) {
359 Ok(sock) => Ok(ControlSocket::Bound(sock, path)),
360 Err(err) if err.kind() == io::ErrorKind::AddrInUse => Err(Error::AlreadyRunning(path)),
361 Err(err) => Err(err.into()),
362 }
363 }
364}