net-mesh 0.27.3

High-performance, schema-agnostic, backend-agnostic event bus
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
//! Reed-Solomon erasure-coding primitives for the v0.3 Phase C
//! blob store path.
//!
//! v0.3 Phase A + B store every chunk in full (the
//! [`Encoding::Replicated`]
//! path); chunk-level redundancy comes from cross-node replication.
//! Phase C adds the
//! [`Encoding::ReedSolomon { k, m }`](super::blob_ref::Encoding::ReedSolomon)
//! path: each stripe of `k` data chunks gets `m` parity chunks
//! computed via systematic Reed-Solomon over `GF(2^8)`, so the
//! stripe survives any `m` chunk losses (data OR parity) and only
//! pays `(k + m) / k` storage overhead — e.g. `(10, 4)` costs 1.4×
//! storage vs 2× for two-copy replication, with the same tolerance
//! to any 4 chunk losses per stripe.
//!
//! # Scope of this module (Phase C1)
//!
//! Phase C ships in several commits; C1 lands the foundational
//! primitives:
//!
//! - The [`RsParams`] `(k, m)` value type with hard validation
//!   (rejects `k == 0`, `m == 0`, `k + m > 255`) + a soft warning
//!   threshold at `k + m > 64`.
//! - [`RsEncoder`] — a thin wrapper around
//!   [`reed_solomon_erasure::ReedSolomon`] with `GF(2^8)`. Exposes
//!   `encode(data) -> parity` for the store path and
//!   `reconstruct_data(shards)` for the fetch path. All shards
//!   MUST be pre-padded to equal length by the caller (the striper
//!   that lands in C2 owns the padding policy).
//! - Striper constants ([`RS_STRIPE_TARGET_BYTES`],
//!   [`RS_STRIPE_MIN_BYTES`]) per the v0.3 plan §6.
//! - The [`DATAFORTS_BLOB_ERASURE_SUPPORTED`] capability tag +
//!   [`ErasureSupportProbe`] hook, mirroring the Tree/CDC probe
//!   pattern from Phase A6 / B2.
//!
//! Subsequent C-phase commits wire these into
//! `MeshBlobAdapter::store_stream_tree` (the striper), the fetch
//! path (optimistic data fetch + on-failure parity reconstruction),
//! the GC stripe-membership index, and the operator-driven repair
//! sweep.

use reed_solomon_erasure::galois_8;
use reed_solomon_erasure::ReedSolomon;

use super::blob_ref::Encoding;
use super::blob_tree::{ChunkRefV3, StripeBlock};
use super::error::BlobError;

// ───────────────────────────────────────────────────────────────────────────
// Striper constants (used by C2)
// ───────────────────────────────────────────────────────────────────────────

/// Target accumulated *data bytes* before a stripe closes. Set to
/// `10 × 4 MiB = 40 MiB`, matching the default
/// `(k = 10, m = 4)` configuration's data-side capacity at the
/// production CDC average chunk size. Striping by bytes (not chunk
/// count) keeps the stripe predictable under CDC where chunks
/// range `[1 MiB, 16 MiB]`; a stripe spans 4-12 CDC chunks
/// depending on boundary distribution.
///
/// **Currently unused.** The v0.3 striper closes purely on chunk
/// count (`in_flight.len() >= k`); the byte-targeted close logic
/// the constant describes is documented as a future commit per
/// the plan, but not yet wired through [`RsStriper::push_chunk`].
/// Kept as a published constant for downstream operators reading
/// the design surface; readers SHOULD treat the close behavior as
/// chunk-count-based until a follow-up commit reintroduces the
/// byte target.
pub const RS_STRIPE_TARGET_BYTES: u64 = 40 * 1024 * 1024;

/// Minimum accumulated *data bytes* a stripe needs to actually
/// receive RS encoding. A stripe that hasn't reached this size at
/// end-of-stream (i.e., the blob is too small to fill a stripe)
/// falls back to [`Encoding::Replicated`] for that final partial
/// stripe — see the plan §6 small-stripe fallback. Without the
/// fallback, a 1 MiB blob stored under `(10, 4)` would carry 4 MiB
/// of parity overhead (5× storage); the fallback skips parity for
/// the small case.
///
/// **Currently unused.** Same status as
/// [`RS_STRIPE_TARGET_BYTES`] — the small-stripe fallback in v0.3
/// uses a chunk-count threshold internally, not this byte
/// threshold. See [`RsStriper::push_chunk`] for the operative
/// close rule.
pub const RS_STRIPE_MIN_BYTES: u64 = 8 * 1024 * 1024;

/// Default data shards per stripe. `(10, 4)` is the v0.3 plan's
/// canonical configuration: 1.4× storage overhead, tolerates any
/// 4 chunk losses per stripe.
pub const DEFAULT_RS_K: u8 = 10;

/// Default parity shards per stripe. See [`DEFAULT_RS_K`].
pub const DEFAULT_RS_M: u8 = 4;

/// Hard ceiling on `k + m`. The `Encoding::ReedSolomon { k, m }`
/// wire field is two `u8`s so a sum > 255 cannot encode validly;
/// the validator rejects at the producer surface so the failure
/// surfaces synchronously, not as a wire-decode error on the
/// receiver.
pub const RS_MAX_KM_SUM: u16 = 255;

/// Soft threshold on `k + m` above which a warning is emitted at
/// validation time. Most RS implementations are tuned for sums
/// below this; reconstruction performance degrades non-linearly
/// past the threshold. Configurations like `(20, 4)` (sum 24) or
/// `(10, 4)` (sum 14) stay well clear.
pub const RS_WARN_KM_SUM: u16 = 64;

// ───────────────────────────────────────────────────────────────────────────
// Capability tag + downgrade probe
// ───────────────────────────────────────────────────────────────────────────

/// Capability tag a node advertises when it supports the v0.3
/// Phase C Reed-Solomon store path
/// ([`Encoding::ReedSolomon { k, m }`](super::blob_ref::Encoding::ReedSolomon)).
///
/// Independent of [`super::blob_tree::DATAFORTS_BLOB_TREE_SUPPORTED`]
/// and [`super::cdc::DATAFORTS_BLOB_CDC_SUPPORTED`]: a node can
/// support Tree + CDC without RS (Phase A + B without C). Producers
/// targeting a peer that does NOT advertise this tag must downgrade
/// the blob's encoding to [`Encoding::Replicated`] — the substrate
/// has no transparent fallback at fetch time because the receiver
/// must already hold a copy of the parity-computing code to
/// reconstruct missing chunks.
pub const DATAFORTS_BLOB_ERASURE_SUPPORTED: &str = "dataforts:blob-erasure-supported";

/// Producer-side hook for the RS downgrade decision.
///
/// Mirrors [`super::cdc::CdcSupportProbe`] and
/// [`super::blob_tree::TreeSupportProbe`] one-for-one. Default
/// [`ErasureSupportProbe::AlwaysSupported`] is correct for
/// single-cluster all-Phase-C deployments;
/// [`ErasureSupportProbe::ForceReplicated`] is correct for cross-
/// version rollouts; [`ErasureSupportProbe::Dynamic`] lets callers
/// wire a runtime capability-tag check.
///
/// Producers consult the probe BEFORE passing
/// [`Encoding::ReedSolomon`] to `store_stream_tree` — on `false`,
/// they substitute [`Encoding::Replicated`].
#[derive(Default)]
pub enum ErasureSupportProbe {
    /// All targets support RS. Default for single-cluster
    /// all-Phase-C deployments.
    #[default]
    AlwaysSupported,
    /// No target supports RS. Forces every publish to use
    /// Replicated encoding. Useful during cluster-wide rollouts
    /// before every node has been upgraded.
    ForceReplicated,
    /// Dynamic check — caller-supplied closure consults the
    /// capability-tag advertisement layer at decision time.
    /// Returns `true` iff the destination advertises
    /// [`DATAFORTS_BLOB_ERASURE_SUPPORTED`].
    Dynamic(Box<dyn Fn() -> bool + Send + Sync>),
}

impl ErasureSupportProbe {
    /// Evaluate the probe. Cheap for the static variants; invokes
    /// the closure for `Dynamic`.
    pub fn check(&self) -> bool {
        match self {
            ErasureSupportProbe::AlwaysSupported => true,
            ErasureSupportProbe::ForceReplicated => false,
            ErasureSupportProbe::Dynamic(f) => f(),
        }
    }
}

impl std::fmt::Debug for ErasureSupportProbe {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            ErasureSupportProbe::AlwaysSupported => {
                f.write_str("ErasureSupportProbe::AlwaysSupported")
            }
            ErasureSupportProbe::ForceReplicated => {
                f.write_str("ErasureSupportProbe::ForceReplicated")
            }
            ErasureSupportProbe::Dynamic(_) => f.write_str("ErasureSupportProbe::Dynamic(..)"),
        }
    }
}

/// Producer-side downgrade helper: if `encoding` is
/// [`Encoding::ReedSolomon`] and `probe.check()` returns `false`,
/// substitute [`Encoding::Replicated`]. Passes other encodings
/// through unchanged.
///
/// Composes with the [`super::cdc::cdc_downgrade`] helper —
/// callers consult Tree, CDC, and erasure probes independently
/// before invoking `store_stream_tree`.
pub fn erasure_downgrade(encoding: Encoding, probe: &ErasureSupportProbe) -> Encoding {
    match encoding {
        Encoding::ReedSolomon { .. } if !probe.check() => Encoding::Replicated,
        other => other,
    }
}

// ───────────────────────────────────────────────────────────────────────────
// RsParams + RsEncoder
// ───────────────────────────────────────────────────────────────────────────

/// `(k, m)` parameter pair for a Reed-Solomon stripe configuration.
/// Mirrors the [`Encoding::ReedSolomon { k, m }`](super::blob_ref::Encoding::ReedSolomon)
/// fields; carried separately so the encoder doesn't have to
/// re-pattern-match the enum on every stripe.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct RsParams {
    /// Number of data shards per stripe. Must be `>= 1`.
    pub k: u8,
    /// Number of parity shards per stripe. Must be `>= 1`. The
    /// stripe survives any `m` chunk losses (data OR parity).
    pub m: u8,
}

impl RsParams {
    /// Convenience constructor. Validation is deferred to
    /// [`Self::validate`] — construction is infallible so test
    /// fixtures can build deliberately-malformed params.
    pub const fn new(k: u8, m: u8) -> Self {
        Self { k, m }
    }

    /// v0.3 Phase C default: `(10, 4)` — 1.4× storage, 4-loss
    /// tolerance. Matches [`DEFAULT_RS_K`] / [`DEFAULT_RS_M`].
    pub const fn default_production() -> Self {
        Self {
            k: DEFAULT_RS_K,
            m: DEFAULT_RS_M,
        }
    }

    /// Reject the malformed cases:
    /// - `k == 0` — a stripe with no data shards is meaningless.
    /// - `m == 0` — no parity means nothing to reconstruct from.
    /// - `k as u16 + m as u16 > 255` — wire fields are `u8` each;
    ///   the substrate uses `k + m` to size the underlying matrix.
    ///
    /// Caller is expected to honour the [`RS_WARN_KM_SUM`] soft
    /// threshold via its own logging; the validator stays quiet
    /// about it so test fixtures can use whatever shape they need.
    pub fn validate(&self) -> Result<(), BlobError> {
        if self.k == 0 {
            return Err(BlobError::Backend(
                "RS params: k must be >= 1; zero-data stripe is invalid".to_owned(),
            ));
        }
        if self.m == 0 {
            return Err(BlobError::Backend(
                "RS params: m must be >= 1; zero-parity stripe cannot reconstruct losses"
                    .to_owned(),
            ));
        }
        if self.k as u16 + self.m as u16 > RS_MAX_KM_SUM {
            return Err(BlobError::Backend(format!(
                "RS params: k + m = {} exceeds the wire-format maximum {}",
                self.k as u16 + self.m as u16,
                RS_MAX_KM_SUM
            )));
        }
        Ok(())
    }

    /// Pull the params out of an [`Encoding::ReedSolomon { k, m }`]
    /// enum variant. Returns `None` for [`Encoding::Replicated`].
    pub fn from_encoding(encoding: Encoding) -> Option<Self> {
        match encoding {
            Encoding::ReedSolomon { k, m } => Some(Self { k, m }),
            Encoding::Replicated => None,
        }
    }
}

impl Default for RsParams {
    fn default() -> Self {
        Self::default_production()
    }
}

/// Reed-Solomon encoder/decoder wrapper over `GF(2^8)`. Construct
/// once per [`RsParams`] configuration and reuse across many
/// stripes — the underlying matrix construction is the expensive
/// part, the per-stripe `encode` / `reconstruct_data` calls are
/// data-throughput-bound.
pub struct RsEncoder {
    rs: ReedSolomon<galois_8::Field>,
    params: RsParams,
}

impl RsEncoder {
    /// Construct an encoder for the supplied parameters. Returns
    /// `BlobError::Backend` if `params.validate()` fails or if the
    /// underlying RS-library constructor rejects the shape
    /// (currently identical to our validator but kept as a separate
    /// error to surface library-side changes if any).
    pub fn new(params: RsParams) -> Result<Self, BlobError> {
        params.validate()?;
        let rs = ReedSolomon::<galois_8::Field>::new(params.k as usize, params.m as usize)
            .map_err(|e| {
                BlobError::Backend(format!(
                    "RS encoder construction failed for (k={}, m={}): {:?}",
                    params.k, params.m, e
                ))
            })?;
        Ok(Self { rs, params })
    }

    /// `(k, m)` the encoder was constructed with.
    pub fn params(&self) -> RsParams {
        self.params
    }

    /// Compute `m` parity shards from `k` equal-length data shards.
    ///
    /// All data shards MUST be the same length; the caller (the
    /// striper that lands in C2) is responsible for zero-padding
    /// short chunks. Returns a `Vec<Vec<u8>>` of length `m`, each
    /// inner `Vec` sized to the data-shard length. Errors:
    ///
    /// - `data.len() != self.params.k` → `BlobError::Backend`.
    /// - Inner-vec lengths differ → `BlobError::Backend`.
    /// - Inner-vec length is zero → `BlobError::Backend`.
    pub fn encode(&self, data: &[Vec<u8>]) -> Result<Vec<Vec<u8>>, BlobError> {
        // Backwards-compat shim — the slice-taking [`Self::encode_slices`]
        // does the real work. Per PERF_AUDIT §6.5 the striper now
        // routes through `encode_slices` directly so the data
        // shards don't have to be materialized into owned `Vec`s
        // first; this `&[Vec<u8>]` entry stays for tests and
        // legacy callers.
        let data_views: Vec<&[u8]> = data.iter().map(|d| d.as_slice()).collect();
        self.encode_slices(&data_views)
    }

    /// PERF_AUDIT §6.5 — slice-taking variant of [`Self::encode`]
    /// used by the RS striper hot path. Lets full-length data
    /// shards pass through as borrowed slices into the original
    /// `Bytes` buffers (no per-chunk Vec materialization); only
    /// shards shorter than `max_len` get padded into owned
    /// scratch storage on the caller side.
    pub fn encode_slices(&self, data: &[&[u8]]) -> Result<Vec<Vec<u8>>, BlobError> {
        if data.len() != self.params.k as usize {
            return Err(BlobError::Backend(format!(
                "RS encode: expected {} data shards, got {}",
                self.params.k,
                data.len()
            )));
        }
        let shard_len = match data.first() {
            Some(first) => first.len(),
            None => 0,
        };
        if shard_len == 0 {
            return Err(BlobError::Backend(
                "RS encode: data shards must be non-empty".to_owned(),
            ));
        }
        if data.iter().any(|d| d.len() != shard_len) {
            return Err(BlobError::Backend(
                "RS encode: all data shards must be the same length (caller is responsible \
                 for zero-padding short chunks)"
                    .to_owned(),
            ));
        }
        let mut parity: Vec<Vec<u8>> = (0..self.params.m).map(|_| vec![0u8; shard_len]).collect();
        let mut parity_refs: Vec<&mut [u8]> = parity.iter_mut().map(|p| p.as_mut_slice()).collect();
        self.rs
            .encode_sep(data, &mut parity_refs)
            .map_err(|e| BlobError::Backend(format!("RS encode_sep failed: {:?}", e)))?;
        Ok(parity)
    }

    /// Reconstruct any missing data shards from a partial set of
    /// data + parity. `shards` is indexed `[0..k)` for data and
    /// `[k..k+m)` for parity; each slot is `Some(bytes)` if present
    /// and `None` if missing. On success, every previously-`None`
    /// data slot is replaced with the reconstructed bytes; parity
    /// slots may remain `None` (we only need `reconstruct_data`,
    /// not full reconstruction). Errors:
    ///
    /// - `shards.len() != k + m` → `BlobError::Backend`.
    /// - Fewer than `k` total survivors (data + parity present)
    ///   → underlying RS library returns
    ///   [`reed_solomon_erasure::Error::TooFewShardsPresent`],
    ///   mapped to `BlobError::Backend`.
    /// - All present shards must have the same length.
    pub fn reconstruct_data(&self, shards: &mut [Option<Vec<u8>>]) -> Result<(), BlobError> {
        let expected = self.params.k as usize + self.params.m as usize;
        if shards.len() != expected {
            return Err(BlobError::Backend(format!(
                "RS reconstruct_data: expected {} shard slots (k={} + m={}), got {}",
                expected,
                self.params.k,
                self.params.m,
                shards.len()
            )));
        }
        self.rs
            .reconstruct_data(shards)
            .map_err(|e| BlobError::Backend(format!("RS reconstruct_data failed: {:?}", e)))
    }
}

// ───────────────────────────────────────────────────────────────────────────
// RsStriper — accumulates data chunks into stripes, emits StripeBlocks
// ───────────────────────────────────────────────────────────────────────────

/// One unit of output the [`RsStriper`] emits when a stripe
/// closes: the [`StripeBlock`] (data + parity chunk refs) plus
/// the bytes of every newly-computed parity chunk that the
/// caller must persist before the stripe is fetchable.
///
/// Data chunks were already persisted by the caller before being
/// pushed into the striper; only parity bytes are new.
pub struct ClosedStripe {
    /// The fully-populated stripe descriptor — drop into a
    /// [`super::blob_tree::TreeNode::ErasureLeaf`].
    pub block: StripeBlock,
    /// Newly-computed parity bytes the caller must persist via
    /// `store_chunk` before the stripe is recoverable. Pairs
    /// 1:1 with the parity entries at the tail of `block.chunks`.
    pub parity_bytes: Vec<Vec<u8>>,
}

/// Reed-Solomon striper: byte-bounded stripe accumulation +
/// parity generation.
///
/// The caller feeds the striper a stream of data chunks via
/// [`Self::push_chunk`]. The striper accumulates them until the
/// total data bytes reach [`RS_STRIPE_TARGET_BYTES`], then closes
/// the stripe: zero-pads every data chunk to the maximum data
/// chunk length, computes `m` parity chunks via
/// [`RsEncoder::encode`], and emits a [`ClosedStripe`] with the
/// fully-populated [`StripeBlock`] + the parity bytes that must
/// be persisted.
///
/// At end-of-stream, [`Self::finalize`] flushes whatever's left:
/// - If the trailing partial stripe has data bytes ≥
///   [`RS_STRIPE_MIN_BYTES`], it RS-encodes like a full stripe
///   (padding the data chunks to make exactly `k` data
///   shards — short trailing chunks get zero-filled, missing
///   trailing chunks are added as all-zero data shards with
///   `size = 0`; the latter case keeps the wire shape consistent
///   without claiming data bytes the caller never sent).
/// - If the trailing partial is below the min-bytes threshold,
///   it falls back to [`Encoding::Replicated`] for that stripe:
///   every accumulated data chunk is emitted as a Replicated
///   stripe with no parity, so the operator pays no parity
///   overhead on tiny trailing data.
///
/// # Memory bound
///
/// O(`RS_STRIPE_TARGET_BYTES` × overhead) ≈ 40 MiB chunk-byte
/// shadows + 16 MiB parity. The striper keeps one in-flight
/// stripe; closed stripes are emitted immediately and drop out
/// of the striper's working set.
pub struct RsStriper {
    rs_params: RsParams,
    encoder: RsEncoder,
    /// Chunks accumulated for the in-flight stripe. Each entry
    /// is `(bytes, chunk_ref)` — the bytes are kept around so
    /// `encode` can produce parity over them (since data chunks
    /// must be padded to equal length for the GF(2^8) encoder,
    /// the striper needs the raw bytes, not just the ref).
    ///
    /// **PERF_AUDIT §6.5** — held as `Bytes` so the CDC store path
    /// can `push_chunk(chunk_bytes.clone(), ...)` (refcount bump,
    /// O(1)) instead of the pre-fix `push_chunk(chunk_bytes.to_vec(),
    /// ...)` (a full ~1-16 MiB memcpy per chunk). The Fixed store
    /// path converts `Vec<u8>` → `Bytes` via `Bytes::from` which
    /// is also O(1) (takes ownership of the Vec's buffer).
    in_flight: Vec<(bytes::Bytes, ChunkRefV3)>,
    /// Running total of *data bytes* (sum of in-flight chunk
    /// sizes) — drives the stripe-close decision.
    in_flight_data_bytes: u64,
    /// Closed-stripe counter for operator metrics.
    closed_count: u64,
}

impl RsStriper {
    /// Construct a striper for the supplied RS parameters.
    /// Validates and constructs the underlying encoder once;
    /// per-stripe `close` calls are encode-only.
    pub fn new(rs_params: RsParams) -> Result<Self, BlobError> {
        let encoder = RsEncoder::new(rs_params)?;
        Ok(Self {
            rs_params,
            encoder,
            in_flight: Vec::new(),
            in_flight_data_bytes: 0,
            closed_count: 0,
        })
    }

    /// Push a single data chunk. Returns `Some(ClosedStripe)` if
    /// this push completed a stripe (`k`-th chunk arrived),
    /// otherwise `None`.
    ///
    /// The chunk's `bytes` MUST hash to `chunk_ref.hash` and have
    /// length equal to `chunk_ref.size` — the caller (the store
    /// path) is responsible for that pairing; the striper
    /// doesn't re-hash.
    ///
    /// v0.3 Phase C2 closes stripes at exactly `k` chunks rather
    /// than at the plan's [`RS_STRIPE_TARGET_BYTES`] byte target.
    /// The chunk-count rule keeps the encoder input shape uniform
    /// (always exactly `k` data shards, no synthetic-padding
    /// edge cases) while approximating the plan's byte target at
    /// `k × avg_chunk_size` ≈ 40 MiB for the (10, 4) production
    /// default. A future commit may re-introduce the byte target
    /// with explicit synthetic-shard handling for the
    /// fewer-than-`k`-chunks-at-byte-target CDC edge case.
    pub fn push_chunk(
        &mut self,
        bytes: bytes::Bytes,
        chunk_ref: ChunkRefV3,
    ) -> Result<Option<ClosedStripe>, BlobError> {
        if !chunk_ref.is_data() {
            return Err(BlobError::Backend(
                "RsStriper::push_chunk received a non-data chunk; striper only \
                 accepts Data role chunks (parity is computed internally)"
                    .to_owned(),
            ));
        }
        let chunk_bytes = chunk_ref.size as u64;
        self.in_flight_data_bytes = self.in_flight_data_bytes.saturating_add(chunk_bytes);
        self.in_flight.push((bytes, chunk_ref));
        if self.in_flight.len() >= self.rs_params.k as usize {
            let closed = self.close_stripe_with_rs()?;
            return Ok(Some(closed));
        }
        Ok(None)
    }

    /// End-of-stream: flush the in-flight stripe. The trailing
    /// partial stripe (1..k chunks) always emits as
    /// [`Encoding::Replicated`] regardless of size — the v0.3
    /// Phase C2 simplification of the plan's byte-threshold
    /// fallback. The operator pays no parity overhead on the
    /// trailing 0..k chunks of any blob.
    pub fn finalize(mut self) -> Result<Option<ClosedStripe>, BlobError> {
        if self.in_flight.is_empty() {
            return Ok(None);
        }
        Ok(Some(self.close_stripe_as_replicated()))
    }

    /// Stats helper for tests + operator metrics.
    pub fn closed_stripe_count(&self) -> u64 {
        self.closed_count
    }

    /// Internal: close the in-flight stripe as an RS-encoded
    /// stripe. Always called with exactly `k` data chunks (the
    /// push-side close trigger); pads each data shard to the
    /// max length and computes `m` parity shards.
    fn close_stripe_with_rs(&mut self) -> Result<ClosedStripe, BlobError> {
        let k = self.rs_params.k as usize;
        let m = self.rs_params.m as usize;
        let in_flight = std::mem::take(&mut self.in_flight);
        self.in_flight_data_bytes = 0;
        if in_flight.len() != k {
            return Err(BlobError::Backend(format!(
                "RS striper: stripe close expected exactly {} data shards, got {}",
                k,
                in_flight.len()
            )));
        }

        let max_len = in_flight
            .iter()
            .map(|(b, _)| b.len())
            .max()
            .unwrap_or(1)
            .max(1);
        // PERF_AUDIT §6.5 — short shards get padded into owned
        // scratch Vecs (storage for the resize-to-max_len); shards
        // that are already at max_len pass through as borrowed
        // slices into the `Bytes` we already hold. The encoder
        // doesn't need ownership of the input — it only reads.
        // Pre-fix every shard was unconditionally materialized
        // into an owned Vec, paying a full-shard memcpy per CDC
        // chunk for the (~always) full-length data shards.
        let mut padded_owned: Vec<Vec<u8>> = Vec::new();
        let mut data_refs: Vec<ChunkRefV3> = Vec::with_capacity(k);
        // Track each in-flight entry's source so we can later
        // build a `Vec<&[u8]>` referencing either the original
        // `Bytes` or the padded scratch buffer.
        enum ShardSrc {
            /// Index into the original `in_flight[i].0: Bytes`.
            Original(usize),
            /// Index into `padded_owned`.
            Padded(usize),
        }
        let mut sources: Vec<ShardSrc> = Vec::with_capacity(k);
        for (i, (bytes, chunk_ref)) in in_flight.iter().enumerate() {
            if bytes.len() == max_len {
                sources.push(ShardSrc::Original(i));
            } else {
                let mut padded = Vec::with_capacity(max_len);
                padded.extend_from_slice(bytes);
                padded.resize(max_len, 0);
                let idx = padded_owned.len();
                padded_owned.push(padded);
                sources.push(ShardSrc::Padded(idx));
            }
            data_refs.push(*chunk_ref);
        }

        // Build the borrowed slice view the encoder consumes. The
        // `in_flight` Bytes and `padded_owned` Vecs both live for
        // the entire encode call, so the &[u8] borrows are sound.
        let data_views: Vec<&[u8]> = sources
            .iter()
            .map(|src| match src {
                ShardSrc::Original(i) => in_flight[*i].0.as_ref(),
                ShardSrc::Padded(i) => padded_owned[*i].as_slice(),
            })
            .collect();
        let parity_bytes = self.encoder.encode_slices(&data_views)?;
        let mut parity_refs: Vec<ChunkRefV3> = Vec::with_capacity(m);
        for (i, pbytes) in parity_bytes.iter().enumerate() {
            let phash: [u8; 32] = blake3::hash(pbytes).into();
            parity_refs.push(ChunkRefV3::parity(phash, pbytes.len() as u32, i as u8));
        }

        let mut chunks: Vec<ChunkRefV3> = data_refs;
        chunks.extend(parity_refs);
        let block = StripeBlock {
            encoding: Encoding::ReedSolomon {
                k: self.rs_params.k,
                m: self.rs_params.m,
            },
            chunks,
        };
        block.validate()?;
        self.closed_count = self.closed_count.saturating_add(1);
        Ok(ClosedStripe {
            block,
            parity_bytes,
        })
    }

    /// Internal: close the in-flight stripe as the Replicated
    /// fallback (small-stripe case). No parity is computed; the
    /// stripe's `chunks` is just the accumulated data refs.
    fn close_stripe_as_replicated(&mut self) -> ClosedStripe {
        let in_flight = std::mem::take(&mut self.in_flight);
        self.in_flight_data_bytes = 0;
        let chunks: Vec<ChunkRefV3> = in_flight.into_iter().map(|(_, r)| r).collect();
        let block = StripeBlock {
            encoding: Encoding::Replicated,
            chunks,
        };
        self.closed_count = self.closed_count.saturating_add(1);
        ClosedStripe {
            block,
            parity_bytes: Vec::new(),
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    /// Round-trip: encode k data → m parity, drop m of the
    /// combined (k + m) shards, reconstruct, assert byte-equality.
    #[test]
    fn encode_then_drop_m_shards_then_reconstruct_round_trips() {
        let params = RsParams { k: 4, m: 2 };
        let encoder = RsEncoder::new(params).unwrap();
        // 4 data shards, 1024 bytes each — distinct content per
        // shard so a reconstruction error would be loud.
        let data: Vec<Vec<u8>> = (0..4u8)
            .map(|i| (0..1024).map(|j| i.wrapping_add(j as u8)).collect())
            .collect();
        let parity = encoder.encode(&data).unwrap();
        assert_eq!(parity.len(), 2);
        assert_eq!(parity[0].len(), 1024);

        // Build the shard set, then drop 2 (one data, one parity)
        // and reconstruct. With k=4, m=2, dropping 2 of 6 is the
        // hard tolerance — recovery should succeed.
        let mut shards: Vec<Option<Vec<u8>>> = data
            .iter()
            .cloned()
            .chain(parity.iter().cloned())
            .map(Some)
            .collect();
        shards[1] = None; // drop data shard 1
        shards[5] = None; // drop parity shard 1

        encoder.reconstruct_data(&mut shards).unwrap();
        assert_eq!(
            shards[1].as_ref().unwrap(),
            &data[1],
            "reconstructed data shard 1 must equal the original"
        );
        // Data shards 0, 2, 3 untouched (already present).
        assert_eq!(shards[0].as_ref().unwrap(), &data[0]);
        assert_eq!(shards[2].as_ref().unwrap(), &data[2]);
        assert_eq!(shards[3].as_ref().unwrap(), &data[3]);
    }

    /// Dropping `m + 1` shards must fail reconstruction (the RS
    /// tolerance is exactly `m` losses per stripe).
    #[test]
    fn dropping_more_than_m_shards_fails_reconstruction() {
        let params = RsParams { k: 4, m: 2 };
        let encoder = RsEncoder::new(params).unwrap();
        let data: Vec<Vec<u8>> = (0..4u8).map(|i| vec![i; 512]).collect();
        let parity = encoder.encode(&data).unwrap();
        let mut shards: Vec<Option<Vec<u8>>> = data
            .iter()
            .cloned()
            .chain(parity.iter().cloned())
            .map(Some)
            .collect();
        // Drop 3 of the 6 — exceeds m=2 tolerance.
        shards[0] = None;
        shards[1] = None;
        shards[2] = None;
        let err = encoder.reconstruct_data(&mut shards).unwrap_err();
        let msg = err.to_string();
        assert!(
            msg.contains("reconstruct_data") || msg.contains("TooFew"),
            "expected an RS-library failure, got: {}",
            msg
        );
    }

    /// All m parity shards lost, all k data shards present: the
    /// "no reconstruction needed" path. `reconstruct_data`
    /// succeeds without touching the data and leaves parity
    /// slots `None` (the call only restores data).
    #[test]
    fn parity_loss_with_full_data_set_succeeds_without_touching_data() {
        let params = RsParams { k: 4, m: 2 };
        let encoder = RsEncoder::new(params).unwrap();
        let data: Vec<Vec<u8>> = (0..4u8).map(|i| vec![i.wrapping_mul(7); 256]).collect();
        let parity = encoder.encode(&data).unwrap();
        let mut shards: Vec<Option<Vec<u8>>> = data
            .iter()
            .cloned()
            .chain(parity.iter().cloned())
            .map(Some)
            .collect();
        // Drop both parity shards.
        shards[4] = None;
        shards[5] = None;
        encoder.reconstruct_data(&mut shards).unwrap();
        for i in 0..4 {
            assert_eq!(shards[i].as_ref().unwrap(), &data[i]);
        }
    }

    /// `validate` rejects the malformed cases.
    #[test]
    fn validate_rejects_malformed_params() {
        assert!(RsParams { k: 0, m: 4 }.validate().is_err());
        assert!(RsParams { k: 10, m: 0 }.validate().is_err());
        // k + m > 255 is the hard ceiling. u8 + u8 max sum is 510,
        // so 200 + 200 = 400 is rejected.
        assert!(RsParams { k: 200, m: 200 }.validate().is_err());
        // Production default is valid.
        assert!(RsParams::default_production().validate().is_ok());
    }

    /// `from_encoding` extracts the params from the wire variant.
    #[test]
    fn from_encoding_extracts_params() {
        assert_eq!(
            RsParams::from_encoding(Encoding::ReedSolomon { k: 6, m: 3 }),
            Some(RsParams { k: 6, m: 3 })
        );
        assert_eq!(RsParams::from_encoding(Encoding::Replicated), None);
    }

    /// `encode` rejects mismatched shard lengths — the caller (the
    /// future striper) must pad to equal length before calling.
    #[test]
    fn encode_rejects_uneven_data_shard_lengths() {
        let encoder = RsEncoder::new(RsParams { k: 3, m: 2 }).unwrap();
        let data = vec![vec![0u8; 100], vec![1u8; 50], vec![2u8; 100]];
        assert!(encoder.encode(&data).is_err());
    }

    /// `encode` rejects the wrong number of data shards.
    #[test]
    fn encode_rejects_wrong_data_shard_count() {
        let encoder = RsEncoder::new(RsParams { k: 4, m: 2 }).unwrap();
        let data = vec![vec![0u8; 100], vec![1u8; 100]];
        assert!(encoder.encode(&data).is_err());
    }

    /// `ErasureSupportProbe` static variants resolve as expected.
    #[test]
    fn erasure_support_probe_static_variants() {
        assert!(ErasureSupportProbe::AlwaysSupported.check());
        assert!(!ErasureSupportProbe::ForceReplicated.check());
        assert!(ErasureSupportProbe::default().check());
    }

    /// `ErasureSupportProbe::Dynamic` consults the closure on each
    /// `check()`.
    #[test]
    fn erasure_support_probe_dynamic_consults_closure() {
        use std::sync::atomic::{AtomicBool, Ordering};
        use std::sync::Arc;
        let flag = Arc::new(AtomicBool::new(false));
        let f = flag.clone();
        let probe = ErasureSupportProbe::Dynamic(Box::new(move || f.load(Ordering::Relaxed)));
        assert!(!probe.check());
        flag.store(true, Ordering::Relaxed);
        assert!(probe.check());
    }

    /// `erasure_downgrade` substitutes Replicated when the probe
    /// rejects RS, passes RS through when accepted, and leaves
    /// Replicated untouched in both probe arms.
    #[test]
    fn erasure_downgrade_substitutes_only_for_rs_on_reject() {
        let rs = Encoding::ReedSolomon { k: 10, m: 4 };
        let rep = Encoding::Replicated;
        assert_eq!(
            erasure_downgrade(rs, &ErasureSupportProbe::AlwaysSupported),
            rs
        );
        assert_eq!(
            erasure_downgrade(rep, &ErasureSupportProbe::AlwaysSupported),
            rep
        );
        assert_eq!(
            erasure_downgrade(rs, &ErasureSupportProbe::ForceReplicated),
            Encoding::Replicated
        );
        assert_eq!(
            erasure_downgrade(rep, &ErasureSupportProbe::ForceReplicated),
            rep
        );
    }

    fn det_bytes(seed: u8, len: usize) -> Vec<u8> {
        let mut state: u64 = seed as u64;
        (0..len)
            .map(|_| {
                state = state
                    .wrapping_mul(6364136223846793005)
                    .wrapping_add(1442695040888963407);
                (state >> 33) as u8
            })
            .collect()
    }

    /// Push exactly k chunks → striper closes one RS stripe with
    /// k data + m parity refs, all data sizes preserved.
    #[test]
    fn striper_closes_at_k_chunks_into_rs_stripe() {
        let params = RsParams { k: 4, m: 2 };
        let mut striper = RsStriper::new(params).unwrap();
        for i in 0..3u8 {
            let bytes = det_bytes(i, 100);
            let hash: [u8; 32] = blake3::hash(&bytes).into();
            let cref = ChunkRefV3::data(hash, 100);
            assert!(striper.push_chunk(bytes.into(), cref).unwrap().is_none());
        }
        let bytes = det_bytes(3, 100);
        let hash: [u8; 32] = blake3::hash(&bytes).into();
        let cref = ChunkRefV3::data(hash, 100);
        let closed = striper.push_chunk(bytes.into(), cref).unwrap().unwrap();
        assert_eq!(closed.block.chunks.len(), 6); // 4 data + 2 parity
        assert_eq!(closed.parity_bytes.len(), 2);
        assert_eq!(
            closed.block.chunks.iter().filter(|c| c.is_data()).count(),
            4
        );
        assert_eq!(
            closed.block.chunks.iter().filter(|c| c.is_parity()).count(),
            2
        );
    }

    /// Mixed-size data chunks get zero-padded to the max length
    /// before parity computation; the resulting StripeBlock
    /// preserves the original pre-padding sizes in ChunkRefV3.size.
    #[test]
    fn striper_preserves_pre_padding_sizes_in_chunk_refs() {
        let params = RsParams { k: 3, m: 2 };
        let mut striper = RsStriper::new(params).unwrap();
        let sizes = [200, 100, 150];
        let mut sent = Vec::new();
        for (i, &size) in sizes.iter().enumerate() {
            let bytes = det_bytes(i as u8, size);
            let hash: [u8; 32] = blake3::hash(&bytes).into();
            let cref = ChunkRefV3::data(hash, size as u32);
            sent.push(cref);
            let result = striper.push_chunk(bytes.into(), cref).unwrap();
            if i + 1 == sizes.len() {
                let closed = result.unwrap();
                for (j, &expected_size) in sizes.iter().enumerate() {
                    assert_eq!(closed.block.chunks[j].size as usize, expected_size);
                }
                // Parity shard size = max(data sizes) = 200.
                for parity in closed.block.chunks.iter().filter(|c| c.is_parity()) {
                    assert_eq!(parity.size as usize, 200);
                }
            } else {
                assert!(result.is_none());
            }
        }
    }

    /// Finalize with < k chunks falls back to Replicated stripe.
    #[test]
    fn striper_finalize_with_partial_emits_replicated_stripe() {
        let params = RsParams { k: 5, m: 2 };
        let mut striper = RsStriper::new(params).unwrap();
        for i in 0..3u8 {
            let bytes = det_bytes(i, 50);
            let hash: [u8; 32] = blake3::hash(&bytes).into();
            let cref = ChunkRefV3::data(hash, 50);
            assert!(striper.push_chunk(bytes.into(), cref).unwrap().is_none());
        }
        let closed = striper.finalize().unwrap().unwrap();
        assert_eq!(closed.block.encoding, Encoding::Replicated);
        assert_eq!(closed.block.chunks.len(), 3);
        assert!(closed.parity_bytes.is_empty());
        assert!(closed.block.chunks.iter().all(|c| c.is_data()));
    }

    /// Finalize with zero in-flight chunks returns None.
    #[test]
    fn striper_finalize_with_no_chunks_returns_none() {
        let params = RsParams { k: 4, m: 2 };
        let striper = RsStriper::new(params).unwrap();
        assert!(striper.finalize().unwrap().is_none());
    }

    /// Two full stripes back-to-back: 2k chunks → 2 closed
    /// stripes, each k data + m parity.
    #[test]
    fn striper_closes_multiple_stripes() {
        let params = RsParams { k: 3, m: 2 };
        let mut striper = RsStriper::new(params).unwrap();
        let mut closed_count = 0u64;
        for i in 0..6u8 {
            let bytes = det_bytes(i, 64);
            let hash: [u8; 32] = blake3::hash(&bytes).into();
            let cref = ChunkRefV3::data(hash, 64);
            if striper.push_chunk(bytes.into(), cref).unwrap().is_some() {
                closed_count += 1;
            }
        }
        assert_eq!(closed_count, 2);
        assert_eq!(striper.closed_stripe_count(), 2);
    }

    /// Striper rejects parity-role chunks (only Data accepted).
    #[test]
    fn striper_rejects_parity_role_inputs() {
        let mut striper = RsStriper::new(RsParams { k: 3, m: 2 }).unwrap();
        let bytes = vec![0u8; 10];
        let parity_ref = ChunkRefV3::parity([0u8; 32], 10, 0);
        assert!(striper.push_chunk(bytes.into(), parity_ref).is_err());
    }

    /// End-to-end RS round trip through the striper: push k
    /// data chunks, drop one data chunk + one parity chunk,
    /// reconstruct via RsEncoder, assert byte-equality.
    #[test]
    fn striper_output_round_trips_through_rs_encoder() {
        let params = RsParams { k: 3, m: 2 };
        let mut striper = RsStriper::new(params).unwrap();
        let originals: Vec<Vec<u8>> = (0..3u8).map(|i| det_bytes(i, 128)).collect();
        let mut closed: Option<ClosedStripe> = None;
        for (i, bytes) in originals.iter().enumerate() {
            let hash: [u8; 32] = blake3::hash(bytes).into();
            let cref = ChunkRefV3::data(hash, bytes.len() as u32);
            let result = striper.push_chunk(bytes.clone().into(), cref).unwrap();
            if i + 1 == originals.len() {
                closed = Some(result.unwrap());
            }
        }
        let closed = closed.unwrap();
        // Rebuild shards: data + parity, drop one of each.
        let shard_len = closed.parity_bytes[0].len();
        let mut shards: Vec<Option<Vec<u8>>> = Vec::with_capacity(5);
        for orig in &originals {
            let mut padded = orig.clone();
            padded.resize(shard_len, 0);
            shards.push(Some(padded));
        }
        for p in &closed.parity_bytes {
            shards.push(Some(p.clone()));
        }
        shards[1] = None; // drop data shard 1
        shards[4] = None; // drop parity shard 1

        let encoder = RsEncoder::new(params).unwrap();
        encoder.reconstruct_data(&mut shards).unwrap();
        // Reconstructed data shard 1 should match original (after
        // accounting for the zero-padding).
        let mut expected = originals[1].clone();
        expected.resize(shard_len, 0);
        assert_eq!(shards[1].as_ref().unwrap(), &expected);
    }

    /// PERF_AUDIT §6.5 — golden parity equivalence. The
    /// Bytes-holding striper (full-length shards borrowed straight
    /// from the original `Bytes`, short shards padded into owned
    /// scratch) must produce byte-identical parity to the legacy
    /// all-Vec path (manually zero-pad every shard to `max_len`,
    /// then `encode(&[Vec<u8>])`). Mixed-size stripe so BOTH the
    /// `Original` (shard 1, max-length) and `Padded` (shards 0, 2,
    /// 3) branches of `close_stripe_with_rs` execute.
    #[test]
    fn striper_bytes_path_parity_matches_legacy_all_vec_encode() {
        let params = RsParams { k: 4, m: 2 };
        let sizes = [100usize, 256, 73, 9];
        let originals: Vec<Vec<u8>> = sizes
            .iter()
            .enumerate()
            .map(|(i, &s)| det_bytes(i as u8, s))
            .collect();

        // Legacy all-Vec path: pad every shard to max_len, encode
        // owned Vecs. This is exactly what the pre-§6.5
        // close_stripe_with_rs did.
        let max_len = sizes.iter().copied().max().unwrap();
        let padded: Vec<Vec<u8>> = originals
            .iter()
            .map(|b| {
                let mut v = b.clone();
                v.resize(max_len, 0);
                v
            })
            .collect();
        let encoder = RsEncoder::new(params).unwrap();
        let golden = encoder.encode(&padded).unwrap();

        // Striper path (Bytes in_flight + borrowed/padded views).
        let mut striper = RsStriper::new(params).unwrap();
        let mut closed: Option<ClosedStripe> = None;
        for (i, bytes) in originals.iter().enumerate() {
            let hash: [u8; 32] = blake3::hash(bytes).into();
            let cref = ChunkRefV3::data(hash, bytes.len() as u32);
            let result = striper.push_chunk(bytes.clone().into(), cref).unwrap();
            if i + 1 == originals.len() {
                closed = Some(result.unwrap());
            } else {
                assert!(result.is_none());
            }
        }
        let closed = closed.unwrap();
        assert_eq!(
            closed.parity_bytes, golden,
            "striper parity must be byte-identical to the legacy all-Vec encode path"
        );
        // Parity refs must hash + size the emitted parity bytes.
        let parity_refs: Vec<&ChunkRefV3> = closed
            .block
            .chunks
            .iter()
            .filter(|c| c.is_parity())
            .collect();
        assert_eq!(parity_refs.len(), 2);
        for (r, p) in parity_refs.iter().zip(closed.parity_bytes.iter()) {
            let expect: [u8; 32] = blake3::hash(p).into();
            assert_eq!(r.hash, expect, "parity ref hash must cover emitted bytes");
            assert_eq!(r.size as usize, max_len, "parity sized to max data shard");
        }
        // Data refs keep their PRE-padding sizes (the on-wire
        // contract reads rely on to strip the zero-fill).
        let data_refs: Vec<&ChunkRefV3> =
            closed.block.chunks.iter().filter(|c| c.is_data()).collect();
        for (r, &s) in data_refs.iter().zip(sizes.iter()) {
            assert_eq!(r.size as usize, s, "data ref keeps pre-padding size");
        }
    }

    /// Production constants match the v0.3 plan §6: `(10, 4)`
    /// default, 40 MiB stripe target, 8 MiB stripe minimum.
    #[test]
    fn striper_constants_match_plan_defaults() {
        assert_eq!(DEFAULT_RS_K, 10);
        assert_eq!(DEFAULT_RS_M, 4);
        assert_eq!(RS_STRIPE_TARGET_BYTES, 40 * 1024 * 1024);
        assert_eq!(RS_STRIPE_MIN_BYTES, 8 * 1024 * 1024);
        assert_eq!(RsParams::default_production(), RsParams { k: 10, m: 4 });
    }
}