Skip to main content

commonware_glue/stateful/
mod.rs

1//! Manage QMDB database instances on behalf of a stateful application.
2//!
3//! A stateful application built on consensus must maintain speculative state for
4//! every pending chain built on top of the finalized tip. This module provides
5//! the [`Application`] trait and a [`Stateful`] actor that automates that
6//! bookkeeping:
7//!
8//! 1. Before each `propose` or `verify`, the actor forks unmerkleized batches
9//!    from the parent block's pending state (or from committed database state
10//!    if the parent has been finalized).
11//! 2. The application executes against those batches and returns merkleized
12//!    results, which the actor stores as a new pending tip keyed by the
13//!    block's digest.
14//! 3. On finalization, the actor applies the winning tip's changesets to the
15//!    underlying databases and prunes pending entries from dead forks.
16//!
17//! # Database Layer
18//!
19//! The [`db`] module defines batch lifecycle traits ([`db::Unmerkleized`],
20//! [`db::Merkleized`], [`db::ManagedDb`]) and a [`db::DatabaseSet`] trait that
21//! groups one or more databases into a single unit.
22//!
23//! The [`db::p2p`] submodule provides P2P resolver actors (a
24//! [`db::p2p::standard`] resolver implementing
25//! [`commonware_storage::qmdb::sync::resolver::Resolver`] and a
26//! [`db::p2p::compact`] resolver implementing
27//! [`commonware_storage::qmdb::sync::compact::Resolver`]) over
28//! [`commonware-resolver`](commonware_resolver), enabling databases to fetch
29//! and serve sync operations from peers.
30//!
31//! # Syncing
32//!
33//! Applications load a [`SyncPlan`] before constructing marshal and [`Stateful`].
34//! The plan reads the durable state sync state and keeps that metadata handle
35//! until [`Stateful`] consumes it, avoiding multiple opens of the same metadata
36//! partition during startup. Callers gate floor selection on
37//! [`SyncPlan::may_state_sync`] and, if state sync is desired or
38//! [`SyncPlan::requires_state_sync_floor`] is true, attach a finalized floor via
39//! [`SyncPlan::with_floor`]. The same plan then drives marshal (via
40//! [`SyncPlan::marshal_start`]) and stateful (via [`Config::plan`]), so both
41//! actors are guaranteed to agree on the startup decision. Once the durable
42//! complete height is set, the node never performs peer state sync again and
43//! must recover from the later of the stored height and marshal's processed
44//! height on future startups.
45//!
46//! The actor supports two sync paths:
47//!
48//! - **Marshal sync** (no floor attached): [`Stateful::start`] prepares the
49//!   databases before the actor is spawned. New nodes initialize from
50//!   genesis; restarted nodes reconcile the database set against the later of
51//!   marshal's processed anchor and the stored state sync height, rewinding if
52//!   needed. If marshal is behind that stored height, the actor acknowledges old
53//!   finalized blocks without applying them again until marshal catches up. The
54//!   actor then starts directly in normal processing mode while marshal continues
55//!   backfilling blocks from the network.
56//!
57//! - **State sync** (floor attached): Run a one-time QMDB state sync from
58//!   marshal's configured floor block, populating each database via
59//!   [`db::StateSyncSet::sync`]. For each finalized block while state sync
60//!   is live, the actor synchronously asks bootstrap to observe that block's
61//!   sync targets. If the live session accepts the block, the actor
62//!   acknowledges it immediately. Once bootstrap freezes databases at
63//!   `database_anchor`, the actor enters normal processing. If a finalized block
64//!   above `database_anchor` arrives first, the actor processes it during handoff.
65//!   Durable metadata is marked in-progress before any database mutation and is
66//!   marked complete at the converged anchor before handoff acknowledgement. A
67//!   crash before completion restarts through the state-sync path, reopening
68//!   the existing sync journals. Subsequent restarts after completion take the
69//!   marshal sync path to ensure a contiguous stream.
70//!
71//! # Lazy Recovery
72//!
73//! Pending state is kept entirely in memory to avoid disk writes on the
74//! consensus hot path. After a restart the map is empty, but the actor
75//! recovers lazily: when `propose` or `verify` encounters a parent whose
76//! state is missing, the actor walks back through the block DAG (via a
77//! [`BlockProvider`](commonware_consensus::marshal::ancestry::BlockProvider))
78//! to the nearest known ancestor or the finalized tip,
79//! then replays forward via [`Application::apply`] to fill the gap. Each
80//! replayed block is inserted into the pending map immediately so that
81//! partial progress survives timeouts.
82//!
83//! # Compatibility
84//!
85//! The [`Stateful`] application may be used with [`Deferred`] and [`coding::Marshaled`],
86//! but not with [`Inline`]. This is because [`Inline`] does not verify the correctness
87//! of the embedded context within the [`CertifiableBlock`].
88//!
89//! [`Deferred`]: commonware_consensus::marshal::standard::Deferred
90//! [`Inline`]: commonware_consensus::marshal::standard::Inline
91//! [`coding::Marshaled`]: commonware_consensus::marshal::coding::Marshaled
92
93use commonware_consensus::{CertifiableBlock, Epochable, Viewable};
94use commonware_cryptography::certificate::Scheme;
95use commonware_runtime::{Clock, Metrics, Spawner};
96use db::DatabaseSet;
97use futures::Stream;
98use rand::Rng;
99use std::future::Future;
100
101mod actor;
102pub use actor::{Config, Mailbox, Stateful, SyncPlan};
103
104pub mod db;
105
106#[cfg(test)]
107mod tests;
108
109/// The output of a successful [`Application::propose`] call.
110pub struct Proposed<A: Application<E>, E: Rng + Spawner + Metrics + Clock> {
111    /// The block built by the application.
112    pub block: A::Block,
113
114    /// The merkleized database batches produced during execution.
115    pub merkleized: <A::Databases as DatabaseSet<E>>::Merkleized,
116}
117
118/// A stateful application whose storage is managed by a [`DatabaseSet`].
119///
120/// Implementors receive [`DatabaseSet::Unmerkleized`] batches and
121/// return [`DatabaseSet::Merkleized`] batches after execution. The surrounding
122/// wrapper handles persistence: storing merkleized batches as pending tips on
123/// the block tree and applying changesets to the underlying databases on
124/// finalization.
125pub trait Application<E>: Clone + Send + 'static
126where
127    E: Rng + Spawner + Metrics + Clock,
128{
129    /// The signing scheme used by the application.
130    type SigningScheme: Scheme;
131
132    /// Metadata provided by the consensus engine for a given block.
133    ///
134    /// This often includes things like the proposer, view number, height, or
135    /// epoch. Must be [`Epochable`] and [`Viewable`] so the wrapper can
136    /// construct a [`Round`](commonware_consensus::types::Round) for
137    /// pending-state pruning.
138    type Context: Epochable + Viewable + Send;
139
140    /// The block type produced by the application.
141    ///
142    /// Must implement [`CertifiableBlock`] so the wrapper can extract
143    /// the consensus context during lazy recovery (see
144    /// [`apply`](Self::apply)).
145    type Block: CertifiableBlock<Context = Self::Context>;
146
147    /// The set of databases managed on behalf of this application.
148    type Databases: DatabaseSet<E>;
149
150    /// A provider of input to the application.
151    ///
152    /// This may be a mempool that serves transactions, a stream of
153    /// certificates, or any other source of input that drives state
154    /// transitions.
155    type InputProvider: Send;
156
157    /// Extract per-database sync targets from a finalized block.
158    ///
159    /// Called by the wrapper for finalized blocks received during state sync.
160    ///
161    /// The returned targets are handed to the state sync coordinator so the
162    /// sync engines can track the latest finalized state root and range.
163    fn sync_targets(block: &Self::Block) -> <Self::Databases as DatabaseSet<E>>::SyncTargets;
164
165    /// Block used to initialize the consensus engine in the first epoch.
166    fn genesis(&mut self) -> impl Future<Output = Self::Block> + Send;
167
168    /// Build a new block on top of the provided parent ancestry.
169    ///
170    /// Returns [`None`] if the build fails.
171    ///
172    /// The wrapper checks that the returned merkleized state matches
173    /// [`sync_targets`](Self::sync_targets) for the returned block before the
174    /// result is cached as pending state. If the implementor produces a
175    /// block with mismatched targets, this function will panic.
176    ///
177    /// Applications using [`qmdb::current`](commonware_storage::qmdb::current)
178    /// must still ensure the proposed block commits to the merkleized batch's
179    /// canonical root. The wrapper's sync-target check only verifies the ops
180    /// root and operation range used by replay sync.
181    ///
182    /// This future may be cancelled by consensus if the caller drops its
183    /// response receiver. Implementations should be cancellation-safe: dropping
184    /// and retrying must not violate invariants or lose durable progress.
185    fn propose(
186        &mut self,
187        context: (E, Self::Context),
188        ancestry: impl Stream<Item = Self::Block> + Send,
189        batches: <Self::Databases as DatabaseSet<E>>::Unmerkleized,
190        input: &mut Self::InputProvider,
191    ) -> impl Future<Output = Option<Proposed<Self, E>>> + Send;
192
193    /// Verify a block received from a peer, relative to its ancestry.
194    ///
195    /// Called before voting. The implementation should execute the block
196    /// against the provided batches and merkleize them.
197    ///
198    /// This future should not resolve until the implementation can produce a
199    /// stable verdict. Return [`None`] only when the block is permanently
200    /// invalid for the supplied context, ancestry, and batches. If validity may
201    /// still change as additional information becomes available, continue
202    /// waiting instead of returning [`None`].
203    ///
204    /// In other words, to abstain from voting, do not resolve this future yet.
205    /// Keep it pending until the implementation can either prove the block
206    /// valid, prove it invalid, or the consensus engine cancels the request.
207    /// Abstaining is not represented by a special return value.
208    ///
209    /// Verification must reject any block whose execution result does not
210    /// match the block's committed state (for example, a state root mismatch).
211    /// Implementations do not need to re-check [`sync_targets`](Self::sync_targets)
212    /// against the produced batches themselves: the wrapper enforces
213    /// this by checking that any returned merkleized state matches the block
214    /// before it is cached as pending state.
215    ///
216    /// Applications using [`qmdb::current`](commonware_storage::qmdb::current)
217    /// must still reject blocks whose committed canonical root differs from the
218    /// merkleized batch root. The wrapper's sync-target check only verifies the
219    /// ops root and operation range used by replay sync.
220    ///
221    /// This future may be cancelled by consensus if the caller drops its
222    /// response receiver. Implementations should be cancellation-safe: dropping
223    /// and retrying must not violate invariants or lose durable progress.
224    fn verify(
225        &mut self,
226        context: (E, Self::Context),
227        ancestry: impl Stream<Item = Self::Block> + Send,
228        batches: <Self::Databases as DatabaseSet<E>>::Unmerkleized,
229    ) -> impl Future<Output = Option<<Self::Databases as DatabaseSet<E>>::Merkleized>> + Send;
230
231    /// Apply a previously certified block to reconstruct its merkleized state.
232    ///
233    /// Called by the wrapper during lazy recovery when pending state for
234    /// an ancestor block is missing (e.g. after a restart). The block is
235    /// known-good (it was previously certified), so the implementation
236    /// should unconditionally execute the block's state transitions.
237    ///
238    /// The returned merkleized state must match what
239    /// [`verify`](Self::verify) accepted for `block`. The wrapper commits this
240    /// replay result during finalization and cannot re-check block-specific
241    /// commitments generically.
242    ///
243    /// This future may be cancelled if the originating propose/verify request
244    /// is dropped. Implementations should be cancellation-safe: dropping and
245    /// retrying must not violate invariants or lose durable progress.
246    ///
247    /// # Panics
248    ///
249    /// Implementations should panic if execution fails, as this indicates
250    /// data corruption or non-determinism.
251    fn apply(
252        &mut self,
253        context: (E, Self::Context),
254        block: &Self::Block,
255        batches: <Self::Databases as DatabaseSet<E>>::Unmerkleized,
256    ) -> impl Future<Output = <Self::Databases as DatabaseSet<E>>::Merkleized> + Send;
257
258    /// Observe a block after its database batches have been durably finalized.
259    ///
260    /// Called only after [`DatabaseSet::finalize`] succeeds. Implementations
261    /// may use this to run post-finalization maintenance such as pruning.
262    ///
263    /// # Panics
264    ///
265    /// Implementations should panic if post-finalization maintenance fails.
266    fn finalized(
267        &mut self,
268        _context: (E, Self::Context),
269        _block: &Self::Block,
270        _databases: &Self::Databases,
271    ) -> impl Future<Output = ()> + Send {
272        async {}
273    }
274}