commonware_glue/stateful/mod.rs
1//! Manage QMDB database instances on behalf of a stateful application.
2//!
3//! A stateful application built on consensus must maintain speculative state for
4//! every pending chain built on top of the finalized tip. This module provides
5//! the [`Application`] trait and a [`Stateful`] actor that automates that
6//! bookkeeping:
7//!
8//! 1. Before each `propose` or `verify`, the actor forks unmerkleized batches
9//! from the parent block's pending state (or from committed database state
10//! if the parent has been finalized).
11//! 2. The application executes against those batches and returns merkleized
12//! results, which the actor stores as a new pending tip keyed by the
13//! block's digest.
14//! 3. On finalization, the actor applies the winning tip's changesets to the
15//! underlying databases and prunes pending entries from dead forks.
16//!
17//! # Database Layer
18//!
19//! The [`db`] module defines batch lifecycle traits ([`db::Unmerkleized`],
20//! [`db::Merkleized`], [`db::ManagedDb`]) and a [`db::DatabaseSet`] trait that
21//! groups one or more databases into a single unit.
22//!
23//! The [`db::p2p`] submodule provides P2P resolver actors (a
24//! [`db::p2p::standard`] resolver implementing
25//! [`commonware_storage::qmdb::sync::resolver::Resolver`] and a
26//! [`db::p2p::compact`] resolver implementing
27//! [`commonware_storage::qmdb::sync::compact::Resolver`]) over
28//! [`commonware-resolver`](commonware_resolver), enabling databases to fetch
29//! and serve sync operations from peers.
30//!
31//! # Syncing
32//!
33//! Applications load a [`SyncPlan`] before constructing marshal and [`Stateful`].
34//! The plan reads the durable state sync state and keeps that metadata handle
35//! until [`Stateful`] consumes it, avoiding multiple opens of the same metadata
36//! partition during startup. Callers gate floor selection on
37//! [`SyncPlan::may_state_sync`] and, if state sync is desired or
38//! [`SyncPlan::requires_state_sync_floor`] is true, attach a finalized floor via
39//! [`SyncPlan::with_floor`]. The same plan then drives marshal (via
40//! [`SyncPlan::marshal_start`]) and stateful (via [`Config::plan`]), so both
41//! actors are guaranteed to agree on the startup decision. Once the durable
42//! complete height is set, the node never performs peer state sync again and
43//! must recover from the later of the stored height and marshal's processed
44//! height on future startups.
45//!
46//! The actor supports two sync paths:
47//!
48//! - **Marshal sync** (no floor attached): [`Stateful::start`] prepares the
49//! databases before the actor is spawned. New nodes initialize from
50//! genesis; restarted nodes reconcile the database set against the later of
51//! marshal's processed anchor and the stored state sync height, rewinding if
52//! needed. If marshal is behind that stored height, the actor acknowledges old
53//! finalized blocks without applying them again until marshal catches up. The
54//! actor then starts directly in normal processing mode while marshal continues
55//! backfilling blocks from the network.
56//!
57//! - **State sync** (floor attached): Run a one-time QMDB state sync from
58//! marshal's configured floor block, populating each database via
59//! [`db::StateSyncSet::sync`]. For each finalized block while state sync
60//! is live, the actor synchronously asks bootstrap to observe that block's
61//! sync targets. If the live session accepts the block, the actor
62//! acknowledges it immediately. Once bootstrap freezes databases at
63//! `database_anchor`, the actor enters normal processing. If a finalized block
64//! above `database_anchor` arrives first, the actor processes it during handoff.
65//! Durable metadata is marked in-progress before any database mutation and is
66//! marked complete at the converged anchor before handoff acknowledgement. A
67//! crash before completion restarts through the state-sync path, reopening
68//! the existing sync journals. Subsequent restarts after completion take the
69//! marshal sync path to ensure a contiguous stream.
70//!
71//! # Lazy Recovery
72//!
73//! Pending state is kept entirely in memory to avoid disk writes on the
74//! consensus hot path. After a restart the map is empty, but the actor
75//! recovers lazily: when `propose` or `verify` encounters a parent whose
76//! state is missing, the actor walks back through the block DAG (via a
77//! [`BlockProvider`](commonware_consensus::marshal::ancestry::BlockProvider))
78//! to the nearest known ancestor or the finalized tip,
79//! then replays forward via [`Application::apply`] to fill the gap. Each
80//! replayed block is inserted into the pending map immediately so that
81//! partial progress survives timeouts.
82//!
83//! # Compatibility
84//!
85//! The [`Stateful`] application may be used with [`Deferred`] and [`coding::Marshaled`],
86//! but not with [`Inline`]. This is because [`Inline`] does not verify the correctness
87//! of the embedded context within the [`CertifiableBlock`].
88//!
89//! [`Deferred`]: commonware_consensus::marshal::standard::Deferred
90//! [`Inline`]: commonware_consensus::marshal::standard::Inline
91//! [`coding::Marshaled`]: commonware_consensus::marshal::coding::Marshaled
92
93use commonware_consensus::{CertifiableBlock, Epochable, Viewable};
94use commonware_cryptography::certificate::Scheme;
95use commonware_runtime::{Clock, Metrics, Spawner};
96use db::DatabaseSet;
97use futures::Stream;
98use rand::Rng;
99use std::future::Future;
100
101mod actor;
102pub use actor::{Config, Mailbox, Stateful, SyncPlan};
103
104pub mod db;
105
106#[cfg(test)]
107mod tests;
108
109/// The output of a successful [`Application::propose`] call.
110pub struct Proposed<A: Application<E>, E: Rng + Spawner + Metrics + Clock> {
111 /// The block built by the application.
112 pub block: A::Block,
113
114 /// The merkleized database batches produced during execution.
115 pub merkleized: <A::Databases as DatabaseSet<E>>::Merkleized,
116}
117
118/// A stateful application whose storage is managed by a [`DatabaseSet`].
119///
120/// Implementors receive [`DatabaseSet::Unmerkleized`] batches and
121/// return [`DatabaseSet::Merkleized`] batches after execution. The surrounding
122/// wrapper handles persistence: storing merkleized batches as pending tips on
123/// the block tree and applying changesets to the underlying databases on
124/// finalization.
125pub trait Application<E>: Clone + Send + 'static
126where
127 E: Rng + Spawner + Metrics + Clock,
128{
129 /// The signing scheme used by the application.
130 type SigningScheme: Scheme;
131
132 /// Metadata provided by the consensus engine for a given block.
133 ///
134 /// This often includes things like the proposer, view number, height, or
135 /// epoch. Must be [`Epochable`] and [`Viewable`] so the wrapper can
136 /// construct a [`Round`](commonware_consensus::types::Round) for
137 /// pending-state pruning.
138 type Context: Epochable + Viewable + Send;
139
140 /// The block type produced by the application.
141 ///
142 /// Must implement [`CertifiableBlock`] so the wrapper can extract
143 /// the consensus context during lazy recovery (see
144 /// [`apply`](Self::apply)).
145 type Block: CertifiableBlock<Context = Self::Context>;
146
147 /// The set of databases managed on behalf of this application.
148 type Databases: DatabaseSet<E>;
149
150 /// A provider of input to the application.
151 ///
152 /// This may be a mempool that serves transactions, a stream of
153 /// certificates, or any other source of input that drives state
154 /// transitions.
155 type InputProvider: Send;
156
157 /// Extract per-database sync targets from a finalized block.
158 ///
159 /// Called by the wrapper for finalized blocks received during state sync.
160 ///
161 /// The returned targets are handed to the state sync coordinator so the
162 /// sync engines can track the latest finalized state root and range.
163 fn sync_targets(block: &Self::Block) -> <Self::Databases as DatabaseSet<E>>::SyncTargets;
164
165 /// Block used to initialize the consensus engine in the first epoch.
166 fn genesis(&mut self) -> impl Future<Output = Self::Block> + Send;
167
168 /// Build a new block on top of the provided parent ancestry.
169 ///
170 /// Returns [`None`] if the build fails.
171 ///
172 /// The wrapper checks that the returned merkleized state matches
173 /// [`sync_targets`](Self::sync_targets) for the returned block before the
174 /// result is cached as pending state. If the implementor produces a
175 /// block with mismatched targets, this function will panic.
176 ///
177 /// Applications using [`qmdb::current`](commonware_storage::qmdb::current)
178 /// must still ensure the proposed block commits to the merkleized batch's
179 /// canonical root. The wrapper's sync-target check only verifies the ops
180 /// root and operation range used by replay sync.
181 ///
182 /// This future may be cancelled by consensus if the caller drops its
183 /// response receiver. Implementations should be cancellation-safe: dropping
184 /// and retrying must not violate invariants or lose durable progress.
185 fn propose(
186 &mut self,
187 context: (E, Self::Context),
188 ancestry: impl Stream<Item = Self::Block> + Send,
189 batches: <Self::Databases as DatabaseSet<E>>::Unmerkleized,
190 input: &mut Self::InputProvider,
191 ) -> impl Future<Output = Option<Proposed<Self, E>>> + Send;
192
193 /// Verify a block received from a peer, relative to its ancestry.
194 ///
195 /// Called before voting. The implementation should execute the block
196 /// against the provided batches and merkleize them.
197 ///
198 /// This future should not resolve until the implementation can produce a
199 /// stable verdict. Return [`None`] only when the block is permanently
200 /// invalid for the supplied context, ancestry, and batches. If validity may
201 /// still change as additional information becomes available, continue
202 /// waiting instead of returning [`None`].
203 ///
204 /// In other words, to abstain from voting, do not resolve this future yet.
205 /// Keep it pending until the implementation can either prove the block
206 /// valid, prove it invalid, or the consensus engine cancels the request.
207 /// Abstaining is not represented by a special return value.
208 ///
209 /// Verification must reject any block whose execution result does not
210 /// match the block's committed state (for example, a state root mismatch).
211 /// Implementations do not need to re-check [`sync_targets`](Self::sync_targets)
212 /// against the produced batches themselves: the wrapper enforces
213 /// this by checking that any returned merkleized state matches the block
214 /// before it is cached as pending state.
215 ///
216 /// Applications using [`qmdb::current`](commonware_storage::qmdb::current)
217 /// must still reject blocks whose committed canonical root differs from the
218 /// merkleized batch root. The wrapper's sync-target check only verifies the
219 /// ops root and operation range used by replay sync.
220 ///
221 /// This future may be cancelled by consensus if the caller drops its
222 /// response receiver. Implementations should be cancellation-safe: dropping
223 /// and retrying must not violate invariants or lose durable progress.
224 fn verify(
225 &mut self,
226 context: (E, Self::Context),
227 ancestry: impl Stream<Item = Self::Block> + Send,
228 batches: <Self::Databases as DatabaseSet<E>>::Unmerkleized,
229 ) -> impl Future<Output = Option<<Self::Databases as DatabaseSet<E>>::Merkleized>> + Send;
230
231 /// Apply a previously certified block to reconstruct its merkleized state.
232 ///
233 /// Called by the wrapper during lazy recovery when pending state for
234 /// an ancestor block is missing (e.g. after a restart). The block is
235 /// known-good (it was previously certified), so the implementation
236 /// should unconditionally execute the block's state transitions.
237 ///
238 /// The returned merkleized state must match what
239 /// [`verify`](Self::verify) accepted for `block`. The wrapper commits this
240 /// replay result during finalization and cannot re-check block-specific
241 /// commitments generically.
242 ///
243 /// This future may be cancelled if the originating propose/verify request
244 /// is dropped. Implementations should be cancellation-safe: dropping and
245 /// retrying must not violate invariants or lose durable progress.
246 ///
247 /// # Panics
248 ///
249 /// Implementations should panic if execution fails, as this indicates
250 /// data corruption or non-determinism.
251 fn apply(
252 &mut self,
253 context: (E, Self::Context),
254 block: &Self::Block,
255 batches: <Self::Databases as DatabaseSet<E>>::Unmerkleized,
256 ) -> impl Future<Output = <Self::Databases as DatabaseSet<E>>::Merkleized> + Send;
257
258 /// Observe a block after its database batches have been durably finalized.
259 ///
260 /// Called only after [`DatabaseSet::finalize`] succeeds. Implementations
261 /// may use this to run post-finalization maintenance such as pruning.
262 ///
263 /// # Panics
264 ///
265 /// Implementations should panic if post-finalization maintenance fails.
266 fn finalized(
267 &mut self,
268 _context: (E, Self::Context),
269 _block: &Self::Block,
270 _databases: &Self::Databases,
271 ) -> impl Future<Output = ()> + Send {
272 async {}
273 }
274}