Skip to main content

commonware_consensus/marshal/coding/
mod.rs

1//! Ordered delivery of erasure-coded blocks.
2//!
3//! # Overview
4//!
5//! The coding marshal couples the consensus pipeline with erasure-coded block broadcast.
6//! Blocks are produced by an application, encoded into [`types::Shard`]s, fanned out to peers, and
7//! later reconstructed when a notarization or finalization proves that the data is needed.
8//! Compared to [`super::standard`], this variant makes more efficient usage of the network's bandwidth
9//! by spreading the load of block dissemination across all participants.
10//!
11//! # Components
12//!
13//! - [`crate::marshal::core::Actor`]: The unified marshal actor that orders finalized blocks,
14//!   handles acknowledgements from the application, and requests repairs when gaps are detected.
15//!   Used with [`Coding`] as the variant type parameter.
16//! - [`crate::marshal::core::Mailbox`]: Accepts requests from other local subsystems and forwards
17//!   them to the actor. Used with [`Coding`] as the variant type parameter.
18//! - [`shards::Engine`]: Broadcasts shards, verifies locally held fragments, and reconstructs
19//!   entire [`types::CodedBlock`]s on demand.
20//! - [`crate::marshal::resolver`]: Issues outbound fetches to remote peers when marshal is missing
21//!   a block, notarization, or finalization referenced by consensus.
22//! - [`types`]: Defines commitments, distribution shards, and helper builders used across the
23//!   module.
24//! - [`Marshaled`]: Wraps an [`crate::Application`] implementation so it automatically enforces
25//!   epoch boundaries and performs erasure encoding before a proposal leaves the application.
26//!
27//! # Data Flow
28//!
29//! 1. The application produces a block through [`Marshaled`], which encodes the payload and
30//!    obtains a [`crate::types::coding::Commitment`] describing the shard layout.
31//! 2. The block is broadcast via [`shards::Engine`]; each participant receives exactly one shard
32//!    and reshares it to everyone else once it verifies the fragment.
33//! 3. The actor ingests notarizations/finalizations from `simplex`, pulls reconstructed blocks
34//!    from the shard engine or backfills them through [`crate::marshal::resolver`], and durably
35//!    persists the ordered data.
36//! 4. The actor reports finalized blocks to the node's [`crate::Reporter`] at-least-once and
37//!    drives repair loops whenever notarizations reference yet-to-be-delivered payloads.
38//!
39//! # Storage and Repair
40//!
41//! Notarized data and certificates live in prunable archives managed internally, while finalized
42//! blocks are migrated into immutable archives. Any gaps are filled by asking peers for specific
43//! commitments through the resolver pipeline. The shard engine keeps only ephemeral, in-memory
44//! caches; once a block is finalized it is evicted from the reconstruction map, reducing memory
45//! pressure.
46//!
47//! # When to Use
48//!
49//! Choose this module when the consensus deployment wants erasure-coded dissemination with the
50//! same ordering guarantees provided by [`super::standard`]. The API is a breaking change from
51//! the standard marshal: applications must adapt to the coding-specific variant type and buffer
52//! implementation required by this module.
53
54pub mod shards;
55pub mod types;
56pub(crate) mod validation;
57
58mod variant;
59pub use variant::Coding;
60
61mod marshaled;
62pub use marshaled::{Marshaled, MarshaledConfig};
63
64#[cfg(test)]
65mod tests {
66    use crate::{
67        marshal::{
68            coding::{
69                types::{coding_config_for_participants, CodedBlock},
70                Marshaled, MarshaledConfig,
71            },
72            mocks::{
73                harness::{
74                    self, default_leader, genesis_commitment, make_coding_block,
75                    setup_network_links, setup_network_with_participants, CodingB, CodingCtx,
76                    CodingHarness, EmptyProvider, TestHarness, BLOCKS_PER_EPOCH, LINK, NAMESPACE,
77                    NUM_VALIDATORS, QUORUM, S, UNRELIABLE_LINK, V,
78                },
79                verifying::MockVerifyingApp,
80            },
81        },
82        simplex::{scheme::bls12381_threshold::vrf as bls12381_threshold_vrf, types::Proposal},
83        types::{coding::Commitment, Epoch, Epocher, FixedEpocher, Height, Round, View},
84        Automaton, CertifiableAutomaton,
85    };
86    use commonware_codec::FixedSize;
87    use commonware_coding::ReedSolomon;
88    use commonware_cryptography::{
89        certificate::{mocks::Fixture, ConstantProvider},
90        sha256::Sha256,
91        Committable, Digestible, Hasher as _,
92    };
93    use commonware_macros::{select, test_group, test_traced};
94    use commonware_parallel::Sequential;
95    use commonware_runtime::{deterministic, Clock, Metrics, Runner};
96    use commonware_utils::{NZUsize, NZU16};
97    use std::time::Duration;
98
99    #[test_group("slow")]
100    #[test_traced("WARN")]
101    fn test_coding_finalize_good_links() {
102        for seed in 0..5 {
103            let r1 = harness::finalize::<CodingHarness>(seed, LINK, false);
104            let r2 = harness::finalize::<CodingHarness>(seed, LINK, false);
105            assert_eq!(r1, r2);
106        }
107    }
108
109    #[test_group("slow")]
110    #[test_traced("WARN")]
111    fn test_coding_finalize_bad_links() {
112        for seed in 0..5 {
113            let r1 = harness::finalize::<CodingHarness>(seed, UNRELIABLE_LINK, false);
114            let r2 = harness::finalize::<CodingHarness>(seed, UNRELIABLE_LINK, false);
115            assert_eq!(r1, r2);
116        }
117    }
118
119    #[test_group("slow")]
120    #[test_traced("WARN")]
121    fn test_coding_finalize_good_links_quorum_sees_finalization() {
122        for seed in 0..5 {
123            let r1 = harness::finalize::<CodingHarness>(seed, LINK, true);
124            let r2 = harness::finalize::<CodingHarness>(seed, LINK, true);
125            assert_eq!(r1, r2);
126        }
127    }
128
129    #[test_group("slow")]
130    #[test_traced("WARN")]
131    fn test_coding_finalize_bad_links_quorum_sees_finalization() {
132        for seed in 0..5 {
133            let r1 = harness::finalize::<CodingHarness>(seed, UNRELIABLE_LINK, true);
134            let r2 = harness::finalize::<CodingHarness>(seed, UNRELIABLE_LINK, true);
135            assert_eq!(r1, r2);
136        }
137    }
138
139    #[test_traced("WARN")]
140    fn test_coding_ack_pipeline_backlog() {
141        harness::ack_pipeline_backlog::<CodingHarness>();
142    }
143
144    #[test_traced("WARN")]
145    fn test_coding_ack_pipeline_backlog_persists_on_restart() {
146        harness::ack_pipeline_backlog_persists_on_restart::<CodingHarness>();
147    }
148
149    #[test_traced("WARN")]
150    fn test_coding_sync_height_floor() {
151        harness::sync_height_floor::<CodingHarness>();
152    }
153
154    #[test_traced("WARN")]
155    fn test_coding_prune_finalized_archives() {
156        harness::prune_finalized_archives::<CodingHarness>();
157    }
158
159    #[test_traced("WARN")]
160    fn test_coding_rejects_block_delivery_below_floor() {
161        harness::reject_stale_block_delivery_after_floor_update::<CodingHarness>();
162    }
163
164    #[test_traced("WARN")]
165    fn test_coding_subscribe_basic_block_delivery() {
166        harness::subscribe_basic_block_delivery::<CodingHarness>();
167    }
168
169    #[test_traced("WARN")]
170    fn test_coding_subscribe_multiple_subscriptions() {
171        harness::subscribe_multiple_subscriptions::<CodingHarness>();
172    }
173
174    #[test_traced("WARN")]
175    fn test_coding_subscribe_canceled_subscriptions() {
176        harness::subscribe_canceled_subscriptions::<CodingHarness>();
177    }
178
179    #[test_traced("WARN")]
180    fn test_coding_subscribe_blocks_from_different_sources() {
181        harness::subscribe_blocks_from_different_sources::<CodingHarness>();
182    }
183
184    #[test_traced("WARN")]
185    fn test_coding_get_info_basic_queries_present_and_missing() {
186        harness::get_info_basic_queries_present_and_missing::<CodingHarness>();
187    }
188
189    #[test_traced("WARN")]
190    fn test_coding_get_info_latest_progression_multiple_finalizations() {
191        harness::get_info_latest_progression_multiple_finalizations::<CodingHarness>();
192    }
193
194    #[test_traced("WARN")]
195    fn test_coding_get_block_by_height_and_latest() {
196        harness::get_block_by_height_and_latest::<CodingHarness>();
197    }
198
199    #[test_traced("WARN")]
200    fn test_coding_get_block_by_commitment_from_sources_and_missing() {
201        harness::get_block_by_commitment_from_sources_and_missing::<CodingHarness>();
202    }
203
204    #[test_traced("WARN")]
205    fn test_coding_get_finalization_by_height() {
206        harness::get_finalization_by_height::<CodingHarness>();
207    }
208
209    #[test_traced("WARN")]
210    fn test_coding_hint_finalized_triggers_fetch() {
211        harness::hint_finalized_triggers_fetch::<CodingHarness>();
212    }
213
214    #[test_traced("WARN")]
215    fn test_coding_ancestry_stream() {
216        harness::ancestry_stream::<CodingHarness>();
217    }
218
219    #[test_traced("WARN")]
220    fn test_coding_finalize_same_height_different_views() {
221        harness::finalize_same_height_different_views::<CodingHarness>();
222    }
223
224    #[test_traced("WARN")]
225    fn test_coding_init_processed_height() {
226        harness::init_processed_height::<CodingHarness>();
227    }
228
229    #[test_traced("INFO")]
230    fn test_coding_broadcast_caches_block() {
231        harness::broadcast_caches_block::<CodingHarness>();
232    }
233
234    /// Test that certifying a lower-view block after a higher-view block succeeds.
235    ///
236    /// This is a critical test for crash recovery scenarios where a validator may need
237    /// to certify blocks in non-sequential view order.
238    #[test_traced("INFO")]
239    fn test_certify_lower_view_after_higher_view() {
240        let runner = deterministic::Runner::timed(Duration::from_secs(60));
241        runner.start(|mut context| async move {
242            let Fixture {
243                participants,
244                schemes,
245                ..
246            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
247            let mut oracle =
248                setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
249                    .await;
250
251            let me = participants[0].clone();
252            let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
253
254            let setup = CodingHarness::setup_validator(
255                context.with_label("validator_0"),
256                &mut oracle,
257                me.clone(),
258                ConstantProvider::new(schemes[0].clone()),
259            )
260            .await;
261            let marshal = setup.mailbox;
262            let shards = setup.extra;
263
264            let genesis_ctx = CodingCtx {
265                round: Round::zero(),
266                leader: default_leader(),
267                parent: (View::zero(), genesis_commitment()),
268            };
269            let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
270
271            let mock_app: MockVerifyingApp<CodingB, S> = MockVerifyingApp::new(genesis.clone());
272
273            let cfg = MarshaledConfig {
274                application: mock_app,
275                marshal: marshal.clone(),
276                shards: shards.clone(),
277                scheme_provider: ConstantProvider::new(schemes[0].clone()),
278                epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
279                strategy: Sequential,
280            };
281            let mut marshaled = Marshaled::new(context.clone(), cfg);
282
283            // Create parent block at height 1
284            let parent_ctx = CodingCtx {
285                round: Round::new(Epoch::new(0), View::new(1)),
286                leader: default_leader(),
287                parent: (View::zero(), genesis_commitment()),
288            };
289            let parent = make_coding_block(parent_ctx, genesis.digest(), Height::new(1), 100);
290            let parent_digest = parent.digest();
291            let coded_parent = CodedBlock::new(parent.clone(), coding_config, &Sequential);
292            let parent_commitment = coded_parent.commitment();
293            shards
294                .clone()
295                .proposed(Round::new(Epoch::new(0), View::new(1)), coded_parent)
296                .await;
297
298            // Block A at view 5 (height 2) - create with context matching what verify will receive
299            let round_a = Round::new(Epoch::new(0), View::new(5));
300            let context_a = CodingCtx {
301                round: round_a,
302                leader: me.clone(),
303                parent: (View::new(1), parent_commitment),
304            };
305            let block_a = make_coding_block(context_a.clone(), parent_digest, Height::new(2), 200);
306            let coded_block_a = CodedBlock::new(block_a.clone(), coding_config, &Sequential);
307            let commitment_a = coded_block_a.commitment();
308            shards.clone().proposed(round_a, coded_block_a).await;
309
310            // Block B at view 10 (height 2, different block same height - could happen with
311            // different proposers or re-proposals)
312            let round_b = Round::new(Epoch::new(0), View::new(10));
313            let context_b = CodingCtx {
314                round: round_b,
315                leader: me.clone(),
316                parent: (View::new(1), parent_commitment),
317            };
318            let block_b = make_coding_block(context_b.clone(), parent_digest, Height::new(2), 300);
319            let coded_block_b = CodedBlock::new(block_b.clone(), coding_config, &Sequential);
320            let commitment_b = coded_block_b.commitment();
321            shards.clone().proposed(round_b, coded_block_b).await;
322
323            context.sleep(Duration::from_millis(10)).await;
324
325            // Step 1: Verify block A at view 5
326            let _ = marshaled.verify(context_a, commitment_a).await.await;
327
328            // Step 2: Verify block B at view 10
329            let _ = marshaled.verify(context_b, commitment_b).await.await;
330
331            // Step 3: Certify block B at view 10 FIRST
332            let certify_b = marshaled.certify(round_b, commitment_b).await;
333            assert!(
334                certify_b.await.unwrap(),
335                "Block B certification should succeed"
336            );
337
338            // Step 4: Certify block A at view 5 - should succeed
339            let certify_a = marshaled.certify(round_a, commitment_a).await;
340
341            // Use select with timeout to detect never-resolving receiver
342            select! {
343                result = certify_a => {
344                    assert!(result.unwrap(), "Block A certification should succeed");
345                },
346                _ = context.sleep(Duration::from_secs(5)) => {
347                    panic!("Block A certification timed out");
348                },
349            }
350        })
351    }
352
353    /// Regression test for re-proposal validation in optimistic_verify.
354    ///
355    /// Verifies that:
356    /// 1. Valid re-proposals at epoch boundaries are accepted
357    /// 2. Invalid re-proposals (not at epoch boundary) are rejected
358    ///
359    /// A re-proposal occurs when the parent digest equals the block being verified,
360    /// meaning the same block is being proposed again in a new view.
361    #[test_traced("INFO")]
362    fn test_marshaled_reproposal_validation() {
363        let runner = deterministic::Runner::timed(Duration::from_secs(60));
364        runner.start(|mut context| async move {
365            let Fixture {
366                participants,
367                schemes,
368                ..
369            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
370            let mut oracle =
371                setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
372                    .await;
373
374            let me = participants[0].clone();
375            let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
376
377            let setup = CodingHarness::setup_validator(
378                context.with_label("validator_0"),
379                &mut oracle,
380                me.clone(),
381                ConstantProvider::new(schemes[0].clone()),
382            )
383            .await;
384            let marshal = setup.mailbox;
385            let shards = setup.extra;
386
387            let genesis_ctx = CodingCtx {
388                round: Round::zero(),
389                leader: default_leader(),
390                parent: (View::zero(), genesis_commitment()),
391            };
392            let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
393
394            let mock_app: MockVerifyingApp<CodingB, S> = MockVerifyingApp::new(genesis.clone());
395            let cfg = MarshaledConfig {
396                application: mock_app,
397                marshal: marshal.clone(),
398                shards: shards.clone(),
399                scheme_provider: ConstantProvider::new(schemes[0].clone()),
400                epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
401                strategy: Sequential,
402            };
403            let mut marshaled = Marshaled::new(context.clone(), cfg);
404
405            // Build a chain up to the epoch boundary (height 19 is the last block in epoch 0
406            // with BLOCKS_PER_EPOCH=20, since epoch 0 covers heights 0-19)
407            let mut parent = genesis.digest();
408            let mut last_view = View::zero();
409            let mut last_commitment = genesis_commitment();
410            for i in 1..BLOCKS_PER_EPOCH.get() {
411                let round = Round::new(Epoch::new(0), View::new(i));
412                let ctx = CodingCtx {
413                    round,
414                    leader: me.clone(),
415                    parent: (last_view, last_commitment),
416                };
417                let block = make_coding_block(ctx.clone(), parent, Height::new(i), i * 100);
418                let coded_block = CodedBlock::new(block.clone(), coding_config, &Sequential);
419                last_commitment = coded_block.commitment();
420                shards.clone().proposed(round, coded_block).await;
421                parent = block.digest();
422                last_view = View::new(i);
423            }
424
425            // Create the epoch boundary block (height 19, last block in epoch 0)
426            let boundary_height = Height::new(BLOCKS_PER_EPOCH.get() - 1);
427            let boundary_round = Round::new(Epoch::new(0), View::new(boundary_height.get()));
428            let boundary_context = CodingCtx {
429                round: boundary_round,
430                leader: me.clone(),
431                parent: (last_view, last_commitment),
432            };
433            let boundary_block = make_coding_block(
434                boundary_context.clone(),
435                parent,
436                boundary_height,
437                boundary_height.get() * 100,
438            );
439            let coded_boundary =
440                CodedBlock::new(boundary_block.clone(), coding_config, &Sequential);
441            let boundary_commitment = coded_boundary.commitment();
442            shards
443                .clone()
444                .proposed(boundary_round, coded_boundary)
445                .await;
446
447            context.sleep(Duration::from_millis(10)).await;
448
449            // Test 1: Valid re-proposal at epoch boundary should be accepted
450            // Re-proposal context: parent digest equals the block being verified
451            // Re-proposals happen within the same epoch when the parent is the last block
452            //
453            // In the coding marshal, verify() returns shard validity while deferred_verify
454            // runs in the background. We call verify() to register the verification task,
455            // then certify() returns the deferred_verify result.
456            let reproposal_round = Round::new(Epoch::new(0), View::new(20));
457            let reproposal_context = CodingCtx {
458                round: reproposal_round,
459                leader: me.clone(),
460                parent: (View::new(boundary_height.get()), boundary_commitment), // Parent IS the boundary block
461            };
462
463            // Call verify to kick off deferred verification.
464            // We must await the verify result to ensure the verification task is
465            // registered before calling certify.
466            let shard_validity = marshaled
467                .verify(reproposal_context.clone(), boundary_commitment)
468                .await
469                .await;
470            assert!(
471                shard_validity.unwrap(),
472                "Re-proposal verify should return true for shard validity"
473            );
474
475            // Use certify to get the actual deferred_verify result
476            let certify_result = marshaled
477                .certify(reproposal_round, boundary_commitment)
478                .await
479                .await;
480            assert!(
481                certify_result.unwrap(),
482                "Valid re-proposal at epoch boundary should be accepted"
483            );
484
485            // Test 2: Invalid re-proposal (not at epoch boundary) should be rejected
486            // Create a block at height 10 (not at epoch boundary)
487            let non_boundary_height = Height::new(10);
488            let non_boundary_round = Round::new(Epoch::new(0), View::new(10));
489            // For simplicity, we'll create a fresh non-boundary block and test re-proposal
490            let non_boundary_context = CodingCtx {
491                round: non_boundary_round,
492                leader: me.clone(),
493                parent: (View::new(9), last_commitment), // Use a prior commitment
494            };
495            let non_boundary_block = make_coding_block(
496                non_boundary_context.clone(),
497                parent,
498                non_boundary_height,
499                1000,
500            );
501            let coded_non_boundary =
502                CodedBlock::new(non_boundary_block.clone(), coding_config, &Sequential);
503            let non_boundary_commitment = coded_non_boundary.commitment();
504
505            // Make the non-boundary block available
506            shards
507                .clone()
508                .proposed(non_boundary_round, coded_non_boundary)
509                .await;
510
511            context.sleep(Duration::from_millis(10)).await;
512
513            // Attempt to re-propose the non-boundary block
514            let invalid_reproposal_round = Round::new(Epoch::new(0), View::new(15));
515            let invalid_reproposal_context = CodingCtx {
516                round: invalid_reproposal_round,
517                leader: me.clone(),
518                parent: (View::new(10), non_boundary_commitment),
519            };
520
521            // Call verify to kick off deferred verification.
522            // We must await the verify result to ensure the verification task is
523            // registered before calling certify.
524            let shard_validity = marshaled
525                .verify(invalid_reproposal_context, non_boundary_commitment)
526                .await
527                .await;
528            assert!(
529                !shard_validity.unwrap(),
530                "Invalid re-proposal verify should return false"
531            );
532
533            // Use certify to get the actual deferred_verify result
534            let certify_result = marshaled
535                .certify(invalid_reproposal_round, non_boundary_commitment)
536                .await
537                .await;
538            assert!(
539                !certify_result.unwrap(),
540                "Invalid re-proposal (not at epoch boundary) should be rejected"
541            );
542
543            // Test 3: Re-proposal with mismatched epoch should be rejected
544            // This is a regression test - re-proposals must be in the same epoch as the block.
545            let cross_epoch_reproposal_round = Round::new(Epoch::new(1), View::new(20));
546            let cross_epoch_reproposal_context = CodingCtx {
547                round: cross_epoch_reproposal_round,
548                leader: me.clone(),
549                parent: (View::new(boundary_height.get()), boundary_commitment),
550            };
551
552            // Call verify to kick off deferred verification.
553            // We must await the verify result to ensure the verification task is
554            // registered before calling certify.
555            let shard_validity = marshaled
556                .verify(cross_epoch_reproposal_context.clone(), boundary_commitment)
557                .await
558                .await;
559            assert!(
560                !shard_validity.unwrap(),
561                "Cross-epoch re-proposal verify should return false"
562            );
563
564            // Use certify to get the actual deferred_verify result
565            let certify_result = marshaled
566                .certify(cross_epoch_reproposal_round, boundary_commitment)
567                .await
568                .await;
569            assert!(
570                !certify_result.unwrap(),
571                "Re-proposal with mismatched epoch should be rejected"
572            );
573
574            // Note: Tests for certify-only paths (crash recovery scenarios) are not included here
575            // because they require multiple validators to reconstruct blocks from shards. In a
576            // single-validator test setup, block reconstruction fails due to insufficient shards.
577            // These paths are tested in integration tests with multiple validators.
578        })
579    }
580
581    #[test_traced("WARN")]
582    fn test_marshaled_rejects_mismatched_context_digest() {
583        let runner = deterministic::Runner::timed(Duration::from_secs(30));
584        runner.start(|mut context| async move {
585            let Fixture {
586                participants,
587                schemes,
588                ..
589            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
590            let mut oracle =
591                setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
592                    .await;
593
594            let me = participants[0].clone();
595            let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
596
597            let setup = CodingHarness::setup_validator(
598                context.with_label("validator_0"),
599                &mut oracle,
600                me.clone(),
601                ConstantProvider::new(schemes[0].clone()),
602            )
603            .await;
604            let marshal = setup.mailbox;
605            let shards = setup.extra;
606
607            let genesis_ctx = CodingCtx {
608                round: Round::zero(),
609                leader: default_leader(),
610                parent: (View::zero(), genesis_commitment()),
611            };
612            let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
613
614            let mock_app: MockVerifyingApp<CodingB, S> = MockVerifyingApp::new(genesis.clone());
615            let cfg = MarshaledConfig {
616                application: mock_app,
617                marshal: marshal.clone(),
618                shards: shards.clone(),
619                scheme_provider: ConstantProvider::new(schemes[0].clone()),
620                epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
621                strategy: Sequential,
622            };
623            let mut marshaled = Marshaled::new(context.clone(), cfg);
624
625            // Create parent block at height 1 so the commitment is well-formed.
626            let parent_ctx = CodingCtx {
627                round: Round::new(Epoch::zero(), View::new(1)),
628                leader: default_leader(),
629                parent: (View::zero(), genesis_commitment()),
630            };
631            let parent = make_coding_block(parent_ctx, genesis.digest(), Height::new(1), 100);
632            let coded_parent = CodedBlock::new(parent.clone(), coding_config, &Sequential);
633            let parent_commitment = coded_parent.commitment();
634            shards
635                .clone()
636                .proposed(Round::new(Epoch::zero(), View::new(1)), coded_parent)
637                .await;
638
639            // Build a block with context A (commitment hash uses this context).
640            let round_a = Round::new(Epoch::zero(), View::new(2));
641            let context_a = CodingCtx {
642                round: round_a,
643                leader: me.clone(),
644                parent: (View::new(1), parent_commitment),
645            };
646            let block_a = make_coding_block(context_a, parent.digest(), Height::new(2), 200);
647            let coded_block_a: CodedBlock<_, ReedSolomon<Sha256>, Sha256> =
648                CodedBlock::new(block_a, coding_config, &Sequential);
649            let commitment_a = coded_block_a.commitment();
650
651            // Verify using a different consensus context B (hash mismatch).
652            let round_b = Round::new(Epoch::zero(), View::new(3));
653            let context_b = CodingCtx {
654                round: round_b,
655                leader: participants[1].clone(),
656                parent: (View::new(1), parent_commitment),
657            };
658
659            let verify_rx = marshaled.verify(context_b, commitment_a).await;
660            select! {
661                result = verify_rx => {
662                    assert!(
663                        !result.unwrap(),
664                        "mismatched context digest should be rejected"
665                    );
666                },
667                _ = context.sleep(Duration::from_secs(5)) => {
668                    panic!("verify should reject mismatched context digest promptly");
669                },
670            }
671        })
672    }
673
674    #[test_traced("WARN")]
675    fn test_reproposal_verify_receiver_drop_does_not_synthesize_false() {
676        let runner = deterministic::Runner::timed(Duration::from_secs(30));
677        runner.start(|mut context| async move {
678            let Fixture {
679                participants,
680                schemes,
681                ..
682            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
683            let mut oracle = setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone()).await;
684
685            let me = participants[0].clone();
686            let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
687
688            let setup = CodingHarness::setup_validator(
689                context.with_label("validator_0"),
690                &mut oracle,
691                me.clone(),
692                ConstantProvider::new(schemes[0].clone()),
693            )
694            .await;
695            let marshal = setup.mailbox;
696            let shards = setup.extra;
697
698            let genesis_ctx = CodingCtx {
699                round: Round::zero(),
700                leader: default_leader(),
701                parent: (View::zero(), genesis_commitment()),
702            };
703            let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
704
705            let mock_app: MockVerifyingApp<CodingB, S> = MockVerifyingApp::new(genesis.clone());
706            let cfg = MarshaledConfig {
707                application: mock_app,
708                marshal: marshal.clone(),
709                shards: shards.clone(),
710                scheme_provider: ConstantProvider::new(schemes[0].clone()),
711                epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
712                strategy: Sequential,
713            };
714            let mut marshaled = Marshaled::new(context.clone(), cfg);
715
716            // Re-proposal payload with valid coding config, but no block available.
717            let missing_payload = Commitment::from((
718                Sha256::hash(b"missing_block"),
719                Sha256::hash(b"missing_root"),
720                Sha256::hash(b"missing_context"),
721                coding_config,
722            ));
723            let round = Round::new(Epoch::zero(), View::new(1));
724            let reproposal_context = CodingCtx {
725                round,
726                leader: me,
727                parent: (View::zero(), missing_payload),
728            };
729
730            // Start verify, then drop the receiver immediately.
731            let verify_rx = marshaled.verify(reproposal_context, missing_payload).await;
732            drop(verify_rx);
733
734            // Certify should resolve promptly from the in-progress task, but must
735            // not synthesize `false` when verification was canceled before a verdict.
736            let certify_rx = marshaled.certify(round, missing_payload).await;
737            select! {
738                result = certify_rx => {
739                    assert!(
740                        result.is_err(),
741                        "certify should resolve without an explicit verdict when verify receiver is dropped"
742                    );
743                },
744                _ = context.sleep(Duration::from_secs(5)) => {
745                    panic!("certify task should resolve promptly after verify receiver drop");
746                },
747            }
748        })
749    }
750
751    #[test_traced("WARN")]
752    fn test_reproposal_missing_block_does_not_synthesize_false() {
753        let runner = deterministic::Runner::timed(Duration::from_secs(30));
754        runner.start(|mut context| async move {
755            let Fixture {
756                participants,
757                schemes,
758                ..
759            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
760            let mut oracle = setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone()).await;
761
762            let me = participants[0].clone();
763            let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
764
765            let setup = CodingHarness::setup_validator(
766                context.with_label("validator_0"),
767                &mut oracle,
768                me.clone(),
769                ConstantProvider::new(schemes[0].clone()),
770            )
771            .await;
772            let marshal = setup.mailbox;
773            let shards = setup.extra;
774
775            let genesis_ctx = CodingCtx {
776                round: Round::zero(),
777                leader: default_leader(),
778                parent: (View::zero(), genesis_commitment()),
779            };
780            let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
781
782            let mock_app: MockVerifyingApp<CodingB, S> = MockVerifyingApp::new(genesis.clone());
783            let cfg = MarshaledConfig {
784                application: mock_app,
785                marshal: marshal.clone(),
786                shards: shards.clone(),
787                scheme_provider: ConstantProvider::new(schemes[0].clone()),
788                epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
789                strategy: Sequential,
790            };
791            let mut marshaled = Marshaled::new(context.clone(), cfg);
792
793            // Re-proposal payload with valid coding config, but no block available.
794            let missing_payload = Commitment::from((
795                Sha256::hash(b"missing_block"),
796                Sha256::hash(b"missing_root"),
797                Sha256::hash(b"missing_context"),
798                coding_config,
799            ));
800            let round = Round::new(Epoch::zero(), View::new(1));
801            let reproposal_context = CodingCtx {
802                round,
803                leader: me,
804                parent: (View::zero(), missing_payload),
805            };
806
807            // Verify must not synthesize `false` when the block cannot be fetched.
808            let verify_rx = marshaled.verify(reproposal_context, missing_payload).await;
809
810            // Ensure the verification task has registered its subscription, then
811            // force cancellation by pruning the missing commitment.
812            context.sleep(Duration::from_millis(100)).await;
813            shards.prune(missing_payload).await;
814
815            select! {
816                result = verify_rx => {
817                    assert!(
818                        result.is_err(),
819                        "verify should resolve without explicit false when re-proposal block is unavailable"
820                    );
821                },
822                _ = context.sleep(Duration::from_secs(5)) => {
823                    panic!("verify should resolve promptly when re-proposal block is unavailable");
824                },
825            }
826
827            // Certify should consume the same unresolved verification task.
828            let certify_rx = marshaled.certify(round, missing_payload).await;
829            select! {
830                result = certify_rx => {
831                    assert!(
832                        result.is_err(),
833                        "certify should resolve without explicit false when re-proposal block is unavailable"
834                    );
835                },
836                _ = context.sleep(Duration::from_secs(5)) => {
837                    panic!("certify should resolve promptly when re-proposal block is unavailable");
838                },
839            }
840        })
841    }
842
843    #[test_traced("WARN")]
844    fn test_core_subscription_closes_when_coding_buffer_prunes_missing_commitment() {
845        let runner = deterministic::Runner::timed(Duration::from_secs(30));
846        runner.start(|mut context| async move {
847            let Fixture {
848                participants,
849                schemes,
850                ..
851            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
852            let mut oracle =
853                setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
854                    .await;
855
856            let setup = CodingHarness::setup_validator(
857                context.with_label("validator_0"),
858                &mut oracle,
859                participants[0].clone(),
860                ConstantProvider::new(schemes[0].clone()),
861            )
862            .await;
863            let marshal = setup.mailbox;
864            let shards = setup.extra;
865
866            let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
867            let missing_commitment = Commitment::from((
868                Sha256::hash(b"missing_block"),
869                Sha256::hash(b"missing_root"),
870                Sha256::hash(b"missing_context"),
871                coding_config,
872            ));
873            let round = Round::new(Epoch::zero(), View::new(1));
874
875            // Subscribe through the core actor. This internally subscribes to the
876            // coding shard buffer and registers local waiters.
877            let block_rx = marshal
878                .subscribe_by_commitment(Some(round), missing_commitment)
879                .await;
880
881            // Allow core actor to register the underlying buffer subscription.
882            context.sleep(Duration::from_millis(100)).await;
883
884            // Prune the missing commitment in the shard engine, which should cancel
885            // the underlying buffer subscription.
886            shards.prune(missing_commitment).await;
887
888            // The core actor must surface cancellation by closing the subscription,
889            // not by panicking or leaving the waiter parked indefinitely.
890            select! {
891                result = block_rx => {
892                    assert!(
893                        result.is_err(),
894                        "core subscription should close when coding buffer drops subscription"
895                    );
896                },
897                _ = context.sleep(Duration::from_secs(5)) => {
898                    panic!("core subscription should resolve promptly after coding prune");
899                },
900            }
901        })
902    }
903
904    #[test_traced("WARN")]
905    fn test_marshaled_rejects_unsupported_epoch() {
906        #[derive(Clone)]
907        struct LimitedEpocher {
908            inner: FixedEpocher,
909            max_epoch: u64,
910        }
911
912        impl Epocher for LimitedEpocher {
913            fn containing(&self, height: Height) -> Option<crate::types::EpochInfo> {
914                let bounds = self.inner.containing(height)?;
915                if bounds.epoch().get() > self.max_epoch {
916                    None
917                } else {
918                    Some(bounds)
919                }
920            }
921
922            fn first(&self, epoch: Epoch) -> Option<Height> {
923                if epoch.get() > self.max_epoch {
924                    None
925                } else {
926                    self.inner.first(epoch)
927                }
928            }
929
930            fn last(&self, epoch: Epoch) -> Option<Height> {
931                if epoch.get() > self.max_epoch {
932                    None
933                } else {
934                    self.inner.last(epoch)
935                }
936            }
937        }
938
939        let runner = deterministic::Runner::timed(Duration::from_secs(60));
940        runner.start(|mut context| async move {
941            let Fixture {
942                participants,
943                schemes,
944                ..
945            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
946            let mut oracle =
947                setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
948                    .await;
949
950            let me = participants[0].clone();
951            let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
952
953            let setup = CodingHarness::setup_validator(
954                context.with_label("validator_0"),
955                &mut oracle,
956                me.clone(),
957                ConstantProvider::new(schemes[0].clone()),
958            )
959            .await;
960            let marshal = setup.mailbox;
961            let shards = setup.extra;
962
963            let genesis_ctx = CodingCtx {
964                round: Round::zero(),
965                leader: default_leader(),
966                parent: (View::zero(), genesis_commitment()),
967            };
968            let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
969
970            let mock_app: MockVerifyingApp<CodingB, S> = MockVerifyingApp::new(genesis.clone());
971            let limited_epocher = LimitedEpocher {
972                inner: FixedEpocher::new(BLOCKS_PER_EPOCH),
973                max_epoch: 0,
974            };
975            let cfg = MarshaledConfig {
976                application: mock_app,
977                marshal: marshal.clone(),
978                shards: shards.clone(),
979                scheme_provider: ConstantProvider::new(schemes[0].clone()),
980                epocher: limited_epocher,
981                strategy: Sequential,
982            };
983            let mut marshaled = Marshaled::new(context.clone(), cfg);
984
985            // Create a parent block at height 19 (last block in epoch 0, which is supported)
986            let parent_ctx = CodingCtx {
987                round: Round::new(Epoch::zero(), View::new(19)),
988                leader: default_leader(),
989                parent: (View::zero(), genesis_commitment()),
990            };
991            let parent = make_coding_block(parent_ctx, genesis.digest(), Height::new(19), 1000);
992            let parent_digest = parent.digest();
993            let coded_parent = CodedBlock::new(parent.clone(), coding_config, &Sequential);
994            let parent_commitment = coded_parent.commitment();
995            shards
996                .clone()
997                .proposed(Round::new(Epoch::zero(), View::new(19)), coded_parent)
998                .await;
999
1000            // Create a block at height 20 (first block in epoch 1, which is NOT supported)
1001            let block_ctx = CodingCtx {
1002                round: Round::new(Epoch::new(1), View::new(20)),
1003                leader: default_leader(),
1004                parent: (View::new(19), parent_commitment),
1005            };
1006            let block = make_coding_block(block_ctx, parent_digest, Height::new(20), 2000);
1007            let coded_block = CodedBlock::new(block.clone(), coding_config, &Sequential);
1008            let block_commitment = coded_block.commitment();
1009            shards
1010                .clone()
1011                .proposed(Round::new(Epoch::new(1), View::new(20)), coded_block)
1012                .await;
1013
1014            context.sleep(Duration::from_millis(10)).await;
1015
1016            // In the coding marshal, verify() returns shard validity while deferred_verify
1017            // runs in the background. We need to use certify() to get the deferred_verify result.
1018            let unsupported_round = Round::new(Epoch::new(1), View::new(20));
1019            let unsupported_context = CodingCtx {
1020                round: unsupported_round,
1021                leader: me.clone(),
1022                parent: (View::new(19), parent_commitment),
1023            };
1024
1025            // Call verify to kick off deferred verification
1026            let _shard_validity = marshaled
1027                .verify(unsupported_context, block_commitment)
1028                .await;
1029
1030            // Use certify to get the actual deferred_verify result
1031            let certify_result = marshaled
1032                .certify(unsupported_round, block_commitment)
1033                .await
1034                .await;
1035
1036            assert!(
1037                !certify_result.unwrap(),
1038                "Block in unsupported epoch should be rejected"
1039            );
1040        })
1041    }
1042
1043    #[test_traced("WARN")]
1044    fn test_marshaled_rejects_invalid_ancestry() {
1045        let runner = deterministic::Runner::timed(Duration::from_secs(60));
1046        runner.start(|mut context| async move {
1047            let Fixture {
1048                participants,
1049                schemes,
1050                ..
1051            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1052            let mut oracle =
1053                setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
1054                    .await;
1055
1056            let me = participants[0].clone();
1057            let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
1058
1059            let setup = CodingHarness::setup_validator(
1060                context.with_label("validator_0"),
1061                &mut oracle,
1062                me.clone(),
1063                ConstantProvider::new(schemes[0].clone()),
1064            )
1065            .await;
1066            let marshal = setup.mailbox;
1067            let shards = setup.extra;
1068
1069            // Create genesis block
1070            let genesis_ctx = CodingCtx {
1071                round: Round::zero(),
1072                leader: default_leader(),
1073                parent: (View::zero(), genesis_commitment()),
1074            };
1075            let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
1076
1077            // Wrap with Marshaled verifier
1078            let mock_app: MockVerifyingApp<CodingB, S> = MockVerifyingApp::new(genesis.clone());
1079            let cfg = MarshaledConfig {
1080                application: mock_app,
1081                marshal: marshal.clone(),
1082                shards: shards.clone(),
1083                scheme_provider: ConstantProvider::new(schemes[0].clone()),
1084                epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
1085                strategy: Sequential,
1086            };
1087            let mut marshaled = Marshaled::new(context.clone(), cfg);
1088
1089            // Test case 1: Non-contiguous height
1090            //
1091            // We need both blocks in the same epoch.
1092            // With BLOCKS_PER_EPOCH=20: epoch 0 is heights 0-19, epoch 1 is heights 20-39
1093            //
1094            // Store honest parent at height 21 (epoch 1)
1095            let honest_parent_ctx = CodingCtx {
1096                round: Round::new(Epoch::new(1), View::new(21)),
1097                leader: default_leader(),
1098                parent: (View::zero(), genesis_commitment()),
1099            };
1100            let honest_parent = make_coding_block(
1101                honest_parent_ctx,
1102                genesis.digest(),
1103                Height::new(BLOCKS_PER_EPOCH.get() + 1),
1104                1000,
1105            );
1106            let parent_digest = honest_parent.digest();
1107            let coded_parent = CodedBlock::new(honest_parent.clone(), coding_config, &Sequential);
1108            let parent_commitment = coded_parent.commitment();
1109            shards
1110                .clone()
1111                .proposed(Round::new(Epoch::new(1), View::new(21)), coded_parent)
1112                .await;
1113
1114            // Byzantine proposer broadcasts malicious block at height 35
1115            // The block has the correct context (matching what consensus will provide)
1116            // but contains invalid content (non-contiguous height: 21 -> 35 instead of 21 -> 22)
1117            let byzantine_round = Round::new(Epoch::new(1), View::new(35));
1118            let byzantine_context = CodingCtx {
1119                round: byzantine_round,
1120                leader: me.clone(),
1121                parent: (View::new(21), parent_commitment), // Consensus says parent is at height 21
1122            };
1123            let malicious_block = make_coding_block(
1124                byzantine_context.clone(),
1125                parent_digest,
1126                Height::new(BLOCKS_PER_EPOCH.get() + 15), // Byzantine: non-contiguous height
1127                2000,
1128            );
1129            let coded_malicious =
1130                CodedBlock::new(malicious_block.clone(), coding_config, &Sequential);
1131            let malicious_commitment = coded_malicious.commitment();
1132            shards
1133                .clone()
1134                .proposed(byzantine_round, coded_malicious)
1135                .await;
1136
1137            // Small delay to ensure broadcast is processed
1138            context.sleep(Duration::from_millis(10)).await;
1139
1140            // Marshaled.verify() kicks off deferred verification in the background.
1141            // The Marshaled verifier will:
1142            // 1. Fetch honest_parent (height 21) from marshal based on context.parent
1143            // 2. Fetch malicious_block (height 35) from marshal based on digest
1144            // 3. Validate height is contiguous (fail)
1145            // 4. Return false
1146            let _shard_validity = marshaled
1147                .verify(byzantine_context, malicious_commitment)
1148                .await;
1149
1150            // Use certify to get the actual deferred_verify result
1151            let certify_result = marshaled
1152                .certify(byzantine_round, malicious_commitment)
1153                .await
1154                .await;
1155
1156            assert!(
1157                !certify_result.unwrap(),
1158                "Byzantine block with non-contiguous heights should be rejected"
1159            );
1160
1161            // Test case 2: Mismatched parent digest
1162            //
1163            // Create another malicious block with correct context and height
1164            // but referencing the wrong parent digest (genesis instead of honest_parent)
1165            let byzantine_round2 = Round::new(Epoch::new(1), View::new(22));
1166            let byzantine_context2 = CodingCtx {
1167                round: byzantine_round2,
1168                leader: me.clone(),
1169                parent: (View::new(21), parent_commitment), // Consensus says parent is at height 21
1170            };
1171            let malicious_block2 = make_coding_block(
1172                byzantine_context2.clone(),
1173                genesis.digest(), // Byzantine: wrong parent digest
1174                Height::new(BLOCKS_PER_EPOCH.get() + 2),
1175                3000,
1176            );
1177            let coded_malicious2 =
1178                CodedBlock::new(malicious_block2.clone(), coding_config, &Sequential);
1179            let malicious_commitment2 = coded_malicious2.commitment();
1180            shards
1181                .clone()
1182                .proposed(byzantine_round2, coded_malicious2)
1183                .await;
1184
1185            // Small delay to ensure broadcast is processed
1186            context.sleep(Duration::from_millis(10)).await;
1187
1188            // Marshaled.verify() kicks off deferred verification in the background.
1189            // The Marshaled verifier will:
1190            // 1. Fetch honest_parent (height 21) from marshal based on context.parent
1191            // 2. Fetch malicious_block (height 22) from marshal based on digest
1192            // 3. Validate height is contiguous
1193            // 4. Validate parent commitment matches (fail)
1194            // 5. Return false
1195            let _shard_validity = marshaled
1196                .verify(byzantine_context2, malicious_commitment2)
1197                .await;
1198
1199            // Use certify to get the actual deferred_verify result
1200            let certify_result = marshaled
1201                .certify(byzantine_round2, malicious_commitment2)
1202                .await
1203                .await;
1204
1205            assert!(
1206                !certify_result.unwrap(),
1207                "Byzantine block with mismatched parent commitment should be rejected"
1208            );
1209        })
1210    }
1211
1212    #[test_traced("WARN")]
1213    fn test_certify_without_prior_verify_crash_recovery() {
1214        // After a crash, consensus may call certify() without a prior verify().
1215        // The certify path (marshaled.rs:842-936) should:
1216        //   1. Find no in-progress verification task
1217        //   2. Subscribe to the block from the shard engine
1218        //   3. Use the block's embedded context for deferred_verify
1219        //   4. Return Ok(true) for a valid block
1220        let runner = deterministic::Runner::timed(Duration::from_secs(30));
1221        runner.start(|mut context| async move {
1222            let Fixture {
1223                participants,
1224                schemes,
1225                ..
1226            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1227            let mut oracle =
1228                setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
1229                    .await;
1230
1231            let me = participants[0].clone();
1232            let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
1233
1234            let setup = CodingHarness::setup_validator(
1235                context.with_label("validator_0"),
1236                &mut oracle,
1237                me.clone(),
1238                ConstantProvider::new(schemes[0].clone()),
1239            )
1240            .await;
1241            let marshal = setup.mailbox;
1242            let shards = setup.extra;
1243
1244            let genesis_ctx = CodingCtx {
1245                round: Round::zero(),
1246                leader: default_leader(),
1247                parent: (View::zero(), genesis_commitment()),
1248            };
1249            let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
1250
1251            let mock_app: MockVerifyingApp<CodingB, S> = MockVerifyingApp::new(genesis.clone());
1252            let cfg = MarshaledConfig {
1253                application: mock_app,
1254                marshal: marshal.clone(),
1255                shards: shards.clone(),
1256                scheme_provider: ConstantProvider::new(schemes[0].clone()),
1257                epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
1258                strategy: Sequential,
1259            };
1260            let mut marshaled = Marshaled::new(context.clone(), cfg);
1261
1262            // Create parent at height 1.
1263            let parent_round = Round::new(Epoch::zero(), View::new(1));
1264            let parent_ctx = CodingCtx {
1265                round: parent_round,
1266                leader: default_leader(),
1267                parent: (View::zero(), genesis_commitment()),
1268            };
1269            let parent = make_coding_block(parent_ctx, genesis.digest(), Height::new(1), 100);
1270            let coded_parent = CodedBlock::new(parent.clone(), coding_config, &Sequential);
1271            let parent_commitment = coded_parent.commitment();
1272            shards.clone().proposed(parent_round, coded_parent).await;
1273
1274            // Create child at height 2.
1275            let child_round = Round::new(Epoch::zero(), View::new(2));
1276            let child_ctx = CodingCtx {
1277                round: child_round,
1278                leader: me.clone(),
1279                parent: (View::new(1), parent_commitment),
1280            };
1281            let child = make_coding_block(child_ctx, parent.digest(), Height::new(2), 200);
1282            let coded_child = CodedBlock::new(child, coding_config, &Sequential);
1283            let child_commitment = coded_child.commitment();
1284            shards.clone().proposed(child_round, coded_child).await;
1285
1286            context.sleep(Duration::from_millis(10)).await;
1287
1288            // Call certify directly without any prior verify (simulating crash recovery).
1289            let certify_rx = marshaled.certify(child_round, child_commitment).await;
1290            select! {
1291                result = certify_rx => {
1292                    assert!(
1293                        result.unwrap(),
1294                        "certify without prior verify should succeed for valid block"
1295                    );
1296                },
1297                _ = context.sleep(Duration::from_secs(5)) => {
1298                    panic!("certify should complete within timeout");
1299                },
1300            }
1301        })
1302    }
1303
1304    /// Regression test: a Byzantine leader must not be able to crash honest nodes
1305    /// by proposing a `Commitment` with invalid `CodingConfig` bytes (e.g.
1306    /// zero-valued `NonZeroU16` fields). The fix validates the embedded config
1307    /// during deserialization so malformed commitments are rejected at the codec
1308    /// level before reaching `verify()`.
1309    #[test_traced("WARN")]
1310    fn test_malformed_commitment_config_rejected_at_deserialization() {
1311        use commonware_codec::{Encode, ReadExt};
1312
1313        // Construct a Commitment with all-zero bytes (invalid CodingConfig:
1314        // minimum_shards=0, extra_shards=0). Serialize it and attempt to
1315        // deserialize -- this must fail.
1316        let malformed_bytes = [0u8; Commitment::SIZE];
1317        let result = Commitment::read(&mut &malformed_bytes[..]);
1318        assert!(
1319            result.is_err(),
1320            "deserialization of Commitment with zeroed CodingConfig must fail"
1321        );
1322
1323        // A validly-constructed Commitment must still round-trip.
1324        let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
1325        let valid = Commitment::from((
1326            Sha256::hash(b"block"),
1327            Sha256::hash(b"root"),
1328            Sha256::hash(b"context"),
1329            coding_config,
1330        ));
1331        let encoded = valid.encode();
1332        let decoded =
1333            Commitment::read(&mut &encoded[..]).expect("valid Commitment must deserialize");
1334        assert_eq!(valid, decoded);
1335    }
1336
1337    #[test_traced("WARN")]
1338    fn test_certify_propagates_application_verify_failure() {
1339        let runner = deterministic::Runner::timed(Duration::from_secs(30));
1340        runner.start(|mut context| async move {
1341            // 1) Set up a single validator marshal stack.
1342            let Fixture {
1343                participants,
1344                schemes,
1345                ..
1346            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1347            let mut oracle =
1348                setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
1349                    .await;
1350
1351            let me = participants[0].clone();
1352            let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
1353
1354            let setup = CodingHarness::setup_validator(
1355                context.with_label("validator_0"),
1356                &mut oracle,
1357                me.clone(),
1358                ConstantProvider::new(schemes[0].clone()),
1359            )
1360            .await;
1361            let marshal = setup.mailbox;
1362            let shards = setup.extra;
1363
1364            let genesis_ctx = CodingCtx {
1365                round: Round::zero(),
1366                leader: default_leader(),
1367                parent: (View::zero(), genesis_commitment()),
1368            };
1369            let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
1370            // 2) Force application verification to fail in deferred verification.
1371            let mock_app: MockVerifyingApp<CodingB, S> =
1372                MockVerifyingApp::with_verify_result(genesis.clone(), false);
1373
1374            let cfg = MarshaledConfig {
1375                application: mock_app,
1376                marshal: marshal.clone(),
1377                shards: shards.clone(),
1378                scheme_provider: ConstantProvider::new(schemes[0].clone()),
1379                epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
1380                strategy: Sequential,
1381            };
1382            let mut marshaled = Marshaled::new(context.clone(), cfg);
1383
1384            let parent_round = Round::new(Epoch::zero(), View::new(1));
1385            let parent_context = CodingCtx {
1386                round: parent_round,
1387                leader: me.clone(),
1388                parent: (View::zero(), genesis_commitment()),
1389            };
1390            let parent = make_coding_block(parent_context, genesis.digest(), Height::new(1), 100);
1391            let coded_parent = CodedBlock::new(parent.clone(), coding_config, &Sequential);
1392            let parent_commitment = coded_parent.commitment();
1393            shards.clone().proposed(parent_round, coded_parent).await;
1394
1395            // 3) Publish a valid child so optimistic verify can succeed.
1396            let round = Round::new(Epoch::zero(), View::new(2));
1397            let verify_context = CodingCtx {
1398                round,
1399                leader: me,
1400                parent: (View::new(1), parent_commitment),
1401            };
1402            let block =
1403                make_coding_block(verify_context.clone(), parent.digest(), Height::new(2), 200);
1404            let coded_block = CodedBlock::new(block, coding_config, &Sequential);
1405            let commitment = coded_block.commitment();
1406            shards.clone().proposed(round, coded_block).await;
1407
1408            context.sleep(Duration::from_millis(10)).await;
1409
1410            let optimistic = marshaled.verify(verify_context, commitment).await;
1411            assert!(
1412                optimistic.await.expect("verify result missing"),
1413                "optimistic verify should pass pre-checks and schedule deferred verification"
1414            );
1415
1416            // 4) Certify must observe the deferred application failure and return false.
1417            let certify = marshaled.certify(round, commitment).await;
1418            assert!(
1419                !certify.await.expect("certify result missing"),
1420                "certify should propagate deferred application verify failure"
1421            );
1422        })
1423    }
1424
1425    #[test_traced("WARN")]
1426    fn test_backfill_block_mismatched_commitment() {
1427        // Regression: when backfilling by Request::Block(digest), a peer may return
1428        // a coded block with matching inner digest but a different coding commitment.
1429        // If a finalization for this digest is already cached, marshal must reject
1430        // the block unless V::commitment(block) matches the finalization payload.
1431        let runner = deterministic::Runner::timed(Duration::from_secs(30));
1432        runner.start(|mut context| async move {
1433            let Fixture {
1434                participants,
1435                schemes,
1436                ..
1437            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1438            let mut oracle = setup_network_with_participants(
1439                context.clone(),
1440                NZUsize!(1),
1441                participants[..2].iter().cloned(),
1442            )
1443            .await;
1444
1445            let coding_config_a = coding_config_for_participants(NUM_VALIDATORS as u16);
1446            // Same total shards (4) but different min/extra split produces a different
1447            // coding root and config bytes, yielding a different commitment.
1448            let coding_config_b = commonware_coding::Config {
1449                minimum_shards: coding_config_a.minimum_shards.checked_add(1).unwrap(),
1450                extra_shards: NZU16!(coding_config_a.extra_shards.get() - 1),
1451            };
1452
1453            let v0_setup = CodingHarness::setup_validator(
1454                context.with_label("validator_0"),
1455                &mut oracle,
1456                participants[0].clone(),
1457                ConstantProvider::new(schemes[0].clone()),
1458            )
1459            .await;
1460            let v1_setup = CodingHarness::setup_validator(
1461                context.with_label("validator_1"),
1462                &mut oracle,
1463                participants[1].clone(),
1464                ConstantProvider::new(schemes[1].clone()),
1465            )
1466            .await;
1467
1468            setup_network_links(&mut oracle, &participants[..2], LINK).await;
1469
1470            let mut v0_mailbox = v0_setup.mailbox;
1471            let v1_mailbox = v1_setup.mailbox;
1472
1473            let genesis_ctx = CodingCtx {
1474                round: Round::zero(),
1475                leader: default_leader(),
1476                parent: (View::zero(), genesis_commitment()),
1477            };
1478            let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
1479
1480            let round1 = Round::new(Epoch::zero(), View::new(1));
1481            let block1_ctx = CodingCtx {
1482                round: round1,
1483                leader: participants[0].clone(),
1484                parent: (View::zero(), genesis_commitment()),
1485            };
1486            let block1 = make_coding_block(block1_ctx, genesis.digest(), Height::new(1), 100);
1487
1488            let coded_block_a: CodedBlock<_, ReedSolomon<Sha256>, Sha256> =
1489                CodedBlock::new(block1.clone(), coding_config_a, &Sequential);
1490            let commitment_a = coded_block_a.commitment();
1491
1492            let coded_block_b: CodedBlock<_, ReedSolomon<Sha256>, Sha256> =
1493                CodedBlock::new(block1.clone(), coding_config_b, &Sequential);
1494            let commitment_b = coded_block_b.commitment();
1495
1496            assert_eq!(coded_block_a.digest(), coded_block_b.digest());
1497            assert_ne!(commitment_a, commitment_b);
1498
1499            // Validator 1 proposes coded_block_b (same inner block, different coding).
1500            // This stores it in v1's shard engine and actor cache.
1501            v1_mailbox.proposed(round1, coded_block_b.clone()).await;
1502            context.sleep(Duration::from_millis(100)).await;
1503
1504            // Create finalization referencing commitment_a (the "correct" commitment).
1505            let proposal: Proposal<Commitment> = Proposal {
1506                round: round1,
1507                parent: View::zero(),
1508                payload: commitment_a,
1509            };
1510            let finalization = CodingHarness::make_finalization(proposal.clone(), &schemes, QUORUM);
1511
1512            // Report finalization to v0. v0 doesn't have the block:
1513            //   - it fetches Request::Block(digest)
1514            //   - v1 responds with coded_block_b (same digest, wrong commitment)
1515            //   - finalization lookup is digest-indexed, so deliver path must still
1516            //     reject because cached finalization expects commitment_a
1517            CodingHarness::report_finalization(&mut v0_mailbox, finalization).await;
1518
1519            // Wait for the fetch cycle to complete.
1520            context.sleep(Duration::from_secs(5)).await;
1521
1522            // The mismatched block must not be stored.
1523            let stored = v0_mailbox.get_block(Height::new(1)).await;
1524            assert!(
1525                stored.is_none(),
1526                "v0 should reject backfilled block with mismatched commitment"
1527            );
1528
1529            // Without the block, finalization should not be persisted by height yet.
1530            let stored_finalization = v0_mailbox.get_finalization(Height::new(1)).await;
1531            assert!(
1532                stored_finalization.is_none(),
1533                "finalization should not be archived until matching block is available"
1534            );
1535        })
1536    }
1537
1538    /// When the scheme provider has no entry for the current epoch,
1539    /// `Marshaled::propose` and `Marshaled::verify` must return a dropped
1540    /// receiver (the consensus engine treats `RecvError` as "abstain").
1541    #[test_traced("WARN")]
1542    fn test_marshaled_missing_scheme_skips_propose_and_verify() {
1543        let runner = deterministic::Runner::timed(Duration::from_secs(30));
1544        runner.start(|mut context| async move {
1545            let Fixture {
1546                participants,
1547                schemes,
1548                ..
1549            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1550            let mut oracle =
1551                setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
1552                    .await;
1553
1554            let me = participants[0].clone();
1555
1556            let setup = CodingHarness::setup_validator(
1557                context.with_label("validator_0"),
1558                &mut oracle,
1559                me.clone(),
1560                ConstantProvider::new(schemes[0].clone()),
1561            )
1562            .await;
1563
1564            let genesis_ctx = CodingCtx {
1565                round: Round::zero(),
1566                leader: default_leader(),
1567                parent: (View::zero(), genesis_commitment()),
1568            };
1569            let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
1570
1571            let mock_app: MockVerifyingApp<CodingB, S> = MockVerifyingApp::new(genesis);
1572
1573            let cfg = MarshaledConfig {
1574                application: mock_app,
1575                marshal: setup.mailbox,
1576                shards: setup.extra,
1577                scheme_provider: EmptyProvider,
1578                epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
1579                strategy: Sequential,
1580            };
1581            let mut marshaled = Marshaled::new(context.clone(), cfg);
1582
1583            let ctx = CodingCtx {
1584                round: Round::new(Epoch::zero(), View::new(1)),
1585                leader: me.clone(),
1586                parent: (View::zero(), genesis_commitment()),
1587            };
1588
1589            // propose with a missing scheme returns a dropped sender
1590            let rx = marshaled.propose(ctx.clone()).await;
1591            assert!(rx.await.is_err());
1592
1593            // verify with a missing scheme returns a dropped sender
1594            let rx = marshaled.verify(ctx, genesis_commitment()).await;
1595            assert!(rx.await.is_err());
1596        });
1597    }
1598}