1pub mod handle;
2pub mod thread;
3
4use std::path::PathBuf;
5use std::{fs, io, net};
6
7#[cfg(unix)]
8use std::os::unix::net::UnixListener as Listener;
9#[cfg(windows)]
10use winpipe::WinListener as Listener;
11
12use crossbeam_channel as chan;
13use cyphernet::Ecdh;
14use netservices::resource::NetAccept;
15use radicle::cob::migrate;
16use radicle::crypto;
17use radicle::node::device::Device;
18use radicle_fetch::FetchLimit;
19use radicle_signals::Signal;
20use reactor::poller::popol;
21use reactor::Reactor;
22use thiserror::Error;
23
24use radicle::node;
25use radicle::node::address;
26use radicle::node::address::Store as _;
27use radicle::node::notifications;
28use radicle::node::policy::config as policy;
29use radicle::node::Event;
30use radicle::node::UserAgent;
31use radicle::profile::Home;
32use radicle::{cob, git, storage, Storage};
33
34use crate::control;
35use crate::node::{routing, NodeId};
36use crate::service::gossip;
37use crate::wire;
38use crate::wire::Wire;
39use crate::worker;
40use crate::{service, LocalTime};
41
42pub use handle::Error as HandleError;
43pub use handle::Handle;
44pub use node::events::Emitter;
45
46pub const MAX_PENDING_TASKS: usize = 1024;
48
49#[derive(Error, Debug)]
51pub enum Error {
52 #[error("routing database error: {0}")]
54 Routing(#[from] routing::Error),
55 #[error("cobs cache database error: {0}")]
57 CobsCache(#[from] cob::cache::Error),
58 #[error("node database error: {0}")]
60 Database(#[from] node::db::Error),
61 #[error("storage error: {0}")]
63 Storage(#[from] storage::Error),
64 #[error("policies database error: {0}")]
66 Policy(#[from] policy::Error),
67 #[error("notifications database error: {0}")]
69 Notifications(#[from] notifications::Error),
70 #[error("gossip database error: {0}")]
72 Gossip(#[from] gossip::Error),
73 #[error("address database error: {0}")]
75 Address(#[from] address::Error),
76 #[error("service error: {0}")]
78 Service(Box<service::Error>),
79 #[error("i/o error: {0}")]
81 Io(#[from] io::Error),
82 #[error("control socket error: {0}")]
84 Control(#[from] control::Error),
85 #[error(
87 "another node appears to be running; \
88 if this isn't the case, delete the socket file at '{0}' \
89 and restart the node"
90 )]
91 AlreadyRunning(PathBuf),
92 #[error("git version error: {0}")]
94 GitVersion(#[from] git::VersionError),
95}
96
97impl From<service::Error> for Error {
98 fn from(e: service::Error) -> Self {
99 Self::Service(Box::new(e))
100 }
101}
102
103pub enum ControlSocket {
105 Bound(Listener, PathBuf),
107 Received(Listener),
109}
110
111pub struct Runtime {
113 pub id: NodeId,
114 pub home: Home,
115 pub control: ControlSocket,
116 pub handle: Handle,
117 pub storage: Storage,
118 pub reactor: Reactor<wire::Control, popol::Poller>,
119 pub pool: worker::Pool,
120 pub local_addrs: Vec<net::SocketAddr>,
121 pub signals: chan::Receiver<Signal>,
122}
123
124impl Runtime {
125 pub fn init<G>(
129 home: Home,
130 config: radicle::node::Config,
131 listen: Vec<net::SocketAddr>,
132 signals: chan::Receiver<Signal>,
133 signer: Device<G>,
134 ) -> Result<Runtime, Error>
135 where
136 G: crypto::signature::Signer<crypto::Signature> + Ecdh<Pk = NodeId> + Clone + 'static,
137 {
138 let id = *signer.public_key();
139 let alias = config.alias.clone();
140 let network = config.network;
141 let rng = fastrand::Rng::new();
142 let clock = LocalTime::now();
143 let timestamp = clock.into();
144 let storage = Storage::open(home.storage(), git::UserInfo { alias, key: id })?;
145 let policy = config.seeding_policy.into();
146
147 for (key, _) in &config.extra {
148 log::warn!(target: "node", "Unused or deprecated configuration attribute {key:?}");
149 }
150
151 log::info!(target: "node", "Opening policy database..");
152 let policies = home.policies_mut()?;
153 let policies = policy::Config::new(policy, policies);
154 let notifications = home.notifications_mut()?;
155 let mut cobs_cache = cob::cache::Store::open(home.cobs().join(cob::cache::COBS_DB_FILE))?;
156
157 match cobs_cache.check_version() {
158 Ok(()) => {}
159 Err(cob::cache::Error::OutOfDate) => {
160 log::info!(target: "node", "Migrating COBs cache..");
161 let version = cobs_cache.migrate(migrate::log)?;
162 log::info!(target: "node", "Migration of COBs cache complete (version={version})..");
163 }
164 Err(e) => return Err(e.into()),
165 }
166
167 log::info!(target: "node", "Default seeding policy set to '{}'", &policy);
168 log::info!(target: "node", "Initializing service ({network:?})..");
169
170 let announcement = service::gossip::node(&config, timestamp)
171 .solve(Default::default())
172 .expect("Runtime::init: unable to solve proof-of-work puzzle");
173
174 log::info!(target: "node", "Opening node database..");
175 let db = home
176 .database_mut()?
177 .journal_mode(node::db::JournalMode::default())?
178 .init(
179 &id,
180 announcement.features,
181 &announcement.alias,
182 &announcement.agent,
183 announcement.timestamp,
184 announcement.addresses.iter(),
185 )?;
186 let mut stores: service::Stores<_> = db.clone().into();
187
188 if config.connect.is_empty() && stores.addresses().is_empty()? {
189 log::info!(target: "node", "Address book is empty. Adding bootstrap nodes..");
190
191 for (alias, version, addrs) in config.network.bootstrap() {
192 for addr in addrs {
193 let (id, addr) = addr.into();
194
195 stores.addresses_mut().insert(
196 &id,
197 version,
198 radicle::node::Features::SEED,
199 &alias,
200 0,
201 &UserAgent::default(),
202 clock.into(),
203 [node::KnownAddress::new(addr, address::Source::Bootstrap)],
204 )?;
205 }
206 }
207 log::info!(target: "node", "{} nodes added to address book", stores.addresses().len()?);
208 }
209
210 let emitter: Emitter<Event> = Default::default();
211 let mut service = service::Service::new(
212 config.clone(),
213 stores,
214 storage.clone(),
215 policies,
216 signer.clone(),
217 rng,
218 announcement,
219 emitter.clone(),
220 );
221 service.initialize(clock)?;
222
223 let (worker_send, worker_recv) = chan::bounded::<worker::Task>(MAX_PENDING_TASKS);
224 let mut wire = Wire::new(service, worker_send, signer.clone());
225 let mut local_addrs = Vec::new();
226
227 for addr in listen {
228 let listener = NetAccept::bind(&addr)?;
229 let local_addr = listener.local_addr();
230
231 local_addrs.push(local_addr);
232 wire.listen(listener);
233 }
234 let reactor = Reactor::named(wire, popol::Poller::new(), thread::name(&id, "service"))?;
235 let handle = Handle::new(home.clone(), reactor.controller(), emitter);
236
237 let nid = *signer.public_key();
238 let fetch = worker::FetchConfig {
239 limit: FetchLimit::default(),
240 local: nid,
241 expiry: worker::garbage::Expiry::default(),
242 };
243 let pool = worker::Pool::with(
244 worker_recv,
245 nid,
246 handle.clone(),
247 notifications,
248 cobs_cache,
249 db,
250 worker::Config {
251 capacity: config.workers.into(),
252 storage: storage.clone(),
253 fetch,
254 policy,
255 policies_db: home.node().join(node::POLICIES_DB_FILE),
256 },
257 )?;
258 let control = Self::bind(home.socket())?;
259
260 Ok(Runtime {
261 id,
262 home,
263 control,
264 storage,
265 reactor,
266 handle,
267 pool,
268 signals,
269 local_addrs,
270 })
271 }
272
273 pub fn run(self) -> Result<(), Error> {
274 let home = self.home;
275 let (listener, remove) = match self.control {
276 ControlSocket::Bound(listener, path) => (listener, Some(path)),
277 ControlSocket::Received(listener) => (listener, None),
278 };
279
280 log::info!(target: "node", "Running node {} in {}..", self.id, home.path().display());
281
282 thread::spawn(&self.id, "control", {
283 let handle = self.handle.clone();
284 || control::listen(listener, handle)
285 });
286
287 #[cfg(unix)]
288 let _signals = thread::spawn(&self.id, "signals", move || loop {
289 use radicle::node::Handle as _;
290
291 match self.signals.recv() {
292 Ok(Signal::Terminate | Signal::Interrupt) => {
293 log::info!(target: "node", "Termination signal received; shutting down..");
294 self.handle.shutdown().ok();
295 break;
296 }
297 Ok(Signal::Hangup) => {
298 log::debug!(target: "node", "Hangup signal (SIGHUP) received; ignoring..");
299 }
300 Ok(Signal::WindowChanged) => {}
301 Err(e) => {
302 log::warn!(target: "node", "Signal notifications channel error: {e}");
303 break;
304 }
305 }
306 });
307
308 self.pool.run().unwrap();
309 self.reactor.join().unwrap();
310
311 remove.map(|path| fs::remove_file(path).ok());
316
317 log::debug!(target: "node", "Node shutdown completed for {}", self.id);
318
319 Ok(())
320 }
321
322 #[cfg(all(feature = "systemd", target_os = "linux"))]
323 fn receive_listener() -> Option<Listener> {
324 let fd = match radicle_systemd::listen::fd("control") {
325 Ok(Some(fd)) => fd,
326 Ok(None) => return None,
327 Err(err) => {
328 log::error!(target: "node", "Error receiving listener from systemd: {err}");
329 return None;
330 }
331 };
332
333 let socket: socket2::Socket = unsafe { std::os::fd::FromRawFd::from_raw_fd(fd) };
334
335 let domain = match socket.domain() {
336 Ok(domain) => domain,
337 Err(err) => {
338 log::error!(target: "node", "Error receiving listener from systemd when inspecting domain of socket: {err}");
339 return None;
340 }
341 };
342
343 if domain != socket2::Domain::UNIX {
344 log::error!(target: "node", "Dropping listener received from systemd: Domain is not AF_UNIX.");
345 return None;
346 }
347
348 Some(Listener::from(socket))
349 }
350
351 fn bind(path: PathBuf) -> Result<ControlSocket, Error> {
352 #[cfg(all(feature = "systemd", target_os = "linux"))]
353 {
354 if let Some(listener) = Self::receive_listener() {
355 log::info!(target: "node", "Received control socket.");
356 return Ok(ControlSocket::Received(listener));
357 }
358 }
359
360 log::info!(target: "node", "Binding control socket {}..", &path.display());
361 match Listener::bind(&path) {
362 Ok(sock) => Ok(ControlSocket::Bound(sock, path)),
363 Err(err) if err.kind() == io::ErrorKind::AddrInUse => Err(Error::AlreadyRunning(path)),
364 Err(err) => Err(err.into()),
365 }
366 }
367}