1use crate::stateful::{
8 actor::{
9 core::{mailbox::Message, processing::Processing, syncing::Syncing},
10 processor::{Processor, ProcessorMetrics},
11 syncer::{self, SyncPlan, SyncResult},
12 },
13 db::{
14 assert_rewind_window_safety, AttachableResolverSet, DatabaseSet, StateSyncSet,
15 SyncEngineConfig,
16 },
17 Application,
18};
19use commonware_actor::mailbox::{self as actor_mailbox};
20use commonware_consensus::{
21 marshal::{
22 ancestry::BlockProvider,
23 core::{Mailbox as MarshalMailbox, Variant},
24 },
25 simplex::types::Finalization,
26};
27use commonware_cryptography::{certificate::Scheme, Digestible};
28use commonware_runtime::{spawn_cell, Clock, ContextCell, Handle, Metrics, Spawner, Storage};
29use commonware_utils::{channel::oneshot, sync::AsyncMutex};
30use futures::join;
31use rand::Rng;
32use std::{num::NonZeroUsize, sync::Arc};
33
34mod mailbox;
35pub use mailbox::Mailbox;
36
37mod processing;
38mod syncing;
39
40type BlockDigest<A, E> = <<A as Application<E>>::Block as Digestible>::Digest;
41
42pub struct Config<E, A, S, V, R>
44where
45 E: Rng + Spawner + Metrics + Clock + Storage,
46 A: Application<E>,
47 S: Scheme,
48 V: Variant<ApplicationBlock = A::Block>,
49{
50 pub application: A,
52
53 pub db_config: <A::Databases as DatabaseSet<E>>::Config,
55
56 pub input_provider: A::InputProvider,
58
59 pub marshal: MarshalMailbox<S, V>,
61
62 pub max_pending_acks: NonZeroUsize,
66
67 pub mailbox_size: NonZeroUsize,
69
70 pub plan: SyncPlan<E, S, V>,
74
75 pub resolvers: R,
77
78 pub sync_config: SyncEngineConfig,
80}
81
82pub struct Stateful<E, A, S, V, R>
86where
87 E: Rng + Spawner + Metrics + Clock + Storage,
88 A: Application<E>,
89 S: Scheme,
90 V: Variant<ApplicationBlock = A::Block>,
91{
92 context: ContextCell<E>,
94
95 mailbox: actor_mailbox::Receiver<Message<E, A>>,
97
98 application: A,
100
101 input_provider: A::InputProvider,
103
104 marshal: MarshalMailbox<S, V>,
106
107 db_config: <A::Databases as DatabaseSet<E>>::Config,
109
110 plan: SyncPlan<E, S, V>,
112
113 resolvers: R,
115
116 sync_config: SyncEngineConfig,
118}
119
120impl<E, A, S, V, R> Stateful<E, A, S, V, R>
121where
122 E: Rng + Spawner + Metrics + Clock + Storage,
123 A: Application<E>,
124 A::Databases: StateSyncSet<E, R, BlockDigest<A, E>>,
125 S: Scheme,
126 V: Variant<ApplicationBlock = A::Block>,
127 R: AttachableResolverSet<A::Databases>,
128 MarshalMailbox<S, V>: BlockProvider<Block = A::Block>,
129{
130 pub fn init(context: E, config: Config<E, A, S, V, R>) -> (Self, Mailbox<E, A>) {
135 assert_rewind_window_safety::<E, A::Databases>(config.max_pending_acks);
136
137 let (sender, mailbox) = actor_mailbox::new(context.child("mailbox"), config.mailbox_size);
138 (
139 Self {
140 context: ContextCell::new(context),
141 mailbox,
142 application: config.application,
143 input_provider: config.input_provider,
144 marshal: config.marshal,
145 db_config: config.db_config,
146 plan: config.plan,
147 resolvers: config.resolvers,
148 sync_config: config.sync_config,
149 },
150 Mailbox::new(sender),
151 )
152 }
153
154 pub fn start(mut self) -> Handle<()> {
155 spawn_cell!(self.context, self.run())
156 }
157
158 async fn run(self) {
159 if let Some(floor) = self.plan.floor().cloned() {
160 self.start_state_sync(floor).await;
161 } else if self.plan.requires_state_sync_floor() {
162 panic!("interrupted state sync must resume from a newly selected floor");
163 } else {
164 self.start_from_marshal().await;
165 }
166 }
167
168 async fn start_state_sync(self, floor: Finalization<S, V::Commitment>) {
171 let sync_metadata = Arc::new(AsyncMutex::new(self.plan.into_sync_metadata()));
172 let (sync_complete, sync_completed) = oneshot::channel();
173 let (syncer, syncer_mailbox) = syncer::Syncer::new(syncer::Config {
174 context: self.context.child("syncer"),
175 db_config: self.db_config,
176 sync_config: self.sync_config,
177 resolvers: self.resolvers.clone(),
178 sync_metadata: sync_metadata.clone(),
179 finalization: floor,
180 marshal: self.marshal.clone(),
181 sync_complete,
182 });
183 let syncing = Syncing {
184 context: self.context,
185 mailbox: self.mailbox,
186 application: self.application,
187 input_provider: self.input_provider,
188 marshal: self.marshal,
189 sync_metadata,
190 syncer: syncer_mailbox,
191 held_verify_requests: Vec::new(),
192 database_subscribers: Vec::new(),
193 artifact: None,
194 resolvers: self.resolvers,
195 sync_completed,
196 };
197 let _ = join!(syncer.start(), syncing.start());
198 }
199
200 async fn start_from_marshal(self) {
202 let syncer::StartupResult {
203 sync: SyncResult { databases, anchor },
204 skip_finalized_until,
205 } = syncer::init_databases_from_marshal::<E, A, S, V>(
206 self.context.as_present(),
207 &self.marshal,
208 self.db_config,
209 self.plan.into_sync_metadata(),
210 )
211 .await;
212
213 self.resolvers.attach_databases(databases.clone()).await;
216
217 let processor_metrics = ProcessorMetrics::new(self.context.child("processor"));
218 let processor = Processor::new(self.application, databases, anchor, processor_metrics);
219 Processing {
220 context: self.context,
221 mailbox: self.mailbox,
222 input_provider: self.input_provider,
223 marshal: self.marshal,
224 resolvers: self.resolvers,
225 processor,
226 skip_finalized_until,
227 }
228 .start()
229 .await
230 }
231}
232
233#[cfg(test)]
234mod tests {
235 use super::{Config, Stateful};
236 use crate::stateful::{
237 actor::syncer::SyncPlan,
238 db::{AttachableResolver, StateSyncDb, SyncEngineConfig},
239 tests::mocks::{TestApp, TestBlock, TestDb, TestScheme, TestVariant},
240 };
241 use commonware_consensus::{
242 marshal::{self, ancestry, core::Actor as MarshalActor},
243 simplex::{
244 mocks::scheme as scheme_mocks,
245 types::{Finalization, Finalize, Proposal},
246 },
247 types::{Epoch, FixedEpocher, Round, View, ViewDelta},
248 Application as _, CertifiableBlock as _,
249 };
250 use commonware_cryptography::{
251 certificate::{mocks::Fixture, ConstantProvider},
252 sha256::Digest as Sha256Digest,
253 };
254 use commonware_macros::select;
255 use commonware_parallel::Sequential;
256 use commonware_runtime::{
257 buffer::paged::CacheRef, deterministic, Clock as _, Runner as _, Supervisor as _,
258 };
259 use commonware_storage::archive::immutable;
260 use commonware_utils::{channel::mpsc, sync::AsyncRwLock, NZUsize, NZU16, NZU64};
261 use std::{convert::Infallible, sync::Arc, time::Duration};
262
263 #[derive(Clone)]
264 struct NoopResolver;
265
266 impl AttachableResolver<TestDb> for NoopResolver {
267 async fn attach_database(&self, _db: Arc<AsyncRwLock<TestDb>>) {}
268 }
269
270 impl StateSyncDb<deterministic::Context, NoopResolver> for TestDb {
271 type SyncError = Infallible;
272
273 async fn sync_db(
274 _context: deterministic::Context,
275 _config: Self::Config,
276 _resolver: NoopResolver,
277 _target: Self::SyncTarget,
278 _tip_updates: mpsc::Receiver<Self::SyncTarget>,
279 _finish: Option<mpsc::Receiver<()>>,
280 _reached_target: Option<mpsc::Sender<Self::SyncTarget>>,
281 _sync_config: SyncEngineConfig,
282 ) -> Result<Self, Self::SyncError> {
283 Ok(Self)
284 }
285 }
286
287 fn archive_config(page_cache: CacheRef, partition: &str) -> immutable::Config<()> {
288 immutable::Config {
289 metadata_partition: format!("{partition}-metadata"),
290 freezer_table_partition: format!("{partition}-table"),
291 freezer_table_initial_size: 4,
292 freezer_table_resize_frequency: 2,
293 freezer_table_resize_chunk_size: 2,
294 freezer_key_partition: format!("{partition}-key"),
295 freezer_key_page_cache: page_cache,
296 freezer_value_partition: format!("{partition}-value"),
297 freezer_value_target_size: 128,
298 freezer_value_compression: None,
299 ordinal_partition: format!("{partition}-ordinal"),
300 items_per_section: NZU64!(4),
301 codec_config: (),
302 replay_buffer: NZUsize!(64),
303 freezer_key_write_buffer: NZUsize!(64),
304 freezer_value_write_buffer: NZUsize!(64),
305 ordinal_write_buffer: NZUsize!(64),
306 }
307 }
308
309 fn build_finalization(
310 fixture: &Fixture<TestScheme>,
311 payload: Sha256Digest,
312 ) -> Finalization<TestScheme, Sha256Digest> {
313 let proposal = Proposal::new(
314 Round::new(Epoch::zero(), View::new(1)),
315 View::zero(),
316 payload,
317 );
318 let votes: Vec<_> = fixture
319 .schemes
320 .iter()
321 .map(|scheme| Finalize::sign(scheme, proposal.clone()).unwrap())
322 .collect();
323
324 Finalization::from_finalizes(&fixture.verifier, &votes, &Sequential)
325 .expect("finalization quorum")
326 }
327
328 #[test]
329 fn mailbox_rejects_propose_while_floor_resolution_waits() {
330 deterministic::Runner::timed(Duration::from_secs(5)).start(|context| async move {
331 let mut signing_context = context.child("signing");
332 let fixture = scheme_mocks::fixture(&mut signing_context, b"pending-floor", 1);
333 let provider = ConstantProvider::new(fixture.schemes[0].clone());
334 let finalization = build_finalization(&fixture, Sha256Digest::from([7; 32]));
335
336 let page_cache = CacheRef::from_pooler(&context, NZU16!(1024), NZUsize!(8));
337 let finalizations_by_height = immutable::Archive::init(
338 context.child("finalizations_by_height"),
339 archive_config(page_cache.clone(), "pending-floor-finalizations"),
340 )
341 .await
342 .expect("failed to initialize finalizations archive");
343 let finalized_blocks = immutable::Archive::init(
344 context.child("finalized_blocks"),
345 archive_config(page_cache.clone(), "pending-floor-blocks"),
346 )
347 .await
348 .expect("failed to initialize blocks archive");
349
350 let (_marshal_actor, marshal, _height) =
351 MarshalActor::<_, TestVariant, _, _, _, _, _>::init(
352 context.child("marshal"),
353 finalizations_by_height,
354 finalized_blocks,
355 marshal::Config {
356 provider,
357 epocher: FixedEpocher::new(NZU64!(u64::MAX)),
358 start: marshal::Start::Genesis(TestBlock::new(0, 0)),
359 partition_prefix: "pending-floor-marshal".to_string(),
360 mailbox_size: NZUsize!(8),
361 view_retention_timeout: ViewDelta::new(1),
362 prunable_items_per_section: NZU64!(4),
363 page_cache,
364 replay_buffer: NZUsize!(64),
365 key_write_buffer: NZUsize!(64),
366 value_write_buffer: NZUsize!(64),
367 block_codec_config: (),
368 max_repair: NZUsize!(1),
369 max_pending_acks: NZUsize!(1),
370 strategy: Sequential,
371 },
372 )
373 .await;
374
375 let plan = SyncPlan::init(&context, "pending-floor-stateful".to_string()).await;
376 let (stateful, mut mailbox) = Stateful::init(
377 context.child("stateful"),
378 Config {
379 application: TestApp,
380 db_config: (),
381 input_provider: (),
382 marshal,
383 max_pending_acks: NZUsize!(1),
384 mailbox_size: NZUsize!(8),
385 plan: plan.with_floor(finalization),
386 resolvers: NoopResolver,
387 sync_config: SyncEngineConfig {
388 fetch_batch_size: NZU64!(1),
389 apply_batch_size: 1,
390 max_outstanding_requests: 1,
391 update_channel_size: NZUsize!(1),
392 max_retained_roots: 1,
393 },
394 },
395 );
396 let handle = stateful.start();
397
398 select! {
399 result = mailbox.propose(
400 (context.child("proposal"), TestBlock::new(1, 1).context()),
401 ancestry::from_iter([]),
402 ) => {
403 assert!(result.is_none());
404 },
405 _ = context.sleep(Duration::from_millis(100)) => {
406 panic!("stateful mailbox stalled while resolving state sync floor");
407 },
408 }
409
410 handle.abort();
411 });
412 }
413}