Skip to main content

dig_service/
lib.rs

1//! # dig-service
2//!
3//! The generic orchestration scaffold every DIG Network binary shares.
4//! Provides the minimal set of primitives needed to stand up a service:
5//! the [`Service<N, A, R>`] composition, a [`TaskRegistry`] for tracked
6//! `tokio::spawn`, a [`ShutdownToken`] broadcast, and three lifecycle
7//! traits ([`NodeLifecycle`], [`PeerApi`], [`RpcApi`]).
8//!
9//! ## Design principle
10//!
11//! **Smallest possible shared foundation.** This crate does NOT pull in:
12//!
13//! - TOML / YAML config parsing — each binary defines its own config struct
14//!   via `serde` and hands a parsed instance to `Service::new`.
15//! - `clap` / CLI framework — binaries build their own CLI.
16//! - `tracing_subscriber` subscriber installation — each binary installs its
17//!   subscriber in `main.rs` before `Service::start`.
18//! - Prometheus / metrics exporters — wired in `main.rs`.
19//! - `dig-rpc` server — plugged in through [`RpcApi`].
20//! - Peer transport — plugged in through [`PeerApi`].
21//!
22//! Keeping this crate minimal means every downstream binary picks only the
23//! pieces it needs. Introducers and relays get `Service<N, A, ()>` (peer-only);
24//! daemons get `Service<N, (), R>` (RPC-only); fullnodes get all three slots.
25//!
26//! ## At a glance
27//!
28//! ```text
29//!   apps/fullnode   apps/validator   apps/introducer   apps/relay   apps/daemon
30//!         │               │                 │              │             │
31//!         └───────────────┴─────────┬───────┴──────────────┴─────────────┘
32//!                                   ▼
33//!                             dig-service              ← this crate
34//!                                   │
35//!                                   ▼
36//!                              tokio + tokio-util
37//! ```
38//!
39//! ## Minimal example
40//!
41//! ```no_run
42//! use async_trait::async_trait;
43//! use dig_service::{
44//!     NodeLifecycle, Service, StartContext, RunContext, StopContext,
45//! };
46//!
47//! struct Node;
48//!
49//! #[async_trait]
50//! impl NodeLifecycle for Node {
51//!     const NAME: Option<&'static str> = Some("example");
52//!
53//!     async fn pre_start(&self, _ctx: &StartContext<'_>) -> anyhow::Result<()> { Ok(()) }
54//!     async fn on_start(&self, _ctx: &StartContext<'_>) -> anyhow::Result<()> { Ok(()) }
55//!     async fn run(&self, ctx: RunContext) -> anyhow::Result<()> {
56//!         // Block until shutdown is requested.
57//!         ctx.shutdown.cancelled().await;
58//!         Ok(())
59//!     }
60//!     async fn on_stop(&self, _ctx: &StopContext<'_>) -> anyhow::Result<()> { Ok(()) }
61//!     async fn post_stop(&self, _ctx: &StopContext<'_>) -> anyhow::Result<()> { Ok(()) }
62//! }
63//!
64//! # #[tokio::main]
65//! # async fn main() {
66//! let svc = Service::<Node, (), ()>::new(Node, (), ());
67//! let _exit = svc.start().await.unwrap();
68//! # }
69//! ```
70//!
71//! ## Lifecycle diagram
72//!
73//! ```text
74//!       Service::start
75//!            │
76//!            ▼
77//!    node.pre_start     ◂── open stores, replay journal
78//!            │
79//!            ▼
80//!    node.on_start      ◂── bind ports, warm caches
81//!            │
82//!            ▼
83//!  ┌──  node.run  ──┐   ◂── main event loop; returns on shutdown
84//!  │                │
85//!  │  tasks.spawn…  │
86//!  └────────┬───────┘
87//!           ▼
88//!  tasks.join_all    ◂── graceful wait w/ deadline
89//!           ▼
90//!    node.on_stop    ◂── flush stores
91//!           ▼
92//!    node.post_stop  ◂── close stores, release locks
93//!           ▼
94//!      ExitStatus
95//! ```
96//!
97//! ## Feature flags
98//!
99//! | Flag | Default | Effect |
100//! |---|---|---|
101//! | `testing` | off | Ships `TestService` + deterministic helpers |
102//! | `tokio-console` | off | Wires `console_subscriber` for live introspection |
103//! | `structured-panic` | off | Routes panics through `tracing::error!` and triggers `Fatal` shutdown |
104
105#![deny(unsafe_code)]
106#![warn(missing_docs)]
107
108mod error;
109mod handle;
110mod shutdown;
111mod tasks;
112mod traits;
113
114#[cfg(feature = "testing")]
115pub mod testing;
116
117// Re-export tokio primitives that every consumer will want.
118pub use tokio::task::JoinHandle;
119pub use tokio_util::sync::CancellationToken;
120
121// Public surface.
122pub use error::{Result, ServiceError};
123pub use handle::{ServiceHandle, TaskSummary};
124pub use shutdown::{ExitReason, ExitStatus, ShutdownReason, ShutdownToken};
125pub use tasks::{TaskKind, TaskRegistry};
126pub use traits::{
127    DisconnectReason, InboundMessage, NodeLifecycle, PeerApi, PeerId, PeerInfo, RpcApi, RunContext,
128    StartContext, StopContext,
129};
130
131use std::sync::Arc;
132
133use parking_lot::RwLock;
134
135/// The top-level orchestration handle.
136///
137/// See the [crate-level documentation](crate) for architecture + lifecycle
138/// diagram. `Service` owns the business-logic `node` and the peer / RPC
139/// API adapters; driving `start()` runs the full lifecycle.
140///
141/// # Type parameters
142///
143/// - `N` — the [`NodeLifecycle`] implementation (the business core).
144/// - `A` — the [`PeerApi`] implementation. Use `()` for binaries that don't
145///   serve a peer surface (daemon, wallet).
146/// - `R` — the [`RpcApi`] implementation. Use `()` for binaries that don't
147///   serve RPC (introducer, relay).
148pub struct Service<N, A, R>
149where
150    N: NodeLifecycle,
151    A: PeerApi,
152    R: RpcApi,
153{
154    node: Arc<N>,
155    peer_api: Arc<A>,
156    rpc_api: Arc<R>,
157    shutdown: ShutdownToken,
158    tasks: TaskRegistry,
159    // Cached handle so ServiceHandle clones stay valid after Service is dropped.
160    handle_state: Arc<RwLock<handle::HandleState>>,
161    started: Arc<std::sync::atomic::AtomicBool>,
162}
163
164impl<N, A, R> Service<N, A, R>
165where
166    N: NodeLifecycle,
167    A: PeerApi,
168    R: RpcApi,
169{
170    /// Construct a new service from its three components.
171    ///
172    /// No runtime work happens here — ports are not bound, stores are not
173    /// opened. All of that runs inside `start()`.
174    pub fn new(node: N, peer_api: A, rpc_api: R) -> Self {
175        let shutdown = ShutdownToken::new();
176        let tasks = TaskRegistry::new(shutdown.clone());
177        let handle_state = Arc::new(RwLock::new(handle::HandleState::new(N::NAME.unwrap_or(""))));
178        Self {
179            node: Arc::new(node),
180            peer_api: Arc::new(peer_api),
181            rpc_api: Arc::new(rpc_api),
182            shutdown,
183            tasks,
184            handle_state,
185            started: Arc::new(std::sync::atomic::AtomicBool::new(false)),
186        }
187    }
188
189    /// Return a cloneable handle for out-of-process-lifetime commands
190    /// (signal handlers, RPC admin methods).
191    ///
192    /// The returned handle remains valid after `Service` is dropped — its
193    /// operations simply become no-ops.
194    pub fn handle(&self) -> ServiceHandle {
195        ServiceHandle::new(
196            self.shutdown.clone(),
197            self.tasks.clone(),
198            self.handle_state.clone(),
199        )
200    }
201
202    /// Borrow the node. Mostly useful in tests; production callers should
203    /// share state via the `Arc<N>` they constructed before `Service::new`.
204    pub fn node(&self) -> &Arc<N> {
205        &self.node
206    }
207
208    /// Borrow the peer API.
209    pub fn peer_api(&self) -> &Arc<A> {
210        &self.peer_api
211    }
212
213    /// Borrow the RPC API.
214    pub fn rpc_api(&self) -> &Arc<R> {
215        &self.rpc_api
216    }
217
218    /// Borrow the shutdown token. Use this to give background tasks a way
219    /// to notice shutdown has been requested.
220    pub fn shutdown_token(&self) -> &ShutdownToken {
221        &self.shutdown
222    }
223
224    /// Borrow the task registry.
225    pub fn tasks(&self) -> &TaskRegistry {
226        &self.tasks
227    }
228
229    /// Request a graceful shutdown. Idempotent — only the first reason is
230    /// recorded.
231    pub fn request_shutdown(&self, reason: ShutdownReason) {
232        self.shutdown.cancel(reason);
233    }
234
235    /// Drive the full lifecycle: `pre_start → on_start → run → on_stop →
236    /// post_stop`. Returns once `run` exits (graceful or error) OR a
237    /// shutdown has been requested.
238    ///
239    /// # Error handling
240    ///
241    /// - A failure in `pre_start` short-circuits without calling any later
242    ///   hook.
243    /// - A failure in `on_start` calls `post_stop` but skips `run` and
244    ///   `on_stop`.
245    /// - A failure in `run` still calls `on_stop` + `post_stop`.
246    /// - A failure in `on_stop` is recorded but does not block `post_stop`.
247    /// - Panics inside `run` are caught via `scopeguard`-style drop logic
248    ///   and reported as `ExitReason::RunError`.
249    ///
250    /// Calling `start` twice on the same `Service` returns
251    /// [`ServiceError::AlreadyRunning`].
252    pub async fn start(self) -> Result<ExitStatus> {
253        use std::sync::atomic::Ordering;
254
255        if self.started.swap(true, Ordering::SeqCst) {
256            return Err(ServiceError::AlreadyRunning);
257        }
258
259        let node = self.node.clone();
260        let start_ctx = StartContext {
261            name: N::NAME.unwrap_or(""),
262            shutdown: self.shutdown.clone(),
263            tasks: &self.tasks,
264        };
265
266        // pre_start
267        if let Err(e) = node.pre_start(&start_ctx).await {
268            return Err(ServiceError::PreStartFailed(Arc::new(e)));
269        }
270
271        // on_start
272        if let Err(e) = node.on_start(&start_ctx).await {
273            // on_start failed — call post_stop so any resources opened in
274            // pre_start have a chance to close.
275            let stop_ctx = StopContext {
276                shutdown: self.shutdown.clone(),
277                tasks: &self.tasks,
278                exit_reason: ExitReason::RunError(Arc::new(anyhow::anyhow!("on_start failed"))),
279            };
280            let _ = node.post_stop(&stop_ctx).await;
281            return Err(ServiceError::OnStartFailed(Arc::new(e)));
282        }
283
284        // run
285        let run_ctx = RunContext {
286            shutdown: self.shutdown.clone(),
287            tasks: self.tasks.clone(),
288        };
289        let run_result = node.run(run_ctx).await;
290
291        // Determine exit reason.
292        let exit_reason = match &run_result {
293            Ok(()) => self
294                .shutdown
295                .reason()
296                .map(ExitReason::RequestedShutdown)
297                .unwrap_or(ExitReason::RunCompleted),
298            Err(e) => ExitReason::RunError(Arc::new(anyhow::anyhow!("{e}"))),
299        };
300
301        // Signal tasks to wind down even if run returned normally.
302        if !self.shutdown.is_cancelled() {
303            self.shutdown.cancel(ShutdownReason::RequestedByRun);
304        }
305
306        // Wait for background tasks with deadline.
307        let join_result = self
308            .tasks
309            .join_all(std::time::Duration::from_secs(30))
310            .await;
311
312        // on_stop
313        let stop_ctx = StopContext {
314            shutdown: self.shutdown.clone(),
315            tasks: &self.tasks,
316            exit_reason: exit_reason.clone(),
317        };
318        let on_stop_result = node.on_stop(&stop_ctx).await;
319
320        // post_stop (always called)
321        let post_stop_result = node.post_stop(&stop_ctx).await;
322
323        // Decide final error priority: run > on_stop > post_stop > join.
324        if let Err(e) = run_result {
325            return Err(ServiceError::RunFailed(Arc::new(e)));
326        }
327        if let Err(e) = on_stop_result {
328            return Err(ServiceError::OnStopFailed(Arc::new(e)));
329        }
330        if let Err(e) = post_stop_result {
331            return Err(ServiceError::OnStopFailed(Arc::new(e)));
332        }
333        join_result?;
334
335        Ok(ExitStatus {
336            reason: exit_reason,
337        })
338    }
339}