1pub mod handle;
2pub mod thread;
3
4use std::fmt::Debug;
5use std::path::PathBuf;
6use std::str::FromStr as _;
7use std::{fs, io, net};
8
9#[cfg(unix)]
10use std::os::unix::net::UnixListener;
11#[cfg(windows)]
12use uds_windows::UnixListener;
13
14use crossbeam_channel as chan;
15use cyphernet::Ecdh;
16use radicle::cob::migrate;
17use radicle::crypto;
18use radicle::node::device::Device;
19use radicle_signals::Signal;
20use thiserror::Error;
21
22use radicle::node;
23use radicle::node::Event;
24use radicle::node::UserAgent;
25use radicle::node::address;
26use radicle::node::address::Store as _;
27use radicle::node::notifications;
28use radicle::node::policy::config as policy;
29use radicle::profile::Home;
30use radicle::{Storage, cob, git, storage};
31
32use crate::control;
33use crate::node::{NodeId, routing};
34use crate::reactor;
35use crate::reactor::Reactor;
36use crate::service::gossip;
37use crate::wire::Wire;
38use crate::worker;
39use crate::{LocalTime, service};
40
41pub use handle::Error as HandleError;
42pub use handle::Handle;
43pub use node::events::Emitter;
44
45pub const MAX_PENDING_TASKS: usize = 1024;
47
48#[derive(Error, Debug)]
50pub enum Error {
51 #[error("routing database error: {0}")]
53 Routing(#[from] routing::Error),
54 #[error("cobs cache database error: {0}")]
56 CobsCache(#[from] cob::cache::Error),
57 #[error("node database error: {0}")]
59 Database(#[from] node::db::Error),
60 #[error("storage error: {0}")]
62 Storage(#[from] storage::Error),
63 #[error("policies database error: {0}")]
65 Policy(#[from] policy::Error),
66 #[error("notifications database error: {0}")]
68 Notifications(#[from] notifications::Error),
69 #[error("gossip database error: {0}")]
71 Gossip(#[from] gossip::Error),
72 #[error("address database error: {0}")]
74 Address(#[from] address::Error),
75 #[error("service error: {0}")]
77 Service(Box<service::Error>),
78 #[error("i/o error: {0}")]
80 Io(#[from] io::Error),
81 #[error("control socket error: {0}")]
83 Control(#[from] control::Error),
84 #[error(
86 "another node appears to be running; \
87 if this isn't the case, delete the socket file at '{0}' \
88 and restart the node"
89 )]
90 AlreadyRunning(PathBuf),
91 #[error("git version error: {0}")]
93 GitVersion(#[from] git::VersionError),
94}
95
96impl From<service::Error> for Error {
97 fn from(e: service::Error) -> Self {
98 Self::Service(Box::new(e))
99 }
100}
101
102pub enum ControlSocket {
104 Bound(UnixListener, PathBuf),
106 Received(UnixListener),
108}
109
110pub struct Runtime {
112 pub id: NodeId,
113 pub home: Home,
114 pub control: ControlSocket,
115 pub handle: Handle,
116 pub storage: Storage,
117 pub reactor: Reactor,
118 pub pool: worker::Pool,
119 pub local_addrs: Vec<net::SocketAddr>,
120 pub signals: chan::Receiver<Signal>,
121}
122
123impl Runtime {
124 pub fn init<G>(
128 home: Home,
129 config: radicle::node::Config,
130 socket: PathBuf,
131 listen: Vec<net::SocketAddr>,
132 signals: chan::Receiver<Signal>,
133 signer: Device<G>,
134 ) -> Result<Runtime, Error>
135 where
136 G: crypto::signature::Signer<crypto::Signature>
137 + Ecdh<Pk = NodeId>
138 + Clone
139 + Debug
140 + 'static,
141 {
142 let id = *signer.public_key();
143 let alias = config.alias.clone();
144 let network = config.network;
145 let rng = fastrand::Rng::new();
146 let clock = LocalTime::now();
147 let timestamp = clock.into();
148 let storage = Storage::open(home.storage(), git::UserInfo { alias, key: id })?;
149 let policy = config.seeding_policy.into();
150
151 for (key, _) in &config.extra {
152 log::warn!(target: "node", "Unused or deprecated configuration attribute {key:?}");
153 }
154
155 log::info!(target: "node", "Opening policy database..");
156 let policies = home.policies_mut()?;
157 let policies = policy::Config::new(policy, policies);
158 let notifications = home.notifications_mut()?;
159 let mut cobs_cache = cob::cache::Store::open(home.cobs().join(cob::cache::COBS_DB_FILE))?;
160
161 match cobs_cache.check_version() {
162 Ok(()) => {}
163 Err(cob::cache::Error::OutOfDate) => {
164 log::info!(target: "node", "Migrating COBs cache..");
165 let version = cobs_cache.migrate(migrate::log)?;
166 log::info!(target: "node", "Migration of COBs cache complete (version={version})..");
167 }
168 Err(e) => return Err(e.into()),
169 }
170
171 log::info!(target: "node", "Default seeding policy set to '{}'", &policy);
172 log::info!(target: "node", "Initializing service ({network:?})..");
173
174 let announcement = service::gossip::node(&config, timestamp)
175 .solve(Default::default())
176 .expect("Runtime::init: unable to solve proof-of-work puzzle");
177
178 log::info!(target: "node", "Opening node database..");
179 let db = home.database_mut(config.database)?.init(
180 &id,
181 announcement.features,
182 &announcement.alias,
183 &announcement.agent,
184 announcement.timestamp,
185 announcement.addresses.iter(),
186 )?;
187 let mut stores: service::Stores<_> = db.clone().into();
188
189 if config.connect.is_empty() && stores.addresses().is_empty()? {
190 log::info!(target: "node", "Address book is empty. Adding bootstrap nodes..");
191
192 for (alias, version, addrs) in config.network.bootstrap() {
193 for addr in addrs {
194 let (id, addr) = addr.into();
195
196 stores.addresses_mut().insert(
197 &id,
198 version,
199 radicle::node::Features::SEED,
200 &alias,
201 0,
202 &UserAgent::from_str("/radicle/runtime/bootstrap/")
203 .expect("valid user agent"),
204 clock.into(),
205 [node::KnownAddress::new(addr, address::Source::Bootstrap)],
206 )?;
207 }
208 }
209 log::info!(target: "node", "{} nodes added to address book", stores.addresses().len()?);
210 }
211
212 let emitter: Emitter<Event> = Default::default();
213 let mut service = service::Service::new(
214 config.clone(),
215 stores,
216 storage.clone(),
217 policies,
218 signer.clone(),
219 rng,
220 announcement,
221 emitter.clone(),
222 );
223 service.initialize(clock)?;
224
225 let (worker_send, worker_recv) = chan::bounded::<worker::Task>(MAX_PENDING_TASKS);
226 let mut wire = Wire::new(service, worker_send, signer.clone());
227 let mut local_addrs = Vec::new();
228
229 for addr in listen {
230 let listener = reactor::Listener::bind(addr)?;
231 let local_addr = listener.local_addr();
232
233 local_addrs.push(local_addr);
234 wire.listen(listener);
235 }
236 let reactor = Reactor::new(wire, thread::name(&id, "service"))?;
237 let handle = Handle::new(home.clone(), socket.clone(), reactor.controller(), emitter);
238
239 let nid = *signer.public_key();
240 let fetch = worker::FetchConfig {
241 local: nid,
242 expiry: worker::garbage::Expiry::default(),
243 };
244 let pool = worker::Pool::with(
245 worker_recv,
246 nid,
247 handle.clone(),
248 notifications,
249 cobs_cache,
250 db,
251 worker::Config {
252 capacity: config.workers.into(),
253 storage: storage.clone(),
254 fetch,
255 policy,
256 policies_db: home.node().join(node::POLICIES_DB_FILE),
257 },
258 )?;
259 let control = Self::bind(socket)?;
260
261 Ok(Runtime {
262 id,
263 home,
264 control,
265 storage,
266 reactor,
267 handle,
268 pool,
269 signals,
270 local_addrs,
271 })
272 }
273
274 pub fn run(self) -> Result<(), Error> {
275 let home = self.home;
276 let (listener, remove) = match self.control {
277 ControlSocket::Bound(listener, path) => (listener, Some(path)),
278 ControlSocket::Received(listener) => (listener, None),
279 };
280
281 log::info!(target: "node", "Running node {} in {}..", self.id, home.path().display());
282
283 thread::spawn(&self.id, "control", {
284 let handle = self.handle.clone();
285 || control::listen(listener, handle)
286 });
287
288 let _signals = thread::spawn(&self.id, "signals", move || {
289 loop {
290 use radicle::node::Handle as _;
291
292 match self.signals.recv() {
293 Ok(Signal::Terminate | Signal::Interrupt) => {
294 log::info!(target: "node", "Termination signal received; shutting down..");
295 self.handle.shutdown().ok();
296 break;
297 }
298 Ok(Signal::Hangup) => {
299 log::debug!(target: "node", "Hangup signal (SIGHUP) received; ignoring..");
300 }
301 Ok(Signal::WindowChanged) => {}
302 Err(e) => {
303 log::warn!(target: "node", "Signal notifications channel error: {e}");
304 break;
305 }
306 }
307 }
308 });
309
310 self.pool.run().unwrap();
311 self.reactor.join().unwrap();
312
313 remove.map(|path| fs::remove_file(path).ok());
318
319 log::debug!(target: "node", "Node shutdown completed for {}", self.id);
320
321 Ok(())
322 }
323
324 #[cfg(all(feature = "socket2", feature = "systemd", target_os = "linux"))]
325 fn receive_listener() -> Option<UnixListener> {
326 let fd = match unsafe { radicle_systemd::listen::fd("control") } {
328 Ok(Some(fd)) => fd,
329 Ok(None) => return None,
330 Err(err) => {
331 log::error!(target: "node", "Error receiving listener from systemd: {err}");
332 return None;
333 }
334 };
335
336 let socket: socket2::Socket = unsafe { std::os::fd::FromRawFd::from_raw_fd(fd) };
337
338 let domain = match socket.domain() {
339 Ok(domain) => domain,
340 Err(err) => {
341 log::error!(target: "node", "Error receiving listener from systemd when inspecting domain of socket: {err}");
342 return None;
343 }
344 };
345
346 if domain != socket2::Domain::UNIX {
347 log::error!(target: "node", "Dropping listener received from systemd: Domain is not AF_UNIX.");
348 return None;
349 }
350
351 Some(UnixListener::from(socket))
352 }
353
354 fn bind(path: PathBuf) -> Result<ControlSocket, Error> {
355 #[cfg(all(feature = "socket2", feature = "systemd", target_os = "linux"))]
356 {
357 if let Some(listener) = Self::receive_listener() {
358 log::info!(target: "node", "Received control socket.");
359 return Ok(ControlSocket::Received(listener));
360 }
361 }
362
363 log::info!(target: "node", "Binding control socket {}..", &path.display());
364 match UnixListener::bind(&path) {
365 Ok(sock) => Ok(ControlSocket::Bound(sock, path)),
366 Err(err) if err.kind() == io::ErrorKind::AddrInUse => Err(Error::AlreadyRunning(path)),
367 Err(err) => Err(err.into()),
368 }
369 }
370}