Skip to main content

fsqlite_core/
tiered_storage.rs

1//! Tiered storage controls for Native mode (ยง3.5.11, `bd-1hi.29`).
2//!
3//! This module models a three-tier object lifecycle:
4//! - L1: in-memory decoded bytes (returned by `fetch_object`)
5//! - L2: local append-only symbol segments (`l2_segments`)
6//! - L3: remote symbol store (`RemoteTier`)
7//!
8//! The implementation focuses on the normative safety rails:
9//! - remote I/O requires a `RemoteCap` token
10//! - `durability=local` performs no remote writes on commit
11//! - `durability=quorum(M/N)` requires remote ACK quorum before success
12//! - segment eviction is cancel-safe and precondition-checked
13//! - fetch path prefers systematic symbols then falls back to decode
14
15use std::collections::BTreeMap;
16use std::sync::Arc;
17
18use fsqlite_error::{FrankenError, Result};
19use fsqlite_types::cx::{Cx, cap};
20use fsqlite_types::{
21    IdempotencyKey, ObjectId, Oti, RemoteCap, Saga, SymbolReadPath, SymbolRecord,
22    SystematicLayoutError, reconstruct_systematic_happy_path, source_symbol_count,
23};
24use tracing::{debug, info, warn};
25use xxhash_rust::xxh3::xxh3_64;
26
27use crate::decode_proofs::{EcsDecodeProof, RejectedSymbol, SymbolDigest, SymbolRejectionReason};
28use crate::remote_effects::Executor as RemoteAdmissionExecutor;
29
30const BEAD_ID: &str = "bd-1hi.29";
31const FETCH_SYMBOLS_COMPUTATION: &str = "fsqlite:tiered:fetch_symbols:v1";
32const UPLOAD_SEGMENT_COMPUTATION: &str = "fsqlite:tiered:upload_segment:v1";
33const DEFAULT_WRITE_BACK_SEGMENT_ID: u64 = u64::MAX - 1;
34const DEFAULT_FALLBACK_DECODE_SLACK: usize = 2;
35
36/// Native-mode durability policy.
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38pub enum DurabilityMode {
39    /// `PRAGMA durability = local`
40    Local,
41    /// `PRAGMA durability = quorum(M/N)`
42    Quorum { required: u32, total: u32 },
43}
44
45impl DurabilityMode {
46    /// Local-only durability policy.
47    #[must_use]
48    pub const fn local() -> Self {
49        Self::Local
50    }
51
52    /// Construct a quorum policy.
53    pub fn quorum(required: u32, total: u32) -> Result<Self> {
54        if required == 0 || required > total {
55            return Err(FrankenError::OutOfRange {
56                what: "durability quorum".to_owned(),
57                value: format!("required={required}, total={total}"),
58            });
59        }
60        Ok(Self::Quorum { required, total })
61    }
62
63    #[must_use]
64    pub const fn requires_remote(self) -> bool {
65        matches!(self, Self::Quorum { .. })
66    }
67
68    #[must_use]
69    pub const fn quorum_satisfied(self, acked_stores: u32) -> bool {
70        match self {
71            Self::Local => true,
72            Self::Quorum { required, .. } => acked_stores >= required,
73        }
74    }
75}
76
77/// Request for a remote fetch operation.
78#[derive(Debug, Clone, PartialEq, Eq)]
79pub struct FetchSymbolsRequest {
80    pub object_id: ObjectId,
81    pub preferred_esis: Vec<u32>,
82    pub max_symbols: usize,
83    pub idempotency_key: IdempotencyKey,
84    pub ecs_epoch: u64,
85    pub remote_cap: RemoteCap,
86    pub computation: &'static str,
87}
88
89/// Request for a remote upload operation.
90#[derive(Debug, Clone, PartialEq, Eq)]
91pub struct UploadSegmentRequest {
92    pub segment_id: u64,
93    pub records: Vec<SymbolRecord>,
94    pub idempotency_key: IdempotencyKey,
95    pub saga: Saga,
96    pub ecs_epoch: u64,
97    pub remote_cap: RemoteCap,
98    pub computation: &'static str,
99}
100
101/// Remote upload result.
102#[derive(Debug, Clone, Copy, PartialEq, Eq)]
103pub struct UploadSegmentReceipt {
104    pub acked_stores: u32,
105    pub deduplicated: bool,
106}
107
108/// Minimal remote tier contract used by tiered storage control logic.
109pub trait RemoteTier {
110    /// Fetch symbols for one object.
111    fn fetch_symbols(&mut self, request: &FetchSymbolsRequest) -> Result<Vec<SymbolRecord>>;
112
113    /// Upload one rotated segment.
114    fn upload_segment(&mut self, request: &UploadSegmentRequest) -> Result<UploadSegmentReceipt>;
115
116    /// Check whether every object in a segment is remotely recoverable.
117    fn segment_recoverable(&self, segment_id: u64, min_symbols_per_object: usize) -> bool;
118}
119
120/// Commit request for one L2 segment rotation.
121#[derive(Debug, Clone, PartialEq, Eq)]
122pub struct CommitRequest {
123    pub segment_id: u64,
124    pub records: Vec<SymbolRecord>,
125    pub idempotency_key: IdempotencyKey,
126    pub saga: Saga,
127    pub ecs_epoch: u64,
128}
129
130impl CommitRequest {
131    /// Build a deterministic commit request from segment + symbol records.
132    #[must_use]
133    pub fn new(segment_id: u64, records: Vec<SymbolRecord>, ecs_epoch: u64) -> Self {
134        let request_bytes = segment_request_bytes(segment_id, &records);
135        let idempotency_key = IdempotencyKey::derive(ecs_epoch, &request_bytes);
136        let saga = Saga::new(idempotency_key);
137        Self {
138            segment_id,
139            records,
140            idempotency_key,
141            saga,
142            ecs_epoch,
143        }
144    }
145}
146
147/// Commit result summary.
148#[derive(Debug, Clone, Copy, PartialEq, Eq)]
149pub struct CommitOutcome {
150    pub remote_io: bool,
151    pub upload_receipt: Option<UploadSegmentReceipt>,
152}
153
154/// Fetch result summary.
155#[derive(Debug, Clone, PartialEq, Eq)]
156pub struct FetchOutcome {
157    pub bytes: Vec<u8>,
158    pub read_path: SymbolReadPath,
159    pub remote_used: bool,
160    pub write_back_count: usize,
161    pub decode_proof: Option<EcsDecodeProof>,
162}
163
164/// Decode-proof audit event emitted by fallback decode paths.
165#[derive(Debug, Clone, PartialEq, Eq)]
166pub struct DecodeAuditEntry {
167    pub seq: u64,
168    pub object_id: ObjectId,
169    pub decode_success: bool,
170    pub proof: EcsDecodeProof,
171}
172
173/// Eviction saga phase.
174#[derive(Debug, Clone, Copy, PartialEq, Eq)]
175pub enum EvictionPhase {
176    Uploaded,
177    CompensatedCancelled,
178    CompensatedPrecondition,
179    Retired,
180}
181
182/// Segment-eviction result summary.
183#[derive(Debug, Clone, Copy, PartialEq, Eq)]
184pub struct EvictionOutcome {
185    pub phase: EvictionPhase,
186    pub evicted: bool,
187    pub local_retained: bool,
188    pub upload_receipt: UploadSegmentReceipt,
189}
190
191/// Tiered storage control plane state.
192#[derive(Debug)]
193pub struct TieredStorage {
194    durability_mode: DurabilityMode,
195    remote_executor: Arc<RemoteAdmissionExecutor>,
196    write_back_segment_id: u64,
197    l2_segments: BTreeMap<u64, Vec<SymbolRecord>>,
198    decode_audit_seq: u64,
199    decode_audit: Vec<DecodeAuditEntry>,
200}
201
202impl Default for TieredStorage {
203    fn default() -> Self {
204        Self::new(DurabilityMode::Local)
205    }
206}
207
208impl TieredStorage {
209    fn new_with_remote_executor(
210        durability_mode: DurabilityMode,
211        remote_executor: RemoteAdmissionExecutor,
212    ) -> Self {
213        Self {
214            durability_mode,
215            remote_executor: Arc::new(remote_executor),
216            write_back_segment_id: DEFAULT_WRITE_BACK_SEGMENT_ID,
217            l2_segments: BTreeMap::new(),
218            decode_audit_seq: 0,
219            decode_audit: Vec::new(),
220        }
221    }
222
223    /// Create a tiered-storage controller.
224    #[must_use]
225    pub fn new(durability_mode: DurabilityMode) -> Self {
226        Self::new_with_remote_executor(
227            durability_mode,
228            RemoteAdmissionExecutor::balanced_tiered_storage_default(),
229        )
230    }
231
232    /// Current durability mode.
233    #[must_use]
234    pub const fn durability_mode(&self) -> DurabilityMode {
235        self.durability_mode
236    }
237
238    /// Update durability mode.
239    pub fn set_durability_mode(&mut self, mode: DurabilityMode) {
240        self.durability_mode = mode;
241    }
242
243    /// Segment id used for self-healing write-back.
244    #[must_use]
245    pub const fn write_back_segment_id(&self) -> u64 {
246        self.write_back_segment_id
247    }
248
249    /// Drain deterministic decode-proof audit entries.
250    pub fn take_decode_audit_entries(&mut self) -> Vec<DecodeAuditEntry> {
251        std::mem::take(&mut self.decode_audit)
252    }
253
254    /// Insert or replace one L2 segment.
255    pub fn insert_l2_segment(&mut self, segment_id: u64, records: Vec<SymbolRecord>) {
256        self.l2_segments.insert(segment_id, records);
257    }
258
259    /// Number of L2 segments currently retained.
260    #[must_use]
261    pub fn l2_segment_count(&self) -> usize {
262        self.l2_segments.len()
263    }
264
265    /// Whether the L2 segment exists.
266    #[must_use]
267    pub fn l2_segment_exists(&self, segment_id: u64) -> bool {
268        self.l2_segments.contains_key(&segment_id)
269    }
270
271    /// Collect all L2 records for one object, deduplicated by ESI.
272    #[must_use]
273    pub fn l2_records_for_object(&self, object_id: ObjectId) -> Vec<SymbolRecord> {
274        let mut by_esi = BTreeMap::<u32, SymbolRecord>::new();
275        for segment in self.l2_segments.values() {
276            for record in segment {
277                if record.object_id == object_id {
278                    merge_symbol_record_by_esi(&mut by_esi, record.clone());
279                }
280            }
281        }
282        by_esi.into_values().collect()
283    }
284
285    /// Commit one rotated segment under the configured durability policy.
286    ///
287    /// Local symbols are staged first; remote durability is then enforced when
288    /// `durability=quorum`.
289    pub fn commit_segment<Caps, R>(
290        &mut self,
291        cx: &Cx<Caps>,
292        request: CommitRequest,
293        remote: Option<&mut R>,
294        remote_cap: Option<RemoteCap>,
295    ) -> Result<CommitOutcome>
296    where
297        Caps: cap::SubsetOf<cap::All>,
298        R: RemoteTier,
299    {
300        self.insert_l2_segment(request.segment_id, request.records.clone());
301
302        if !self.durability_mode.requires_remote() {
303            info!(
304                bead_id = BEAD_ID,
305                segment_id = request.segment_id,
306                mode = "local",
307                "commit satisfied by L2 only"
308            );
309            return Ok(CommitOutcome {
310                remote_io: false,
311                upload_receipt: None,
312            });
313        }
314
315        let cap = remote_cap.ok_or(FrankenError::AuthDenied)?;
316        let remote_store = remote.ok_or(FrankenError::AuthDenied)?;
317
318        let upload_request = UploadSegmentRequest {
319            segment_id: request.segment_id,
320            records: request.records,
321            idempotency_key: request.idempotency_key,
322            saga: request.saga,
323            ecs_epoch: request.ecs_epoch,
324            remote_cap: cap,
325            computation: UPLOAD_SEGMENT_COMPUTATION,
326        };
327        let receipt = self.remote_executor.run(
328            cx,
329            upload_request.computation,
330            Some(upload_request.saga),
331            Some(upload_request.idempotency_key),
332            upload_request.ecs_epoch,
333            || remote_store.upload_segment(&upload_request),
334        )?;
335        if !self.durability_mode.quorum_satisfied(receipt.acked_stores) {
336            warn!(
337                bead_id = BEAD_ID,
338                segment_id = request.segment_id,
339                acked_stores = receipt.acked_stores,
340                "quorum durability not yet satisfied"
341            );
342            return Err(FrankenError::Busy);
343        }
344
345        Ok(CommitOutcome {
346            remote_io: true,
347            upload_receipt: Some(receipt),
348        })
349    }
350
351    /// Fetch one object through tiered storage (L2 fast path, then L3 fallback).
352    pub fn fetch_object<Caps, R>(
353        &mut self,
354        cx: &Cx<Caps>,
355        object_id: ObjectId,
356        ecs_epoch: u64,
357        remote: Option<&mut R>,
358        remote_cap: Option<RemoteCap>,
359    ) -> Result<FetchOutcome>
360    where
361        Caps: cap::SubsetOf<cap::All>,
362        R: RemoteTier,
363    {
364        let local_records = self.l2_records_for_object(object_id);
365        if !local_records.is_empty() {
366            match recover_object_hybrid(&local_records) {
367                Ok(local) => {
368                    if let Some(proof) = local.decode_proof.clone() {
369                        self.record_decode_proof(proof);
370                    }
371                    return Ok(FetchOutcome {
372                        bytes: local.bytes,
373                        read_path: local.read_path,
374                        remote_used: false,
375                        write_back_count: 0,
376                        decode_proof: local.decode_proof,
377                    });
378                }
379                Err(failure) => {
380                    self.record_decode_proof(failure.proof);
381                    debug!(
382                        bead_id = BEAD_ID,
383                        object_id = %object_id,
384                        reason = %failure.reason,
385                        "local fallback decode attempt failed; escalating to remote tier"
386                    );
387                }
388            }
389        }
390
391        let cap = remote_cap.ok_or(FrankenError::AuthDenied)?;
392        let remote_store = remote.ok_or(FrankenError::AuthDenied)?;
393
394        let preferred_esis = preferred_source_esis(local_records.first().map(|record| record.oti));
395        let idempotency_key = derive_fetch_key(object_id, &preferred_esis, ecs_epoch);
396        let fetch_request = FetchSymbolsRequest {
397            object_id,
398            preferred_esis,
399            max_symbols: usize::MAX,
400            idempotency_key,
401            ecs_epoch,
402            remote_cap: cap,
403            computation: FETCH_SYMBOLS_COMPUTATION,
404        };
405        let fetched = self.remote_executor.run(
406            cx,
407            fetch_request.computation,
408            None,
409            Some(fetch_request.idempotency_key),
410            fetch_request.ecs_epoch,
411            || remote_store.fetch_symbols(&fetch_request),
412        )?;
413        if fetched.is_empty() {
414            return Err(FrankenError::Internal(format!(
415                "remote tier returned no symbols for object {object_id}"
416            )));
417        }
418        cx.checkpoint().map_err(|_| FrankenError::Busy)?;
419
420        let merged = merge_symbol_sets(&local_records, &fetched);
421        let recovered = match recover_object_hybrid(&merged) {
422            Ok(value) => value,
423            Err(failure) => {
424                let detail = failure.reason.clone();
425                self.record_decode_proof(failure.proof);
426                return Err(FrankenError::DatabaseCorrupt {
427                    detail: format!("unable to recover object {object_id}: {detail}"),
428                });
429            }
430        };
431        if let Some(proof) = recovered.decode_proof.clone() {
432            self.record_decode_proof(proof);
433        }
434        let write_back_count = self.write_back_repairs(&local_records, &fetched);
435
436        Ok(FetchOutcome {
437            bytes: recovered.bytes,
438            read_path: recovered.read_path,
439            remote_used: true,
440            write_back_count,
441            decode_proof: recovered.decode_proof,
442        })
443    }
444
445    /// Evict one rotated segment from L2 to L3 using a cancel-safe saga.
446    ///
447    /// The local segment is removed only when:
448    /// 1. remote upload succeeds, and
449    /// 2. cancellation is not requested, and
450    /// 3. remote recoverability preconditions are met.
451    pub fn evict_segment<Caps, R>(
452        &mut self,
453        cx: &Cx<Caps>,
454        segment_id: u64,
455        min_symbols_per_object: usize,
456        ecs_epoch: u64,
457        remote: &mut R,
458        remote_cap: Option<RemoteCap>,
459    ) -> Result<EvictionOutcome>
460    where
461        Caps: cap::SubsetOf<cap::All>,
462        R: RemoteTier,
463    {
464        let cap = remote_cap.ok_or(FrankenError::AuthDenied)?;
465        let records =
466            self.l2_segments.get(&segment_id).cloned().ok_or_else(|| {
467                FrankenError::Internal(format!("unknown L2 segment {segment_id}"))
468            })?;
469
470        let key = derive_evict_key(segment_id, &records, ecs_epoch);
471        let upload_request = UploadSegmentRequest {
472            segment_id,
473            records,
474            idempotency_key: key,
475            saga: Saga::new(key),
476            ecs_epoch,
477            remote_cap: cap,
478            computation: UPLOAD_SEGMENT_COMPUTATION,
479        };
480        let receipt = self.remote_executor.run(
481            cx,
482            upload_request.computation,
483            Some(upload_request.saga),
484            Some(upload_request.idempotency_key),
485            upload_request.ecs_epoch,
486            || remote.upload_segment(&upload_request),
487        )?;
488        debug!(
489            bead_id = BEAD_ID,
490            segment_id,
491            acked_stores = receipt.acked_stores,
492            "segment uploaded to L3"
493        );
494
495        if cx.is_cancel_requested() || cx.checkpoint().is_err() {
496            warn!(
497                bead_id = BEAD_ID,
498                segment_id, "eviction cancelled; retaining local segment"
499            );
500            return Ok(EvictionOutcome {
501                phase: EvictionPhase::CompensatedCancelled,
502                evicted: false,
503                local_retained: true,
504                upload_receipt: receipt,
505            });
506        }
507
508        if !remote.segment_recoverable(segment_id, min_symbols_per_object) {
509            warn!(
510                bead_id = BEAD_ID,
511                segment_id,
512                min_symbols_per_object,
513                "eviction precondition failed; retaining local segment"
514            );
515            return Ok(EvictionOutcome {
516                phase: EvictionPhase::CompensatedPrecondition,
517                evicted: false,
518                local_retained: true,
519                upload_receipt: receipt,
520            });
521        }
522
523        let _removed = self.l2_segments.remove(&segment_id);
524        info!(bead_id = BEAD_ID, segment_id, "segment evicted from L2");
525        Ok(EvictionOutcome {
526            phase: EvictionPhase::Retired,
527            evicted: true,
528            local_retained: false,
529            upload_receipt: receipt,
530        })
531    }
532
533    fn write_back_repairs(&mut self, local: &[SymbolRecord], fetched: &[SymbolRecord]) -> usize {
534        let local_by_esi: BTreeMap<u32, &SymbolRecord> =
535            local.iter().map(|record| (record.esi, record)).collect();
536        let mut repairs_by_esi = BTreeMap::<u32, SymbolRecord>::new();
537        for record in fetched {
538            let needs_repair = match local_by_esi.get(&record.esi) {
539                None => true,
540                Some(existing) => !existing.verify_integrity() && record.verify_integrity(),
541            };
542            if needs_repair {
543                merge_symbol_record_by_esi(&mut repairs_by_esi, record.clone());
544            }
545        }
546        let repairs: Vec<SymbolRecord> = repairs_by_esi.into_values().collect();
547        if repairs.is_empty() {
548            return 0;
549        }
550        let added = repairs.len();
551        let segment = self
552            .l2_segments
553            .entry(self.write_back_segment_id)
554            .or_default();
555
556        // Efficiently merge new repairs into the existing segment.
557        // We use a BTreeMap keyed by (ObjectId, ESI) to deduplicate.
558        let mut deduped = BTreeMap::<(ObjectId, u32), SymbolRecord>::new();
559        for record in segment.drain(..) {
560            deduped.insert((record.object_id, record.esi), record);
561        }
562        for record in repairs {
563            let key = (record.object_id, record.esi);
564            match deduped.entry(key) {
565                std::collections::btree_map::Entry::Vacant(entry) => {
566                    entry.insert(record);
567                }
568                std::collections::btree_map::Entry::Occupied(mut entry) => {
569                    if prefer_symbol_record(entry.get(), &record) {
570                        entry.insert(record);
571                    }
572                }
573            }
574        }
575        *segment = deduped.into_values().collect();
576        added
577    }
578
579    fn record_decode_proof(&mut self, proof: EcsDecodeProof) {
580        self.decode_audit_seq = self.decode_audit_seq.saturating_add(1);
581        self.decode_audit.push(DecodeAuditEntry {
582            seq: self.decode_audit_seq,
583            object_id: proof.object_id,
584            decode_success: proof.decode_success,
585            proof,
586        });
587    }
588}
589
590fn preferred_source_esis(oti: Option<Oti>) -> Vec<u32> {
591    let Some(oti) = oti else {
592        return Vec::new();
593    };
594    let Ok(source_symbols) = source_symbol_count(oti) else {
595        return Vec::new();
596    };
597    let max_u32 = usize::try_from(u32::MAX).unwrap_or(usize::MAX);
598    let capped = source_symbols.min(max_u32);
599    let mut esis = Vec::with_capacity(capped);
600    for idx in 0..capped {
601        if let Ok(esi) = u32::try_from(idx) {
602            esis.push(esi);
603        }
604    }
605    esis
606}
607
608fn derive_fetch_key(object_id: ObjectId, preferred_esis: &[u32], ecs_epoch: u64) -> IdempotencyKey {
609    let mut bytes = Vec::with_capacity(16 + preferred_esis.len() * 4);
610    bytes.extend_from_slice(object_id.as_bytes());
611    for esi in preferred_esis {
612        bytes.extend_from_slice(&esi.to_le_bytes());
613    }
614    IdempotencyKey::derive(ecs_epoch, &bytes)
615}
616
617fn segment_request_bytes(segment_id: u64, records: &[SymbolRecord]) -> Vec<u8> {
618    let payload_bytes = records.iter().fold(0_usize, |acc, record| {
619        acc.saturating_add(record.to_bytes().len())
620    });
621    let mut bytes = Vec::with_capacity(16 + payload_bytes + records.len().saturating_mul(8));
622    bytes.extend_from_slice(&segment_id.to_le_bytes());
623    bytes.extend_from_slice(
624        &u64::try_from(records.len())
625            .unwrap_or(u64::MAX)
626            .to_le_bytes(),
627    );
628    for record in records {
629        let record_bytes = record.to_bytes();
630        bytes.extend_from_slice(
631            &u64::try_from(record_bytes.len())
632                .unwrap_or(u64::MAX)
633                .to_le_bytes(),
634        );
635        bytes.extend_from_slice(&record_bytes);
636    }
637    bytes
638}
639
640fn derive_evict_key(segment_id: u64, records: &[SymbolRecord], ecs_epoch: u64) -> IdempotencyKey {
641    let request_bytes = segment_request_bytes(segment_id, records);
642    IdempotencyKey::derive(ecs_epoch, &request_bytes)
643}
644
645fn prefer_symbol_record(existing: &SymbolRecord, candidate: &SymbolRecord) -> bool {
646    !existing.verify_integrity() && candidate.verify_integrity()
647}
648
649fn merge_symbol_record_by_esi(by_esi: &mut BTreeMap<u32, SymbolRecord>, record: SymbolRecord) {
650    merge_symbol_record_by_key(by_esi, record.esi, record);
651}
652
653fn merge_symbol_record_by_key<K: Ord>(
654    map: &mut BTreeMap<K, SymbolRecord>,
655    key: K,
656    record: SymbolRecord,
657) {
658    match map.entry(key) {
659        std::collections::btree_map::Entry::Vacant(entry) => {
660            entry.insert(record);
661        }
662        std::collections::btree_map::Entry::Occupied(mut entry) => {
663            if prefer_symbol_record(entry.get(), &record) {
664                entry.insert(record);
665            }
666        }
667    }
668}
669
670fn merge_symbol_sets(local: &[SymbolRecord], fetched: &[SymbolRecord]) -> Vec<SymbolRecord> {
671    let mut by_esi = BTreeMap::<u32, SymbolRecord>::new();
672    for record in local {
673        merge_symbol_record_by_esi(&mut by_esi, record.clone());
674    }
675    for record in fetched {
676        merge_symbol_record_by_esi(&mut by_esi, record.clone());
677    }
678    by_esi.into_values().collect()
679}
680
681#[derive(Debug, Clone)]
682struct HybridRecoverResult {
683    bytes: Vec<u8>,
684    read_path: SymbolReadPath,
685    decode_proof: Option<EcsDecodeProof>,
686}
687
688#[derive(Debug, Clone)]
689struct FallbackDecodeSuccess {
690    bytes: Vec<u8>,
691    proof: EcsDecodeProof,
692}
693
694#[derive(Debug, Clone)]
695struct FallbackDecodeFailure {
696    reason: String,
697    proof: EcsDecodeProof,
698}
699
700impl std::fmt::Display for FallbackDecodeFailure {
701    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
702        f.write_str(&self.reason)
703    }
704}
705
706#[derive(Debug, Clone)]
707struct FallbackSymbolEvidence {
708    object_id: ObjectId,
709    source_symbols: usize,
710    symbol_size: usize,
711    transfer_len: usize,
712    accepted_by_esi: BTreeMap<u32, SymbolRecord>,
713    accepted_esis: Vec<u32>,
714    rejected_symbols: Vec<RejectedSymbol>,
715    symbol_digests: Vec<SymbolDigest>,
716}
717
718fn recover_object_hybrid(
719    records: &[SymbolRecord],
720) -> std::result::Result<HybridRecoverResult, Box<FallbackDecodeFailure>> {
721    match reconstruct_systematic_happy_path(records) {
722        Ok(bytes) => Ok(HybridRecoverResult {
723            bytes,
724            read_path: SymbolReadPath::SystematicFastPath,
725            decode_proof: None,
726        }),
727        Err(reason) => {
728            let fallback = fallback_decode_records(records, &reason)?;
729            Ok(HybridRecoverResult {
730                bytes: fallback.bytes,
731                read_path: SymbolReadPath::FullDecodeFallback { reason },
732                decode_proof: Some(fallback.proof),
733            })
734        }
735    }
736}
737
738fn fallback_decode_records(
739    records: &[SymbolRecord],
740    systematic_reason: &SystematicLayoutError,
741) -> std::result::Result<FallbackDecodeSuccess, Box<FallbackDecodeFailure>> {
742    let evidence = collect_fallback_symbol_evidence(records)?;
743    let k_source = u32::try_from(evidence.source_symbols).unwrap_or(u32::MAX);
744    let available_symbols = evidence.accepted_esis.len();
745    let required_symbols = evidence
746        .source_symbols
747        .saturating_add(DEFAULT_FALLBACK_DECODE_SLACK);
748    if available_symbols < required_symbols {
749        let detail = format!(
750            "systematic_reason={systematic_reason}; insufficient_symbols_for_fallback: available={available_symbols} required={required_symbols} slack_decode={DEFAULT_FALLBACK_DECODE_SLACK}"
751        );
752        return Err(FallbackDecodeFailure {
753            reason: detail,
754            proof: build_fallback_decode_proof_from_parts(
755                evidence.object_id,
756                k_source,
757                &evidence.accepted_esis,
758                &evidence.rejected_symbols,
759                &evidence.symbol_digests,
760                false,
761                Some(u32::try_from(available_symbols).unwrap_or(u32::MAX)),
762            ),
763        }
764        .into());
765    }
766
767    let mut out = Vec::with_capacity(evidence.source_symbols.saturating_mul(evidence.symbol_size));
768    for expected_esi in 0..evidence.source_symbols {
769        let expected_esi_u32 = u32::try_from(expected_esi).unwrap_or(u32::MAX);
770        let Some(record) = evidence.accepted_by_esi.get(&expected_esi_u32) else {
771            let detail = format!(
772                "systematic_reason={systematic_reason}; missing_source_symbol: esi={expected_esi_u32}"
773            );
774            return Err(FallbackDecodeFailure {
775                reason: detail,
776                proof: build_fallback_decode_proof_from_parts(
777                    evidence.object_id,
778                    k_source,
779                    &evidence.accepted_esis,
780                    &evidence.rejected_symbols,
781                    &evidence.symbol_digests,
782                    false,
783                    Some(u32::try_from(available_symbols).unwrap_or(u32::MAX)),
784                ),
785            }
786            .into());
787        };
788        out.extend_from_slice(&record.symbol_data);
789    }
790    out.truncate(evidence.transfer_len);
791
792    Ok(FallbackDecodeSuccess {
793        bytes: out,
794        proof: build_fallback_decode_proof_from_parts(
795            evidence.object_id,
796            k_source,
797            &evidence.accepted_esis,
798            &evidence.rejected_symbols,
799            &evidence.symbol_digests,
800            true,
801            Some(k_source),
802        ),
803    })
804}
805
806fn collect_fallback_symbol_evidence(
807    records: &[SymbolRecord],
808) -> std::result::Result<FallbackSymbolEvidence, Box<FallbackDecodeFailure>> {
809    let Some(first) = records.first() else {
810        return Err(FallbackDecodeFailure {
811            reason: String::from("empty_symbol_set"),
812            proof: build_fallback_decode_proof_from_parts(
813                ObjectId::from_bytes([0_u8; 16]),
814                0,
815                &[],
816                &[],
817                &[],
818                false,
819                Some(0),
820            ),
821        }
822        .into());
823    };
824
825    let source_symbols = source_symbol_count(first.oti).map_err(|err| {
826        Box::new(FallbackDecodeFailure {
827            reason: format!("invalid_source_symbol_count: {err}"),
828            proof: build_fallback_decode_proof_from_parts(
829                first.object_id,
830                0,
831                &[],
832                &[],
833                &[],
834                false,
835                Some(0),
836            ),
837        })
838    })?;
839
840    let symbol_size = usize::try_from(first.oti.t).map_err(|_| {
841        Box::new(FallbackDecodeFailure {
842            reason: String::from("invalid_symbol_size"),
843            proof: build_fallback_decode_proof_from_parts(
844                first.object_id,
845                u32::try_from(source_symbols).unwrap_or(u32::MAX),
846                &[],
847                &[],
848                &[],
849                false,
850                Some(0),
851            ),
852        })
853    })?;
854    let transfer_len = usize::try_from(first.oti.f).map_err(|_| {
855        Box::new(FallbackDecodeFailure {
856            reason: String::from("invalid_transfer_length"),
857            proof: build_fallback_decode_proof_from_parts(
858                first.object_id,
859                u32::try_from(source_symbols).unwrap_or(u32::MAX),
860                &[],
861                &[],
862                &[],
863                false,
864                Some(0),
865            ),
866        })
867    })?;
868
869    let mut ordered = records.to_vec();
870    ordered.sort_by_key(|record| record.esi);
871
872    let mut accepted_by_esi = BTreeMap::<u32, SymbolRecord>::new();
873    let mut rejected_symbols = Vec::new();
874    let mut symbol_digests = Vec::new();
875    for record in ordered {
876        let rejection = if record.object_id != first.object_id
877            || record.oti != first.oti
878            || record.symbol_data.len() != symbol_size
879        {
880            Some(SymbolRejectionReason::FormatViolation)
881        } else if !record.verify_integrity() {
882            Some(SymbolRejectionReason::HashMismatch)
883        } else if accepted_by_esi.contains_key(&record.esi) {
884            Some(SymbolRejectionReason::DuplicateEsi)
885        } else {
886            None
887        };
888
889        if let Some(reason) = rejection {
890            rejected_symbols.push(RejectedSymbol {
891                esi: record.esi,
892                reason,
893            });
894            continue;
895        }
896
897        symbol_digests.push(SymbolDigest {
898            esi: record.esi,
899            digest_xxh3: xxh3_64(&record.to_bytes()),
900        });
901        accepted_by_esi.insert(record.esi, record);
902    }
903
904    let accepted_esis = accepted_by_esi.keys().copied().collect();
905    symbol_digests.sort_by_key(|digest| digest.esi);
906
907    Ok(FallbackSymbolEvidence {
908        object_id: first.object_id,
909        source_symbols,
910        symbol_size,
911        transfer_len,
912        accepted_by_esi,
913        accepted_esis,
914        rejected_symbols,
915        symbol_digests,
916    })
917}
918
919fn build_fallback_decode_proof_from_parts(
920    object_id: ObjectId,
921    k_source: u32,
922    accepted_esis: &[u32],
923    rejected_symbols: &[RejectedSymbol],
924    symbol_digests: &[SymbolDigest],
925    decode_success: bool,
926    intermediate_rank: Option<u32>,
927) -> EcsDecodeProof {
928    let seed = deterministic_fallback_seed(object_id, k_source);
929    let timing_ns = deterministic_fallback_timing_ns(
930        object_id,
931        k_source,
932        accepted_esis,
933        rejected_symbols,
934        decode_success,
935    );
936    let proof = EcsDecodeProof::from_esis(
937        object_id,
938        k_source,
939        accepted_esis,
940        decode_success,
941        intermediate_rank,
942        timing_ns,
943        seed,
944    );
945    proof
946        .with_rejected_symbols(rejected_symbols.to_vec())
947        .with_symbol_digests(symbol_digests.to_vec())
948}
949
950fn deterministic_fallback_seed(object_id: ObjectId, k_source: u32) -> u64 {
951    let mut material = Vec::with_capacity(40);
952    material.extend_from_slice(b"fsqlite:tiered:fallback:seed:v1");
953    material.extend_from_slice(object_id.as_bytes());
954    material.extend_from_slice(&k_source.to_le_bytes());
955    xxh3_64(&material)
956}
957
958fn deterministic_fallback_timing_ns(
959    object_id: ObjectId,
960    k_source: u32,
961    accepted_esis: &[u32],
962    rejected_symbols: &[RejectedSymbol],
963    decode_success: bool,
964) -> u64 {
965    let mut material =
966        Vec::with_capacity(48 + accepted_esis.len() * 4 + rejected_symbols.len() * 5);
967    material.extend_from_slice(b"fsqlite:tiered:fallback:timing:v1");
968    material.extend_from_slice(object_id.as_bytes());
969    material.extend_from_slice(&k_source.to_le_bytes());
970    material.push(u8::from(decode_success));
971    for esi in accepted_esis {
972        material.extend_from_slice(&esi.to_le_bytes());
973    }
974    for item in rejected_symbols {
975        material.extend_from_slice(&item.esi.to_le_bytes());
976        material.push(rejection_reason_code(item.reason));
977    }
978    xxh3_64(&material)
979}
980
981fn rejection_reason_code(reason: SymbolRejectionReason) -> u8 {
982    match reason {
983        SymbolRejectionReason::HashMismatch => 1,
984        SymbolRejectionReason::InvalidAuthTag => 2,
985        SymbolRejectionReason::DuplicateEsi => 3,
986        SymbolRejectionReason::FormatViolation => 4,
987    }
988}
989
990#[cfg(test)]
991mod tests {
992    use std::collections::{BTreeSet, HashMap};
993    use std::time::Duration;
994
995    use fsqlite_types::cx::{Cx, cap};
996    use fsqlite_types::{ObjectId, Oti, SymbolRecordFlags};
997    use proptest::prelude::*;
998    use proptest::test_runner::{Config as ProptestConfig, RngAlgorithm, RngSeed, TestRunner};
999
1000    use super::*;
1001
1002    #[derive(Debug, Default)]
1003    struct MockRemoteTier {
1004        object_symbols: HashMap<ObjectId, Vec<SymbolRecord>>,
1005        segment_symbols: HashMap<u64, Vec<SymbolRecord>>,
1006        upload_receipts: HashMap<(u64, IdempotencyKey), UploadSegmentReceipt>,
1007        segment_recoverability_overrides: HashMap<u64, bool>,
1008        upload_calls: usize,
1009        fetch_calls: usize,
1010        configured_acks: u32,
1011        cancel_after_upload: Option<Cx<cap::All>>,
1012        cancel_after_fetch: Option<Cx<cap::All>>,
1013        last_fetch_preferred: Vec<u32>,
1014    }
1015
1016    impl MockRemoteTier {
1017        fn set_object_symbols(&mut self, object_id: ObjectId, records: Vec<SymbolRecord>) {
1018            self.object_symbols.insert(object_id, records);
1019        }
1020
1021        fn set_acked_stores(&mut self, acked_stores: u32) {
1022            self.configured_acks = acked_stores;
1023        }
1024
1025        fn set_segment_recoverable(&mut self, segment_id: u64, recoverable: bool) {
1026            self.segment_recoverability_overrides
1027                .insert(segment_id, recoverable);
1028        }
1029
1030        fn set_cancel_after_upload(&mut self, cx: Cx<cap::All>) {
1031            self.cancel_after_upload = Some(cx);
1032        }
1033
1034        fn upload_calls(&self) -> usize {
1035            self.upload_calls
1036        }
1037
1038        fn fetch_calls(&self) -> usize {
1039            self.fetch_calls
1040        }
1041    }
1042
1043    impl RemoteTier for MockRemoteTier {
1044        fn fetch_symbols(&mut self, request: &FetchSymbolsRequest) -> Result<Vec<SymbolRecord>> {
1045            self.fetch_calls = self.fetch_calls.saturating_add(1);
1046            self.last_fetch_preferred = request.preferred_esis.clone();
1047            let Some(records) = self.object_symbols.get(&request.object_id) else {
1048                return Ok(Vec::new());
1049            };
1050
1051            let preferred: BTreeSet<u32> = request.preferred_esis.iter().copied().collect();
1052            let mut ordered = records.clone();
1053            ordered.sort_by_key(|record| (!preferred.contains(&record.esi), record.esi));
1054            ordered.truncate(request.max_symbols);
1055            if let Some(cx) = self.cancel_after_fetch.take() {
1056                cx.cancel();
1057            }
1058            Ok(ordered)
1059        }
1060
1061        fn upload_segment(
1062            &mut self,
1063            request: &UploadSegmentRequest,
1064        ) -> Result<UploadSegmentReceipt> {
1065            let key = (request.segment_id, request.idempotency_key);
1066            if let Some(existing) = self.upload_receipts.get(&key).copied() {
1067                return Ok(UploadSegmentReceipt {
1068                    deduplicated: true,
1069                    ..existing
1070                });
1071            }
1072
1073            self.upload_calls = self.upload_calls.saturating_add(1);
1074            self.segment_symbols
1075                .insert(request.segment_id, request.records.clone());
1076
1077            for record in &request.records {
1078                let entry = self.object_symbols.entry(record.object_id).or_default();
1079                if entry.iter().all(|existing| existing.esi != record.esi) {
1080                    entry.push(record.clone());
1081                }
1082                entry.sort_by_key(|existing| existing.esi);
1083            }
1084
1085            let receipt = UploadSegmentReceipt {
1086                acked_stores: self.configured_acks,
1087                deduplicated: false,
1088            };
1089            self.upload_receipts.insert(key, receipt);
1090
1091            if let Some(cx) = self.cancel_after_upload.take() {
1092                cx.cancel();
1093            }
1094
1095            Ok(receipt)
1096        }
1097
1098        fn segment_recoverable(&self, segment_id: u64, min_symbols_per_object: usize) -> bool {
1099            if let Some(override_value) = self.segment_recoverability_overrides.get(&segment_id) {
1100                return *override_value;
1101            }
1102            let Some(records) = self.segment_symbols.get(&segment_id) else {
1103                return false;
1104            };
1105            let mut per_object = HashMap::<ObjectId, usize>::new();
1106            for record in records {
1107                let entry = per_object.entry(record.object_id).or_insert(0);
1108                *entry = entry.saturating_add(1);
1109            }
1110            per_object
1111                .values()
1112                .all(|count| *count >= min_symbols_per_object)
1113        }
1114    }
1115
1116    fn object_id_from_u64(raw: u64) -> ObjectId {
1117        let mut bytes = [0_u8; 16];
1118        bytes[0..8].copy_from_slice(&raw.to_le_bytes());
1119        bytes[8..16].copy_from_slice(&raw.to_le_bytes());
1120        ObjectId::from_bytes(bytes)
1121    }
1122
1123    fn remote_cap(seed: u8) -> RemoteCap {
1124        RemoteCap::from_bytes([seed; 16])
1125    }
1126
1127    fn make_symbol_records(
1128        object_id: ObjectId,
1129        payload: &[u8],
1130        symbol_size: usize,
1131        repair_symbols: usize,
1132    ) -> Vec<SymbolRecord> {
1133        let symbol_size_u32 = u32::try_from(symbol_size).expect("symbol_size fits u32");
1134        let transfer_len_u64 = u64::try_from(payload.len()).expect("payload len fits u64");
1135        let oti = Oti {
1136            f: transfer_len_u64,
1137            al: 1,
1138            t: symbol_size_u32,
1139            z: 1,
1140            n: 1,
1141        };
1142
1143        let source_symbols = payload.len().div_ceil(symbol_size);
1144        let mut out = Vec::new();
1145        for idx in 0..source_symbols {
1146            let start = idx * symbol_size;
1147            let end = (start + symbol_size).min(payload.len());
1148            let mut symbol = vec![0_u8; symbol_size];
1149            symbol[..end - start].copy_from_slice(&payload[start..end]);
1150            let esi = u32::try_from(idx).expect("source esi fits u32");
1151            let flags = if idx == 0 {
1152                SymbolRecordFlags::SYSTEMATIC_RUN_START
1153            } else {
1154                SymbolRecordFlags::empty()
1155            };
1156            out.push(SymbolRecord::new(object_id, oti, esi, symbol, flags));
1157        }
1158
1159        for repair_idx in 0..repair_symbols {
1160            let repair_esi_usize = source_symbols.saturating_add(repair_idx);
1161            let esi = u32::try_from(repair_esi_usize).expect("repair esi fits u32");
1162            let mut symbol = vec![0_u8; symbol_size];
1163            let esi_low = u8::try_from(esi & 0xFF).expect("masked to u8");
1164            for (offset, byte) in symbol.iter_mut().enumerate() {
1165                let offset_low = u8::try_from(offset & 0xFF).expect("masked to u8");
1166                *byte = esi_low ^ offset_low;
1167            }
1168            out.push(SymbolRecord::new(
1169                object_id,
1170                oti,
1171                esi,
1172                symbol,
1173                SymbolRecordFlags::empty(),
1174            ));
1175        }
1176
1177        out
1178    }
1179
1180    fn rejected_esis_set(proof: &EcsDecodeProof) -> BTreeSet<u32> {
1181        proof
1182            .rejected_symbols
1183            .iter()
1184            .map(|entry| entry.esi)
1185            .collect()
1186    }
1187
1188    fn decode_proof_report_ok(proof: &EcsDecodeProof) -> bool {
1189        proof
1190            .verification_report(
1191                crate::decode_proofs::DecodeProofVerificationConfig::default(),
1192                &proof.symbol_digests,
1193                &proof.rejected_symbols,
1194            )
1195            .ok
1196    }
1197
1198    #[test]
1199    fn test_l3_fetch_requires_remote_cap() {
1200        let object_id = object_id_from_u64(1);
1201        let payload = b"tiered-fetch-requires-cap";
1202
1203        let mut local = make_symbol_records(object_id, payload, 8, 0);
1204        local.retain(|record| record.esi != 1);
1205
1206        let mut storage = TieredStorage::new(DurabilityMode::local());
1207        storage.insert_l2_segment(1, local);
1208
1209        let mut remote = MockRemoteTier::default();
1210        remote.set_object_symbols(object_id, make_symbol_records(object_id, payload, 8, 1));
1211
1212        let cx = Cx::<cap::All>::new();
1213        let result = storage.fetch_object(&cx, object_id, 7, Some(&mut remote), None);
1214        assert!(matches!(result, Err(FrankenError::AuthDenied)));
1215        assert_eq!(remote.fetch_calls(), 0);
1216    }
1217
1218    #[test]
1219    fn test_l3_upload_idempotency_key() {
1220        let object_id = object_id_from_u64(2);
1221        let payload = b"idempotent-upload";
1222        let records = make_symbol_records(object_id, payload, 8, 1);
1223
1224        let mut storage = TieredStorage::new(DurabilityMode::quorum(1, 3).expect("valid quorum"));
1225        let mut remote = MockRemoteTier::default();
1226        remote.set_acked_stores(2);
1227        let cx = Cx::<cap::All>::new();
1228        let cap = Some(remote_cap(9));
1229
1230        let request = CommitRequest::new(10, records, 11);
1231        let first = storage
1232            .commit_segment(&cx, request.clone(), Some(&mut remote), cap)
1233            .expect("first upload succeeds");
1234        let second = storage
1235            .commit_segment(&cx, request, Some(&mut remote), cap)
1236            .expect("second upload returns idempotent result");
1237
1238        assert_eq!(remote.upload_calls(), 1);
1239        let first_receipt = first
1240            .upload_receipt
1241            .expect("first commit has upload receipt");
1242        let second_receipt = second
1243            .upload_receipt
1244            .expect("second commit has upload receipt");
1245        assert!(!first_receipt.deduplicated);
1246        assert!(second_receipt.deduplicated);
1247    }
1248
1249    #[test]
1250    fn test_l3_upload_idempotency_key_changes_with_segment_contents() {
1251        let object_id = object_id_from_u64(22);
1252        let payload_a = b"idempotent-upload-a";
1253        let payload_b = b"idempotent-upload-b";
1254        let records_a = make_symbol_records(object_id, payload_a, 8, 1);
1255        let records_b = make_symbol_records(object_id, payload_b, 8, 1);
1256
1257        let request_a = CommitRequest::new(10, records_a, 11);
1258        let request_b = CommitRequest::new(10, records_b, 11);
1259
1260        assert_ne!(request_a.idempotency_key, request_b.idempotency_key);
1261        assert_ne!(request_a.saga, request_b.saga);
1262    }
1263
1264    #[test]
1265    fn test_evict_idempotency_key_changes_with_segment_contents() {
1266        let object_id = object_id_from_u64(23);
1267        let payload_a = b"evict-segment-a";
1268        let payload_b = b"evict-segment-b";
1269        let records_a = make_symbol_records(object_id, payload_a, 8, 1);
1270        let records_b = make_symbol_records(object_id, payload_b, 8, 1);
1271
1272        let key_a = derive_evict_key(40, &records_a, 12);
1273        let key_b = derive_evict_key(40, &records_b, 12);
1274
1275        assert_ne!(key_a, key_b);
1276    }
1277
1278    #[test]
1279    fn test_eviction_cancel_safety() {
1280        let object_id = object_id_from_u64(3);
1281        let payload = b"eviction-cancel-safety";
1282        let records = make_symbol_records(object_id, payload, 8, 1);
1283
1284        let mut storage = TieredStorage::new(DurabilityMode::local());
1285        storage.insert_l2_segment(20, records);
1286
1287        let cx = Cx::<cap::All>::new();
1288        let mut remote = MockRemoteTier::default();
1289        remote.set_acked_stores(3);
1290        remote.set_cancel_after_upload(cx.clone());
1291
1292        let outcome = storage
1293            .evict_segment(&cx, 20, 1, 50, &mut remote, Some(remote_cap(7)))
1294            .expect("eviction call succeeds");
1295
1296        assert_eq!(outcome.phase, EvictionPhase::CompensatedCancelled);
1297        assert!(!outcome.evicted);
1298        assert!(outcome.local_retained);
1299        assert!(storage.l2_segment_exists(20));
1300    }
1301
1302    #[test]
1303    fn test_eviction_precondition_check() {
1304        let object_id = object_id_from_u64(4);
1305        let payload = b"eviction-precondition-check";
1306        let records = make_symbol_records(object_id, payload, 8, 1);
1307
1308        let mut storage = TieredStorage::new(DurabilityMode::local());
1309        storage.insert_l2_segment(30, records);
1310
1311        let cx = Cx::<cap::All>::new();
1312        let mut remote = MockRemoteTier::default();
1313        remote.set_acked_stores(3);
1314        remote.set_segment_recoverable(30, false);
1315
1316        let outcome = storage
1317            .evict_segment(&cx, 30, 2, 51, &mut remote, Some(remote_cap(8)))
1318            .expect("eviction call succeeds");
1319
1320        assert_eq!(outcome.phase, EvictionPhase::CompensatedPrecondition);
1321        assert!(!outcome.evicted);
1322        assert!(outcome.local_retained);
1323        assert!(storage.l2_segment_exists(30));
1324    }
1325
1326    #[test]
1327    fn test_fetch_on_demand_systematic_fast_path() {
1328        let object_id = object_id_from_u64(5);
1329        let payload = b"systematic-fast-path";
1330        let records = make_symbol_records(object_id, payload, 8, 1);
1331
1332        let mut storage = TieredStorage::new(DurabilityMode::local());
1333        storage.insert_l2_segment(40, records);
1334
1335        let cx = Cx::<cap::All>::new();
1336        let outcome = storage
1337            .fetch_object(
1338                &cx,
1339                object_id,
1340                52,
1341                Option::<&mut MockRemoteTier>::None,
1342                None,
1343            )
1344            .expect("local fast-path fetch succeeds");
1345
1346        assert_eq!(outcome.bytes, payload);
1347        assert!(matches!(
1348            outcome.read_path,
1349            SymbolReadPath::SystematicFastPath
1350        ));
1351        assert!(!outcome.remote_used);
1352        assert_eq!(outcome.write_back_count, 0);
1353        assert!(outcome.decode_proof.is_none());
1354        assert!(storage.take_decode_audit_entries().is_empty());
1355    }
1356
1357    #[test]
1358    fn test_fast_path_repeated_reads_emit_no_decode_artifacts() {
1359        let object_id = object_id_from_u64(55);
1360        let payload = b"systematic-fast-path-repeat";
1361        let records = make_symbol_records(object_id, payload, 8, 1);
1362
1363        let mut storage = TieredStorage::new(DurabilityMode::local());
1364        storage.insert_l2_segment(405, records);
1365        let cx = Cx::<cap::All>::new();
1366
1367        for _ in 0..64 {
1368            let outcome = storage
1369                .fetch_object(
1370                    &cx,
1371                    object_id,
1372                    52,
1373                    Option::<&mut MockRemoteTier>::None,
1374                    None,
1375                )
1376                .expect("local fast-path fetch succeeds");
1377            assert!(matches!(
1378                outcome.read_path,
1379                SymbolReadPath::SystematicFastPath
1380            ));
1381            assert!(outcome.decode_proof.is_none());
1382            assert!(!outcome.remote_used);
1383        }
1384
1385        assert!(
1386            storage.take_decode_audit_entries().is_empty(),
1387            "fast path should never invoke fallback decoder/proof emission"
1388        );
1389    }
1390
1391    #[test]
1392    fn test_fetch_on_demand_repair_fallback() {
1393        let object_id = object_id_from_u64(6);
1394        let payload = b"repair-fallback-path";
1395        let mut full = make_symbol_records(object_id, payload, 8, 3);
1396        for record in &mut full {
1397            if record.esi == 0 {
1398                *record = SymbolRecord::new(
1399                    record.object_id,
1400                    record.oti,
1401                    record.esi,
1402                    record.symbol_data.clone(),
1403                    SymbolRecordFlags::empty(),
1404                );
1405            }
1406        }
1407
1408        let mut local_partial = full.clone();
1409        local_partial.retain(|record| record.esi == 0 || record.esi == 2);
1410
1411        let mut remote_repairs = full;
1412        remote_repairs.retain(|record| record.esi == 1 || record.esi >= 3);
1413
1414        let mut storage = TieredStorage::new(DurabilityMode::local());
1415        storage.insert_l2_segment(41, local_partial);
1416
1417        let mut remote = MockRemoteTier::default();
1418        remote.set_object_symbols(object_id, remote_repairs);
1419        let cx = Cx::<cap::All>::new();
1420
1421        let outcome = storage
1422            .fetch_object(&cx, object_id, 53, Some(&mut remote), Some(remote_cap(5)))
1423            .expect("fallback fetch succeeds");
1424
1425        assert!(matches!(
1426            outcome.read_path,
1427            SymbolReadPath::FullDecodeFallback { .. }
1428        ));
1429        assert_eq!(outcome.bytes, payload);
1430        assert!(outcome.remote_used);
1431        assert!(outcome.write_back_count > 0);
1432        assert!(outcome.decode_proof.is_some());
1433        assert_eq!(remote.last_fetch_preferred, vec![0, 1, 2]);
1434        assert!(storage.l2_segment_exists(storage.write_back_segment_id()));
1435        let audit = storage.take_decode_audit_entries();
1436        assert!(
1437            audit.iter().any(|entry| entry.decode_success),
1438            "expected at least one successful fallback proof"
1439        );
1440        assert!(
1441            audit.iter().any(|entry| !entry.decode_success),
1442            "expected local failure proof before remote fallback success"
1443        );
1444    }
1445
1446    #[test]
1447    fn test_fetch_repairs_corrupt_local_symbol_and_persists_healed_copy() {
1448        let object_id = object_id_from_u64(72);
1449        let payload = b"repair-corrupt-local-symbol";
1450        let full = make_symbol_records(object_id, payload, 8, 2);
1451
1452        let mut local = full.clone();
1453        let corrupt = local
1454            .iter_mut()
1455            .find(|record| record.esi == 1)
1456            .expect("seeded symbol set must contain ESI 1");
1457        if let Some(first) = corrupt.symbol_data.first_mut() {
1458            *first ^= 0x5A;
1459        }
1460        assert!(
1461            !corrupt.verify_integrity(),
1462            "test fixture must actually corrupt the local symbol"
1463        );
1464
1465        let mut storage = TieredStorage::new(DurabilityMode::local());
1466        storage.insert_l2_segment(420, local);
1467
1468        let mut remote = MockRemoteTier::default();
1469        remote.set_object_symbols(object_id, full.clone());
1470        let cx = Cx::<cap::All>::new();
1471
1472        let repaired = storage
1473            .fetch_object(&cx, object_id, 57, Some(&mut remote), Some(remote_cap(13)))
1474            .expect("remote repair should recover the object");
1475        assert_eq!(repaired.bytes, payload);
1476        assert!(repaired.remote_used);
1477        assert_eq!(repaired.write_back_count, 1);
1478
1479        let local_replay = storage
1480            .fetch_object(
1481                &cx,
1482                object_id,
1483                58,
1484                Option::<&mut MockRemoteTier>::None,
1485                None,
1486            )
1487            .expect("healed symbol should make later local-only reads succeed");
1488        assert_eq!(local_replay.bytes, payload);
1489        assert!(!local_replay.remote_used);
1490        assert!(
1491            storage
1492                .l2_records_for_object(object_id)
1493                .iter()
1494                .all(SymbolRecord::verify_integrity),
1495            "local self-healing must prefer the repaired symbol over the stale corrupt copy"
1496        );
1497    }
1498
1499    #[test]
1500    fn test_fetch_cancelled_after_remote_read_does_not_write_back_repairs() {
1501        let object_id = object_id_from_u64(73);
1502        let payload = b"cancel-after-remote-fetch";
1503        let full = make_symbol_records(object_id, payload, 8, 1);
1504        let mut local = full.clone();
1505        local.retain(|record| record.esi == 0);
1506
1507        let mut storage = TieredStorage::new(DurabilityMode::local());
1508        storage.insert_l2_segment(421, local);
1509
1510        let cx = Cx::<cap::All>::new();
1511        let mut remote = MockRemoteTier::default();
1512        remote.set_object_symbols(object_id, full);
1513        remote.cancel_after_fetch = Some(cx.clone());
1514
1515        let result =
1516            storage.fetch_object(&cx, object_id, 59, Some(&mut remote), Some(remote_cap(14)));
1517        assert!(matches!(result, Err(FrankenError::Busy)));
1518        assert_eq!(remote.fetch_calls(), 1);
1519        assert!(
1520            !storage.l2_segment_exists(storage.write_back_segment_id()),
1521            "cancelled fetch must not append repaired symbols into the write-back segment"
1522        );
1523    }
1524
1525    #[test]
1526    fn test_fetch_fallback_failure_emits_decode_proof() {
1527        let object_id = object_id_from_u64(66);
1528        let payload = b"fallback-threshold-failure";
1529        let mut full = make_symbol_records(object_id, payload, 8, 0);
1530        for record in &mut full {
1531            if record.esi == 0 {
1532                *record = SymbolRecord::new(
1533                    record.object_id,
1534                    record.oti,
1535                    record.esi,
1536                    record.symbol_data.clone(),
1537                    SymbolRecordFlags::empty(),
1538                );
1539            }
1540        }
1541
1542        let mut local_partial = full.clone();
1543        local_partial.retain(|record| record.esi == 0 || record.esi == 2);
1544        let mut remote_source = full;
1545        remote_source.retain(|record| record.esi == 1);
1546
1547        let mut storage = TieredStorage::new(DurabilityMode::local());
1548        storage.insert_l2_segment(416, local_partial);
1549
1550        let mut remote = MockRemoteTier::default();
1551        remote.set_object_symbols(object_id, remote_source);
1552        let cx = Cx::<cap::All>::new();
1553
1554        let result =
1555            storage.fetch_object(&cx, object_id, 54, Some(&mut remote), Some(remote_cap(6)));
1556        assert!(matches!(result, Err(FrankenError::DatabaseCorrupt { .. })));
1557        if let Err(FrankenError::DatabaseCorrupt { detail }) = result {
1558            assert!(
1559                detail.contains("insufficient_symbols_for_fallback"),
1560                "expected deterministic fallback-failure detail, got: {detail}"
1561            );
1562        }
1563        let audit = storage.take_decode_audit_entries();
1564        assert!(
1565            audit.iter().any(|entry| !entry.decode_success),
1566            "expected at least one failure proof artifact"
1567        );
1568        assert!(
1569            audit
1570                .iter()
1571                .any(|entry| !entry.decode_success && entry.proof.symbols_received.len() >= 2),
1572            "expected proof to capture available symbol cardinality"
1573        );
1574    }
1575
1576    #[test]
1577    fn test_write_back_segment_keeps_missing_symbols_per_object() {
1578        let object_a = object_id_from_u64(68);
1579        let payload_a = b"first-payload";
1580        let full_a = make_symbol_records(object_a, payload_a, 8, 0);
1581        let mut local_a = full_a.clone();
1582        local_a.retain(|record| record.esi == 0);
1583
1584        let object_b = object_id_from_u64(69);
1585        let payload_b = b"second-bytes!";
1586        let full_b = make_symbol_records(object_b, payload_b, 8, 0);
1587        let mut local_b = full_b.clone();
1588        local_b.retain(|record| record.esi == 0);
1589
1590        let mut storage = TieredStorage::new(DurabilityMode::local());
1591        storage.insert_l2_segment(418, local_a);
1592        storage.insert_l2_segment(419, local_b);
1593
1594        let mut remote = MockRemoteTier::default();
1595        remote.set_object_symbols(object_a, full_a);
1596        remote.set_object_symbols(object_b, full_b);
1597        let cx = Cx::<cap::All>::new();
1598
1599        let first_fetch = storage
1600            .fetch_object(&cx, object_a, 57, Some(&mut remote), Some(remote_cap(13)))
1601            .expect("first object fetch succeeds");
1602        assert!(first_fetch.remote_used);
1603        assert!(first_fetch.write_back_count > 0);
1604
1605        let second_fetch = storage
1606            .fetch_object(&cx, object_b, 58, Some(&mut remote), Some(remote_cap(13)))
1607            .expect("second object fetch succeeds");
1608        assert!(second_fetch.remote_used);
1609        assert!(second_fetch.write_back_count > 0);
1610
1611        let replay_a = storage
1612            .fetch_object(&cx, object_a, 59, Option::<&mut MockRemoteTier>::None, None)
1613            .expect("first object should remain recoverable from local write-back");
1614        assert_eq!(replay_a.bytes, payload_a);
1615        assert!(!replay_a.remote_used);
1616
1617        let replay_b = storage
1618            .fetch_object(&cx, object_b, 60, Option::<&mut MockRemoteTier>::None, None)
1619            .expect("second object should remain recoverable from local write-back");
1620        assert_eq!(replay_b.bytes, payload_b);
1621        assert!(!replay_b.remote_used);
1622    }
1623
1624    #[test]
1625    fn test_fallback_decode_proof_stable_for_same_inputs() {
1626        let run_once = || -> EcsDecodeProof {
1627            let object_id = object_id_from_u64(67);
1628            let payload = b"fallback-proof-stability";
1629            let mut full = make_symbol_records(object_id, payload, 8, 3);
1630            for record in &mut full {
1631                if record.esi == 0 {
1632                    *record = SymbolRecord::new(
1633                        record.object_id,
1634                        record.oti,
1635                        record.esi,
1636                        record.symbol_data.clone(),
1637                        SymbolRecordFlags::empty(),
1638                    );
1639                }
1640            }
1641
1642            let mut local_partial = full.clone();
1643            local_partial.retain(|record| record.esi == 0 || record.esi == 2);
1644            let mut remote_repairs = full;
1645            remote_repairs.retain(|record| record.esi == 1 || record.esi >= 3);
1646
1647            let mut storage = TieredStorage::new(DurabilityMode::local());
1648            storage.insert_l2_segment(417, local_partial);
1649
1650            let mut remote = MockRemoteTier::default();
1651            remote.set_object_symbols(object_id, remote_repairs);
1652            let cx = Cx::<cap::All>::new();
1653
1654            let outcome = storage
1655                .fetch_object(&cx, object_id, 55, Some(&mut remote), Some(remote_cap(7)))
1656                .expect("fallback fetch succeeds");
1657            assert!(matches!(
1658                outcome.read_path,
1659                SymbolReadPath::FullDecodeFallback { .. }
1660            ));
1661            outcome
1662                .decode_proof
1663                .expect("fallback-success path should emit decode proof")
1664        };
1665
1666        let proof_a = run_once();
1667        let proof_b = run_once();
1668        assert_eq!(
1669            proof_a, proof_b,
1670            "proof artifacts must be stable for identical fallback input sets"
1671        );
1672    }
1673
1674    #[test]
1675    fn test_symbolrecord_corruption_erasures_seeded_property() {
1676        let mut runner = TestRunner::new(ProptestConfig {
1677            cases: 96,
1678            failure_persistence: None,
1679            rng_algorithm: RngAlgorithm::ChaCha,
1680            rng_seed: RngSeed::Fixed(0x0BAD_C0DE_u64),
1681            ..ProptestConfig::default()
1682        });
1683
1684        let strategy = (
1685            prop::collection::vec(proptest::num::u8::ANY, 17..96),
1686            prop::collection::vec(0_u8..7, 0..4),
1687            prop::collection::vec(0_u8..7, 0..4),
1688        );
1689
1690        runner
1691            .run(&strategy, |(payload, dropped_raw, corrupted_raw)| {
1692                let object_id = object_id_from_u64(77);
1693                let full = make_symbol_records(object_id, &payload, 8, 4);
1694
1695                let dropped: BTreeSet<u32> = dropped_raw.into_iter().map(u32::from).collect();
1696                let corrupted: BTreeSet<u32> = corrupted_raw.into_iter().map(u32::from).collect();
1697
1698                let mut remote_records = Vec::new();
1699                let mut expected_rejected = BTreeSet::new();
1700                let mut accepted_esis = BTreeSet::new();
1701
1702                for mut record in full {
1703                    if dropped.contains(&record.esi) {
1704                        continue;
1705                    }
1706                    // Always force the non-systematic fallback path while preserving
1707                    // source-symbol availability semantics for success/failure checks.
1708                    if record.esi == 0 {
1709                        record = SymbolRecord::new(
1710                            record.object_id,
1711                            record.oti,
1712                            record.esi,
1713                            record.symbol_data.clone(),
1714                            SymbolRecordFlags::empty(),
1715                        );
1716                    }
1717                    if corrupted.contains(&record.esi) {
1718                        if let Some(first) = record.symbol_data.first_mut() {
1719                            *first ^= 0x5A;
1720                        }
1721                        expected_rejected.insert(record.esi);
1722                    } else {
1723                        accepted_esis.insert(record.esi);
1724                    }
1725                    remote_records.push(record);
1726                }
1727
1728                let source_symbols = payload.len().div_ceil(8);
1729                let required_symbols = source_symbols.saturating_add(DEFAULT_FALLBACK_DECODE_SLACK);
1730                let has_complete_source_run = (0..source_symbols).all(|index| {
1731                    let esi = u32::try_from(index).expect("source index fits in u32");
1732                    accepted_esis.contains(&esi)
1733                });
1734                let expect_success =
1735                    accepted_esis.len() >= required_symbols && has_complete_source_run;
1736
1737                let mut storage = TieredStorage::new(DurabilityMode::local());
1738                let mut remote = MockRemoteTier::default();
1739                remote.set_object_symbols(object_id, remote_records);
1740                let cx = Cx::<cap::All>::new();
1741
1742                let result = storage.fetch_object(
1743                    &cx,
1744                    object_id,
1745                    56,
1746                    Some(&mut remote),
1747                    Some(remote_cap(12)),
1748                );
1749
1750                if expect_success {
1751                    let outcome =
1752                        result.expect("decode should succeed when enough valid symbols remain");
1753                    let used_fallback =
1754                        matches!(outcome.read_path, SymbolReadPath::FullDecodeFallback { .. });
1755                    prop_assert!(used_fallback);
1756                    prop_assert_eq!(outcome.bytes, payload);
1757                    let proof = outcome
1758                        .decode_proof
1759                        .expect("fallback-success path should emit decode proof");
1760                    prop_assert!(proof.decode_success);
1761                    prop_assert_eq!(rejected_esis_set(&proof), expected_rejected);
1762                    prop_assert!(decode_proof_report_ok(&proof));
1763                } else {
1764                    let is_corrupt_error =
1765                        matches!(result, Err(FrankenError::DatabaseCorrupt { .. }));
1766                    prop_assert!(is_corrupt_error);
1767                    let audit = storage.take_decode_audit_entries();
1768                    let Some(failure_entry) = audit.iter().find(|entry| !entry.decode_success)
1769                    else {
1770                        return Err(proptest::test_runner::TestCaseError::fail(
1771                            "expected failure decode proof artifact",
1772                        ));
1773                    };
1774                    prop_assert_eq!(rejected_esis_set(&failure_entry.proof), expected_rejected);
1775                    prop_assert!(decode_proof_report_ok(&failure_entry.proof));
1776                }
1777
1778                Ok(())
1779            })
1780            .expect("seeded SymbolRecord corruption property should hold");
1781    }
1782
1783    #[test]
1784    fn test_durability_mode_local_no_remote() {
1785        let object_id = object_id_from_u64(7);
1786        let payload = b"local-durability-no-remote";
1787        let records = make_symbol_records(object_id, payload, 8, 1);
1788
1789        let mut storage = TieredStorage::new(DurabilityMode::local());
1790        let mut remote = MockRemoteTier::default();
1791        remote.set_acked_stores(3);
1792        let cx = Cx::<cap::All>::new();
1793        let request = CommitRequest::new(50, records, 60);
1794
1795        let outcome = storage
1796            .commit_segment(&cx, request, Some(&mut remote), Some(remote_cap(4)))
1797            .expect("local durability commit succeeds");
1798
1799        assert!(!outcome.remote_io);
1800        assert_eq!(remote.upload_calls(), 0);
1801        assert!(storage.l2_segment_exists(50));
1802    }
1803
1804    #[test]
1805    fn test_durability_mode_quorum_requires_ack() {
1806        let object_id = object_id_from_u64(8);
1807        let payload = b"quorum-durability";
1808        let records = make_symbol_records(object_id, payload, 8, 1);
1809
1810        let mut storage = TieredStorage::new(DurabilityMode::quorum(2, 3).expect("valid quorum"));
1811        let mut remote = MockRemoteTier::default();
1812        let cx = Cx::<cap::All>::new();
1813        let cap = Some(remote_cap(10));
1814
1815        remote.set_acked_stores(1);
1816        let req_fail = CommitRequest::new(60, records.clone(), 61);
1817        let fail = storage.commit_segment(&cx, req_fail, Some(&mut remote), cap);
1818        assert!(matches!(fail, Err(FrankenError::Busy)));
1819        assert!(storage.l2_segment_exists(60));
1820
1821        remote.set_acked_stores(2);
1822        let req_ok = CommitRequest::new(61, records, 62);
1823        let ok = storage
1824            .commit_segment(&cx, req_ok, Some(&mut remote), cap)
1825            .expect("quorum commit succeeds after sufficient ACKs");
1826        assert!(ok.remote_io);
1827        assert!(storage.l2_segment_exists(61));
1828    }
1829
1830    #[test]
1831    fn test_tiered_storage_remote_admission_cancelled_while_waiting() {
1832        let object_id = object_id_from_u64(9);
1833        let payload = b"tiered-admission-cancel";
1834        let records = make_symbol_records(object_id, payload, 8, 1);
1835        let mut storage = TieredStorage::new_with_remote_executor(
1836            DurabilityMode::quorum(1, 2).expect("valid quorum"),
1837            RemoteAdmissionExecutor::with_limits(
1838                crate::remote_effects::TIERED_STORAGE_EXECUTOR_NAME,
1839                1,
1840                1,
1841                Duration::from_secs(1),
1842            )
1843            .expect("executor config valid"),
1844        );
1845        let held_executor = Arc::clone(&storage.remote_executor);
1846        let _held_permit = held_executor
1847            .try_acquire_for_testing()
1848            .expect("first permit must saturate admission");
1849
1850        let cx = Cx::<cap::All>::new();
1851        let canceller = {
1852            let cancel_cx = cx.clone();
1853            std::thread::spawn(move || {
1854                std::thread::sleep(Duration::from_millis(20));
1855                cancel_cx.cancel();
1856            })
1857        };
1858
1859        let mut remote = MockRemoteTier::default();
1860        remote.set_acked_stores(2);
1861        let request = CommitRequest::new(70, records, 71);
1862        let result = storage.commit_segment(&cx, request, Some(&mut remote), Some(remote_cap(12)));
1863
1864        canceller.join().expect("canceller thread joins");
1865
1866        assert!(matches!(result, Err(FrankenError::Busy)));
1867        assert_eq!(remote.upload_calls(), 0);
1868        assert!(storage.l2_segment_exists(70));
1869    }
1870
1871    #[test]
1872    fn test_e2e_tiered_storage_evict_and_fetch() {
1873        let mut storage = TieredStorage::new(DurabilityMode::local());
1874        let mut remote = MockRemoteTier::default();
1875        remote.set_acked_stores(3);
1876        let cx = Cx::<cap::All>::new();
1877        let cap = Some(remote_cap(11));
1878
1879        let mut expected = HashMap::<ObjectId, Vec<u8>>::new();
1880        for idx in 0_u64..500_u64 {
1881            let segment_id = idx + 1;
1882            let object_id = object_id_from_u64(10_000 + idx);
1883            let payload = format!("commit-{segment_id:04}-payload").into_bytes();
1884            let records = make_symbol_records(object_id, &payload, 16, 2);
1885            storage.insert_l2_segment(segment_id, records);
1886            expected.insert(object_id, payload);
1887        }
1888
1889        for segment_id in 1_u64..=500_u64 {
1890            let outcome = storage
1891                .evict_segment(&cx, segment_id, 1, 70, &mut remote, cap)
1892                .expect("eviction succeeds");
1893            assert_eq!(outcome.phase, EvictionPhase::Retired);
1894            assert!(outcome.evicted);
1895        }
1896        assert_eq!(storage.l2_segment_count(), 0);
1897
1898        let target_object = object_id_from_u64(10_321);
1899        let outcome = storage
1900            .fetch_object(&cx, target_object, 71, Some(&mut remote), cap)
1901            .expect("remote fetch after eviction succeeds");
1902        assert_eq!(
1903            outcome.bytes,
1904            expected
1905                .get(&target_object)
1906                .expect("target payload available")
1907                .clone()
1908        );
1909        assert!(outcome.remote_used);
1910        assert!(storage.l2_segment_exists(storage.write_back_segment_id()));
1911        assert!(outcome.write_back_count > 0);
1912    }
1913}