dig_service/lib.rs
1//! # dig-service
2//!
3//! The generic orchestration scaffold every DIG Network binary shares.
4//! Provides the minimal set of primitives needed to stand up a service:
5//! the [`Service<N, A, R>`] composition, a [`TaskRegistry`] for tracked
6//! `tokio::spawn`, a [`ShutdownToken`] broadcast, and three lifecycle
7//! traits ([`NodeLifecycle`], [`PeerApi`], [`RpcApi`]).
8//!
9//! ## Design principle
10//!
11//! **Smallest possible shared foundation.** This crate does NOT pull in:
12//!
13//! - TOML / YAML config parsing — each binary defines its own config struct
14//! via `serde` and hands a parsed instance to `Service::new`.
15//! - `clap` / CLI framework — binaries build their own CLI.
16//! - `tracing_subscriber` subscriber installation — each binary installs its
17//! subscriber in `main.rs` before `Service::start`.
18//! - Prometheus / metrics exporters — wired in `main.rs`.
19//! - `dig-rpc` server — plugged in through [`RpcApi`].
20//! - Peer transport — plugged in through [`PeerApi`].
21//!
22//! Keeping this crate minimal means every downstream binary picks only the
23//! pieces it needs. Introducers and relays get `Service<N, A, ()>` (peer-only);
24//! daemons get `Service<N, (), R>` (RPC-only); fullnodes get all three slots.
25//!
26//! ## At a glance
27//!
28//! ```text
29//! apps/fullnode apps/validator apps/introducer apps/relay apps/daemon
30//! │ │ │ │ │
31//! └───────────────┴─────────┬───────┴──────────────┴─────────────┘
32//! ▼
33//! dig-service ← this crate
34//! │
35//! ▼
36//! tokio + tokio-util
37//! ```
38//!
39//! ## Minimal example
40//!
41//! ```no_run
42//! use async_trait::async_trait;
43//! use dig_service::{
44//! NodeLifecycle, Service, StartContext, RunContext, StopContext,
45//! };
46//!
47//! struct Node;
48//!
49//! #[async_trait]
50//! impl NodeLifecycle for Node {
51//! const NAME: Option<&'static str> = Some("example");
52//!
53//! async fn pre_start(&self, _ctx: &StartContext<'_>) -> anyhow::Result<()> { Ok(()) }
54//! async fn on_start(&self, _ctx: &StartContext<'_>) -> anyhow::Result<()> { Ok(()) }
55//! async fn run(&self, ctx: RunContext) -> anyhow::Result<()> {
56//! // Block until shutdown is requested.
57//! ctx.shutdown.cancelled().await;
58//! Ok(())
59//! }
60//! async fn on_stop(&self, _ctx: &StopContext<'_>) -> anyhow::Result<()> { Ok(()) }
61//! async fn post_stop(&self, _ctx: &StopContext<'_>) -> anyhow::Result<()> { Ok(()) }
62//! }
63//!
64//! # #[tokio::main]
65//! # async fn main() {
66//! let svc = Service::<Node, (), ()>::new(Node, (), ());
67//! let _exit = svc.start().await.unwrap();
68//! # }
69//! ```
70//!
71//! ## Lifecycle diagram
72//!
73//! ```text
74//! Service::start
75//! │
76//! ▼
77//! node.pre_start ◂── open stores, replay journal
78//! │
79//! ▼
80//! node.on_start ◂── bind ports, warm caches
81//! │
82//! ▼
83//! ┌── node.run ──┐ ◂── main event loop; returns on shutdown
84//! │ │
85//! │ tasks.spawn… │
86//! └────────┬───────┘
87//! ▼
88//! tasks.join_all ◂── graceful wait w/ deadline
89//! ▼
90//! node.on_stop ◂── flush stores
91//! ▼
92//! node.post_stop ◂── close stores, release locks
93//! ▼
94//! ExitStatus
95//! ```
96//!
97//! ## Feature flags
98//!
99//! | Flag | Default | Effect |
100//! |---|---|---|
101//! | `testing` | off | Ships `TestService` + deterministic helpers |
102//! | `tokio-console` | off | Wires `console_subscriber` for live introspection |
103//! | `structured-panic` | off | Routes panics through `tracing::error!` and triggers `Fatal` shutdown |
104
105#![deny(unsafe_code)]
106#![warn(missing_docs)]
107
108mod error;
109mod handle;
110mod shutdown;
111mod tasks;
112mod traits;
113
114#[cfg(feature = "testing")]
115pub mod testing;
116
117// Re-export tokio primitives that every consumer will want.
118pub use tokio::task::JoinHandle;
119pub use tokio_util::sync::CancellationToken;
120
121// Public surface.
122pub use error::{Result, ServiceError};
123pub use handle::{ServiceHandle, TaskSummary};
124pub use shutdown::{ExitReason, ExitStatus, ShutdownReason, ShutdownToken};
125pub use tasks::{TaskKind, TaskRegistry};
126pub use traits::{
127 DisconnectReason, InboundMessage, NodeLifecycle, PeerApi, PeerId, PeerInfo, RpcApi, RunContext,
128 StartContext, StopContext,
129};
130
131use std::sync::Arc;
132
133use parking_lot::RwLock;
134
135/// The top-level orchestration handle.
136///
137/// See the [crate-level documentation](crate) for architecture + lifecycle
138/// diagram. `Service` owns the business-logic `node` and the peer / RPC
139/// API adapters; driving `start()` runs the full lifecycle.
140///
141/// # Type parameters
142///
143/// - `N` — the [`NodeLifecycle`] implementation (the business core).
144/// - `A` — the [`PeerApi`] implementation. Use `()` for binaries that don't
145/// serve a peer surface (daemon, wallet).
146/// - `R` — the [`RpcApi`] implementation. Use `()` for binaries that don't
147/// serve RPC (introducer, relay).
148pub struct Service<N, A, R>
149where
150 N: NodeLifecycle,
151 A: PeerApi,
152 R: RpcApi,
153{
154 node: Arc<N>,
155 peer_api: Arc<A>,
156 rpc_api: Arc<R>,
157 shutdown: ShutdownToken,
158 tasks: TaskRegistry,
159 // Cached handle so ServiceHandle clones stay valid after Service is dropped.
160 handle_state: Arc<RwLock<handle::HandleState>>,
161 started: Arc<std::sync::atomic::AtomicBool>,
162}
163
164impl<N, A, R> Service<N, A, R>
165where
166 N: NodeLifecycle,
167 A: PeerApi,
168 R: RpcApi,
169{
170 /// Construct a new service from its three components.
171 ///
172 /// No runtime work happens here — ports are not bound, stores are not
173 /// opened. All of that runs inside `start()`.
174 pub fn new(node: N, peer_api: A, rpc_api: R) -> Self {
175 let shutdown = ShutdownToken::new();
176 let tasks = TaskRegistry::new(shutdown.clone());
177 let handle_state = Arc::new(RwLock::new(handle::HandleState::new(N::NAME.unwrap_or(""))));
178 Self {
179 node: Arc::new(node),
180 peer_api: Arc::new(peer_api),
181 rpc_api: Arc::new(rpc_api),
182 shutdown,
183 tasks,
184 handle_state,
185 started: Arc::new(std::sync::atomic::AtomicBool::new(false)),
186 }
187 }
188
189 /// Return a cloneable handle for out-of-process-lifetime commands
190 /// (signal handlers, RPC admin methods).
191 ///
192 /// The returned handle remains valid after `Service` is dropped — its
193 /// operations simply become no-ops.
194 pub fn handle(&self) -> ServiceHandle {
195 ServiceHandle::new(
196 self.shutdown.clone(),
197 self.tasks.clone(),
198 self.handle_state.clone(),
199 )
200 }
201
202 /// Borrow the node. Mostly useful in tests; production callers should
203 /// share state via the `Arc<N>` they constructed before `Service::new`.
204 pub fn node(&self) -> &Arc<N> {
205 &self.node
206 }
207
208 /// Borrow the peer API.
209 pub fn peer_api(&self) -> &Arc<A> {
210 &self.peer_api
211 }
212
213 /// Borrow the RPC API.
214 pub fn rpc_api(&self) -> &Arc<R> {
215 &self.rpc_api
216 }
217
218 /// Borrow the shutdown token. Use this to give background tasks a way
219 /// to notice shutdown has been requested.
220 pub fn shutdown_token(&self) -> &ShutdownToken {
221 &self.shutdown
222 }
223
224 /// Borrow the task registry.
225 pub fn tasks(&self) -> &TaskRegistry {
226 &self.tasks
227 }
228
229 /// Request a graceful shutdown. Idempotent — only the first reason is
230 /// recorded.
231 pub fn request_shutdown(&self, reason: ShutdownReason) {
232 self.shutdown.cancel(reason);
233 }
234
235 /// Drive the full lifecycle: `pre_start → on_start → run → on_stop →
236 /// post_stop`. Returns once `run` exits (graceful or error) OR a
237 /// shutdown has been requested.
238 ///
239 /// # Error handling
240 ///
241 /// - A failure in `pre_start` short-circuits without calling any later
242 /// hook.
243 /// - A failure in `on_start` calls `post_stop` but skips `run` and
244 /// `on_stop`.
245 /// - A failure in `run` still calls `on_stop` + `post_stop`.
246 /// - A failure in `on_stop` is recorded but does not block `post_stop`.
247 /// - Panics inside `run` are caught via `scopeguard`-style drop logic
248 /// and reported as `ExitReason::RunError`.
249 ///
250 /// Calling `start` twice on the same `Service` returns
251 /// [`ServiceError::AlreadyRunning`].
252 pub async fn start(self) -> Result<ExitStatus> {
253 use std::sync::atomic::Ordering;
254
255 if self.started.swap(true, Ordering::SeqCst) {
256 return Err(ServiceError::AlreadyRunning);
257 }
258
259 let node = self.node.clone();
260 let start_ctx = StartContext {
261 name: N::NAME.unwrap_or(""),
262 shutdown: self.shutdown.clone(),
263 tasks: &self.tasks,
264 };
265
266 // pre_start
267 if let Err(e) = node.pre_start(&start_ctx).await {
268 return Err(ServiceError::PreStartFailed(Arc::new(e)));
269 }
270
271 // on_start
272 if let Err(e) = node.on_start(&start_ctx).await {
273 // on_start failed — call post_stop so any resources opened in
274 // pre_start have a chance to close.
275 let stop_ctx = StopContext {
276 shutdown: self.shutdown.clone(),
277 tasks: &self.tasks,
278 exit_reason: ExitReason::RunError(Arc::new(anyhow::anyhow!("on_start failed"))),
279 };
280 let _ = node.post_stop(&stop_ctx).await;
281 return Err(ServiceError::OnStartFailed(Arc::new(e)));
282 }
283
284 // run
285 let run_ctx = RunContext {
286 shutdown: self.shutdown.clone(),
287 tasks: self.tasks.clone(),
288 };
289 let run_result = node.run(run_ctx).await;
290
291 // Determine exit reason.
292 let exit_reason = match &run_result {
293 Ok(()) => self
294 .shutdown
295 .reason()
296 .map(ExitReason::RequestedShutdown)
297 .unwrap_or(ExitReason::RunCompleted),
298 Err(e) => ExitReason::RunError(Arc::new(anyhow::anyhow!("{e}"))),
299 };
300
301 // Signal tasks to wind down even if run returned normally.
302 if !self.shutdown.is_cancelled() {
303 self.shutdown.cancel(ShutdownReason::RequestedByRun);
304 }
305
306 // Wait for background tasks with deadline.
307 let join_result = self
308 .tasks
309 .join_all(std::time::Duration::from_secs(30))
310 .await;
311
312 // on_stop
313 let stop_ctx = StopContext {
314 shutdown: self.shutdown.clone(),
315 tasks: &self.tasks,
316 exit_reason: exit_reason.clone(),
317 };
318 let on_stop_result = node.on_stop(&stop_ctx).await;
319
320 // post_stop (always called)
321 let post_stop_result = node.post_stop(&stop_ctx).await;
322
323 // Decide final error priority: run > on_stop > post_stop > join.
324 if let Err(e) = run_result {
325 return Err(ServiceError::RunFailed(Arc::new(e)));
326 }
327 if let Err(e) = on_stop_result {
328 return Err(ServiceError::OnStopFailed(Arc::new(e)));
329 }
330 if let Err(e) = post_stop_result {
331 return Err(ServiceError::OnStopFailed(Arc::new(e)));
332 }
333 join_result?;
334
335 Ok(ExitStatus {
336 reason: exit_reason,
337 })
338 }
339}