Skip to main content

commonware_glue/stateful/actor/core/
mod.rs

1//! Stateful application that manages the pending-tip DAG of merkleized batches on behalf of an [`Application`].
2//!
3//! The [`Stateful`] actor is split into two control loops:
4//! - [`Syncing`] manages the state sync process.
5//! - [`Processing`] manages the pending-tip DAG and drives the inner application.
6
7use crate::stateful::{
8    actor::{
9        core::{mailbox::Message, processing::Processing, syncing::Syncing},
10        processor::{Processor, ProcessorMetrics},
11        syncer::{self, SyncPlan, SyncResult},
12    },
13    db::{
14        assert_rewind_window_safety, AttachableResolverSet, DatabaseSet, StateSyncSet,
15        SyncEngineConfig,
16    },
17    Application,
18};
19use commonware_actor::mailbox::{self as actor_mailbox};
20use commonware_consensus::{
21    marshal::{
22        ancestry::BlockProvider,
23        core::{Mailbox as MarshalMailbox, Variant},
24    },
25    simplex::types::Finalization,
26};
27use commonware_cryptography::{certificate::Scheme, Digestible};
28use commonware_runtime::{spawn_cell, Clock, ContextCell, Handle, Metrics, Spawner, Storage};
29use commonware_utils::{channel::oneshot, sync::AsyncMutex};
30use futures::join;
31use rand::Rng;
32use std::{num::NonZeroUsize, sync::Arc};
33
34mod mailbox;
35pub use mailbox::Mailbox;
36
37mod processing;
38mod syncing;
39
40type BlockDigest<A, E> = <<A as Application<E>>::Block as Digestible>::Digest;
41
42/// Configuration for constructing a [`Stateful`] application.
43pub struct Config<E, A, S, V, R>
44where
45    E: Rng + Spawner + Metrics + Clock + Storage,
46    A: Application<E>,
47    S: Scheme,
48    V: Variant<ApplicationBlock = A::Block>,
49{
50    /// The inner application that drives state transitions.
51    pub application: A,
52
53    /// Configuration used to construct the database set.
54    pub db_config: <A::Databases as DatabaseSet<E>>::Config,
55
56    /// Source of input (e.g. transactions) passed to the application on propose.
57    pub input_provider: A::InputProvider,
58
59    /// Marshal mailbox used for startup anchoring and lazy recovery.
60    pub marshal: MarshalMailbox<S, V>,
61
62    /// Marshal ack window used by the provided marshal mailbox.
63    ///
64    /// This must match the marshal config used to construct [`Self::marshal`].
65    pub max_pending_acks: NonZeroUsize,
66
67    /// Capacity of the stateful actor mailbox channel.
68    pub mailbox_size: NonZeroUsize,
69
70    /// Startup plan loaded via [`SyncPlan::init`], optionally augmented with
71    /// a finalized floor via [`SyncPlan::with_floor`]. Carries the durable
72    /// metadata handle and the startup decision shared with marshal.
73    pub plan: SyncPlan<E, S, V>,
74
75    /// Resolver(s) for state sync fetches and post-bootstrap serving.
76    pub resolvers: R,
77
78    /// Sync engine tuning knobs.
79    pub sync_config: SyncEngineConfig,
80}
81
82/// Stateful application that manages the pending-tip DAG of merkleized
83/// batches on behalf of an [`Application`], implementing the consensus
84/// application and verifying traits.
85pub struct Stateful<E, A, S, V, R>
86where
87    E: Rng + Spawner + Metrics + Clock + Storage,
88    A: Application<E>,
89    S: Scheme,
90    V: Variant<ApplicationBlock = A::Block>,
91{
92    /// Runtime context providing RNG, task spawning, metrics, and clock.
93    context: ContextCell<E>,
94
95    /// The receiver for messages.
96    mailbox: actor_mailbox::Receiver<Message<E, A>>,
97
98    /// The inner application that drives state transitions.
99    application: A,
100
101    /// Source of input (e.g. transactions) passed to the application on propose.
102    input_provider: A::InputProvider,
103
104    /// Marshal mailbox used for startup anchoring and lazy recovery.
105    marshal: MarshalMailbox<S, V>,
106
107    /// Configuration used to initialize the database set at startup.
108    db_config: <A::Databases as DatabaseSet<E>>::Config,
109
110    /// Startup plan carrying the metadata handle and floor decision.
111    plan: SyncPlan<E, S, V>,
112
113    /// Resolver(s) for state sync fetches and post-bootstrap serving.
114    resolvers: R,
115
116    /// Sync engine tuning knobs.
117    sync_config: SyncEngineConfig,
118}
119
120impl<E, A, S, V, R> Stateful<E, A, S, V, R>
121where
122    E: Rng + Spawner + Metrics + Clock + Storage,
123    A: Application<E>,
124    A::Databases: StateSyncSet<E, R, BlockDigest<A, E>>,
125    S: Scheme,
126    V: Variant<ApplicationBlock = A::Block>,
127    R: AttachableResolverSet<A::Databases>,
128    MarshalMailbox<S, V>: BlockProvider<Block = A::Block>,
129{
130    /// Construct a [`Stateful`] actor and its [`Mailbox`].
131    ///
132    /// This only wires dependencies and allocates the mailbox. The actor does
133    /// not process messages until [`Stateful::start`] is called.
134    pub fn init(context: E, config: Config<E, A, S, V, R>) -> (Self, Mailbox<E, A>) {
135        assert_rewind_window_safety::<E, A::Databases>(config.max_pending_acks);
136
137        let (sender, mailbox) = actor_mailbox::new(context.child("mailbox"), config.mailbox_size);
138        (
139            Self {
140                context: ContextCell::new(context),
141                mailbox,
142                application: config.application,
143                input_provider: config.input_provider,
144                marshal: config.marshal,
145                db_config: config.db_config,
146                plan: config.plan,
147                resolvers: config.resolvers,
148                sync_config: config.sync_config,
149            },
150            Mailbox::new(sender),
151        )
152    }
153
154    pub fn start(mut self) -> Handle<()> {
155        spawn_cell!(self.context, self.run())
156    }
157
158    async fn run(self) {
159        if let Some(floor) = self.plan.floor().cloned() {
160            self.start_state_sync(floor).await;
161        } else if self.plan.requires_state_sync_floor() {
162            panic!("interrupted state sync must resume from a newly selected floor");
163        } else {
164            self.start_from_marshal().await;
165        }
166    }
167
168    /// Starts the application in [`Syncing`] mode, kicking off a state sync process
169    /// towards the finalized floor specified in the [`SyncPlan`].
170    async fn start_state_sync(self, floor: Finalization<S, V::Commitment>) {
171        let sync_metadata = Arc::new(AsyncMutex::new(self.plan.into_sync_metadata()));
172        let (sync_complete, sync_completed) = oneshot::channel();
173        let (syncer, syncer_mailbox) = syncer::Syncer::new(syncer::Config {
174            context: self.context.child("syncer"),
175            db_config: self.db_config,
176            sync_config: self.sync_config,
177            resolvers: self.resolvers.clone(),
178            sync_metadata: sync_metadata.clone(),
179            finalization: floor,
180            marshal: self.marshal.clone(),
181            sync_complete,
182        });
183        let syncing = Syncing {
184            context: self.context,
185            mailbox: self.mailbox,
186            application: self.application,
187            input_provider: self.input_provider,
188            marshal: self.marshal,
189            sync_metadata,
190            syncer: syncer_mailbox,
191            held_verify_requests: Vec::new(),
192            database_subscribers: Vec::new(),
193            artifact: None,
194            resolvers: self.resolvers,
195            sync_completed,
196        };
197        let _ = join!(syncer.start(), syncing.start());
198    }
199
200    /// Starts the application by initializing the database set at marshal's current floor.
201    async fn start_from_marshal(self) {
202        let syncer::StartupResult {
203            sync: SyncResult { databases, anchor },
204            skip_finalized_until,
205        } = syncer::init_databases_from_marshal::<E, A, S, V>(
206            self.context.as_present(),
207            &self.marshal,
208            self.db_config,
209            self.plan.into_sync_metadata(),
210        )
211        .await;
212
213        // Attach the resolvers to the initialized databases before starting the processor,
214        // so that this instance can serve peers database operations and proofs.
215        self.resolvers.attach_databases(databases.clone()).await;
216
217        let processor_metrics = ProcessorMetrics::new(self.context.child("processor"));
218        let processor = Processor::new(self.application, databases, anchor, processor_metrics);
219        Processing {
220            context: self.context,
221            mailbox: self.mailbox,
222            input_provider: self.input_provider,
223            marshal: self.marshal,
224            resolvers: self.resolvers,
225            processor,
226            skip_finalized_until,
227        }
228        .start()
229        .await
230    }
231}
232
233#[cfg(test)]
234mod tests {
235    use super::{Config, Stateful};
236    use crate::stateful::{
237        actor::syncer::SyncPlan,
238        db::{AttachableResolver, StateSyncDb, SyncEngineConfig},
239        tests::mocks::{TestApp, TestBlock, TestDb, TestScheme, TestVariant},
240    };
241    use commonware_consensus::{
242        marshal::{self, ancestry, core::Actor as MarshalActor},
243        simplex::{
244            mocks::scheme as scheme_mocks,
245            types::{Finalization, Finalize, Proposal},
246        },
247        types::{Epoch, FixedEpocher, Round, View, ViewDelta},
248        Application as _, CertifiableBlock as _,
249    };
250    use commonware_cryptography::{
251        certificate::{mocks::Fixture, ConstantProvider},
252        sha256::Digest as Sha256Digest,
253    };
254    use commonware_macros::select;
255    use commonware_parallel::Sequential;
256    use commonware_runtime::{
257        buffer::paged::CacheRef, deterministic, Clock as _, Runner as _, Supervisor as _,
258    };
259    use commonware_storage::archive::immutable;
260    use commonware_utils::{channel::mpsc, sync::AsyncRwLock, NZUsize, NZU16, NZU64};
261    use std::{convert::Infallible, sync::Arc, time::Duration};
262
263    #[derive(Clone)]
264    struct NoopResolver;
265
266    impl AttachableResolver<TestDb> for NoopResolver {
267        async fn attach_database(&self, _db: Arc<AsyncRwLock<TestDb>>) {}
268    }
269
270    impl StateSyncDb<deterministic::Context, NoopResolver> for TestDb {
271        type SyncError = Infallible;
272
273        async fn sync_db(
274            _context: deterministic::Context,
275            _config: Self::Config,
276            _resolver: NoopResolver,
277            _target: Self::SyncTarget,
278            _tip_updates: mpsc::Receiver<Self::SyncTarget>,
279            _finish: Option<mpsc::Receiver<()>>,
280            _reached_target: Option<mpsc::Sender<Self::SyncTarget>>,
281            _sync_config: SyncEngineConfig,
282        ) -> Result<Self, Self::SyncError> {
283            Ok(Self)
284        }
285    }
286
287    fn archive_config(page_cache: CacheRef, partition: &str) -> immutable::Config<()> {
288        immutable::Config {
289            metadata_partition: format!("{partition}-metadata"),
290            freezer_table_partition: format!("{partition}-table"),
291            freezer_table_initial_size: 4,
292            freezer_table_resize_frequency: 2,
293            freezer_table_resize_chunk_size: 2,
294            freezer_key_partition: format!("{partition}-key"),
295            freezer_key_page_cache: page_cache,
296            freezer_value_partition: format!("{partition}-value"),
297            freezer_value_target_size: 128,
298            freezer_value_compression: None,
299            ordinal_partition: format!("{partition}-ordinal"),
300            items_per_section: NZU64!(4),
301            codec_config: (),
302            replay_buffer: NZUsize!(64),
303            freezer_key_write_buffer: NZUsize!(64),
304            freezer_value_write_buffer: NZUsize!(64),
305            ordinal_write_buffer: NZUsize!(64),
306        }
307    }
308
309    fn build_finalization(
310        fixture: &Fixture<TestScheme>,
311        payload: Sha256Digest,
312    ) -> Finalization<TestScheme, Sha256Digest> {
313        let proposal = Proposal::new(
314            Round::new(Epoch::zero(), View::new(1)),
315            View::zero(),
316            payload,
317        );
318        let votes: Vec<_> = fixture
319            .schemes
320            .iter()
321            .map(|scheme| Finalize::sign(scheme, proposal.clone()).unwrap())
322            .collect();
323
324        Finalization::from_finalizes(&fixture.verifier, &votes, &Sequential)
325            .expect("finalization quorum")
326    }
327
328    #[test]
329    fn mailbox_rejects_propose_while_floor_resolution_waits() {
330        deterministic::Runner::timed(Duration::from_secs(5)).start(|context| async move {
331            let mut signing_context = context.child("signing");
332            let fixture = scheme_mocks::fixture(&mut signing_context, b"pending-floor", 1);
333            let provider = ConstantProvider::new(fixture.schemes[0].clone());
334            let finalization = build_finalization(&fixture, Sha256Digest::from([7; 32]));
335
336            let page_cache = CacheRef::from_pooler(&context, NZU16!(1024), NZUsize!(8));
337            let finalizations_by_height = immutable::Archive::init(
338                context.child("finalizations_by_height"),
339                archive_config(page_cache.clone(), "pending-floor-finalizations"),
340            )
341            .await
342            .expect("failed to initialize finalizations archive");
343            let finalized_blocks = immutable::Archive::init(
344                context.child("finalized_blocks"),
345                archive_config(page_cache.clone(), "pending-floor-blocks"),
346            )
347            .await
348            .expect("failed to initialize blocks archive");
349
350            let (_marshal_actor, marshal, _height) =
351                MarshalActor::<_, TestVariant, _, _, _, _, _>::init(
352                    context.child("marshal"),
353                    finalizations_by_height,
354                    finalized_blocks,
355                    marshal::Config {
356                        provider,
357                        epocher: FixedEpocher::new(NZU64!(u64::MAX)),
358                        start: marshal::Start::Genesis(TestBlock::new(0, 0)),
359                        partition_prefix: "pending-floor-marshal".to_string(),
360                        mailbox_size: NZUsize!(8),
361                        view_retention_timeout: ViewDelta::new(1),
362                        prunable_items_per_section: NZU64!(4),
363                        page_cache,
364                        replay_buffer: NZUsize!(64),
365                        key_write_buffer: NZUsize!(64),
366                        value_write_buffer: NZUsize!(64),
367                        block_codec_config: (),
368                        max_repair: NZUsize!(1),
369                        max_pending_acks: NZUsize!(1),
370                        strategy: Sequential,
371                    },
372                )
373                .await;
374
375            let plan = SyncPlan::init(&context, "pending-floor-stateful".to_string()).await;
376            let (stateful, mut mailbox) = Stateful::init(
377                context.child("stateful"),
378                Config {
379                    application: TestApp,
380                    db_config: (),
381                    input_provider: (),
382                    marshal,
383                    max_pending_acks: NZUsize!(1),
384                    mailbox_size: NZUsize!(8),
385                    plan: plan.with_floor(finalization),
386                    resolvers: NoopResolver,
387                    sync_config: SyncEngineConfig {
388                        fetch_batch_size: NZU64!(1),
389                        apply_batch_size: 1,
390                        max_outstanding_requests: 1,
391                        update_channel_size: NZUsize!(1),
392                        max_retained_roots: 1,
393                    },
394                },
395            );
396            let handle = stateful.start();
397
398            select! {
399                result = mailbox.propose(
400                    (context.child("proposal"), TestBlock::new(1, 1).context()),
401                    ancestry::from_iter([]),
402                ) => {
403                    assert!(result.is_none());
404                },
405                _ = context.sleep(Duration::from_millis(100)) => {
406                    panic!("stateful mailbox stalled while resolving state sync floor");
407                },
408            }
409
410            handle.abort();
411        });
412    }
413}