Skip to main content

commonware_consensus/marshal/coding/
mod.rs

1//! Ordered delivery of erasure-coded blocks.
2//!
3//! # Overview
4//!
5//! The coding marshal couples the consensus pipeline with erasure-coded block broadcast.
6//! Blocks are produced by an application, encoded into [`types::Shard`]s, fanned out to peers, and
7//! later reconstructed when a notarization or finalization proves that the data is needed.
8//! Compared to [`super::standard`], this variant makes more efficient usage of the network's bandwidth
9//! by spreading the load of block dissemination across all participants.
10//!
11//! # Components
12//!
13//! - [`crate::marshal::core::Actor`]: The unified marshal actor that orders finalized blocks,
14//!   handles acknowledgements from the application, and requests repairs when gaps are detected.
15//!   Used with [`Coding`] as the variant type parameter.
16//! - [`crate::marshal::core::Mailbox`]: Accepts requests from other local subsystems and forwards
17//!   them to the actor. Used with [`Coding`] as the variant type parameter.
18//! - [`shards::Engine`]: Broadcasts shards, verifies locally held fragments, and reconstructs
19//!   entire [`types::CodedBlock`]s on demand.
20//! - [`crate::marshal::resolver`]: Issues outbound fetches to remote peers when marshal is missing
21//!   a block, notarization, or finalization referenced by consensus.
22//! - [`types`]: Defines commitments, distribution shards, and helper builders used across the
23//!   module.
24//! - [`Marshaled`]: Wraps an [`crate::Application`] implementation so it automatically enforces
25//!   epoch boundaries and performs erasure encoding before a proposal leaves the application.
26//!
27//! # Data Flow
28//!
29//! 1. The application produces a block through [`Marshaled`], which encodes the payload and
30//!    obtains a [`crate::types::coding::Commitment`] describing the shard layout.
31//! 2. The block is broadcast via [`shards::Engine`]; each participant receives exactly one shard
32//!    and reshares it to everyone else once it verifies the fragment.
33//! 3. The actor ingests notarizations/finalizations from `simplex`, pulls reconstructed blocks
34//!    from the shard engine or backfills them through [`crate::marshal::resolver`], and durably
35//!    persists the ordered data.
36//! 4. The actor reports finalized blocks to the node's [`crate::Reporter`] at-least-once and
37//!    drives repair loops whenever notarizations reference yet-to-be-delivered payloads.
38//!
39//! # Storage and Repair
40//!
41//! Notarized data and certificates live in prunable archives managed internally, while finalized
42//! blocks are migrated into immutable archives. Any gaps are filled by asking peers for specific
43//! commitments through the resolver pipeline. The shard engine keeps only ephemeral, in-memory
44//! caches; once a block is finalized it is evicted from the reconstruction map, reducing memory
45//! pressure.
46//!
47//! # When to Use
48//!
49//! Choose this module when the consensus deployment wants erasure-coded dissemination with the
50//! same ordering guarantees provided by [`super::standard`]. The API is a breaking change from
51//! the standard marshal: applications must adapt to the coding-specific variant type and buffer
52//! implementation required by this module.
53
54pub mod shards;
55pub mod types;
56pub(crate) mod validation;
57
58mod variant;
59pub use variant::Coding;
60
61mod marshaled;
62pub use marshaled::{Marshaled, MarshaledConfig};
63
64#[cfg(test)]
65mod tests {
66    use crate::{
67        marshal::{
68            ancestry::BlockProvider,
69            coding::{
70                shards,
71                types::{coding_config_for_participants, hash_context, CodedBlock},
72                Coding, Marshaled, MarshaledConfig,
73            },
74            config::{Config, Start},
75            core,
76            mocks::{
77                application::Application,
78                harness::{
79                    self, default_leader, genesis_commitment, make_coding_block,
80                    setup_network_links, setup_network_with_participants, CodingB, CodingCtx,
81                    CodingHarness, EmptyProvider, TestHarness, BLOCKS_PER_EPOCH, D, K, LINK,
82                    NAMESPACE, NUM_VALIDATORS, QUORUM, S, TEST_QUOTA, UNRELIABLE_LINK, V,
83                },
84                verifying::MockVerifyingApp,
85            },
86            resolver::handler,
87        },
88        simplex::{scheme::bls12381_threshold::vrf as bls12381_threshold_vrf, types::Proposal},
89        types::{coding::Commitment, Epoch, Epocher, FixedEpocher, Height, Round, View, ViewDelta},
90        Automaton, Block, CertifiableAutomaton, CertifiableBlock,
91    };
92    use bytes::Bytes;
93    use commonware_actor::{mailbox, Feedback};
94    use commonware_codec::{Encode, FixedSize};
95    use commonware_coding::{CodecConfig, Config as CodingConfig, ReedSolomon};
96    use commonware_cryptography::{
97        certificate::{mocks::Fixture, ConstantProvider, Scheme as _},
98        sha256::Sha256,
99        Committable, Digestible, Hasher,
100    };
101    use commonware_macros::{select, test_group, test_traced};
102    use commonware_p2p::Recipients;
103    use commonware_parallel::Sequential;
104    use commonware_resolver::{Delivery, Fetch, Resolver, TargetedResolver};
105    use commonware_runtime::{
106        buffer::paged::CacheRef, deterministic, Clock, Metrics, Runner, Supervisor as _,
107    };
108    use commonware_storage::archive::immutable;
109    use commonware_utils::{
110        channel::oneshot, sync::Mutex, vec::NonEmptyVec, NZUsize, NZU16, NZU64,
111    };
112    use std::{sync::Arc, time::Duration};
113
114    type TestCodingVariant = Coding<CodingB, ReedSolomon<Sha256>, Sha256, K>;
115    type TestCodedBlock = CodedBlock<CodingB, ReedSolomon<Sha256>, Sha256>;
116    type CodingSendRecord = (Round, TestCodedBlock, Recipients<K>);
117
118    // Smallest valid coding config used to build trusted genesis commitments.
119    const GENESIS_CODING_CONFIG: CodingConfig = CodingConfig {
120        minimum_shards: NZU16!(1),
121        extra_shards: NZU16!(1),
122    };
123
124    #[test]
125    fn mailbox_provides_application_blocks() {
126        fn assert_provider<P: BlockProvider<Block = CodingB>>() {}
127        assert_provider::<core::Mailbox<S, TestCodingVariant>>();
128    }
129
130    /// A coding buffer that records subscriptions and never resolves them.
131    #[derive(Clone, Default)]
132    struct RecordingCodingBuffer {
133        digest_subscriptions: Arc<Mutex<Vec<oneshot::Sender<TestCodedBlock>>>>,
134        commitment_subscriptions: Arc<Mutex<Vec<oneshot::Sender<TestCodedBlock>>>>,
135        sends: Arc<Mutex<Vec<CodingSendRecord>>>,
136    }
137
138    impl RecordingCodingBuffer {
139        fn subscription_count(&self) -> usize {
140            self.digest_subscriptions.lock().len() + self.commitment_subscriptions.lock().len()
141        }
142
143        fn commitment_subscription_count(&self) -> usize {
144            self.commitment_subscriptions.lock().len()
145        }
146    }
147
148    impl core::Buffer<TestCodingVariant> for RecordingCodingBuffer {
149        type PublicKey = K;
150
151        async fn find_by_digest(&self, _digest: D) -> Option<TestCodedBlock> {
152            None
153        }
154
155        async fn find_by_commitment(&self, _commitment: Commitment) -> Option<TestCodedBlock> {
156            None
157        }
158
159        fn subscribe_by_digest(&self, _digest: D) -> Option<oneshot::Receiver<TestCodedBlock>> {
160            let (sender, receiver) = oneshot::channel();
161            self.digest_subscriptions.lock().push(sender);
162            Some(receiver)
163        }
164
165        fn subscribe_by_commitment(
166            &self,
167            _commitment: Commitment,
168        ) -> Option<oneshot::Receiver<TestCodedBlock>> {
169            let (sender, receiver) = oneshot::channel();
170            self.commitment_subscriptions.lock().push(sender);
171            Some(receiver)
172        }
173
174        fn finalized(&self, _commitment: Commitment) {}
175
176        fn send(&self, round: Round, block: TestCodedBlock, recipients: Recipients<K>) {
177            self.sends.lock().push((round, block, recipients));
178        }
179    }
180
181    type CodingFetchRecord = Fetch<handler::Key<Commitment>, handler::Annotation>;
182    type CodingTargetedFetch = (handler::Key<Commitment>, NonEmptyVec<K>);
183
184    /// A resolver that records each fetch invocation; other methods are no-ops.
185    #[derive(Clone, Default)]
186    struct RecordingResolver {
187        fetches: Arc<Mutex<Vec<CodingFetchRecord>>>,
188        targeted: Arc<Mutex<Vec<CodingTargetedFetch>>>,
189        auto_delivery: Arc<Mutex<Option<Bytes>>>,
190        delivery_responses: Arc<Mutex<Vec<oneshot::Receiver<bool>>>>,
191        sender: Option<mailbox::Sender<handler::Message<Commitment>>>,
192    }
193
194    impl RecordingResolver {
195        fn holding(metrics: impl Metrics) -> (handler::Receiver<Commitment>, Self) {
196            let (sender, receiver) = mailbox::new(metrics, NZUsize!(100));
197            (
198                handler::Receiver::new(receiver),
199                Self {
200                    fetches: Arc::new(Mutex::new(Vec::new())),
201                    targeted: Arc::new(Mutex::new(Vec::new())),
202                    auto_delivery: Arc::new(Mutex::new(None)),
203                    delivery_responses: Arc::new(Mutex::new(Vec::new())),
204                    sender: Some(sender),
205                },
206            )
207        }
208
209        fn record_fetch(&self, fetch: CodingFetchRecord) {
210            self.fetches.lock().push(fetch.clone());
211            let Some(value) = self.auto_delivery.lock().take() else {
212                return;
213            };
214            let Some(sender) = &self.sender else {
215                return;
216            };
217            let (response, response_rx) = oneshot::channel();
218            self.delivery_responses.lock().push(response_rx);
219            let _ = sender.enqueue(handler::Message::Deliver {
220                delivery: Delivery {
221                    key: fetch.key,
222                    subscribers: NonEmptyVec::new(fetch.subscriber),
223                },
224                value,
225                response,
226            });
227        }
228
229        fn respond_to_next_fetch(&self, value: Bytes) {
230            let replaced = self.auto_delivery.lock().replace(value);
231            assert!(
232                replaced.is_none(),
233                "recording resolver already has an automatic delivery"
234            );
235        }
236
237        async fn wait_for_delivery_response(&self) -> bool {
238            let response = self
239                .delivery_responses
240                .lock()
241                .pop()
242                .expect("delivery response missing");
243            response.await.expect("delivery response sender dropped")
244        }
245
246        fn fetches(&self) -> Vec<CodingFetchRecord> {
247            self.fetches.lock().clone()
248        }
249
250        fn targeted(&self) -> Vec<CodingTargetedFetch> {
251            self.targeted.lock().clone()
252        }
253    }
254
255    impl Resolver for RecordingResolver {
256        type Key = handler::Key<Commitment>;
257        type Subscriber = handler::Annotation;
258
259        fn fetch<F>(&mut self, fetch: F) -> Feedback
260        where
261            F: Into<Fetch<Self::Key, Self::Subscriber>> + Send,
262        {
263            self.record_fetch(fetch.into());
264            Feedback::Ok
265        }
266
267        fn fetch_all<F>(&mut self, fetches: Vec<F>) -> Feedback
268        where
269            F: Into<Fetch<Self::Key, Self::Subscriber>> + Send,
270        {
271            for fetch in fetches {
272                self.record_fetch(fetch.into());
273            }
274            Feedback::Ok
275        }
276
277        fn retain(
278            &mut self,
279            _predicate: impl Fn(&Self::Key, &Self::Subscriber) -> bool + Send + 'static,
280        ) -> Feedback {
281            Feedback::Ok
282        }
283    }
284
285    impl TargetedResolver for RecordingResolver {
286        type PublicKey = K;
287
288        fn fetch_targeted(
289            &mut self,
290            fetch: impl Into<Fetch<Self::Key, Self::Subscriber>> + Send,
291            targets: NonEmptyVec<Self::PublicKey>,
292        ) -> Feedback {
293            self.targeted.lock().push((fetch.into().key, targets));
294            Feedback::Ok
295        }
296
297        fn fetch_all_targeted<F>(
298            &mut self,
299            fetches: Vec<(F, NonEmptyVec<Self::PublicKey>)>,
300        ) -> Feedback
301        where
302            F: Into<Fetch<Self::Key, Self::Subscriber>> + Send,
303        {
304            let mut targeted = self.targeted.lock();
305            for (fetch, targets) in fetches {
306                targeted.push((fetch.into().key, targets));
307            }
308            Feedback::Ok
309        }
310    }
311
312    async fn start_coding_actor_with_recording(
313        context: deterministic::Context,
314        partition_prefix: &str,
315        provider: ConstantProvider<S, Epoch>,
316        buffer: RecordingCodingBuffer,
317    ) -> (
318        core::Mailbox<S, TestCodingVariant>,
319        RecordingResolver,
320        commonware_runtime::Handle<()>,
321    ) {
322        let config = Config {
323            provider,
324            epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
325            start: Start::Genesis(CodingHarness::genesis_block(NUM_VALIDATORS as u16)),
326            mailbox_size: NZUsize!(100),
327            view_retention_timeout: ViewDelta::new(10),
328            max_repair: NZUsize!(10),
329            max_pending_acks: NZUsize!(1),
330            block_codec_config: (),
331            partition_prefix: partition_prefix.to_string(),
332            prunable_items_per_section: NZU64!(10),
333            replay_buffer: NZUsize!(1024),
334            key_write_buffer: NZUsize!(1024),
335            value_write_buffer: NZUsize!(1024),
336            page_cache: CacheRef::from_pooler(
337                &context,
338                harness::PAGE_SIZE,
339                harness::PAGE_CACHE_SIZE,
340            ),
341            strategy: Sequential,
342        };
343
344        let finalizations_by_height = immutable::Archive::init(
345            context.child("finalizations_by_height"),
346            immutable::Config {
347                metadata_partition: format!("{partition_prefix}-finalizations-by-height-metadata"),
348                freezer_table_partition: format!(
349                    "{partition_prefix}-finalizations-by-height-freezer-table"
350                ),
351                freezer_table_initial_size: 64,
352                freezer_table_resize_frequency: 10,
353                freezer_table_resize_chunk_size: 10,
354                freezer_key_partition: format!(
355                    "{partition_prefix}-finalizations-by-height-freezer-key"
356                ),
357                freezer_key_page_cache: config.page_cache.clone(),
358                freezer_value_partition: format!(
359                    "{partition_prefix}-finalizations-by-height-freezer-value"
360                ),
361                freezer_value_target_size: 1024,
362                freezer_value_compression: None,
363                ordinal_partition: format!("{partition_prefix}-finalizations-by-height-ordinal"),
364                items_per_section: NZU64!(10),
365                codec_config: S::certificate_codec_config_unbounded(),
366                replay_buffer: config.replay_buffer,
367                freezer_key_write_buffer: config.key_write_buffer,
368                freezer_value_write_buffer: config.value_write_buffer,
369                ordinal_write_buffer: config.key_write_buffer,
370            },
371        )
372        .await
373        .expect("failed to initialize finalizations by height archive");
374
375        let finalized_blocks = immutable::Archive::init(
376            context.child("finalized_blocks"),
377            immutable::Config {
378                metadata_partition: format!("{partition_prefix}-finalized_blocks-metadata"),
379                freezer_table_partition: format!(
380                    "{partition_prefix}-finalized_blocks-freezer-table"
381                ),
382                freezer_table_initial_size: 64,
383                freezer_table_resize_frequency: 10,
384                freezer_table_resize_chunk_size: 10,
385                freezer_key_partition: format!("{partition_prefix}-finalized_blocks-freezer-key"),
386                freezer_key_page_cache: config.page_cache.clone(),
387                freezer_value_partition: format!(
388                    "{partition_prefix}-finalized_blocks-freezer-value"
389                ),
390                freezer_value_target_size: 1024,
391                freezer_value_compression: None,
392                ordinal_partition: format!("{partition_prefix}-finalized_blocks-ordinal"),
393                items_per_section: NZU64!(10),
394                codec_config: config.block_codec_config,
395                replay_buffer: config.replay_buffer,
396                freezer_key_write_buffer: config.key_write_buffer,
397                freezer_value_write_buffer: config.value_write_buffer,
398                ordinal_write_buffer: config.key_write_buffer,
399            },
400        )
401        .await
402        .expect("failed to initialize finalized blocks archive");
403
404        let (actor, mailbox, _) = core::Actor::init(
405            context.child("actor"),
406            finalizations_by_height,
407            finalized_blocks,
408            config,
409        )
410        .await;
411        let (resolver_rx, resolver) = RecordingResolver::holding(context.child("resolver"));
412        let actor_handle = actor.start(
413            Application::<CodingB>::default(),
414            buffer,
415            (resolver_rx, resolver.clone()),
416        );
417        (mailbox, resolver, actor_handle)
418    }
419
420    async fn start_shard_mailbox(
421        context: deterministic::Context,
422        participants: Vec<K>,
423        provider: ConstantProvider<S, Epoch>,
424    ) -> shards::Mailbox<CodingB, ReedSolomon<Sha256>, Sha256, K> {
425        let me = participants[0].clone();
426        let oracle =
427            setup_network_with_participants(context.child("network"), NZUsize!(1), participants)
428                .await;
429        let control = oracle.control(me.clone());
430        let shard_config: shards::Config<_, _, _, _, _, Sha256, _, _> = shards::Config {
431            scheme_provider: provider,
432            blocker: control.clone(),
433            shard_codec_cfg: CodecConfig {
434                maximum_shard_size: 1024 * 1024,
435            },
436            block_codec_cfg: (),
437            strategy: Sequential,
438            mailbox_size: NZUsize!(10),
439            peer_buffer_size: NZUsize!(64),
440            background_channel_capacity: NZUsize!(1024),
441            peer_provider: oracle.manager(),
442        };
443        let (shard_engine, shard_mailbox) =
444            shards::Engine::new(context.child("shards"), shard_config);
445        let network = control.register(0, TEST_QUOTA).await.unwrap();
446        shard_engine.start(network);
447        shard_mailbox
448    }
449
450    fn genesis_block() -> CodingB {
451        let genesis_ctx = CodingCtx {
452            round: Round::zero(),
453            leader: default_leader(),
454            parent: (View::zero(), genesis_commitment()),
455        };
456        make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0)
457    }
458
459    fn genesis_coding_commitment<H: Hasher, B: CertifiableBlock>(block: &B) -> Commitment {
460        Commitment::from((
461            block.digest(),
462            block.digest(),
463            hash_context::<H, _>(&block.context()),
464            GENESIS_CODING_CONFIG,
465        ))
466    }
467
468    fn missing_candidate(me: K) -> (CodingCtx, TestCodedBlock) {
469        let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
470        let genesis = genesis_block();
471        let genesis_parent_commitment = genesis_coding_commitment::<Sha256, _>(&genesis);
472        let round = Round::new(Epoch::zero(), View::new(1));
473        let candidate_ctx = CodingCtx {
474            round,
475            leader: me,
476            parent: (View::zero(), genesis_parent_commitment),
477        };
478        let candidate =
479            make_coding_block(candidate_ctx.clone(), genesis.digest(), Height::new(1), 100);
480        let coded_candidate: TestCodedBlock =
481            CodedBlock::new(candidate, coding_config, &Sequential);
482        (candidate_ctx, coded_candidate)
483    }
484
485    #[test_traced("WARN")]
486    fn test_coding_block_provider_parent_fetches_by_commitment() {
487        let runner = deterministic::Runner::timed(Duration::from_secs(30));
488        runner.start(|mut context| async move {
489            let Fixture {
490                participants,
491                schemes,
492                ..
493            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
494            let provider = ConstantProvider::new(schemes[0].clone());
495            let buffer = RecordingCodingBuffer::default();
496            let (marshal, _resolver, _actor_handle) = start_coding_actor_with_recording(
497                context.child("actor_stack"),
498                "coding-provider-parent-commitment",
499                provider,
500                buffer.clone(),
501            )
502            .await;
503
504            let (parent_ctx, parent) = missing_candidate(participants[0].clone());
505            let child_ctx = CodingCtx {
506                round: Round::new(Epoch::zero(), View::new(2)),
507                leader: participants[0].clone(),
508                parent: (parent_ctx.round.view(), parent.commitment()),
509            };
510            let child = make_coding_block(child_ctx, parent.digest(), Height::new(2), 200);
511            let subscription = marshal.subscribe_parent(&child);
512
513            context.sleep(Duration::from_millis(100)).await;
514            assert_eq!(
515                buffer.commitment_subscription_count(),
516                1,
517                "parent walkback should use the coding parent commitment"
518            );
519            drop(subscription);
520        });
521    }
522
523    #[test_traced("WARN")]
524    fn test_coding_verify_missing_candidate_waits_without_fetching() {
525        let runner = deterministic::Runner::timed(Duration::from_secs(30));
526        runner.start(|mut context| async move {
527            let Fixture {
528                participants,
529                schemes,
530                ..
531            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
532            let provider = ConstantProvider::new(schemes[0].clone());
533            let me = participants[0].clone();
534            let buffer = RecordingCodingBuffer::default();
535            let (marshal, resolver, _actor_handle) = start_coding_actor_with_recording(
536                context.child("actor_stack"),
537                "coding-verify-missing-candidate",
538                provider.clone(),
539                buffer.clone(),
540            )
541            .await;
542            let shards =
543                start_shard_mailbox(context.child("shard_stack"), participants, provider.clone())
544                    .await;
545            let (candidate_ctx, candidate) = missing_candidate(me);
546            let commitment = candidate.commitment();
547
548            let cfg = MarshaledConfig {
549                application: MockVerifyingApp::<CodingB, S>::new(),
550                marshal,
551                shards,
552                scheme_provider: provider,
553                epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
554                strategy: Sequential,
555            };
556            let mut marshaled = Marshaled::new(context.child("marshaled"), cfg);
557
558            let verify_rx = marshaled.verify(candidate_ctx, commitment).await;
559            context.sleep(Duration::from_millis(100)).await;
560
561            assert!(
562                buffer.subscription_count() > 0,
563                "missing candidate should register a local buffer wait"
564            );
565            assert!(
566                resolver.fetches().is_empty(),
567                "missing candidate verify must not fetch from peers"
568            );
569            assert!(
570                resolver.targeted().is_empty(),
571                "missing candidate verify must not issue targeted fetches"
572            );
573            drop(verify_rx);
574        });
575    }
576
577    #[test_traced("WARN")]
578    fn test_coding_certify_missing_candidate_fetches_by_round() {
579        let runner = deterministic::Runner::timed(Duration::from_secs(30));
580        runner.start(|mut context| async move {
581            let Fixture {
582                participants,
583                schemes,
584                ..
585            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
586            let provider = ConstantProvider::new(schemes[0].clone());
587            let me = participants[0].clone();
588            let buffer = RecordingCodingBuffer::default();
589            let (marshal, resolver, _actor_handle) = start_coding_actor_with_recording(
590                context.child("actor_stack"),
591                "coding-certify-missing-candidate",
592                provider.clone(),
593                buffer.clone(),
594            )
595            .await;
596            let shards =
597                start_shard_mailbox(context.child("shard_stack"), participants, provider.clone())
598                    .await;
599
600            let cfg = MarshaledConfig {
601                application: MockVerifyingApp::<CodingB, S>::new(),
602                marshal,
603                shards,
604                scheme_provider: provider,
605                epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
606                strategy: Sequential,
607            };
608            let mut marshaled = Marshaled::new(context.child("marshaled"), cfg);
609
610            let (candidate_ctx, candidate) = missing_candidate(me);
611            let commitment = candidate.commitment();
612            let round = candidate_ctx.round;
613            let proposal = Proposal::new(round, View::zero(), commitment);
614            let notarization = CodingHarness::make_notarization(proposal, &schemes, QUORUM);
615            resolver.respond_to_next_fetch((notarization, candidate).encode());
616            let certify_rx = marshaled.certify(round, commitment).await;
617
618            let result = certify_rx.await.expect("certify result missing");
619            assert!(result, "fetched notarized candidate should certify");
620            assert!(
621                resolver.wait_for_delivery_response().await,
622                "notarized delivery should validate"
623            );
624            assert!(
625                resolver.fetches().iter().any(|fetch| matches!(
626                    (&fetch.key, &fetch.subscriber),
627                    (
628                        handler::Key::Notarized { round: request_round },
629                        handler::Annotation::Notarization { round: subscriber_round },
630                    ) if *request_round == round && *subscriber_round == round
631                )),
632                "certify should fetch notarized block by round"
633            );
634
635            assert!(
636                buffer.subscription_count() > 0,
637                "missing candidate should register a local buffer wait"
638            );
639            assert!(
640                resolver.targeted().is_empty(),
641                "missing candidate certify must not issue targeted fetches"
642            );
643        });
644    }
645
646    #[test_traced("WARN")]
647    fn test_coding_certify_pending_verify_fetches_by_round() {
648        let runner = deterministic::Runner::timed(Duration::from_secs(30));
649        runner.start(|mut context| async move {
650            let Fixture {
651                participants,
652                schemes,
653                ..
654            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
655            let provider = ConstantProvider::new(schemes[0].clone());
656            let me = participants[0].clone();
657            let buffer = RecordingCodingBuffer::default();
658            let (marshal, resolver, _actor_handle) = start_coding_actor_with_recording(
659                context.child("actor_stack"),
660                "coding-certify-pending-verify",
661                provider.clone(),
662                buffer,
663            )
664            .await;
665            let shards =
666                start_shard_mailbox(context.child("shard_stack"), participants, provider.clone())
667                    .await;
668
669            let cfg = MarshaledConfig {
670                application: MockVerifyingApp::<CodingB, S>::new(),
671                marshal,
672                shards,
673                scheme_provider: provider,
674                epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
675                strategy: Sequential,
676            };
677            let mut marshaled = Marshaled::new(context.child("marshaled"), cfg);
678
679            let (candidate_ctx, candidate) = missing_candidate(me);
680            let commitment = candidate.commitment();
681            let round = candidate_ctx.round;
682            let _verify_rx = marshaled.verify(candidate_ctx, commitment).await;
683
684            let proposal = Proposal::new(round, View::zero(), commitment);
685            let notarization = CodingHarness::make_notarization(proposal, &schemes, QUORUM);
686            resolver.respond_to_next_fetch((notarization, candidate).encode());
687            let certify_rx = marshaled.certify(round, commitment).await;
688
689            let result = certify_rx.await.expect("certify result missing");
690            assert!(
691                result,
692                "pending verify should complete after certification recovery"
693            );
694            assert!(
695                resolver.wait_for_delivery_response().await,
696                "notarized delivery should validate"
697            );
698            assert!(
699                resolver.fetches().iter().any(|fetch| matches!(
700                    (&fetch.key, &fetch.subscriber),
701                    (
702                        handler::Key::Notarized { round: request_round },
703                        handler::Annotation::Notarization { round: subscriber_round },
704                    ) if *request_round == round && *subscriber_round == round
705                )),
706                "certify should recover a pending verify by notarized round"
707            );
708            assert!(
709                resolver.targeted().is_empty(),
710                "certify recovery must not issue targeted fetches"
711            );
712        });
713    }
714
715    #[test_group("slow")]
716    #[test_traced("WARN")]
717    fn test_coding_finalize_good_links() {
718        for seed in 0..5 {
719            let r1 = harness::finalize::<CodingHarness>(seed, LINK, false);
720            let r2 = harness::finalize::<CodingHarness>(seed, LINK, false);
721            assert_eq!(r1, r2);
722        }
723    }
724
725    #[test_group("slow")]
726    #[test_traced("WARN")]
727    fn test_coding_finalize_bad_links() {
728        for seed in 0..5 {
729            let r1 = harness::finalize::<CodingHarness>(seed, UNRELIABLE_LINK, false);
730            let r2 = harness::finalize::<CodingHarness>(seed, UNRELIABLE_LINK, false);
731            assert_eq!(r1, r2);
732        }
733    }
734
735    #[test_group("slow")]
736    #[test_traced("WARN")]
737    fn test_coding_finalize_good_links_quorum_sees_finalization() {
738        for seed in 0..5 {
739            let r1 = harness::finalize::<CodingHarness>(seed, LINK, true);
740            let r2 = harness::finalize::<CodingHarness>(seed, LINK, true);
741            assert_eq!(r1, r2);
742        }
743    }
744
745    #[test_group("slow")]
746    #[test_traced("WARN")]
747    fn test_coding_finalize_bad_links_quorum_sees_finalization() {
748        for seed in 0..5 {
749            let r1 = harness::finalize::<CodingHarness>(seed, UNRELIABLE_LINK, true);
750            let r2 = harness::finalize::<CodingHarness>(seed, UNRELIABLE_LINK, true);
751            assert_eq!(r1, r2);
752        }
753    }
754
755    #[test_group("slow")]
756    #[test_traced("WARN")]
757    fn test_coding_hailstorm_restarts() {
758        for seed in 0..2 {
759            let r1 = harness::hailstorm::<CodingHarness>(seed, 4, 4, 1, LINK);
760            let r2 = harness::hailstorm::<CodingHarness>(seed, 4, 4, 1, LINK);
761            assert_eq!(r1, r2);
762        }
763    }
764
765    #[test_group("slow")]
766    #[test_traced("WARN")]
767    fn test_coding_hailstorm_multi_restarts() {
768        for seed in 0..2 {
769            let r1 = harness::hailstorm::<CodingHarness>(seed, 4, 4, 2, LINK);
770            let r2 = harness::hailstorm::<CodingHarness>(seed, 4, 4, 2, LINK);
771            assert_eq!(r1, r2);
772        }
773    }
774
775    #[test_traced("WARN")]
776    fn test_coding_ack_pipeline_backlog() {
777        harness::ack_pipeline_backlog::<CodingHarness>();
778    }
779
780    #[test_traced("WARN")]
781    fn test_coding_ack_pipeline_backlog_persists_on_restart() {
782        harness::ack_pipeline_backlog_persists_on_restart::<CodingHarness>();
783    }
784
785    #[test_traced("WARN")]
786    fn test_coding_genesis_emitted_once() {
787        harness::genesis_emitted_once::<CodingHarness>();
788    }
789
790    #[test_traced("WARN")]
791    fn test_coding_proposed_success_implies_recoverable_after_restart() {
792        harness::proposed_success_implies_recoverable_after_restart::<CodingHarness>(0..16);
793    }
794
795    #[test_traced("WARN")]
796    fn test_coding_verified_success_implies_recoverable_after_restart() {
797        harness::verified_success_implies_recoverable_after_restart::<CodingHarness>(0..16);
798    }
799
800    #[test_traced("WARN")]
801    fn test_coding_certified_success_implies_recoverable_after_restart() {
802        harness::certified_success_implies_recoverable_after_restart::<CodingHarness>(0..16);
803    }
804
805    #[test_traced("WARN")]
806    fn test_coding_delivery_visibility_implies_recoverable_after_restart() {
807        harness::delivery_visibility_implies_recoverable_after_restart::<CodingHarness>(0..16);
808    }
809
810    #[test_traced("WARN")]
811    fn test_coding_sync_height_floor() {
812        harness::sync_height_floor::<CodingHarness>();
813    }
814
815    #[test_traced("WARN")]
816    fn test_coding_prune_finalized_archives() {
817        harness::prune_finalized_archives::<CodingHarness>();
818    }
819
820    #[test_traced("WARN")]
821    fn test_coding_rejects_block_delivery_below_floor() {
822        harness::reject_stale_block_delivery_after_floor_update::<CodingHarness>();
823    }
824
825    #[test_traced("WARN")]
826    fn test_coding_commitment_fetch_height_hint_mismatch_wakes_subscriber() {
827        harness::commitment_fetch_height_hint_mismatch_wakes_subscriber::<CodingHarness>();
828    }
829
830    #[test_traced("WARN")]
831    fn test_coding_subscribe_basic_block_delivery() {
832        harness::subscribe_basic_block_delivery::<CodingHarness>();
833    }
834
835    #[test_traced("WARN")]
836    fn test_coding_subscribe_multiple_subscriptions() {
837        harness::subscribe_multiple_subscriptions::<CodingHarness>();
838    }
839
840    #[test_traced("WARN")]
841    fn test_coding_subscribe_canceled_subscriptions() {
842        harness::subscribe_canceled_subscriptions::<CodingHarness>();
843    }
844
845    #[test_traced("WARN")]
846    fn test_coding_subscribe_blocks_from_different_sources() {
847        harness::subscribe_blocks_from_different_sources::<CodingHarness>();
848    }
849
850    #[test_traced("WARN")]
851    fn test_coding_get_info_basic_queries_present_and_missing() {
852        harness::get_info_basic_queries_present_and_missing::<CodingHarness>();
853    }
854
855    #[test_traced("WARN")]
856    fn test_coding_get_info_latest_progression_multiple_finalizations() {
857        harness::get_info_latest_progression_multiple_finalizations::<CodingHarness>();
858    }
859
860    #[test_traced("WARN")]
861    fn test_coding_get_block_by_height_and_latest() {
862        harness::get_block_by_height_and_latest::<CodingHarness>();
863    }
864
865    #[test_traced("WARN")]
866    fn test_coding_get_block_by_commitment_from_sources_and_missing() {
867        harness::get_block_by_commitment_from_sources_and_missing::<CodingHarness>();
868    }
869
870    #[test_traced("WARN")]
871    fn test_coding_get_finalization_by_height() {
872        harness::get_finalization_by_height::<CodingHarness>();
873    }
874
875    #[test_traced("WARN")]
876    fn test_coding_hint_finalized_triggers_fetch() {
877        harness::hint_finalized_triggers_fetch::<CodingHarness>();
878    }
879
880    #[test_traced("WARN")]
881    fn test_coding_ancestry_stream() {
882        harness::ancestry_stream::<CodingHarness>();
883    }
884
885    #[test_traced("WARN")]
886    fn test_coding_finalize_same_height_different_views() {
887        harness::finalize_same_height_different_views::<CodingHarness>();
888    }
889
890    #[test_traced("WARN")]
891    fn test_coding_certify_persists_equivocated_block() {
892        harness::certify_persists_equivocated_block::<CodingHarness>();
893    }
894
895    #[test_traced("WARN")]
896    fn test_coding_certify_at_later_view_survives_earlier_view_pruning() {
897        harness::certify_at_later_view_survives_earlier_view_pruning::<CodingHarness>();
898    }
899
900    #[test_traced("WARN")]
901    fn test_coding_certify_first_block_fetches_genesis_parent() {
902        let runner = deterministic::Runner::timed(Duration::from_secs(60));
903        runner.start(|mut context| async move {
904            let Fixture {
905                participants,
906                schemes,
907                ..
908            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
909            let mut oracle = setup_network_with_participants(
910                context.child("network"),
911                NZUsize!(1),
912                participants.clone(),
913            )
914            .await;
915
916            let me = participants[0].clone();
917            let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
918
919            let setup = CodingHarness::setup_validator(
920                context.child("validator").with_attribute("index", 0),
921                &mut oracle,
922                me.clone(),
923                ConstantProvider::new(schemes[0].clone()),
924            )
925            .await;
926            let marshal = setup.mailbox;
927            let shards = setup.extra;
928
929            let genesis_ctx = CodingCtx {
930                round: Round::zero(),
931                leader: default_leader(),
932                parent: (View::zero(), genesis_commitment()),
933            };
934            let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
935            let genesis_parent_commitment = genesis_coding_commitment::<Sha256, _>(&genesis);
936
937            let round = Round::new(Epoch::zero(), View::new(1));
938            let block_ctx = CodingCtx {
939                round,
940                leader: me.clone(),
941                parent: (View::zero(), genesis_parent_commitment),
942            };
943            let block = make_coding_block(block_ctx.clone(), genesis.digest(), Height::new(1), 100);
944            let coded_block = CodedBlock::new(block, coding_config, &Sequential);
945            let commitment = coded_block.commitment();
946            shards.proposed(round, coded_block);
947
948            context.sleep(Duration::from_millis(10)).await;
949
950            let mock_app: MockVerifyingApp<CodingB, S> = MockVerifyingApp::new();
951            let cfg = MarshaledConfig {
952                application: mock_app,
953                marshal,
954                shards,
955                scheme_provider: ConstantProvider::new(schemes[0].clone()),
956                epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
957                strategy: Sequential,
958            };
959            let mut marshaled = Marshaled::new(context.child("marshaled"), cfg);
960
961            let shard_validity = marshaled
962                .verify(block_ctx, commitment)
963                .await
964                .await
965                .expect("verify result missing");
966            assert!(shard_validity, "shard validity should pass");
967
968            let certify_result = marshaled
969                .certify(round, commitment)
970                .await
971                .await
972                .expect("certify result missing");
973            assert!(
974                certify_result,
975                "height-1 block should certify with genesis as parent"
976            );
977        });
978    }
979
980    /// Finalizing a descendant must not height-prune the shard-engine buffer before
981    /// `try_repair_gaps` has consumed buffer-only ancestors.
982    ///
983    /// Places parent (height 1) and descendant (height 2) in the shard engine's
984    /// reconstructed-block cache via `proposed()`, then reports a finalization
985    /// for the descendant only.
986    #[test_traced("WARN")]
987    fn test_coding_store_finalization_does_not_prune_buffer_before_repair() {
988        let runner = deterministic::Runner::timed(Duration::from_secs(60));
989        runner.start(|mut context| async move {
990            let Fixture {
991                participants,
992                schemes,
993                ..
994            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
995            let mut oracle = setup_network_with_participants(
996                context.child("network"),
997                NZUsize!(1),
998                participants.clone(),
999            )
1000            .await;
1001
1002            let setup = CodingHarness::setup_validator(
1003                context.child("validator").with_attribute("index", 0),
1004                &mut oracle,
1005                participants[0].clone(),
1006                ConstantProvider::new(schemes[0].clone()),
1007            )
1008            .await;
1009            let mut handle = harness::ValidatorHandle::<CodingHarness> {
1010                mailbox: setup.mailbox,
1011                extra: setup.extra,
1012            };
1013
1014            // Build a 2-block chain: parent at height 1, descendant at height 2.
1015            let parent_block = CodingHarness::make_test_block(
1016                Sha256::hash(b""),
1017                CodingHarness::genesis_parent_commitment(NUM_VALIDATORS as u16),
1018                Height::new(1),
1019                1,
1020                NUM_VALIDATORS as u16,
1021            );
1022            let parent_digest = CodingHarness::digest(&parent_block);
1023            let parent_commitment = CodingHarness::commitment(&parent_block);
1024
1025            let descendant_block = CodingHarness::make_test_block(
1026                parent_digest,
1027                parent_commitment,
1028                Height::new(2),
1029                2,
1030                NUM_VALIDATORS as u16,
1031            );
1032            let descendant_commitment = CodingHarness::commitment(&descendant_block);
1033
1034            // Seed the shard engine's reconstructed-block cache with both blocks.
1035            CodingHarness::propose(
1036                &mut handle,
1037                Round::new(Epoch::new(0), View::new(1)),
1038                &parent_block,
1039            )
1040            .await;
1041            CodingHarness::propose(
1042                &mut handle,
1043                Round::new(Epoch::new(0), View::new(2)),
1044                &descendant_block,
1045            )
1046            .await;
1047
1048            // Report finalization for the descendant only. The parent has no
1049            // finalization certificate: it must be archived by walking the
1050            // parent link from the descendant and sourcing the block from the
1051            // shard-engine buffer.
1052            let descendant_proposal = Proposal {
1053                round: Round::new(Epoch::new(0), View::new(2)),
1054                parent: View::new(1),
1055                payload: descendant_commitment,
1056            };
1057            let descendant_finalization =
1058                CodingHarness::make_finalization(descendant_proposal, &schemes, QUORUM);
1059            CodingHarness::report_finalization(&mut handle.mailbox, descendant_finalization).await;
1060
1061            // Wait until the descendant is archived: that proves finalization processing
1062            // has completed, at which point the parent must already have been repaired
1063            // from the shard buffer.
1064            while handle.mailbox.get_block(Height::new(2)).await.is_none() {
1065                context.sleep(Duration::from_millis(10)).await;
1066            }
1067
1068            let parent = handle.mailbox.get_block(Height::new(1)).await;
1069            assert!(
1070                parent.is_some(),
1071                "parent must be archived from shard buffer before height-prune evicts it"
1072            );
1073        });
1074    }
1075
1076    #[test_traced("WARN")]
1077    fn test_coding_init_processed_height() {
1078        harness::init_processed_height::<CodingHarness>();
1079    }
1080
1081    #[test_traced("INFO")]
1082    fn test_coding_broadcast_caches_block() {
1083        harness::broadcast_caches_block::<CodingHarness>();
1084    }
1085
1086    /// Test that certifying a lower-view block after a higher-view block succeeds.
1087    ///
1088    /// This is a critical test for crash recovery scenarios where a validator may need
1089    /// to certify blocks in non-sequential view order.
1090    #[test_traced("INFO")]
1091    fn test_certify_lower_view_after_higher_view() {
1092        let runner = deterministic::Runner::timed(Duration::from_secs(60));
1093        runner.start(|mut context| async move {
1094            let Fixture {
1095                participants,
1096                schemes,
1097                ..
1098            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1099            let mut oracle = setup_network_with_participants(
1100                context.child("network"),
1101                NZUsize!(1),
1102                participants.clone(),
1103            )
1104            .await;
1105
1106            let me = participants[0].clone();
1107            let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
1108
1109            let setup = CodingHarness::setup_validator(
1110                context.child("validator").with_attribute("index", 0),
1111                &mut oracle,
1112                me.clone(),
1113                ConstantProvider::new(schemes[0].clone()),
1114            )
1115            .await;
1116            let marshal = setup.mailbox;
1117            let shards = setup.extra;
1118
1119            let mock_app: MockVerifyingApp<CodingB, S> = MockVerifyingApp::new();
1120
1121            let cfg = MarshaledConfig {
1122                application: mock_app,
1123                marshal: marshal.clone(),
1124                shards: shards.clone(),
1125                scheme_provider: ConstantProvider::new(schemes[0].clone()),
1126                epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
1127                strategy: Sequential,
1128            };
1129            let mut marshaled = Marshaled::new(context.child("marshaled"), cfg);
1130
1131            let genesis_ctx = CodingCtx {
1132                round: Round::zero(),
1133                leader: default_leader(),
1134                parent: (View::zero(), genesis_commitment()),
1135            };
1136            let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
1137
1138            // Create parent block at height 1
1139            let parent_ctx = CodingCtx {
1140                round: Round::new(Epoch::new(0), View::new(1)),
1141                leader: default_leader(),
1142                parent: (View::zero(), genesis_commitment()),
1143            };
1144            let parent = make_coding_block(parent_ctx, genesis.digest(), Height::new(1), 100);
1145            let parent_digest = parent.digest();
1146            let coded_parent = CodedBlock::new(parent.clone(), coding_config, &Sequential);
1147            let parent_commitment = coded_parent.commitment();
1148            shards.proposed(Round::new(Epoch::new(0), View::new(1)), coded_parent);
1149
1150            // Block A at view 5 (height 2) - create with context matching what verify will receive
1151            let round_a = Round::new(Epoch::new(0), View::new(5));
1152            let context_a = CodingCtx {
1153                round: round_a,
1154                leader: me.clone(),
1155                parent: (View::new(1), parent_commitment),
1156            };
1157            let block_a = make_coding_block(context_a.clone(), parent_digest, Height::new(2), 200);
1158            let coded_block_a = CodedBlock::new(block_a.clone(), coding_config, &Sequential);
1159            let commitment_a = coded_block_a.commitment();
1160            shards.proposed(round_a, coded_block_a);
1161
1162            // Block B at view 10 (height 2, different block same height - could happen with
1163            // different proposers or re-proposals)
1164            let round_b = Round::new(Epoch::new(0), View::new(10));
1165            let context_b = CodingCtx {
1166                round: round_b,
1167                leader: me.clone(),
1168                parent: (View::new(1), parent_commitment),
1169            };
1170            let block_b = make_coding_block(context_b.clone(), parent_digest, Height::new(2), 300);
1171            let coded_block_b = CodedBlock::new(block_b.clone(), coding_config, &Sequential);
1172            let commitment_b = coded_block_b.commitment();
1173            shards.proposed(round_b, coded_block_b);
1174
1175            context.sleep(Duration::from_millis(10)).await;
1176
1177            // Step 1: Verify block A at view 5
1178            let _ = marshaled.verify(context_a, commitment_a).await.await;
1179
1180            // Step 2: Verify block B at view 10
1181            let _ = marshaled.verify(context_b, commitment_b).await.await;
1182
1183            // Step 3: Certify block B at view 10 FIRST
1184            let certify_b = marshaled.certify(round_b, commitment_b).await;
1185            assert!(
1186                certify_b.await.unwrap(),
1187                "Block B certification should succeed"
1188            );
1189
1190            // Step 4: Certify block A at view 5 - should succeed
1191            let certify_a = marshaled.certify(round_a, commitment_a).await;
1192
1193            // Use select with timeout to detect never-resolving receiver
1194            select! {
1195                result = certify_a => {
1196                    assert!(result.unwrap(), "Block A certification should succeed");
1197                },
1198                _ = context.sleep(Duration::from_secs(5)) => {
1199                    panic!("Block A certification timed out");
1200                },
1201            }
1202        })
1203    }
1204
1205    /// Regression test for re-proposal validation in optimistic_verify.
1206    ///
1207    /// Verifies that:
1208    /// 1. Valid re-proposals at epoch boundaries are accepted
1209    /// 2. Invalid re-proposals (not at epoch boundary) are rejected
1210    ///
1211    /// A re-proposal occurs when the parent digest equals the block being verified,
1212    /// meaning the same block is being proposed again in a new view.
1213    #[test_traced("INFO")]
1214    fn test_marshaled_reproposal_validation() {
1215        let runner = deterministic::Runner::timed(Duration::from_secs(60));
1216        runner.start(|mut context| async move {
1217            let Fixture {
1218                participants,
1219                schemes,
1220                ..
1221            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1222            let mut oracle = setup_network_with_participants(
1223                context.child("network"),
1224                NZUsize!(1),
1225                participants.clone(),
1226            )
1227            .await;
1228
1229            let me = participants[0].clone();
1230            let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
1231
1232            let setup = CodingHarness::setup_validator(
1233                context.child("validator").with_attribute("index", 0),
1234                &mut oracle,
1235                me.clone(),
1236                ConstantProvider::new(schemes[0].clone()),
1237            )
1238            .await;
1239            let marshal = setup.mailbox;
1240            let shards = setup.extra;
1241
1242            let mock_app: MockVerifyingApp<CodingB, S> = MockVerifyingApp::new();
1243            let cfg = MarshaledConfig {
1244                application: mock_app,
1245                marshal: marshal.clone(),
1246                shards: shards.clone(),
1247                scheme_provider: ConstantProvider::new(schemes[0].clone()),
1248                epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
1249                strategy: Sequential,
1250            };
1251            let mut marshaled = Marshaled::new(context.child("marshaled"), cfg);
1252
1253            let genesis_ctx = CodingCtx {
1254                round: Round::zero(),
1255                leader: default_leader(),
1256                parent: (View::zero(), genesis_commitment()),
1257            };
1258            let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
1259
1260            // Build a chain up to the epoch boundary (height 19 is the last block in epoch 0
1261            // with BLOCKS_PER_EPOCH=20, since epoch 0 covers heights 0-19)
1262            let mut parent = genesis.digest();
1263            let mut last_view = View::zero();
1264            let mut last_commitment = genesis_commitment();
1265            for i in 1..BLOCKS_PER_EPOCH.get() {
1266                let round = Round::new(Epoch::new(0), View::new(i));
1267                let ctx = CodingCtx {
1268                    round,
1269                    leader: me.clone(),
1270                    parent: (last_view, last_commitment),
1271                };
1272                let block = make_coding_block(ctx.clone(), parent, Height::new(i), i * 100);
1273                let coded_block = CodedBlock::new(block.clone(), coding_config, &Sequential);
1274                last_commitment = coded_block.commitment();
1275                shards.proposed(round, coded_block);
1276                parent = block.digest();
1277                last_view = View::new(i);
1278            }
1279
1280            // Create the epoch boundary block (height 19, last block in epoch 0)
1281            let boundary_height = Height::new(BLOCKS_PER_EPOCH.get() - 1);
1282            let boundary_round = Round::new(Epoch::new(0), View::new(boundary_height.get()));
1283            let boundary_context = CodingCtx {
1284                round: boundary_round,
1285                leader: me.clone(),
1286                parent: (last_view, last_commitment),
1287            };
1288            let boundary_block = make_coding_block(
1289                boundary_context.clone(),
1290                parent,
1291                boundary_height,
1292                boundary_height.get() * 100,
1293            );
1294            let coded_boundary =
1295                CodedBlock::new(boundary_block.clone(), coding_config, &Sequential);
1296            let boundary_commitment = coded_boundary.commitment();
1297            shards.proposed(boundary_round, coded_boundary);
1298
1299            context.sleep(Duration::from_millis(10)).await;
1300
1301            // Test 1: Valid re-proposal at epoch boundary should be accepted
1302            // Re-proposal context: parent digest equals the block being verified
1303            // Re-proposals happen within the same epoch when the parent is the last block
1304            //
1305            // In the coding marshal, verify() returns shard validity while deferred_verify
1306            // runs in the background. We call verify() to register the verification task,
1307            // then certify() returns the deferred_verify result.
1308            let reproposal_round = Round::new(Epoch::new(0), View::new(20));
1309            let reproposal_context = CodingCtx {
1310                round: reproposal_round,
1311                leader: me.clone(),
1312                parent: (View::new(boundary_height.get()), boundary_commitment), // Parent IS the boundary block
1313            };
1314
1315            // Call verify to kick off deferred verification.
1316            // We must await the verify result to ensure the verification task is
1317            // registered before calling certify.
1318            let shard_validity = marshaled
1319                .verify(reproposal_context.clone(), boundary_commitment)
1320                .await
1321                .await;
1322            assert!(
1323                shard_validity.unwrap(),
1324                "Re-proposal verify should return true for shard validity"
1325            );
1326
1327            // Use certify to get the actual deferred_verify result
1328            let certify_result = marshaled
1329                .certify(reproposal_round, boundary_commitment)
1330                .await
1331                .await;
1332            assert!(
1333                certify_result.unwrap(),
1334                "Valid re-proposal at epoch boundary should be accepted"
1335            );
1336
1337            // Test 2: Invalid re-proposal (not at epoch boundary) should be rejected
1338            // Create a block at height 10 (not at epoch boundary)
1339            let non_boundary_height = Height::new(10);
1340            let non_boundary_round = Round::new(Epoch::new(0), View::new(10));
1341            // For simplicity, we'll create a fresh non-boundary block and test re-proposal
1342            let non_boundary_context = CodingCtx {
1343                round: non_boundary_round,
1344                leader: me.clone(),
1345                parent: (View::new(9), last_commitment), // Use a prior commitment
1346            };
1347            let non_boundary_block = make_coding_block(
1348                non_boundary_context.clone(),
1349                parent,
1350                non_boundary_height,
1351                1000,
1352            );
1353            let coded_non_boundary =
1354                CodedBlock::new(non_boundary_block.clone(), coding_config, &Sequential);
1355            let non_boundary_commitment = coded_non_boundary.commitment();
1356
1357            // Make the non-boundary block available
1358            shards.proposed(non_boundary_round, coded_non_boundary);
1359
1360            context.sleep(Duration::from_millis(10)).await;
1361
1362            // Attempt to re-propose the non-boundary block
1363            let invalid_reproposal_round = Round::new(Epoch::new(0), View::new(15));
1364            let invalid_reproposal_context = CodingCtx {
1365                round: invalid_reproposal_round,
1366                leader: me.clone(),
1367                parent: (View::new(10), non_boundary_commitment),
1368            };
1369
1370            // Call verify to kick off deferred verification.
1371            // We must await the verify result to ensure the verification task is
1372            // registered before calling certify.
1373            let shard_validity = marshaled
1374                .verify(invalid_reproposal_context, non_boundary_commitment)
1375                .await
1376                .await;
1377            assert!(
1378                !shard_validity.unwrap(),
1379                "Invalid re-proposal verify should return false"
1380            );
1381
1382            // Use certify to get the actual deferred_verify result
1383            let certify_result = marshaled
1384                .certify(invalid_reproposal_round, non_boundary_commitment)
1385                .await
1386                .await;
1387            assert!(
1388                !certify_result.unwrap(),
1389                "Invalid re-proposal (not at epoch boundary) should be rejected"
1390            );
1391
1392            // Test 3: Re-proposal with mismatched epoch should be rejected
1393            // This is a regression test - re-proposals must be in the same epoch as the block.
1394            let cross_epoch_reproposal_round = Round::new(Epoch::new(1), View::new(20));
1395            let cross_epoch_reproposal_context = CodingCtx {
1396                round: cross_epoch_reproposal_round,
1397                leader: me.clone(),
1398                parent: (View::new(boundary_height.get()), boundary_commitment),
1399            };
1400
1401            // Call verify to kick off deferred verification.
1402            // We must await the verify result to ensure the verification task is
1403            // registered before calling certify.
1404            let shard_validity = marshaled
1405                .verify(cross_epoch_reproposal_context.clone(), boundary_commitment)
1406                .await
1407                .await;
1408            assert!(
1409                !shard_validity.unwrap(),
1410                "Cross-epoch re-proposal verify should return false"
1411            );
1412
1413            // Use certify to get the actual deferred_verify result
1414            let certify_result = marshaled
1415                .certify(cross_epoch_reproposal_round, boundary_commitment)
1416                .await
1417                .await;
1418            assert!(
1419                !certify_result.unwrap(),
1420                "Re-proposal with mismatched epoch should be rejected"
1421            );
1422
1423            // Note: Tests for certify-only paths (crash recovery scenarios) are not included here
1424            // because they require multiple validators to reconstruct blocks from shards. In a
1425            // single-validator test setup, block reconstruction fails due to insufficient shards.
1426            // These paths are tested in integration tests with multiple validators.
1427        })
1428    }
1429
1430    #[test_traced("WARN")]
1431    fn test_marshaled_rejects_mismatched_context_digest() {
1432        let runner = deterministic::Runner::timed(Duration::from_secs(30));
1433        runner.start(|mut context| async move {
1434            let Fixture {
1435                participants,
1436                schemes,
1437                ..
1438            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1439            let mut oracle = setup_network_with_participants(
1440                context.child("network"),
1441                NZUsize!(1),
1442                participants.clone(),
1443            )
1444            .await;
1445
1446            let me = participants[0].clone();
1447            let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
1448
1449            let setup = CodingHarness::setup_validator(
1450                context.child("validator").with_attribute("index", 0),
1451                &mut oracle,
1452                me.clone(),
1453                ConstantProvider::new(schemes[0].clone()),
1454            )
1455            .await;
1456            let marshal = setup.mailbox;
1457            let shards = setup.extra;
1458
1459            let genesis_ctx = CodingCtx {
1460                round: Round::zero(),
1461                leader: default_leader(),
1462                parent: (View::zero(), genesis_commitment()),
1463            };
1464            let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
1465
1466            let mock_app: MockVerifyingApp<CodingB, S> = MockVerifyingApp::new();
1467            let cfg = MarshaledConfig {
1468                application: mock_app,
1469                marshal: marshal.clone(),
1470                shards: shards.clone(),
1471                scheme_provider: ConstantProvider::new(schemes[0].clone()),
1472                epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
1473                strategy: Sequential,
1474            };
1475            let mut marshaled = Marshaled::new(context.child("marshaled"), cfg);
1476
1477            // Create parent block at height 1 so the commitment is well-formed.
1478            let parent_ctx = CodingCtx {
1479                round: Round::new(Epoch::zero(), View::new(1)),
1480                leader: default_leader(),
1481                parent: (View::zero(), genesis_commitment()),
1482            };
1483            let parent = make_coding_block(parent_ctx, genesis.digest(), Height::new(1), 100);
1484            let coded_parent = CodedBlock::new(parent.clone(), coding_config, &Sequential);
1485            let parent_commitment = coded_parent.commitment();
1486            shards.proposed(Round::new(Epoch::zero(), View::new(1)), coded_parent);
1487
1488            // Build a block with context A (commitment hash uses this context).
1489            let round_a = Round::new(Epoch::zero(), View::new(2));
1490            let context_a = CodingCtx {
1491                round: round_a,
1492                leader: me.clone(),
1493                parent: (View::new(1), parent_commitment),
1494            };
1495            let block_a = make_coding_block(context_a, parent.digest(), Height::new(2), 200);
1496            let coded_block_a: CodedBlock<_, ReedSolomon<Sha256>, Sha256> =
1497                CodedBlock::new(block_a, coding_config, &Sequential);
1498            let commitment_a = coded_block_a.commitment();
1499
1500            // Verify using a different consensus context B (hash mismatch).
1501            let round_b = Round::new(Epoch::zero(), View::new(3));
1502            let context_b = CodingCtx {
1503                round: round_b,
1504                leader: participants[1].clone(),
1505                parent: (View::new(1), parent_commitment),
1506            };
1507
1508            let verify_rx = marshaled.verify(context_b, commitment_a).await;
1509            select! {
1510                result = verify_rx => {
1511                    assert!(
1512                        !result.unwrap(),
1513                        "mismatched context digest should be rejected"
1514                    );
1515                },
1516                _ = context.sleep(Duration::from_secs(5)) => {
1517                    panic!("verify should reject mismatched context digest promptly");
1518                },
1519            }
1520        })
1521    }
1522
1523    #[test_traced("WARN")]
1524    fn test_reproposal_certify_recovers_after_verify_receiver_drop() {
1525        let runner = deterministic::Runner::timed(Duration::from_secs(30));
1526        runner.start(|mut context| async move {
1527            let Fixture {
1528                participants,
1529                schemes,
1530                ..
1531            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1532            let mut oracle = setup_network_with_participants(
1533                context.child("network"),
1534                NZUsize!(1),
1535                participants.clone(),
1536            )
1537            .await;
1538
1539            let me = participants[0].clone();
1540            let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
1541
1542            let setup = CodingHarness::setup_validator(
1543                context.child("validator").with_attribute("index", 0),
1544                &mut oracle,
1545                me.clone(),
1546                ConstantProvider::new(schemes[0].clone()),
1547            )
1548            .await;
1549            let marshal = setup.mailbox;
1550            let shards = setup.extra;
1551
1552            let mock_app: MockVerifyingApp<CodingB, S> = MockVerifyingApp::new();
1553            let cfg = MarshaledConfig {
1554                application: mock_app,
1555                marshal: marshal.clone(),
1556                shards: shards.clone(),
1557                scheme_provider: ConstantProvider::new(schemes[0].clone()),
1558                epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
1559                strategy: Sequential,
1560            };
1561            let mut marshaled = Marshaled::new(context.child("marshaled"), cfg);
1562
1563            let genesis_ctx = CodingCtx {
1564                round: Round::zero(),
1565                leader: default_leader(),
1566                parent: (View::zero(), genesis_commitment()),
1567            };
1568            let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
1569
1570            // Build a valid boundary re-proposal, but keep it unavailable until
1571            // after the optimistic verify receiver has been dropped.
1572            let boundary_height = Height::new(BLOCKS_PER_EPOCH.get() - 1);
1573            let boundary_round = Round::new(Epoch::zero(), View::new(boundary_height.get()));
1574            let boundary_context = CodingCtx {
1575                round: boundary_round,
1576                leader: me.clone(),
1577                parent: (View::zero(), genesis_commitment()),
1578            };
1579            let boundary_block = make_coding_block(
1580                boundary_context,
1581                genesis.digest(),
1582                boundary_height,
1583                boundary_height.get() * 100,
1584            );
1585            let coded_boundary = CodedBlock::new(boundary_block, coding_config, &Sequential);
1586            let boundary_commitment = coded_boundary.commitment();
1587            let reproposal_round = Round::new(Epoch::zero(), View::new(boundary_height.get() + 1));
1588            let reproposal_context = CodingCtx {
1589                round: reproposal_round,
1590                leader: me,
1591                parent: (View::new(boundary_height.get()), boundary_commitment),
1592            };
1593
1594            // Start verify, then drop the receiver before the block is available.
1595            let verify_rx = marshaled
1596                .verify(reproposal_context, boundary_commitment)
1597                .await;
1598            drop(verify_rx);
1599            context.sleep(Duration::from_millis(10)).await;
1600
1601            shards.proposed(boundary_round, coded_boundary);
1602            context.sleep(Duration::from_millis(10)).await;
1603
1604            // Certify should not return the stale closed verification task; it
1605            // should recover through the embedded-context certification path.
1606            let certify_rx = marshaled
1607                .certify(reproposal_round, boundary_commitment)
1608                .await;
1609            select! {
1610                result = certify_rx => {
1611                    assert!(
1612                        result.expect("certify result missing"),
1613                        "certify should recover after verify receiver drop"
1614                    );
1615                },
1616                _ = context.sleep(Duration::from_secs(5)) => {
1617                    panic!("certify should recover after verify receiver drop");
1618                },
1619            }
1620        })
1621    }
1622
1623    #[test_traced("WARN")]
1624    fn test_reproposal_missing_block_does_not_synthesize_false() {
1625        let runner = deterministic::Runner::timed(Duration::from_secs(30));
1626        runner.start(|mut context| async move {
1627            let Fixture {
1628                participants,
1629                schemes,
1630                ..
1631            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1632            let mut oracle = setup_network_with_participants(
1633                context.child("network"),
1634                NZUsize!(1),
1635                participants.clone(),
1636            )
1637            .await;
1638
1639            let me = participants[0].clone();
1640            let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
1641
1642            let setup = CodingHarness::setup_validator(
1643                context.child("validator").with_attribute("index", 0),
1644                &mut oracle,
1645                me.clone(),
1646                ConstantProvider::new(schemes[0].clone()),
1647            )
1648            .await;
1649            let marshal = setup.mailbox;
1650            let shards = setup.extra;
1651
1652            let mock_app: MockVerifyingApp<CodingB, S> = MockVerifyingApp::new();
1653            let cfg = MarshaledConfig {
1654                application: mock_app,
1655                marshal: marshal.clone(),
1656                shards: shards.clone(),
1657                scheme_provider: ConstantProvider::new(schemes[0].clone()),
1658                epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
1659                strategy: Sequential,
1660            };
1661            let mut marshaled = Marshaled::new(context.child("marshaled"), cfg);
1662
1663            // Re-proposal payload with valid coding config, but no block available.
1664            let missing_payload = Commitment::from((
1665                Sha256::hash(b"missing_block"),
1666                Sha256::hash(b"missing_root"),
1667                Sha256::hash(b"missing_context"),
1668                coding_config,
1669            ));
1670            let round = Round::new(Epoch::zero(), View::new(1));
1671            let reproposal_context = CodingCtx {
1672                round,
1673                leader: me,
1674                parent: (View::zero(), missing_payload),
1675            };
1676
1677            // Verify must not synthesize `false` when the block cannot be fetched.
1678            let verify_rx = marshaled.verify(reproposal_context, missing_payload).await;
1679
1680            // Ensure the verification task has registered its subscription, then
1681            // force cancellation by pruning the missing commitment.
1682            context.sleep(Duration::from_millis(100)).await;
1683            shards.prune(missing_payload);
1684
1685            select! {
1686                result = verify_rx => {
1687                    assert!(
1688                        result.is_err(),
1689                        "verify should resolve without explicit false when re-proposal block is unavailable"
1690                    );
1691                },
1692                _ = context.sleep(Duration::from_secs(5)) => {
1693                    panic!("verify should resolve promptly when re-proposal block is unavailable");
1694                },
1695            }
1696
1697            // Certify should not surface the closed verification task as the final result.
1698            // With no block available, it remains pending on the recovery path until the
1699            // certifier's caller times out or data arrives.
1700            let mut certify_rx = marshaled.certify(round, missing_payload).await;
1701            context.sleep(Duration::from_millis(100)).await;
1702            assert!(
1703                matches!(
1704                    certify_rx.try_recv(),
1705                    Err(commonware_utils::channel::oneshot::error::TryRecvError::Empty)
1706                ),
1707                "certify should remain pending without explicit false or stale cancellation"
1708            );
1709            drop(certify_rx);
1710        })
1711    }
1712
1713    #[test_traced("WARN")]
1714    fn test_core_subscription_closes_when_coding_buffer_prunes_missing_commitment() {
1715        let runner = deterministic::Runner::timed(Duration::from_secs(30));
1716        runner.start(|mut context| async move {
1717            let Fixture {
1718                participants,
1719                schemes,
1720                ..
1721            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1722            let mut oracle = setup_network_with_participants(
1723                context.child("network"),
1724                NZUsize!(1),
1725                participants.clone(),
1726            )
1727            .await;
1728
1729            let setup = CodingHarness::setup_validator(
1730                context.child("validator").with_attribute("index", 0),
1731                &mut oracle,
1732                participants[0].clone(),
1733                ConstantProvider::new(schemes[0].clone()),
1734            )
1735            .await;
1736            let marshal = setup.mailbox;
1737            let shards = setup.extra;
1738
1739            let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
1740            let missing_commitment = Commitment::from((
1741                Sha256::hash(b"missing_block"),
1742                Sha256::hash(b"missing_root"),
1743                Sha256::hash(b"missing_context"),
1744                coding_config,
1745            ));
1746            let round = Round::new(Epoch::zero(), View::new(1));
1747
1748            // Subscribe through the core actor. This internally subscribes to the
1749            // coding shard buffer and registers local waiters.
1750            let block_rx = marshal.subscribe_by_commitment(
1751                missing_commitment,
1752                core::CommitmentFallback::FetchByRound { round },
1753            );
1754
1755            // Allow core actor to register the underlying buffer subscription.
1756            context.sleep(Duration::from_millis(100)).await;
1757
1758            // Prune the missing commitment in the shard engine, which should cancel
1759            // the underlying buffer subscription.
1760            shards.prune(missing_commitment);
1761
1762            // The core actor must surface cancellation by closing the subscription,
1763            // not by panicking or leaving the waiter parked indefinitely.
1764            select! {
1765                result = block_rx => {
1766                    assert!(
1767                        result.is_err(),
1768                        "core subscription should close when coding buffer drops subscription"
1769                    );
1770                },
1771                _ = context.sleep(Duration::from_secs(5)) => {
1772                    panic!("core subscription should resolve promptly after coding prune");
1773                },
1774            }
1775        })
1776    }
1777
1778    #[test_traced("WARN")]
1779    fn test_marshaled_rejects_unsupported_epoch() {
1780        #[derive(Clone)]
1781        struct LimitedEpocher {
1782            inner: FixedEpocher,
1783            max_epoch: u64,
1784        }
1785
1786        impl Epocher for LimitedEpocher {
1787            fn containing(&self, height: Height) -> Option<crate::types::EpochInfo> {
1788                let bounds = self.inner.containing(height)?;
1789                if bounds.epoch().get() > self.max_epoch {
1790                    None
1791                } else {
1792                    Some(bounds)
1793                }
1794            }
1795
1796            fn first(&self, epoch: Epoch) -> Option<Height> {
1797                if epoch.get() > self.max_epoch {
1798                    None
1799                } else {
1800                    self.inner.first(epoch)
1801                }
1802            }
1803
1804            fn last(&self, epoch: Epoch) -> Option<Height> {
1805                if epoch.get() > self.max_epoch {
1806                    None
1807                } else {
1808                    self.inner.last(epoch)
1809                }
1810            }
1811        }
1812
1813        let runner = deterministic::Runner::timed(Duration::from_secs(60));
1814        runner.start(|mut context| async move {
1815            let Fixture {
1816                participants,
1817                schemes,
1818                ..
1819            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1820            let mut oracle = setup_network_with_participants(
1821                context.child("network"),
1822                NZUsize!(1),
1823                participants.clone(),
1824            )
1825            .await;
1826
1827            let me = participants[0].clone();
1828            let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
1829
1830            let setup = CodingHarness::setup_validator(
1831                context.child("validator").with_attribute("index", 0),
1832                &mut oracle,
1833                me.clone(),
1834                ConstantProvider::new(schemes[0].clone()),
1835            )
1836            .await;
1837            let marshal = setup.mailbox;
1838            let shards = setup.extra;
1839
1840            let genesis_ctx = CodingCtx {
1841                round: Round::zero(),
1842                leader: default_leader(),
1843                parent: (View::zero(), genesis_commitment()),
1844            };
1845            let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
1846
1847            let mock_app: MockVerifyingApp<CodingB, S> = MockVerifyingApp::new();
1848            let limited_epocher = LimitedEpocher {
1849                inner: FixedEpocher::new(BLOCKS_PER_EPOCH),
1850                max_epoch: 0,
1851            };
1852            let cfg = MarshaledConfig {
1853                application: mock_app,
1854                marshal: marshal.clone(),
1855                shards: shards.clone(),
1856                scheme_provider: ConstantProvider::new(schemes[0].clone()),
1857                epocher: limited_epocher,
1858                strategy: Sequential,
1859            };
1860            let mut marshaled = Marshaled::new(context.child("marshaled"), cfg);
1861
1862            // Create a parent block at height 19 (last block in epoch 0, which is supported)
1863            let parent_ctx = CodingCtx {
1864                round: Round::new(Epoch::zero(), View::new(19)),
1865                leader: default_leader(),
1866                parent: (View::zero(), genesis_commitment()),
1867            };
1868            let parent = make_coding_block(parent_ctx, genesis.digest(), Height::new(19), 1000);
1869            let parent_digest = parent.digest();
1870            let coded_parent = CodedBlock::new(parent.clone(), coding_config, &Sequential);
1871            let parent_commitment = coded_parent.commitment();
1872            shards.proposed(Round::new(Epoch::zero(), View::new(19)), coded_parent);
1873
1874            // Create a block at height 20 (first block in epoch 1, which is NOT supported)
1875            let block_ctx = CodingCtx {
1876                round: Round::new(Epoch::new(1), View::new(20)),
1877                leader: default_leader(),
1878                parent: (View::new(19), parent_commitment),
1879            };
1880            let block = make_coding_block(block_ctx, parent_digest, Height::new(20), 2000);
1881            let coded_block = CodedBlock::new(block.clone(), coding_config, &Sequential);
1882            let block_commitment = coded_block.commitment();
1883            shards.proposed(Round::new(Epoch::new(1), View::new(20)), coded_block);
1884
1885            context.sleep(Duration::from_millis(10)).await;
1886
1887            // In the coding marshal, verify() returns shard validity while deferred_verify
1888            // runs in the background. We need to use certify() to get the deferred_verify result.
1889            let unsupported_round = Round::new(Epoch::new(1), View::new(20));
1890            let unsupported_context = CodingCtx {
1891                round: unsupported_round,
1892                leader: me.clone(),
1893                parent: (View::new(19), parent_commitment),
1894            };
1895
1896            // Call verify to kick off deferred verification
1897            let _shard_validity = marshaled
1898                .verify(unsupported_context, block_commitment)
1899                .await;
1900
1901            // Use certify to get the actual deferred_verify result
1902            let certify_result = marshaled
1903                .certify(unsupported_round, block_commitment)
1904                .await
1905                .await;
1906
1907            assert!(
1908                !certify_result.unwrap(),
1909                "Block in unsupported epoch should be rejected"
1910            );
1911        })
1912    }
1913
1914    #[test_traced("WARN")]
1915    fn test_marshaled_rejects_invalid_ancestry() {
1916        let runner = deterministic::Runner::timed(Duration::from_secs(60));
1917        runner.start(|mut context| async move {
1918            let Fixture {
1919                participants,
1920                schemes,
1921                ..
1922            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1923            let mut oracle = setup_network_with_participants(
1924                context.child("network"),
1925                NZUsize!(1),
1926                participants.clone(),
1927            )
1928            .await;
1929
1930            let me = participants[0].clone();
1931            let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
1932
1933            let setup = CodingHarness::setup_validator(
1934                context.child("validator").with_attribute("index", 0),
1935                &mut oracle,
1936                me.clone(),
1937                ConstantProvider::new(schemes[0].clone()),
1938            )
1939            .await;
1940            let marshal = setup.mailbox;
1941            let shards = setup.extra;
1942
1943            // Create genesis block
1944            let genesis_ctx = CodingCtx {
1945                round: Round::zero(),
1946                leader: default_leader(),
1947                parent: (View::zero(), genesis_commitment()),
1948            };
1949            let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
1950
1951            // Wrap with Marshaled verifier
1952            let mock_app: MockVerifyingApp<CodingB, S> = MockVerifyingApp::new();
1953            let cfg = MarshaledConfig {
1954                application: mock_app,
1955                marshal: marshal.clone(),
1956                shards: shards.clone(),
1957                scheme_provider: ConstantProvider::new(schemes[0].clone()),
1958                epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
1959                strategy: Sequential,
1960            };
1961            let mut marshaled = Marshaled::new(context.child("marshaled"), cfg);
1962
1963            // Test case 1: Non-contiguous height
1964            //
1965            // We need both blocks in the same epoch.
1966            // With BLOCKS_PER_EPOCH=20: epoch 0 is heights 0-19, epoch 1 is heights 20-39
1967            //
1968            // Store honest parent at height 21 (epoch 1)
1969            let honest_parent_ctx = CodingCtx {
1970                round: Round::new(Epoch::new(1), View::new(21)),
1971                leader: default_leader(),
1972                parent: (View::zero(), genesis_commitment()),
1973            };
1974            let honest_parent = make_coding_block(
1975                honest_parent_ctx,
1976                genesis.digest(),
1977                Height::new(BLOCKS_PER_EPOCH.get() + 1),
1978                1000,
1979            );
1980            let parent_digest = honest_parent.digest();
1981            let coded_parent = CodedBlock::new(honest_parent.clone(), coding_config, &Sequential);
1982            let parent_commitment = coded_parent.commitment();
1983            shards.proposed(Round::new(Epoch::new(1), View::new(21)), coded_parent);
1984
1985            // Byzantine proposer broadcasts malicious block at height 35
1986            // The block has the correct context (matching what consensus will provide)
1987            // but contains invalid content (non-contiguous height: 21 -> 35 instead of 21 -> 22)
1988            let byzantine_round = Round::new(Epoch::new(1), View::new(35));
1989            let byzantine_context = CodingCtx {
1990                round: byzantine_round,
1991                leader: me.clone(),
1992                parent: (View::new(21), parent_commitment), // Consensus says parent is at height 21
1993            };
1994            let malicious_block = make_coding_block(
1995                byzantine_context.clone(),
1996                parent_digest,
1997                Height::new(BLOCKS_PER_EPOCH.get() + 15), // Byzantine: non-contiguous height
1998                2000,
1999            );
2000            let coded_malicious =
2001                CodedBlock::new(malicious_block.clone(), coding_config, &Sequential);
2002            let malicious_commitment = coded_malicious.commitment();
2003            shards.proposed(byzantine_round, coded_malicious);
2004
2005            // Small delay to ensure broadcast is processed
2006            context.sleep(Duration::from_millis(10)).await;
2007
2008            // Marshaled.verify() kicks off deferred verification in the background.
2009            // The Marshaled verifier will:
2010            // 1. Fetch honest_parent (height 21) from marshal based on context.parent
2011            // 2. Fetch malicious_block (height 35) from marshal based on digest
2012            // 3. Validate height is contiguous (fail)
2013            // 4. Return false
2014            let _shard_validity = marshaled
2015                .verify(byzantine_context, malicious_commitment)
2016                .await;
2017
2018            // Use certify to get the actual deferred_verify result
2019            let certify_result = marshaled
2020                .certify(byzantine_round, malicious_commitment)
2021                .await
2022                .await;
2023
2024            assert!(
2025                !certify_result.unwrap(),
2026                "Byzantine block with non-contiguous heights should be rejected"
2027            );
2028
2029            // Test case 2: Mismatched parent digest
2030            //
2031            // Create another malicious block with correct context and height
2032            // but referencing the wrong parent digest (genesis instead of honest_parent)
2033            let byzantine_round2 = Round::new(Epoch::new(1), View::new(22));
2034            let byzantine_context2 = CodingCtx {
2035                round: byzantine_round2,
2036                leader: me.clone(),
2037                parent: (View::new(21), parent_commitment), // Consensus says parent is at height 21
2038            };
2039            let malicious_block2 = make_coding_block(
2040                byzantine_context2.clone(),
2041                genesis.digest(), // Byzantine: wrong parent digest
2042                Height::new(BLOCKS_PER_EPOCH.get() + 2),
2043                3000,
2044            );
2045            let coded_malicious2 =
2046                CodedBlock::new(malicious_block2.clone(), coding_config, &Sequential);
2047            let malicious_commitment2 = coded_malicious2.commitment();
2048            shards.proposed(byzantine_round2, coded_malicious2);
2049
2050            // Small delay to ensure broadcast is processed
2051            context.sleep(Duration::from_millis(10)).await;
2052
2053            // Marshaled.verify() kicks off deferred verification in the background.
2054            // The Marshaled verifier will:
2055            // 1. Fetch honest_parent (height 21) from marshal based on context.parent
2056            // 2. Fetch malicious_block (height 22) from marshal based on digest
2057            // 3. Validate height is contiguous
2058            // 4. Validate parent commitment matches (fail)
2059            // 5. Return false
2060            let _shard_validity = marshaled
2061                .verify(byzantine_context2, malicious_commitment2)
2062                .await;
2063
2064            // Use certify to get the actual deferred_verify result
2065            let certify_result = marshaled
2066                .certify(byzantine_round2, malicious_commitment2)
2067                .await
2068                .await;
2069
2070            assert!(
2071                !certify_result.unwrap(),
2072                "Byzantine block with mismatched parent commitment should be rejected"
2073            );
2074        })
2075    }
2076
2077    #[test_traced("WARN")]
2078    fn test_certify_without_prior_verify_crash_recovery() {
2079        // After a crash, consensus may call certify() without a prior verify().
2080        // The certify path (marshaled.rs:842-936) should:
2081        //   1. Find no in-progress verification task
2082        //   2. Subscribe to the block from the shard engine
2083        //   3. Use the block's embedded context for deferred_verify
2084        //   4. Return Ok(true) for a valid block
2085        let runner = deterministic::Runner::timed(Duration::from_secs(30));
2086        runner.start(|mut context| async move {
2087            let Fixture {
2088                participants,
2089                schemes,
2090                ..
2091            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
2092            let mut oracle = setup_network_with_participants(
2093                context.child("network"),
2094                NZUsize!(1),
2095                participants.clone(),
2096            )
2097            .await;
2098
2099            let me = participants[0].clone();
2100            let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
2101
2102            let setup = CodingHarness::setup_validator(
2103                context.child("validator").with_attribute("index", 0),
2104                &mut oracle,
2105                me.clone(),
2106                ConstantProvider::new(schemes[0].clone()),
2107            )
2108            .await;
2109            let marshal = setup.mailbox;
2110            let shards = setup.extra;
2111
2112            let genesis_ctx = CodingCtx {
2113                round: Round::zero(),
2114                leader: default_leader(),
2115                parent: (View::zero(), genesis_commitment()),
2116            };
2117            let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
2118
2119            let mock_app: MockVerifyingApp<CodingB, S> = MockVerifyingApp::new();
2120            let cfg = MarshaledConfig {
2121                application: mock_app,
2122                marshal: marshal.clone(),
2123                shards: shards.clone(),
2124                scheme_provider: ConstantProvider::new(schemes[0].clone()),
2125                epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
2126                strategy: Sequential,
2127            };
2128            let mut marshaled = Marshaled::new(context.child("marshaled"), cfg);
2129
2130            // Create parent at height 1.
2131            let parent_round = Round::new(Epoch::zero(), View::new(1));
2132            let parent_ctx = CodingCtx {
2133                round: parent_round,
2134                leader: default_leader(),
2135                parent: (View::zero(), genesis_commitment()),
2136            };
2137            let parent = make_coding_block(parent_ctx, genesis.digest(), Height::new(1), 100);
2138            let coded_parent = CodedBlock::new(parent.clone(), coding_config, &Sequential);
2139            let parent_commitment = coded_parent.commitment();
2140            shards.proposed(parent_round, coded_parent);
2141
2142            // Create child at height 2.
2143            let child_round = Round::new(Epoch::zero(), View::new(2));
2144            let child_ctx = CodingCtx {
2145                round: child_round,
2146                leader: me.clone(),
2147                parent: (View::new(1), parent_commitment),
2148            };
2149            let child = make_coding_block(child_ctx, parent.digest(), Height::new(2), 200);
2150            let coded_child = CodedBlock::new(child, coding_config, &Sequential);
2151            let child_commitment = coded_child.commitment();
2152            shards.proposed(child_round, coded_child);
2153
2154            context.sleep(Duration::from_millis(10)).await;
2155
2156            // Call certify directly without any prior verify (simulating crash recovery).
2157            let certify_rx = marshaled.certify(child_round, child_commitment).await;
2158            select! {
2159                result = certify_rx => {
2160                    assert!(
2161                        result.unwrap(),
2162                        "certify without prior verify should succeed for valid block"
2163                    );
2164                },
2165                _ = context.sleep(Duration::from_secs(5)) => {
2166                    panic!("certify should complete within timeout");
2167                },
2168            }
2169        })
2170    }
2171
2172    /// Regression test: a Byzantine leader must not be able to crash honest nodes
2173    /// by proposing a `Commitment` with invalid `CodingConfig` bytes (e.g.
2174    /// zero-valued `NonZeroU16` fields). The fix validates the embedded config
2175    /// during deserialization so malformed commitments are rejected at the codec
2176    /// level before reaching `verify()`.
2177    #[test_traced("WARN")]
2178    fn test_malformed_commitment_config_rejected_at_deserialization() {
2179        use commonware_codec::{Encode, ReadExt};
2180
2181        // Construct a Commitment with all-zero bytes (invalid CodingConfig:
2182        // minimum_shards=0, extra_shards=0). Serialize it and attempt to
2183        // deserialize -- this must fail.
2184        let malformed_bytes = [0u8; Commitment::SIZE];
2185        let result = Commitment::read(&mut &malformed_bytes[..]);
2186        assert!(
2187            result.is_err(),
2188            "deserialization of Commitment with zeroed CodingConfig must fail"
2189        );
2190
2191        // A validly-constructed Commitment must still round-trip.
2192        let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
2193        let valid = Commitment::from((
2194            Sha256::hash(b"block"),
2195            Sha256::hash(b"root"),
2196            Sha256::hash(b"context"),
2197            coding_config,
2198        ));
2199        let encoded = valid.encode();
2200        let decoded =
2201            Commitment::read(&mut &encoded[..]).expect("valid Commitment must deserialize");
2202        assert_eq!(valid, decoded);
2203    }
2204
2205    #[test_traced("WARN")]
2206    fn test_certify_propagates_application_verify_failure() {
2207        let runner = deterministic::Runner::timed(Duration::from_secs(30));
2208        runner.start(|mut context| async move {
2209            // 1) Set up a single validator marshal stack.
2210            let Fixture {
2211                participants,
2212                schemes,
2213                ..
2214            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
2215            let mut oracle = setup_network_with_participants(
2216                context.child("network"),
2217                NZUsize!(1),
2218                participants.clone(),
2219            )
2220            .await;
2221
2222            let me = participants[0].clone();
2223            let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
2224
2225            let setup = CodingHarness::setup_validator(
2226                context.child("validator").with_attribute("index", 0),
2227                &mut oracle,
2228                me.clone(),
2229                ConstantProvider::new(schemes[0].clone()),
2230            )
2231            .await;
2232            let marshal = setup.mailbox;
2233            let shards = setup.extra;
2234
2235            let genesis_ctx = CodingCtx {
2236                round: Round::zero(),
2237                leader: default_leader(),
2238                parent: (View::zero(), genesis_commitment()),
2239            };
2240            let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
2241            // 2) Force application verification to fail in deferred verification.
2242            let mock_app: MockVerifyingApp<CodingB, S> =
2243                MockVerifyingApp::with_verify_result(false);
2244
2245            let cfg = MarshaledConfig {
2246                application: mock_app,
2247                marshal: marshal.clone(),
2248                shards: shards.clone(),
2249                scheme_provider: ConstantProvider::new(schemes[0].clone()),
2250                epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
2251                strategy: Sequential,
2252            };
2253            let mut marshaled = Marshaled::new(context.child("marshaled"), cfg);
2254
2255            let parent_round = Round::new(Epoch::zero(), View::new(1));
2256            let parent_context = CodingCtx {
2257                round: parent_round,
2258                leader: me.clone(),
2259                parent: (View::zero(), genesis_commitment()),
2260            };
2261            let parent = make_coding_block(parent_context, genesis.digest(), Height::new(1), 100);
2262            let coded_parent = CodedBlock::new(parent.clone(), coding_config, &Sequential);
2263            let parent_commitment = coded_parent.commitment();
2264            shards.proposed(parent_round, coded_parent);
2265
2266            // 3) Publish a valid child so optimistic verify can succeed.
2267            let round = Round::new(Epoch::zero(), View::new(2));
2268            let verify_context = CodingCtx {
2269                round,
2270                leader: me,
2271                parent: (View::new(1), parent_commitment),
2272            };
2273            let block =
2274                make_coding_block(verify_context.clone(), parent.digest(), Height::new(2), 200);
2275            let coded_block = CodedBlock::new(block, coding_config, &Sequential);
2276            let commitment = coded_block.commitment();
2277            shards.proposed(round, coded_block);
2278
2279            context.sleep(Duration::from_millis(10)).await;
2280
2281            let optimistic = marshaled.verify(verify_context, commitment).await;
2282            assert!(
2283                optimistic.await.expect("verify result missing"),
2284                "optimistic verify should pass pre-checks and schedule deferred verification"
2285            );
2286
2287            // 4) Certify must observe the deferred application failure and return false.
2288            let certify = marshaled.certify(round, commitment).await;
2289            assert!(
2290                !certify.await.expect("certify result missing"),
2291                "certify should propagate deferred application verify failure"
2292            );
2293        })
2294    }
2295
2296    #[test_traced("WARN")]
2297    fn test_backfill_block_mismatched_commitment() {
2298        // Regression: when backfilling by Key::Block(commitment), a peer may return
2299        // a coded block with matching inner digest but a different coding commitment.
2300        // If a finalization for this digest is already cached, marshal must reject
2301        // the block unless V::commitment(block) matches the finalization payload.
2302        let runner = deterministic::Runner::timed(Duration::from_secs(30));
2303        runner.start(|mut context| async move {
2304            let Fixture {
2305                participants,
2306                schemes,
2307                ..
2308            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
2309            let mut oracle = setup_network_with_participants(
2310                context.child("network"),
2311                NZUsize!(1),
2312                participants[..2].iter().cloned(),
2313            )
2314            .await;
2315
2316            let coding_config_a = coding_config_for_participants(NUM_VALIDATORS as u16);
2317            // Same total shards (4) but different min/extra split produces a different
2318            // coding root and config bytes, yielding a different commitment.
2319            let coding_config_b = commonware_coding::Config {
2320                minimum_shards: coding_config_a.minimum_shards.checked_add(1).unwrap(),
2321                extra_shards: NZU16!(coding_config_a.extra_shards.get() - 1),
2322            };
2323
2324            let v0_setup = CodingHarness::setup_validator(
2325                context.child("validator").with_attribute("index", 0),
2326                &mut oracle,
2327                participants[0].clone(),
2328                ConstantProvider::new(schemes[0].clone()),
2329            )
2330            .await;
2331            let v1_setup = CodingHarness::setup_validator(
2332                context.child("validator").with_attribute("index", 1),
2333                &mut oracle,
2334                participants[1].clone(),
2335                ConstantProvider::new(schemes[1].clone()),
2336            )
2337            .await;
2338
2339            setup_network_links(&mut oracle, &participants[..2], LINK).await;
2340
2341            let mut v0_mailbox = v0_setup.mailbox;
2342            let v1_mailbox = v1_setup.mailbox;
2343
2344            let genesis_ctx = CodingCtx {
2345                round: Round::zero(),
2346                leader: default_leader(),
2347                parent: (View::zero(), genesis_commitment()),
2348            };
2349            let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
2350
2351            let round1 = Round::new(Epoch::zero(), View::new(1));
2352            let block1_ctx = CodingCtx {
2353                round: round1,
2354                leader: participants[0].clone(),
2355                parent: (View::zero(), genesis_commitment()),
2356            };
2357            let block1 = make_coding_block(block1_ctx, genesis.digest(), Height::new(1), 100);
2358
2359            let coded_block_a: CodedBlock<_, ReedSolomon<Sha256>, Sha256> =
2360                CodedBlock::new(block1.clone(), coding_config_a, &Sequential);
2361            let commitment_a = coded_block_a.commitment();
2362
2363            let coded_block_b: CodedBlock<_, ReedSolomon<Sha256>, Sha256> =
2364                CodedBlock::new(block1.clone(), coding_config_b, &Sequential);
2365            let commitment_b = coded_block_b.commitment();
2366
2367            assert_eq!(coded_block_a.digest(), coded_block_b.digest());
2368            assert_ne!(commitment_a, commitment_b);
2369
2370            // Validator 1 proposes coded_block_b (same inner block, different coding).
2371            // This stores it in v1's shard engine and actor cache.
2372            assert!(v1_mailbox.verified(round1, coded_block_b.clone()).await);
2373            context.sleep(Duration::from_millis(100)).await;
2374
2375            // Create finalization referencing commitment_a (the "correct" commitment).
2376            let proposal: Proposal<Commitment> = Proposal {
2377                round: round1,
2378                parent: View::zero(),
2379                payload: commitment_a,
2380            };
2381            let finalization = CodingHarness::make_finalization(proposal.clone(), &schemes, QUORUM);
2382
2383            // Report finalization to v0. v0 doesn't have the block:
2384            //   - it fetches Key::Block(commitment)
2385            //   - v1 responds with coded_block_b (same digest, wrong commitment)
2386            //   - deliver path must reject because the response commitment does not
2387            //     match the request key
2388            CodingHarness::report_finalization(&mut v0_mailbox, finalization).await;
2389
2390            // Wait for the fetch cycle to complete.
2391            context.sleep(Duration::from_secs(5)).await;
2392
2393            // The mismatched block must not be stored.
2394            let stored = v0_mailbox.get_block(Height::new(1)).await;
2395            assert!(
2396                stored.is_none(),
2397                "v0 should reject backfilled block with mismatched commitment"
2398            );
2399
2400            // Without the block, finalization should not be persisted by height yet.
2401            let stored_finalization = v0_mailbox.get_finalization(Height::new(1)).await;
2402            assert!(
2403                stored_finalization.is_none(),
2404                "finalization should not be archived until matching block is available"
2405            );
2406        })
2407    }
2408
2409    #[test_traced("WARN")]
2410    #[should_panic(expected = "floor block parent commitment mismatch")]
2411    fn test_coding_floor_anchor_panics_on_parent_commitment_mismatch() {
2412        let runner = deterministic::Runner::timed(Duration::from_secs(30));
2413        runner.start(|mut context| async move {
2414            let Fixture {
2415                participants,
2416                schemes,
2417                ..
2418            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
2419            let (mailbox, resolver, _actor_handle) = start_coding_actor_with_recording(
2420                context.child("validator"),
2421                "floor-parent-commitment-mismatch",
2422                ConstantProvider::new(schemes[0].clone()),
2423                RecordingCodingBuffer::default(),
2424            )
2425            .await;
2426
2427            let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
2428            let parent_round = Round::new(Epoch::zero(), View::new(1));
2429            let parent_context = CodingCtx {
2430                round: parent_round,
2431                leader: participants[0].clone(),
2432                parent: (View::zero(), genesis_commitment()),
2433            };
2434            let parent = make_coding_block(parent_context, Sha256::hash(b""), Height::new(1), 100);
2435
2436            let floor_round = Round::new(Epoch::zero(), View::new(2));
2437            let bad_context = CodingCtx {
2438                round: floor_round,
2439                leader: participants[0].clone(),
2440                parent: (View::new(1), genesis_commitment()),
2441            };
2442            let floor_block = make_coding_block(bad_context, parent.digest(), Height::new(2), 200);
2443            let coded_floor = CodedBlock::new(floor_block, coding_config, &Sequential);
2444            assert_ne!(
2445                coded_floor.parent(),
2446                coded_floor.context().parent.1.block::<D>()
2447            );
2448
2449            let finalization = CodingHarness::make_finalization(
2450                Proposal::new(
2451                    floor_round,
2452                    View::new(1),
2453                    CodingHarness::commitment(&coded_floor),
2454                ),
2455                &schemes,
2456                QUORUM,
2457            );
2458            resolver.respond_to_next_fetch(coded_floor.encode());
2459            mailbox.set_floor(finalization);
2460            context.sleep(Duration::from_secs(5)).await;
2461        })
2462    }
2463
2464    /// When the scheme provider has no entry for the current epoch,
2465    /// `Marshaled::propose` and `Marshaled::verify` must return a dropped
2466    /// receiver (the consensus engine treats `RecvError` as "abstain").
2467    #[test_traced("WARN")]
2468    fn test_marshaled_missing_scheme_skips_propose_and_verify() {
2469        let runner = deterministic::Runner::timed(Duration::from_secs(30));
2470        runner.start(|mut context| async move {
2471            let Fixture {
2472                participants,
2473                schemes,
2474                ..
2475            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
2476            let mut oracle = setup_network_with_participants(
2477                context.child("network"),
2478                NZUsize!(1),
2479                participants.clone(),
2480            )
2481            .await;
2482
2483            let me = participants[0].clone();
2484
2485            let setup = CodingHarness::setup_validator(
2486                context.child("validator").with_attribute("index", 0),
2487                &mut oracle,
2488                me.clone(),
2489                ConstantProvider::new(schemes[0].clone()),
2490            )
2491            .await;
2492
2493            let mock_app: MockVerifyingApp<CodingB, S> = MockVerifyingApp::new();
2494
2495            let cfg = MarshaledConfig {
2496                application: mock_app,
2497                marshal: setup.mailbox,
2498                shards: setup.extra,
2499                scheme_provider: EmptyProvider,
2500                epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
2501                strategy: Sequential,
2502            };
2503            let mut marshaled = Marshaled::new(context.child("marshaled"), cfg);
2504
2505            let ctx = CodingCtx {
2506                round: Round::new(Epoch::zero(), View::new(1)),
2507                leader: me.clone(),
2508                parent: (View::zero(), genesis_commitment()),
2509            };
2510
2511            // propose with a missing scheme returns a dropped sender
2512            let rx = marshaled.propose(ctx.clone()).await;
2513            assert!(rx.await.is_err());
2514
2515            // verify with a missing scheme returns a dropped sender
2516            let rx = marshaled.verify(ctx, genesis_commitment()).await;
2517            assert!(rx.await.is_err());
2518        });
2519    }
2520
2521    /// Regression: a validator must not vote finalize on a block that is not
2522    /// durably persisted. `certify` resolves true ⟹ block is on disk for
2523    /// this validator. We assert this by aborting the marshal actor the
2524    /// instant `certify` returns true; without the persist-before-certify
2525    /// fix, the actor may have only had the `Verified` message enqueued (not
2526    /// processed), and the block is lost on restart even though the validator
2527    /// would have proceeded to broadcast a finalize vote.
2528    #[test_traced("WARN")]
2529    fn test_marshaled_certify_persists_block_before_resolving() {
2530        for seed in 0u64..16 {
2531            certify_persists_block_before_resolving_at(seed);
2532        }
2533    }
2534
2535    fn certify_persists_block_before_resolving_at(seed: u64) {
2536        let runner = deterministic::Runner::new(
2537            deterministic::Config::new()
2538                .with_seed(seed)
2539                .with_timeout(Some(Duration::from_secs(60))),
2540        );
2541        runner.start(|mut context| async move {
2542            let Fixture {
2543                participants,
2544                schemes,
2545                ..
2546            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
2547            let mut oracle = setup_network_with_participants(
2548                context.child("network"),
2549                NZUsize!(1),
2550                participants.clone(),
2551            )
2552            .await;
2553
2554            let me = participants[0].clone();
2555            let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
2556
2557            let setup = CodingHarness::setup_validator(
2558                context.child("validator").with_attribute("index", 0),
2559                &mut oracle,
2560                me.clone(),
2561                ConstantProvider::new(schemes[0].clone()),
2562            )
2563            .await;
2564            let marshal = setup.mailbox;
2565            let shards = setup.extra;
2566            let marshal_actor_handle = setup.actor_handle;
2567
2568            let genesis_ctx = CodingCtx {
2569                round: Round::zero(),
2570                leader: default_leader(),
2571                parent: (View::zero(), genesis_commitment()),
2572            };
2573            let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
2574
2575            // Push parent (height 1) and child (height 2) into the shards
2576            // engine. These are reconstructable but NOT durably persisted.
2577            let parent_round = Round::new(Epoch::zero(), View::new(1));
2578            let parent_ctx = CodingCtx {
2579                round: parent_round,
2580                leader: default_leader(),
2581                parent: (View::zero(), genesis_commitment()),
2582            };
2583            let parent = make_coding_block(parent_ctx, genesis.digest(), Height::new(1), 100);
2584            let coded_parent = CodedBlock::new(parent.clone(), coding_config, &Sequential);
2585            let parent_commitment = coded_parent.commitment();
2586            shards.proposed(parent_round, coded_parent);
2587
2588            let child_round = Round::new(Epoch::zero(), View::new(2));
2589            let child_ctx = CodingCtx {
2590                round: child_round,
2591                leader: me.clone(),
2592                parent: (View::new(1), parent_commitment),
2593            };
2594            let child = make_coding_block(child_ctx.clone(), parent.digest(), Height::new(2), 200);
2595            let coded_child = CodedBlock::new(child.clone(), coding_config, &Sequential);
2596            let child_commitment = coded_child.commitment();
2597            let child_digest = coded_child.digest();
2598            shards.proposed(child_round, coded_child);
2599
2600            context.sleep(Duration::from_millis(10)).await;
2601
2602            let mock_app: MockVerifyingApp<CodingB, S> = MockVerifyingApp::new();
2603            let cfg = MarshaledConfig {
2604                application: mock_app,
2605                marshal: marshal.clone(),
2606                shards: shards.clone(),
2607                scheme_provider: ConstantProvider::new(schemes[0].clone()),
2608                epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
2609                strategy: Sequential,
2610            };
2611            let mut marshaled = Marshaled::new(context.child("marshaled"), cfg);
2612
2613            // Optimistic verify - returns shard validity (true).
2614            let shard_validity = marshaled
2615                .verify(child_ctx, child_commitment)
2616                .await
2617                .await
2618                .expect("verify result missing");
2619            assert!(shard_validity, "shard validity should pass");
2620
2621            // Certify - this is the safety gate before finalize voting.
2622            let certify_result = marshaled
2623                .certify(child_round, child_commitment)
2624                .await
2625                .await
2626                .expect("certify result missing");
2627            assert!(certify_result, "certify should succeed");
2628
2629            // Abort marshal immediately after certify returns to prove the
2630            // block is already persisted at that point.
2631            marshal_actor_handle.abort();
2632            drop(marshaled);
2633            drop(marshal);
2634            drop(shards);
2635
2636            // Restart from the same partition. The block must be durably
2637            // persisted - otherwise the validator would have voted finalize
2638            // for a block it cannot serve from local storage.
2639            let setup2 = CodingHarness::setup_validator(
2640                context
2641                    .child("validator_restart")
2642                    .with_attribute("index", 0),
2643                &mut oracle,
2644                me.clone(),
2645                ConstantProvider::new(schemes[0].clone()),
2646            )
2647            .await;
2648            let marshal2 = setup2.mailbox;
2649
2650            let post_restart = marshal2.get_block(&child_digest).await;
2651            assert!(
2652                post_restart.is_some(),
2653                "certify resolved true ⟹ block must be durably persisted"
2654            );
2655        });
2656    }
2657
2658    /// Regression: a proposer must be able to recover its own block after a
2659    /// crash that occurs immediately after `Marshaled::propose()` returns a
2660    /// commitment. `propose` is responsible for persisting the block via
2661    /// `marshal.verified`, so the block must survive restart even if
2662    /// `Relay::broadcast` never runs or marshal aborts in between.
2663    #[test_traced("WARN")]
2664    fn test_marshaled_proposed_block_persists_across_restart() {
2665        let runner = deterministic::Runner::timed(Duration::from_secs(60));
2666        runner.start(|mut context| async move {
2667            let Fixture {
2668                participants,
2669                schemes,
2670                ..
2671            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
2672            let mut oracle = setup_network_with_participants(
2673                context.child("network"),
2674                NZUsize!(1),
2675                participants.clone(),
2676            )
2677            .await;
2678
2679            let me = participants[0].clone();
2680            let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
2681
2682            let setup = CodingHarness::setup_validator(
2683                context.child("validator").with_attribute("index", 0),
2684                &mut oracle,
2685                me.clone(),
2686                ConstantProvider::new(schemes[0].clone()),
2687            )
2688            .await;
2689            let marshal = setup.mailbox;
2690            let shards = setup.extra;
2691            let marshal_actor_handle = setup.actor_handle;
2692
2693            let genesis_ctx = CodingCtx {
2694                round: Round::zero(),
2695                leader: default_leader(),
2696                parent: (View::zero(), genesis_commitment()),
2697            };
2698            let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
2699            let genesis_parent_commitment = genesis_coding_commitment::<Sha256, _>(&genesis);
2700
2701            // Build the block we want propose() to return. Its embedded context
2702            // uses the proper genesis commitment so the parent lookup matches
2703            // the cached genesis.
2704            let propose_round = Round::new(Epoch::zero(), View::new(1));
2705            let propose_context = CodingCtx {
2706                round: propose_round,
2707                leader: me.clone(),
2708                parent: (View::zero(), genesis_parent_commitment),
2709            };
2710            let block_to_propose = make_coding_block(
2711                propose_context.clone(),
2712                genesis.digest(),
2713                Height::new(1),
2714                100,
2715            );
2716            let block_digest = block_to_propose.digest();
2717            let expected_commitment = CodedBlock::<_, ReedSolomon<Sha256>, Sha256>::new(
2718                block_to_propose.clone(),
2719                coding_config,
2720                &Sequential,
2721            )
2722            .commitment();
2723
2724            let mock_app: MockVerifyingApp<CodingB, S> =
2725                MockVerifyingApp::new().with_propose_result(block_to_propose);
2726            let cfg = MarshaledConfig {
2727                application: mock_app,
2728                marshal: marshal.clone(),
2729                shards: shards.clone(),
2730                scheme_provider: ConstantProvider::new(schemes[0].clone()),
2731                epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
2732                strategy: Sequential,
2733            };
2734            let mut marshaled = Marshaled::new(context.child("marshaled"), cfg);
2735
2736            // Drive the leader-side propose path. `propose` must persist the
2737            // block before returning the commitment.
2738            let commitment = marshaled
2739                .propose(propose_context)
2740                .await
2741                .await
2742                .expect("propose should produce a commitment");
2743            assert_eq!(commitment, expected_commitment);
2744
2745            // Abort marshal immediately after propose returns; the propose
2746            // path must already have persisted the block.
2747            marshal_actor_handle.abort();
2748            drop(marshaled);
2749            drop(marshal);
2750            drop(shards);
2751
2752            let setup2 = CodingHarness::setup_validator(
2753                context
2754                    .child("validator_restart")
2755                    .with_attribute("index", 0),
2756                &mut oracle,
2757                me.clone(),
2758                ConstantProvider::new(schemes[0].clone()),
2759            )
2760            .await;
2761            let marshal2 = setup2.mailbox;
2762
2763            // The proposer must recover its own block after restart. Without
2764            // the broadcast-path persistence fix, the block lived only in the
2765            // shards engine's in-memory cache and is now gone.
2766            let post_restart = marshal2.get_block(&block_digest).await;
2767            assert!(
2768                post_restart.is_some(),
2769                "proposer should recover its own block after restart"
2770            );
2771        });
2772    }
2773
2774    /// Regression: if marshal already holds a verified block for a round
2775    /// (say, persisted by a pre-crash propose whose notarize vote never
2776    /// reached the journal), a restarted leader's `propose` must return
2777    /// that block's commitment instead of rebuilding. Otherwise the
2778    /// new block lands on the same view index in the prunable archive,
2779    /// gets silently dropped (`skip_if_index_exists=true`), and the
2780    /// leader's notarize targets a commitment no peer can serve.
2781    #[test_traced("WARN")]
2782    fn test_propose_reuses_verified_block_on_restart() {
2783        let runner = deterministic::Runner::timed(Duration::from_secs(60));
2784        runner.start(|mut context| async move {
2785            let Fixture {
2786                participants,
2787                schemes,
2788                ..
2789            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
2790            let mut oracle = setup_network_with_participants(
2791                context.child("network"),
2792                NZUsize!(1),
2793                participants.clone(),
2794            )
2795            .await;
2796
2797            let me = participants[0].clone();
2798            let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
2799
2800            let setup = CodingHarness::setup_validator(
2801                context.child("validator").with_attribute("index", 0),
2802                &mut oracle,
2803                me.clone(),
2804                ConstantProvider::new(schemes[0].clone()),
2805            )
2806            .await;
2807            let marshal = setup.mailbox;
2808            let shards = setup.extra;
2809
2810            let genesis_ctx = CodingCtx {
2811                round: Round::zero(),
2812                leader: default_leader(),
2813                parent: (View::zero(), genesis_commitment()),
2814            };
2815            let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
2816            let genesis_parent_commitment = genesis_coding_commitment::<Sha256, _>(&genesis);
2817
2818            let round = Round::new(Epoch::zero(), View::new(1));
2819            let ctx = CodingCtx {
2820                round,
2821                leader: me.clone(),
2822                parent: (View::zero(), genesis_parent_commitment),
2823            };
2824
2825            // Seed block A in marshal's verified cache for `round`.
2826            let block_a = make_coding_block(ctx.clone(), genesis.digest(), Height::new(1), 100);
2827            let coded_a: CodedBlock<_, ReedSolomon<Sha256>, Sha256> =
2828                CodedBlock::new(block_a.clone(), coding_config, &Sequential);
2829            let commitment_a = coded_a.commitment();
2830            assert!(marshal.verified(round, coded_a).await);
2831
2832            // After restart, a fresh application would build a different
2833            // block for the same round.
2834            let block_b = make_coding_block(ctx.clone(), genesis.digest(), Height::new(1), 200);
2835            let coded_b: CodedBlock<_, ReedSolomon<Sha256>, Sha256> =
2836                CodedBlock::new(block_b.clone(), coding_config, &Sequential);
2837            let commitment_b = coded_b.commitment();
2838            assert_ne!(
2839                commitment_a, commitment_b,
2840                "test requires distinct commitments"
2841            );
2842
2843            let mock_app: MockVerifyingApp<CodingB, S> =
2844                MockVerifyingApp::new().with_propose_result(block_b);
2845            let cfg = MarshaledConfig {
2846                application: mock_app,
2847                marshal: marshal.clone(),
2848                shards: shards.clone(),
2849                scheme_provider: ConstantProvider::new(schemes[0].clone()),
2850                epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
2851                strategy: Sequential,
2852            };
2853            let mut marshaled = Marshaled::new(context.child("marshaled"), cfg);
2854
2855            let commitment = marshaled
2856                .propose(ctx)
2857                .await
2858                .await
2859                .expect("propose must return a commitment");
2860            assert_eq!(
2861                commitment, commitment_a,
2862                "propose must reuse the block marshal already persisted for this round"
2863            );
2864        });
2865    }
2866
2867    /// Regression: if a pre-crash leader persisted a verified block for a
2868    /// round but the simplex `Notarize` never reached the journal, replay
2869    /// can recover a `consensus_context` whose parent differs from the one
2870    /// the cached block was built against. The restarted leader must then
2871    /// drop the receiver so the voter nullifies the view via
2872    /// `MissingProposal`, rather than broadcasting the stale cached block
2873    /// under a header that peers will reject.
2874    #[test_traced("WARN")]
2875    fn test_propose_skips_when_verified_block_context_changed() {
2876        let runner = deterministic::Runner::timed(Duration::from_secs(60));
2877        runner.start(|mut context| async move {
2878            let Fixture {
2879                participants,
2880                schemes,
2881                ..
2882            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
2883            let mut oracle = setup_network_with_participants(
2884                context.child("network"),
2885                NZUsize!(1),
2886                participants.clone(),
2887            )
2888            .await;
2889
2890            let me = participants[0].clone();
2891            let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
2892
2893            let setup = CodingHarness::setup_validator(
2894                context.child("validator").with_attribute("index", 0),
2895                &mut oracle,
2896                me.clone(),
2897                ConstantProvider::new(schemes[0].clone()),
2898            )
2899            .await;
2900            let marshal = setup.mailbox;
2901            let shards = setup.extra;
2902
2903            let genesis_ctx = CodingCtx {
2904                round: Round::zero(),
2905                leader: default_leader(),
2906                parent: (View::zero(), genesis_commitment()),
2907            };
2908            let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
2909            let genesis_parent_commitment = genesis_coding_commitment::<Sha256, _>(&genesis);
2910
2911            // Stash a stale block built against genesis as its parent at round V=2.
2912            let round = Round::new(Epoch::zero(), View::new(2));
2913            let stale_ctx = CodingCtx {
2914                round,
2915                leader: me.clone(),
2916                parent: (View::zero(), genesis_parent_commitment),
2917            };
2918            let stale_block = make_coding_block(stale_ctx, genesis.digest(), Height::new(1), 100);
2919            let stale_coded: CodedBlock<_, ReedSolomon<Sha256>, Sha256> =
2920                CodedBlock::new(stale_block, coding_config, &Sequential);
2921            assert!(marshal.verified(round, stale_coded).await);
2922
2923            // Simulate a replay where parent selection now points to a
2924            // different parent commitment than the cached block was built for.
2925            let new_parent_commitment = Commitment::from((
2926                Sha256::hash(b"different-parent-block"),
2927                Sha256::hash(b"different-parent-inner"),
2928                Sha256::hash(b"different-parent-ctx"),
2929                coding_config,
2930            ));
2931            let new_ctx = CodingCtx {
2932                round,
2933                leader: me.clone(),
2934                parent: (View::new(1), new_parent_commitment),
2935            };
2936
2937            let mock_app: MockVerifyingApp<CodingB, S> = MockVerifyingApp::new();
2938            let cfg = MarshaledConfig {
2939                application: mock_app,
2940                marshal: marshal.clone(),
2941                shards: shards.clone(),
2942                scheme_provider: ConstantProvider::new(schemes[0].clone()),
2943                epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
2944                strategy: Sequential,
2945            };
2946            let mut marshaled = Marshaled::new(context.child("marshaled"), cfg);
2947
2948            let commitment_rx = marshaled.propose(new_ctx).await;
2949            assert!(
2950                commitment_rx.await.is_err(),
2951                "propose must drop the receiver when the cached block's context no longer matches"
2952            );
2953        });
2954    }
2955}