1use std::collections::BTreeMap;
16use std::sync::Arc;
17
18use fsqlite_error::{FrankenError, Result};
19use fsqlite_types::cx::{Cx, cap};
20use fsqlite_types::{
21 IdempotencyKey, ObjectId, Oti, RemoteCap, Saga, SymbolReadPath, SymbolRecord,
22 SystematicLayoutError, reconstruct_systematic_happy_path, source_symbol_count,
23};
24use tracing::{debug, info, warn};
25use xxhash_rust::xxh3::xxh3_64;
26
27use crate::decode_proofs::{EcsDecodeProof, RejectedSymbol, SymbolDigest, SymbolRejectionReason};
28use crate::remote_effects::Executor as RemoteAdmissionExecutor;
29
30const BEAD_ID: &str = "bd-1hi.29";
31const FETCH_SYMBOLS_COMPUTATION: &str = "fsqlite:tiered:fetch_symbols:v1";
32const UPLOAD_SEGMENT_COMPUTATION: &str = "fsqlite:tiered:upload_segment:v1";
33const DEFAULT_WRITE_BACK_SEGMENT_ID: u64 = u64::MAX - 1;
34const DEFAULT_FALLBACK_DECODE_SLACK: usize = 2;
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38pub enum DurabilityMode {
39 Local,
41 Quorum { required: u32, total: u32 },
43}
44
45impl DurabilityMode {
46 #[must_use]
48 pub const fn local() -> Self {
49 Self::Local
50 }
51
52 pub fn quorum(required: u32, total: u32) -> Result<Self> {
54 if required == 0 || required > total {
55 return Err(FrankenError::OutOfRange {
56 what: "durability quorum".to_owned(),
57 value: format!("required={required}, total={total}"),
58 });
59 }
60 Ok(Self::Quorum { required, total })
61 }
62
63 #[must_use]
64 pub const fn requires_remote(self) -> bool {
65 matches!(self, Self::Quorum { .. })
66 }
67
68 #[must_use]
69 pub const fn quorum_satisfied(self, acked_stores: u32) -> bool {
70 match self {
71 Self::Local => true,
72 Self::Quorum { required, .. } => acked_stores >= required,
73 }
74 }
75}
76
77#[derive(Debug, Clone, PartialEq, Eq)]
79pub struct FetchSymbolsRequest {
80 pub object_id: ObjectId,
81 pub preferred_esis: Vec<u32>,
82 pub max_symbols: usize,
83 pub idempotency_key: IdempotencyKey,
84 pub ecs_epoch: u64,
85 pub remote_cap: RemoteCap,
86 pub computation: &'static str,
87}
88
89#[derive(Debug, Clone, PartialEq, Eq)]
91pub struct UploadSegmentRequest {
92 pub segment_id: u64,
93 pub records: Vec<SymbolRecord>,
94 pub idempotency_key: IdempotencyKey,
95 pub saga: Saga,
96 pub ecs_epoch: u64,
97 pub remote_cap: RemoteCap,
98 pub computation: &'static str,
99}
100
101#[derive(Debug, Clone, Copy, PartialEq, Eq)]
103pub struct UploadSegmentReceipt {
104 pub acked_stores: u32,
105 pub deduplicated: bool,
106}
107
108pub trait RemoteTier {
110 fn fetch_symbols(&mut self, request: &FetchSymbolsRequest) -> Result<Vec<SymbolRecord>>;
112
113 fn upload_segment(&mut self, request: &UploadSegmentRequest) -> Result<UploadSegmentReceipt>;
115
116 fn segment_recoverable(&self, segment_id: u64, min_symbols_per_object: usize) -> bool;
118}
119
120#[derive(Debug, Clone, PartialEq, Eq)]
122pub struct CommitRequest {
123 pub segment_id: u64,
124 pub records: Vec<SymbolRecord>,
125 pub idempotency_key: IdempotencyKey,
126 pub saga: Saga,
127 pub ecs_epoch: u64,
128}
129
130impl CommitRequest {
131 #[must_use]
133 pub fn new(segment_id: u64, records: Vec<SymbolRecord>, ecs_epoch: u64) -> Self {
134 let request_bytes = segment_request_bytes(segment_id, &records);
135 let idempotency_key = IdempotencyKey::derive(ecs_epoch, &request_bytes);
136 let saga = Saga::new(idempotency_key);
137 Self {
138 segment_id,
139 records,
140 idempotency_key,
141 saga,
142 ecs_epoch,
143 }
144 }
145}
146
147#[derive(Debug, Clone, Copy, PartialEq, Eq)]
149pub struct CommitOutcome {
150 pub remote_io: bool,
151 pub upload_receipt: Option<UploadSegmentReceipt>,
152}
153
154#[derive(Debug, Clone, PartialEq, Eq)]
156pub struct FetchOutcome {
157 pub bytes: Vec<u8>,
158 pub read_path: SymbolReadPath,
159 pub remote_used: bool,
160 pub write_back_count: usize,
161 pub decode_proof: Option<EcsDecodeProof>,
162}
163
164#[derive(Debug, Clone, PartialEq, Eq)]
166pub struct DecodeAuditEntry {
167 pub seq: u64,
168 pub object_id: ObjectId,
169 pub decode_success: bool,
170 pub proof: EcsDecodeProof,
171}
172
173#[derive(Debug, Clone, Copy, PartialEq, Eq)]
175pub enum EvictionPhase {
176 Uploaded,
177 CompensatedCancelled,
178 CompensatedPrecondition,
179 Retired,
180}
181
182#[derive(Debug, Clone, Copy, PartialEq, Eq)]
184pub struct EvictionOutcome {
185 pub phase: EvictionPhase,
186 pub evicted: bool,
187 pub local_retained: bool,
188 pub upload_receipt: UploadSegmentReceipt,
189}
190
191#[derive(Debug)]
193pub struct TieredStorage {
194 durability_mode: DurabilityMode,
195 remote_executor: Arc<RemoteAdmissionExecutor>,
196 write_back_segment_id: u64,
197 l2_segments: BTreeMap<u64, Vec<SymbolRecord>>,
198 decode_audit_seq: u64,
199 decode_audit: Vec<DecodeAuditEntry>,
200}
201
202impl Default for TieredStorage {
203 fn default() -> Self {
204 Self::new(DurabilityMode::Local)
205 }
206}
207
208impl TieredStorage {
209 fn new_with_remote_executor(
210 durability_mode: DurabilityMode,
211 remote_executor: RemoteAdmissionExecutor,
212 ) -> Self {
213 Self {
214 durability_mode,
215 remote_executor: Arc::new(remote_executor),
216 write_back_segment_id: DEFAULT_WRITE_BACK_SEGMENT_ID,
217 l2_segments: BTreeMap::new(),
218 decode_audit_seq: 0,
219 decode_audit: Vec::new(),
220 }
221 }
222
223 #[must_use]
225 pub fn new(durability_mode: DurabilityMode) -> Self {
226 Self::new_with_remote_executor(
227 durability_mode,
228 RemoteAdmissionExecutor::balanced_tiered_storage_default(),
229 )
230 }
231
232 #[must_use]
234 pub const fn durability_mode(&self) -> DurabilityMode {
235 self.durability_mode
236 }
237
238 pub fn set_durability_mode(&mut self, mode: DurabilityMode) {
240 self.durability_mode = mode;
241 }
242
243 #[must_use]
245 pub const fn write_back_segment_id(&self) -> u64 {
246 self.write_back_segment_id
247 }
248
249 pub fn take_decode_audit_entries(&mut self) -> Vec<DecodeAuditEntry> {
251 std::mem::take(&mut self.decode_audit)
252 }
253
254 pub fn insert_l2_segment(&mut self, segment_id: u64, records: Vec<SymbolRecord>) {
256 self.l2_segments.insert(segment_id, records);
257 }
258
259 #[must_use]
261 pub fn l2_segment_count(&self) -> usize {
262 self.l2_segments.len()
263 }
264
265 #[must_use]
267 pub fn l2_segment_exists(&self, segment_id: u64) -> bool {
268 self.l2_segments.contains_key(&segment_id)
269 }
270
271 #[must_use]
273 pub fn l2_records_for_object(&self, object_id: ObjectId) -> Vec<SymbolRecord> {
274 let mut by_esi = BTreeMap::<u32, SymbolRecord>::new();
275 for segment in self.l2_segments.values() {
276 for record in segment {
277 if record.object_id == object_id {
278 merge_symbol_record_by_esi(&mut by_esi, record.clone());
279 }
280 }
281 }
282 by_esi.into_values().collect()
283 }
284
285 pub fn commit_segment<Caps, R>(
290 &mut self,
291 cx: &Cx<Caps>,
292 request: CommitRequest,
293 remote: Option<&mut R>,
294 remote_cap: Option<RemoteCap>,
295 ) -> Result<CommitOutcome>
296 where
297 Caps: cap::SubsetOf<cap::All>,
298 R: RemoteTier,
299 {
300 self.insert_l2_segment(request.segment_id, request.records.clone());
301
302 if !self.durability_mode.requires_remote() {
303 info!(
304 bead_id = BEAD_ID,
305 segment_id = request.segment_id,
306 mode = "local",
307 "commit satisfied by L2 only"
308 );
309 return Ok(CommitOutcome {
310 remote_io: false,
311 upload_receipt: None,
312 });
313 }
314
315 let cap = remote_cap.ok_or(FrankenError::AuthDenied)?;
316 let remote_store = remote.ok_or(FrankenError::AuthDenied)?;
317
318 let upload_request = UploadSegmentRequest {
319 segment_id: request.segment_id,
320 records: request.records,
321 idempotency_key: request.idempotency_key,
322 saga: request.saga,
323 ecs_epoch: request.ecs_epoch,
324 remote_cap: cap,
325 computation: UPLOAD_SEGMENT_COMPUTATION,
326 };
327 let receipt = self.remote_executor.run(
328 cx,
329 upload_request.computation,
330 Some(upload_request.saga),
331 Some(upload_request.idempotency_key),
332 upload_request.ecs_epoch,
333 || remote_store.upload_segment(&upload_request),
334 )?;
335 if !self.durability_mode.quorum_satisfied(receipt.acked_stores) {
336 warn!(
337 bead_id = BEAD_ID,
338 segment_id = request.segment_id,
339 acked_stores = receipt.acked_stores,
340 "quorum durability not yet satisfied"
341 );
342 return Err(FrankenError::Busy);
343 }
344
345 Ok(CommitOutcome {
346 remote_io: true,
347 upload_receipt: Some(receipt),
348 })
349 }
350
351 pub fn fetch_object<Caps, R>(
353 &mut self,
354 cx: &Cx<Caps>,
355 object_id: ObjectId,
356 ecs_epoch: u64,
357 remote: Option<&mut R>,
358 remote_cap: Option<RemoteCap>,
359 ) -> Result<FetchOutcome>
360 where
361 Caps: cap::SubsetOf<cap::All>,
362 R: RemoteTier,
363 {
364 let local_records = self.l2_records_for_object(object_id);
365 if !local_records.is_empty() {
366 match recover_object_hybrid(&local_records) {
367 Ok(local) => {
368 if let Some(proof) = local.decode_proof.clone() {
369 self.record_decode_proof(proof);
370 }
371 return Ok(FetchOutcome {
372 bytes: local.bytes,
373 read_path: local.read_path,
374 remote_used: false,
375 write_back_count: 0,
376 decode_proof: local.decode_proof,
377 });
378 }
379 Err(failure) => {
380 self.record_decode_proof(failure.proof);
381 debug!(
382 bead_id = BEAD_ID,
383 object_id = %object_id,
384 reason = %failure.reason,
385 "local fallback decode attempt failed; escalating to remote tier"
386 );
387 }
388 }
389 }
390
391 let cap = remote_cap.ok_or(FrankenError::AuthDenied)?;
392 let remote_store = remote.ok_or(FrankenError::AuthDenied)?;
393
394 let preferred_esis = preferred_source_esis(local_records.first().map(|record| record.oti));
395 let idempotency_key = derive_fetch_key(object_id, &preferred_esis, ecs_epoch);
396 let fetch_request = FetchSymbolsRequest {
397 object_id,
398 preferred_esis,
399 max_symbols: usize::MAX,
400 idempotency_key,
401 ecs_epoch,
402 remote_cap: cap,
403 computation: FETCH_SYMBOLS_COMPUTATION,
404 };
405 let fetched = self.remote_executor.run(
406 cx,
407 fetch_request.computation,
408 None,
409 Some(fetch_request.idempotency_key),
410 fetch_request.ecs_epoch,
411 || remote_store.fetch_symbols(&fetch_request),
412 )?;
413 if fetched.is_empty() {
414 return Err(FrankenError::Internal(format!(
415 "remote tier returned no symbols for object {object_id}"
416 )));
417 }
418 cx.checkpoint().map_err(|_| FrankenError::Busy)?;
419
420 let merged = merge_symbol_sets(&local_records, &fetched);
421 let recovered = match recover_object_hybrid(&merged) {
422 Ok(value) => value,
423 Err(failure) => {
424 let detail = failure.reason.clone();
425 self.record_decode_proof(failure.proof);
426 return Err(FrankenError::DatabaseCorrupt {
427 detail: format!("unable to recover object {object_id}: {detail}"),
428 });
429 }
430 };
431 if let Some(proof) = recovered.decode_proof.clone() {
432 self.record_decode_proof(proof);
433 }
434 let write_back_count = self.write_back_repairs(&local_records, &fetched);
435
436 Ok(FetchOutcome {
437 bytes: recovered.bytes,
438 read_path: recovered.read_path,
439 remote_used: true,
440 write_back_count,
441 decode_proof: recovered.decode_proof,
442 })
443 }
444
445 pub fn evict_segment<Caps, R>(
452 &mut self,
453 cx: &Cx<Caps>,
454 segment_id: u64,
455 min_symbols_per_object: usize,
456 ecs_epoch: u64,
457 remote: &mut R,
458 remote_cap: Option<RemoteCap>,
459 ) -> Result<EvictionOutcome>
460 where
461 Caps: cap::SubsetOf<cap::All>,
462 R: RemoteTier,
463 {
464 let cap = remote_cap.ok_or(FrankenError::AuthDenied)?;
465 let records =
466 self.l2_segments.get(&segment_id).cloned().ok_or_else(|| {
467 FrankenError::Internal(format!("unknown L2 segment {segment_id}"))
468 })?;
469
470 let key = derive_evict_key(segment_id, &records, ecs_epoch);
471 let upload_request = UploadSegmentRequest {
472 segment_id,
473 records,
474 idempotency_key: key,
475 saga: Saga::new(key),
476 ecs_epoch,
477 remote_cap: cap,
478 computation: UPLOAD_SEGMENT_COMPUTATION,
479 };
480 let receipt = self.remote_executor.run(
481 cx,
482 upload_request.computation,
483 Some(upload_request.saga),
484 Some(upload_request.idempotency_key),
485 upload_request.ecs_epoch,
486 || remote.upload_segment(&upload_request),
487 )?;
488 debug!(
489 bead_id = BEAD_ID,
490 segment_id,
491 acked_stores = receipt.acked_stores,
492 "segment uploaded to L3"
493 );
494
495 if cx.is_cancel_requested() || cx.checkpoint().is_err() {
496 warn!(
497 bead_id = BEAD_ID,
498 segment_id, "eviction cancelled; retaining local segment"
499 );
500 return Ok(EvictionOutcome {
501 phase: EvictionPhase::CompensatedCancelled,
502 evicted: false,
503 local_retained: true,
504 upload_receipt: receipt,
505 });
506 }
507
508 if !remote.segment_recoverable(segment_id, min_symbols_per_object) {
509 warn!(
510 bead_id = BEAD_ID,
511 segment_id,
512 min_symbols_per_object,
513 "eviction precondition failed; retaining local segment"
514 );
515 return Ok(EvictionOutcome {
516 phase: EvictionPhase::CompensatedPrecondition,
517 evicted: false,
518 local_retained: true,
519 upload_receipt: receipt,
520 });
521 }
522
523 let _removed = self.l2_segments.remove(&segment_id);
524 info!(bead_id = BEAD_ID, segment_id, "segment evicted from L2");
525 Ok(EvictionOutcome {
526 phase: EvictionPhase::Retired,
527 evicted: true,
528 local_retained: false,
529 upload_receipt: receipt,
530 })
531 }
532
533 fn write_back_repairs(&mut self, local: &[SymbolRecord], fetched: &[SymbolRecord]) -> usize {
534 let local_by_esi: BTreeMap<u32, &SymbolRecord> =
535 local.iter().map(|record| (record.esi, record)).collect();
536 let mut repairs_by_esi = BTreeMap::<u32, SymbolRecord>::new();
537 for record in fetched {
538 let needs_repair = match local_by_esi.get(&record.esi) {
539 None => true,
540 Some(existing) => !existing.verify_integrity() && record.verify_integrity(),
541 };
542 if needs_repair {
543 merge_symbol_record_by_esi(&mut repairs_by_esi, record.clone());
544 }
545 }
546 let repairs: Vec<SymbolRecord> = repairs_by_esi.into_values().collect();
547 if repairs.is_empty() {
548 return 0;
549 }
550 let added = repairs.len();
551 let segment = self
552 .l2_segments
553 .entry(self.write_back_segment_id)
554 .or_default();
555
556 let mut deduped = BTreeMap::<(ObjectId, u32), SymbolRecord>::new();
559 for record in segment.drain(..) {
560 deduped.insert((record.object_id, record.esi), record);
561 }
562 for record in repairs {
563 let key = (record.object_id, record.esi);
564 match deduped.entry(key) {
565 std::collections::btree_map::Entry::Vacant(entry) => {
566 entry.insert(record);
567 }
568 std::collections::btree_map::Entry::Occupied(mut entry) => {
569 if prefer_symbol_record(entry.get(), &record) {
570 entry.insert(record);
571 }
572 }
573 }
574 }
575 *segment = deduped.into_values().collect();
576 added
577 }
578
579 fn record_decode_proof(&mut self, proof: EcsDecodeProof) {
580 self.decode_audit_seq = self.decode_audit_seq.saturating_add(1);
581 self.decode_audit.push(DecodeAuditEntry {
582 seq: self.decode_audit_seq,
583 object_id: proof.object_id,
584 decode_success: proof.decode_success,
585 proof,
586 });
587 }
588}
589
590fn preferred_source_esis(oti: Option<Oti>) -> Vec<u32> {
591 let Some(oti) = oti else {
592 return Vec::new();
593 };
594 let Ok(source_symbols) = source_symbol_count(oti) else {
595 return Vec::new();
596 };
597 let max_u32 = usize::try_from(u32::MAX).unwrap_or(usize::MAX);
598 let capped = source_symbols.min(max_u32);
599 let mut esis = Vec::with_capacity(capped);
600 for idx in 0..capped {
601 if let Ok(esi) = u32::try_from(idx) {
602 esis.push(esi);
603 }
604 }
605 esis
606}
607
608fn derive_fetch_key(object_id: ObjectId, preferred_esis: &[u32], ecs_epoch: u64) -> IdempotencyKey {
609 let mut bytes = Vec::with_capacity(16 + preferred_esis.len() * 4);
610 bytes.extend_from_slice(object_id.as_bytes());
611 for esi in preferred_esis {
612 bytes.extend_from_slice(&esi.to_le_bytes());
613 }
614 IdempotencyKey::derive(ecs_epoch, &bytes)
615}
616
617fn segment_request_bytes(segment_id: u64, records: &[SymbolRecord]) -> Vec<u8> {
618 let payload_bytes = records.iter().fold(0_usize, |acc, record| {
619 acc.saturating_add(record.to_bytes().len())
620 });
621 let mut bytes = Vec::with_capacity(16 + payload_bytes + records.len().saturating_mul(8));
622 bytes.extend_from_slice(&segment_id.to_le_bytes());
623 bytes.extend_from_slice(
624 &u64::try_from(records.len())
625 .unwrap_or(u64::MAX)
626 .to_le_bytes(),
627 );
628 for record in records {
629 let record_bytes = record.to_bytes();
630 bytes.extend_from_slice(
631 &u64::try_from(record_bytes.len())
632 .unwrap_or(u64::MAX)
633 .to_le_bytes(),
634 );
635 bytes.extend_from_slice(&record_bytes);
636 }
637 bytes
638}
639
640fn derive_evict_key(segment_id: u64, records: &[SymbolRecord], ecs_epoch: u64) -> IdempotencyKey {
641 let request_bytes = segment_request_bytes(segment_id, records);
642 IdempotencyKey::derive(ecs_epoch, &request_bytes)
643}
644
645fn prefer_symbol_record(existing: &SymbolRecord, candidate: &SymbolRecord) -> bool {
646 !existing.verify_integrity() && candidate.verify_integrity()
647}
648
649fn merge_symbol_record_by_esi(by_esi: &mut BTreeMap<u32, SymbolRecord>, record: SymbolRecord) {
650 merge_symbol_record_by_key(by_esi, record.esi, record);
651}
652
653fn merge_symbol_record_by_key<K: Ord>(
654 map: &mut BTreeMap<K, SymbolRecord>,
655 key: K,
656 record: SymbolRecord,
657) {
658 match map.entry(key) {
659 std::collections::btree_map::Entry::Vacant(entry) => {
660 entry.insert(record);
661 }
662 std::collections::btree_map::Entry::Occupied(mut entry) => {
663 if prefer_symbol_record(entry.get(), &record) {
664 entry.insert(record);
665 }
666 }
667 }
668}
669
670fn merge_symbol_sets(local: &[SymbolRecord], fetched: &[SymbolRecord]) -> Vec<SymbolRecord> {
671 let mut by_esi = BTreeMap::<u32, SymbolRecord>::new();
672 for record in local {
673 merge_symbol_record_by_esi(&mut by_esi, record.clone());
674 }
675 for record in fetched {
676 merge_symbol_record_by_esi(&mut by_esi, record.clone());
677 }
678 by_esi.into_values().collect()
679}
680
681#[derive(Debug, Clone)]
682struct HybridRecoverResult {
683 bytes: Vec<u8>,
684 read_path: SymbolReadPath,
685 decode_proof: Option<EcsDecodeProof>,
686}
687
688#[derive(Debug, Clone)]
689struct FallbackDecodeSuccess {
690 bytes: Vec<u8>,
691 proof: EcsDecodeProof,
692}
693
694#[derive(Debug, Clone)]
695struct FallbackDecodeFailure {
696 reason: String,
697 proof: EcsDecodeProof,
698}
699
700impl std::fmt::Display for FallbackDecodeFailure {
701 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
702 f.write_str(&self.reason)
703 }
704}
705
706#[derive(Debug, Clone)]
707struct FallbackSymbolEvidence {
708 object_id: ObjectId,
709 source_symbols: usize,
710 symbol_size: usize,
711 transfer_len: usize,
712 accepted_by_esi: BTreeMap<u32, SymbolRecord>,
713 accepted_esis: Vec<u32>,
714 rejected_symbols: Vec<RejectedSymbol>,
715 symbol_digests: Vec<SymbolDigest>,
716}
717
718fn recover_object_hybrid(
719 records: &[SymbolRecord],
720) -> std::result::Result<HybridRecoverResult, Box<FallbackDecodeFailure>> {
721 match reconstruct_systematic_happy_path(records) {
722 Ok(bytes) => Ok(HybridRecoverResult {
723 bytes,
724 read_path: SymbolReadPath::SystematicFastPath,
725 decode_proof: None,
726 }),
727 Err(reason) => {
728 let fallback = fallback_decode_records(records, &reason)?;
729 Ok(HybridRecoverResult {
730 bytes: fallback.bytes,
731 read_path: SymbolReadPath::FullDecodeFallback { reason },
732 decode_proof: Some(fallback.proof),
733 })
734 }
735 }
736}
737
738fn fallback_decode_records(
739 records: &[SymbolRecord],
740 systematic_reason: &SystematicLayoutError,
741) -> std::result::Result<FallbackDecodeSuccess, Box<FallbackDecodeFailure>> {
742 let evidence = collect_fallback_symbol_evidence(records)?;
743 let k_source = u32::try_from(evidence.source_symbols).unwrap_or(u32::MAX);
744 let available_symbols = evidence.accepted_esis.len();
745 let required_symbols = evidence
746 .source_symbols
747 .saturating_add(DEFAULT_FALLBACK_DECODE_SLACK);
748 if available_symbols < required_symbols {
749 let detail = format!(
750 "systematic_reason={systematic_reason}; insufficient_symbols_for_fallback: available={available_symbols} required={required_symbols} slack_decode={DEFAULT_FALLBACK_DECODE_SLACK}"
751 );
752 return Err(FallbackDecodeFailure {
753 reason: detail,
754 proof: build_fallback_decode_proof_from_parts(
755 evidence.object_id,
756 k_source,
757 &evidence.accepted_esis,
758 &evidence.rejected_symbols,
759 &evidence.symbol_digests,
760 false,
761 Some(u32::try_from(available_symbols).unwrap_or(u32::MAX)),
762 ),
763 }
764 .into());
765 }
766
767 let mut out = Vec::with_capacity(evidence.source_symbols.saturating_mul(evidence.symbol_size));
768 for expected_esi in 0..evidence.source_symbols {
769 let expected_esi_u32 = u32::try_from(expected_esi).unwrap_or(u32::MAX);
770 let Some(record) = evidence.accepted_by_esi.get(&expected_esi_u32) else {
771 let detail = format!(
772 "systematic_reason={systematic_reason}; missing_source_symbol: esi={expected_esi_u32}"
773 );
774 return Err(FallbackDecodeFailure {
775 reason: detail,
776 proof: build_fallback_decode_proof_from_parts(
777 evidence.object_id,
778 k_source,
779 &evidence.accepted_esis,
780 &evidence.rejected_symbols,
781 &evidence.symbol_digests,
782 false,
783 Some(u32::try_from(available_symbols).unwrap_or(u32::MAX)),
784 ),
785 }
786 .into());
787 };
788 out.extend_from_slice(&record.symbol_data);
789 }
790 out.truncate(evidence.transfer_len);
791
792 Ok(FallbackDecodeSuccess {
793 bytes: out,
794 proof: build_fallback_decode_proof_from_parts(
795 evidence.object_id,
796 k_source,
797 &evidence.accepted_esis,
798 &evidence.rejected_symbols,
799 &evidence.symbol_digests,
800 true,
801 Some(k_source),
802 ),
803 })
804}
805
806fn collect_fallback_symbol_evidence(
807 records: &[SymbolRecord],
808) -> std::result::Result<FallbackSymbolEvidence, Box<FallbackDecodeFailure>> {
809 let Some(first) = records.first() else {
810 return Err(FallbackDecodeFailure {
811 reason: String::from("empty_symbol_set"),
812 proof: build_fallback_decode_proof_from_parts(
813 ObjectId::from_bytes([0_u8; 16]),
814 0,
815 &[],
816 &[],
817 &[],
818 false,
819 Some(0),
820 ),
821 }
822 .into());
823 };
824
825 let source_symbols = source_symbol_count(first.oti).map_err(|err| {
826 Box::new(FallbackDecodeFailure {
827 reason: format!("invalid_source_symbol_count: {err}"),
828 proof: build_fallback_decode_proof_from_parts(
829 first.object_id,
830 0,
831 &[],
832 &[],
833 &[],
834 false,
835 Some(0),
836 ),
837 })
838 })?;
839
840 let symbol_size = usize::try_from(first.oti.t).map_err(|_| {
841 Box::new(FallbackDecodeFailure {
842 reason: String::from("invalid_symbol_size"),
843 proof: build_fallback_decode_proof_from_parts(
844 first.object_id,
845 u32::try_from(source_symbols).unwrap_or(u32::MAX),
846 &[],
847 &[],
848 &[],
849 false,
850 Some(0),
851 ),
852 })
853 })?;
854 let transfer_len = usize::try_from(first.oti.f).map_err(|_| {
855 Box::new(FallbackDecodeFailure {
856 reason: String::from("invalid_transfer_length"),
857 proof: build_fallback_decode_proof_from_parts(
858 first.object_id,
859 u32::try_from(source_symbols).unwrap_or(u32::MAX),
860 &[],
861 &[],
862 &[],
863 false,
864 Some(0),
865 ),
866 })
867 })?;
868
869 let mut ordered = records.to_vec();
870 ordered.sort_by_key(|record| record.esi);
871
872 let mut accepted_by_esi = BTreeMap::<u32, SymbolRecord>::new();
873 let mut rejected_symbols = Vec::new();
874 let mut symbol_digests = Vec::new();
875 for record in ordered {
876 let rejection = if record.object_id != first.object_id
877 || record.oti != first.oti
878 || record.symbol_data.len() != symbol_size
879 {
880 Some(SymbolRejectionReason::FormatViolation)
881 } else if !record.verify_integrity() {
882 Some(SymbolRejectionReason::HashMismatch)
883 } else if accepted_by_esi.contains_key(&record.esi) {
884 Some(SymbolRejectionReason::DuplicateEsi)
885 } else {
886 None
887 };
888
889 if let Some(reason) = rejection {
890 rejected_symbols.push(RejectedSymbol {
891 esi: record.esi,
892 reason,
893 });
894 continue;
895 }
896
897 symbol_digests.push(SymbolDigest {
898 esi: record.esi,
899 digest_xxh3: xxh3_64(&record.to_bytes()),
900 });
901 accepted_by_esi.insert(record.esi, record);
902 }
903
904 let accepted_esis = accepted_by_esi.keys().copied().collect();
905 symbol_digests.sort_by_key(|digest| digest.esi);
906
907 Ok(FallbackSymbolEvidence {
908 object_id: first.object_id,
909 source_symbols,
910 symbol_size,
911 transfer_len,
912 accepted_by_esi,
913 accepted_esis,
914 rejected_symbols,
915 symbol_digests,
916 })
917}
918
919fn build_fallback_decode_proof_from_parts(
920 object_id: ObjectId,
921 k_source: u32,
922 accepted_esis: &[u32],
923 rejected_symbols: &[RejectedSymbol],
924 symbol_digests: &[SymbolDigest],
925 decode_success: bool,
926 intermediate_rank: Option<u32>,
927) -> EcsDecodeProof {
928 let seed = deterministic_fallback_seed(object_id, k_source);
929 let timing_ns = deterministic_fallback_timing_ns(
930 object_id,
931 k_source,
932 accepted_esis,
933 rejected_symbols,
934 decode_success,
935 );
936 let proof = EcsDecodeProof::from_esis(
937 object_id,
938 k_source,
939 accepted_esis,
940 decode_success,
941 intermediate_rank,
942 timing_ns,
943 seed,
944 );
945 proof
946 .with_rejected_symbols(rejected_symbols.to_vec())
947 .with_symbol_digests(symbol_digests.to_vec())
948}
949
950fn deterministic_fallback_seed(object_id: ObjectId, k_source: u32) -> u64 {
951 let mut material = Vec::with_capacity(40);
952 material.extend_from_slice(b"fsqlite:tiered:fallback:seed:v1");
953 material.extend_from_slice(object_id.as_bytes());
954 material.extend_from_slice(&k_source.to_le_bytes());
955 xxh3_64(&material)
956}
957
958fn deterministic_fallback_timing_ns(
959 object_id: ObjectId,
960 k_source: u32,
961 accepted_esis: &[u32],
962 rejected_symbols: &[RejectedSymbol],
963 decode_success: bool,
964) -> u64 {
965 let mut material =
966 Vec::with_capacity(48 + accepted_esis.len() * 4 + rejected_symbols.len() * 5);
967 material.extend_from_slice(b"fsqlite:tiered:fallback:timing:v1");
968 material.extend_from_slice(object_id.as_bytes());
969 material.extend_from_slice(&k_source.to_le_bytes());
970 material.push(u8::from(decode_success));
971 for esi in accepted_esis {
972 material.extend_from_slice(&esi.to_le_bytes());
973 }
974 for item in rejected_symbols {
975 material.extend_from_slice(&item.esi.to_le_bytes());
976 material.push(rejection_reason_code(item.reason));
977 }
978 xxh3_64(&material)
979}
980
981fn rejection_reason_code(reason: SymbolRejectionReason) -> u8 {
982 match reason {
983 SymbolRejectionReason::HashMismatch => 1,
984 SymbolRejectionReason::InvalidAuthTag => 2,
985 SymbolRejectionReason::DuplicateEsi => 3,
986 SymbolRejectionReason::FormatViolation => 4,
987 }
988}
989
990#[cfg(test)]
991mod tests {
992 use std::collections::{BTreeSet, HashMap};
993 use std::time::Duration;
994
995 use fsqlite_types::cx::{Cx, cap};
996 use fsqlite_types::{ObjectId, Oti, SymbolRecordFlags};
997 use proptest::prelude::*;
998 use proptest::test_runner::{Config as ProptestConfig, RngAlgorithm, RngSeed, TestRunner};
999
1000 use super::*;
1001
1002 #[derive(Debug, Default)]
1003 struct MockRemoteTier {
1004 object_symbols: HashMap<ObjectId, Vec<SymbolRecord>>,
1005 segment_symbols: HashMap<u64, Vec<SymbolRecord>>,
1006 upload_receipts: HashMap<(u64, IdempotencyKey), UploadSegmentReceipt>,
1007 segment_recoverability_overrides: HashMap<u64, bool>,
1008 upload_calls: usize,
1009 fetch_calls: usize,
1010 configured_acks: u32,
1011 cancel_after_upload: Option<Cx<cap::All>>,
1012 cancel_after_fetch: Option<Cx<cap::All>>,
1013 last_fetch_preferred: Vec<u32>,
1014 }
1015
1016 impl MockRemoteTier {
1017 fn set_object_symbols(&mut self, object_id: ObjectId, records: Vec<SymbolRecord>) {
1018 self.object_symbols.insert(object_id, records);
1019 }
1020
1021 fn set_acked_stores(&mut self, acked_stores: u32) {
1022 self.configured_acks = acked_stores;
1023 }
1024
1025 fn set_segment_recoverable(&mut self, segment_id: u64, recoverable: bool) {
1026 self.segment_recoverability_overrides
1027 .insert(segment_id, recoverable);
1028 }
1029
1030 fn set_cancel_after_upload(&mut self, cx: Cx<cap::All>) {
1031 self.cancel_after_upload = Some(cx);
1032 }
1033
1034 fn upload_calls(&self) -> usize {
1035 self.upload_calls
1036 }
1037
1038 fn fetch_calls(&self) -> usize {
1039 self.fetch_calls
1040 }
1041 }
1042
1043 impl RemoteTier for MockRemoteTier {
1044 fn fetch_symbols(&mut self, request: &FetchSymbolsRequest) -> Result<Vec<SymbolRecord>> {
1045 self.fetch_calls = self.fetch_calls.saturating_add(1);
1046 self.last_fetch_preferred = request.preferred_esis.clone();
1047 let Some(records) = self.object_symbols.get(&request.object_id) else {
1048 return Ok(Vec::new());
1049 };
1050
1051 let preferred: BTreeSet<u32> = request.preferred_esis.iter().copied().collect();
1052 let mut ordered = records.clone();
1053 ordered.sort_by_key(|record| (!preferred.contains(&record.esi), record.esi));
1054 ordered.truncate(request.max_symbols);
1055 if let Some(cx) = self.cancel_after_fetch.take() {
1056 cx.cancel();
1057 }
1058 Ok(ordered)
1059 }
1060
1061 fn upload_segment(
1062 &mut self,
1063 request: &UploadSegmentRequest,
1064 ) -> Result<UploadSegmentReceipt> {
1065 let key = (request.segment_id, request.idempotency_key);
1066 if let Some(existing) = self.upload_receipts.get(&key).copied() {
1067 return Ok(UploadSegmentReceipt {
1068 deduplicated: true,
1069 ..existing
1070 });
1071 }
1072
1073 self.upload_calls = self.upload_calls.saturating_add(1);
1074 self.segment_symbols
1075 .insert(request.segment_id, request.records.clone());
1076
1077 for record in &request.records {
1078 let entry = self.object_symbols.entry(record.object_id).or_default();
1079 if entry.iter().all(|existing| existing.esi != record.esi) {
1080 entry.push(record.clone());
1081 }
1082 entry.sort_by_key(|existing| existing.esi);
1083 }
1084
1085 let receipt = UploadSegmentReceipt {
1086 acked_stores: self.configured_acks,
1087 deduplicated: false,
1088 };
1089 self.upload_receipts.insert(key, receipt);
1090
1091 if let Some(cx) = self.cancel_after_upload.take() {
1092 cx.cancel();
1093 }
1094
1095 Ok(receipt)
1096 }
1097
1098 fn segment_recoverable(&self, segment_id: u64, min_symbols_per_object: usize) -> bool {
1099 if let Some(override_value) = self.segment_recoverability_overrides.get(&segment_id) {
1100 return *override_value;
1101 }
1102 let Some(records) = self.segment_symbols.get(&segment_id) else {
1103 return false;
1104 };
1105 let mut per_object = HashMap::<ObjectId, usize>::new();
1106 for record in records {
1107 let entry = per_object.entry(record.object_id).or_insert(0);
1108 *entry = entry.saturating_add(1);
1109 }
1110 per_object
1111 .values()
1112 .all(|count| *count >= min_symbols_per_object)
1113 }
1114 }
1115
1116 fn object_id_from_u64(raw: u64) -> ObjectId {
1117 let mut bytes = [0_u8; 16];
1118 bytes[0..8].copy_from_slice(&raw.to_le_bytes());
1119 bytes[8..16].copy_from_slice(&raw.to_le_bytes());
1120 ObjectId::from_bytes(bytes)
1121 }
1122
1123 fn remote_cap(seed: u8) -> RemoteCap {
1124 RemoteCap::from_bytes([seed; 16])
1125 }
1126
1127 fn make_symbol_records(
1128 object_id: ObjectId,
1129 payload: &[u8],
1130 symbol_size: usize,
1131 repair_symbols: usize,
1132 ) -> Vec<SymbolRecord> {
1133 let symbol_size_u32 = u32::try_from(symbol_size).expect("symbol_size fits u32");
1134 let transfer_len_u64 = u64::try_from(payload.len()).expect("payload len fits u64");
1135 let oti = Oti {
1136 f: transfer_len_u64,
1137 al: 1,
1138 t: symbol_size_u32,
1139 z: 1,
1140 n: 1,
1141 };
1142
1143 let source_symbols = payload.len().div_ceil(symbol_size);
1144 let mut out = Vec::new();
1145 for idx in 0..source_symbols {
1146 let start = idx * symbol_size;
1147 let end = (start + symbol_size).min(payload.len());
1148 let mut symbol = vec![0_u8; symbol_size];
1149 symbol[..end - start].copy_from_slice(&payload[start..end]);
1150 let esi = u32::try_from(idx).expect("source esi fits u32");
1151 let flags = if idx == 0 {
1152 SymbolRecordFlags::SYSTEMATIC_RUN_START
1153 } else {
1154 SymbolRecordFlags::empty()
1155 };
1156 out.push(SymbolRecord::new(object_id, oti, esi, symbol, flags));
1157 }
1158
1159 for repair_idx in 0..repair_symbols {
1160 let repair_esi_usize = source_symbols.saturating_add(repair_idx);
1161 let esi = u32::try_from(repair_esi_usize).expect("repair esi fits u32");
1162 let mut symbol = vec![0_u8; symbol_size];
1163 let esi_low = u8::try_from(esi & 0xFF).expect("masked to u8");
1164 for (offset, byte) in symbol.iter_mut().enumerate() {
1165 let offset_low = u8::try_from(offset & 0xFF).expect("masked to u8");
1166 *byte = esi_low ^ offset_low;
1167 }
1168 out.push(SymbolRecord::new(
1169 object_id,
1170 oti,
1171 esi,
1172 symbol,
1173 SymbolRecordFlags::empty(),
1174 ));
1175 }
1176
1177 out
1178 }
1179
1180 fn rejected_esis_set(proof: &EcsDecodeProof) -> BTreeSet<u32> {
1181 proof
1182 .rejected_symbols
1183 .iter()
1184 .map(|entry| entry.esi)
1185 .collect()
1186 }
1187
1188 fn decode_proof_report_ok(proof: &EcsDecodeProof) -> bool {
1189 proof
1190 .verification_report(
1191 crate::decode_proofs::DecodeProofVerificationConfig::default(),
1192 &proof.symbol_digests,
1193 &proof.rejected_symbols,
1194 )
1195 .ok
1196 }
1197
1198 #[test]
1199 fn test_l3_fetch_requires_remote_cap() {
1200 let object_id = object_id_from_u64(1);
1201 let payload = b"tiered-fetch-requires-cap";
1202
1203 let mut local = make_symbol_records(object_id, payload, 8, 0);
1204 local.retain(|record| record.esi != 1);
1205
1206 let mut storage = TieredStorage::new(DurabilityMode::local());
1207 storage.insert_l2_segment(1, local);
1208
1209 let mut remote = MockRemoteTier::default();
1210 remote.set_object_symbols(object_id, make_symbol_records(object_id, payload, 8, 1));
1211
1212 let cx = Cx::<cap::All>::new();
1213 let result = storage.fetch_object(&cx, object_id, 7, Some(&mut remote), None);
1214 assert!(matches!(result, Err(FrankenError::AuthDenied)));
1215 assert_eq!(remote.fetch_calls(), 0);
1216 }
1217
1218 #[test]
1219 fn test_l3_upload_idempotency_key() {
1220 let object_id = object_id_from_u64(2);
1221 let payload = b"idempotent-upload";
1222 let records = make_symbol_records(object_id, payload, 8, 1);
1223
1224 let mut storage = TieredStorage::new(DurabilityMode::quorum(1, 3).expect("valid quorum"));
1225 let mut remote = MockRemoteTier::default();
1226 remote.set_acked_stores(2);
1227 let cx = Cx::<cap::All>::new();
1228 let cap = Some(remote_cap(9));
1229
1230 let request = CommitRequest::new(10, records, 11);
1231 let first = storage
1232 .commit_segment(&cx, request.clone(), Some(&mut remote), cap)
1233 .expect("first upload succeeds");
1234 let second = storage
1235 .commit_segment(&cx, request, Some(&mut remote), cap)
1236 .expect("second upload returns idempotent result");
1237
1238 assert_eq!(remote.upload_calls(), 1);
1239 let first_receipt = first
1240 .upload_receipt
1241 .expect("first commit has upload receipt");
1242 let second_receipt = second
1243 .upload_receipt
1244 .expect("second commit has upload receipt");
1245 assert!(!first_receipt.deduplicated);
1246 assert!(second_receipt.deduplicated);
1247 }
1248
1249 #[test]
1250 fn test_l3_upload_idempotency_key_changes_with_segment_contents() {
1251 let object_id = object_id_from_u64(22);
1252 let payload_a = b"idempotent-upload-a";
1253 let payload_b = b"idempotent-upload-b";
1254 let records_a = make_symbol_records(object_id, payload_a, 8, 1);
1255 let records_b = make_symbol_records(object_id, payload_b, 8, 1);
1256
1257 let request_a = CommitRequest::new(10, records_a, 11);
1258 let request_b = CommitRequest::new(10, records_b, 11);
1259
1260 assert_ne!(request_a.idempotency_key, request_b.idempotency_key);
1261 assert_ne!(request_a.saga, request_b.saga);
1262 }
1263
1264 #[test]
1265 fn test_evict_idempotency_key_changes_with_segment_contents() {
1266 let object_id = object_id_from_u64(23);
1267 let payload_a = b"evict-segment-a";
1268 let payload_b = b"evict-segment-b";
1269 let records_a = make_symbol_records(object_id, payload_a, 8, 1);
1270 let records_b = make_symbol_records(object_id, payload_b, 8, 1);
1271
1272 let key_a = derive_evict_key(40, &records_a, 12);
1273 let key_b = derive_evict_key(40, &records_b, 12);
1274
1275 assert_ne!(key_a, key_b);
1276 }
1277
1278 #[test]
1279 fn test_eviction_cancel_safety() {
1280 let object_id = object_id_from_u64(3);
1281 let payload = b"eviction-cancel-safety";
1282 let records = make_symbol_records(object_id, payload, 8, 1);
1283
1284 let mut storage = TieredStorage::new(DurabilityMode::local());
1285 storage.insert_l2_segment(20, records);
1286
1287 let cx = Cx::<cap::All>::new();
1288 let mut remote = MockRemoteTier::default();
1289 remote.set_acked_stores(3);
1290 remote.set_cancel_after_upload(cx.clone());
1291
1292 let outcome = storage
1293 .evict_segment(&cx, 20, 1, 50, &mut remote, Some(remote_cap(7)))
1294 .expect("eviction call succeeds");
1295
1296 assert_eq!(outcome.phase, EvictionPhase::CompensatedCancelled);
1297 assert!(!outcome.evicted);
1298 assert!(outcome.local_retained);
1299 assert!(storage.l2_segment_exists(20));
1300 }
1301
1302 #[test]
1303 fn test_eviction_precondition_check() {
1304 let object_id = object_id_from_u64(4);
1305 let payload = b"eviction-precondition-check";
1306 let records = make_symbol_records(object_id, payload, 8, 1);
1307
1308 let mut storage = TieredStorage::new(DurabilityMode::local());
1309 storage.insert_l2_segment(30, records);
1310
1311 let cx = Cx::<cap::All>::new();
1312 let mut remote = MockRemoteTier::default();
1313 remote.set_acked_stores(3);
1314 remote.set_segment_recoverable(30, false);
1315
1316 let outcome = storage
1317 .evict_segment(&cx, 30, 2, 51, &mut remote, Some(remote_cap(8)))
1318 .expect("eviction call succeeds");
1319
1320 assert_eq!(outcome.phase, EvictionPhase::CompensatedPrecondition);
1321 assert!(!outcome.evicted);
1322 assert!(outcome.local_retained);
1323 assert!(storage.l2_segment_exists(30));
1324 }
1325
1326 #[test]
1327 fn test_fetch_on_demand_systematic_fast_path() {
1328 let object_id = object_id_from_u64(5);
1329 let payload = b"systematic-fast-path";
1330 let records = make_symbol_records(object_id, payload, 8, 1);
1331
1332 let mut storage = TieredStorage::new(DurabilityMode::local());
1333 storage.insert_l2_segment(40, records);
1334
1335 let cx = Cx::<cap::All>::new();
1336 let outcome = storage
1337 .fetch_object(
1338 &cx,
1339 object_id,
1340 52,
1341 Option::<&mut MockRemoteTier>::None,
1342 None,
1343 )
1344 .expect("local fast-path fetch succeeds");
1345
1346 assert_eq!(outcome.bytes, payload);
1347 assert!(matches!(
1348 outcome.read_path,
1349 SymbolReadPath::SystematicFastPath
1350 ));
1351 assert!(!outcome.remote_used);
1352 assert_eq!(outcome.write_back_count, 0);
1353 assert!(outcome.decode_proof.is_none());
1354 assert!(storage.take_decode_audit_entries().is_empty());
1355 }
1356
1357 #[test]
1358 fn test_fast_path_repeated_reads_emit_no_decode_artifacts() {
1359 let object_id = object_id_from_u64(55);
1360 let payload = b"systematic-fast-path-repeat";
1361 let records = make_symbol_records(object_id, payload, 8, 1);
1362
1363 let mut storage = TieredStorage::new(DurabilityMode::local());
1364 storage.insert_l2_segment(405, records);
1365 let cx = Cx::<cap::All>::new();
1366
1367 for _ in 0..64 {
1368 let outcome = storage
1369 .fetch_object(
1370 &cx,
1371 object_id,
1372 52,
1373 Option::<&mut MockRemoteTier>::None,
1374 None,
1375 )
1376 .expect("local fast-path fetch succeeds");
1377 assert!(matches!(
1378 outcome.read_path,
1379 SymbolReadPath::SystematicFastPath
1380 ));
1381 assert!(outcome.decode_proof.is_none());
1382 assert!(!outcome.remote_used);
1383 }
1384
1385 assert!(
1386 storage.take_decode_audit_entries().is_empty(),
1387 "fast path should never invoke fallback decoder/proof emission"
1388 );
1389 }
1390
1391 #[test]
1392 fn test_fetch_on_demand_repair_fallback() {
1393 let object_id = object_id_from_u64(6);
1394 let payload = b"repair-fallback-path";
1395 let mut full = make_symbol_records(object_id, payload, 8, 3);
1396 for record in &mut full {
1397 if record.esi == 0 {
1398 *record = SymbolRecord::new(
1399 record.object_id,
1400 record.oti,
1401 record.esi,
1402 record.symbol_data.clone(),
1403 SymbolRecordFlags::empty(),
1404 );
1405 }
1406 }
1407
1408 let mut local_partial = full.clone();
1409 local_partial.retain(|record| record.esi == 0 || record.esi == 2);
1410
1411 let mut remote_repairs = full;
1412 remote_repairs.retain(|record| record.esi == 1 || record.esi >= 3);
1413
1414 let mut storage = TieredStorage::new(DurabilityMode::local());
1415 storage.insert_l2_segment(41, local_partial);
1416
1417 let mut remote = MockRemoteTier::default();
1418 remote.set_object_symbols(object_id, remote_repairs);
1419 let cx = Cx::<cap::All>::new();
1420
1421 let outcome = storage
1422 .fetch_object(&cx, object_id, 53, Some(&mut remote), Some(remote_cap(5)))
1423 .expect("fallback fetch succeeds");
1424
1425 assert!(matches!(
1426 outcome.read_path,
1427 SymbolReadPath::FullDecodeFallback { .. }
1428 ));
1429 assert_eq!(outcome.bytes, payload);
1430 assert!(outcome.remote_used);
1431 assert!(outcome.write_back_count > 0);
1432 assert!(outcome.decode_proof.is_some());
1433 assert_eq!(remote.last_fetch_preferred, vec![0, 1, 2]);
1434 assert!(storage.l2_segment_exists(storage.write_back_segment_id()));
1435 let audit = storage.take_decode_audit_entries();
1436 assert!(
1437 audit.iter().any(|entry| entry.decode_success),
1438 "expected at least one successful fallback proof"
1439 );
1440 assert!(
1441 audit.iter().any(|entry| !entry.decode_success),
1442 "expected local failure proof before remote fallback success"
1443 );
1444 }
1445
1446 #[test]
1447 fn test_fetch_repairs_corrupt_local_symbol_and_persists_healed_copy() {
1448 let object_id = object_id_from_u64(72);
1449 let payload = b"repair-corrupt-local-symbol";
1450 let full = make_symbol_records(object_id, payload, 8, 2);
1451
1452 let mut local = full.clone();
1453 let corrupt = local
1454 .iter_mut()
1455 .find(|record| record.esi == 1)
1456 .expect("seeded symbol set must contain ESI 1");
1457 if let Some(first) = corrupt.symbol_data.first_mut() {
1458 *first ^= 0x5A;
1459 }
1460 assert!(
1461 !corrupt.verify_integrity(),
1462 "test fixture must actually corrupt the local symbol"
1463 );
1464
1465 let mut storage = TieredStorage::new(DurabilityMode::local());
1466 storage.insert_l2_segment(420, local);
1467
1468 let mut remote = MockRemoteTier::default();
1469 remote.set_object_symbols(object_id, full.clone());
1470 let cx = Cx::<cap::All>::new();
1471
1472 let repaired = storage
1473 .fetch_object(&cx, object_id, 57, Some(&mut remote), Some(remote_cap(13)))
1474 .expect("remote repair should recover the object");
1475 assert_eq!(repaired.bytes, payload);
1476 assert!(repaired.remote_used);
1477 assert_eq!(repaired.write_back_count, 1);
1478
1479 let local_replay = storage
1480 .fetch_object(
1481 &cx,
1482 object_id,
1483 58,
1484 Option::<&mut MockRemoteTier>::None,
1485 None,
1486 )
1487 .expect("healed symbol should make later local-only reads succeed");
1488 assert_eq!(local_replay.bytes, payload);
1489 assert!(!local_replay.remote_used);
1490 assert!(
1491 storage
1492 .l2_records_for_object(object_id)
1493 .iter()
1494 .all(SymbolRecord::verify_integrity),
1495 "local self-healing must prefer the repaired symbol over the stale corrupt copy"
1496 );
1497 }
1498
1499 #[test]
1500 fn test_fetch_cancelled_after_remote_read_does_not_write_back_repairs() {
1501 let object_id = object_id_from_u64(73);
1502 let payload = b"cancel-after-remote-fetch";
1503 let full = make_symbol_records(object_id, payload, 8, 1);
1504 let mut local = full.clone();
1505 local.retain(|record| record.esi == 0);
1506
1507 let mut storage = TieredStorage::new(DurabilityMode::local());
1508 storage.insert_l2_segment(421, local);
1509
1510 let cx = Cx::<cap::All>::new();
1511 let mut remote = MockRemoteTier::default();
1512 remote.set_object_symbols(object_id, full);
1513 remote.cancel_after_fetch = Some(cx.clone());
1514
1515 let result =
1516 storage.fetch_object(&cx, object_id, 59, Some(&mut remote), Some(remote_cap(14)));
1517 assert!(matches!(result, Err(FrankenError::Busy)));
1518 assert_eq!(remote.fetch_calls(), 1);
1519 assert!(
1520 !storage.l2_segment_exists(storage.write_back_segment_id()),
1521 "cancelled fetch must not append repaired symbols into the write-back segment"
1522 );
1523 }
1524
1525 #[test]
1526 fn test_fetch_fallback_failure_emits_decode_proof() {
1527 let object_id = object_id_from_u64(66);
1528 let payload = b"fallback-threshold-failure";
1529 let mut full = make_symbol_records(object_id, payload, 8, 0);
1530 for record in &mut full {
1531 if record.esi == 0 {
1532 *record = SymbolRecord::new(
1533 record.object_id,
1534 record.oti,
1535 record.esi,
1536 record.symbol_data.clone(),
1537 SymbolRecordFlags::empty(),
1538 );
1539 }
1540 }
1541
1542 let mut local_partial = full.clone();
1543 local_partial.retain(|record| record.esi == 0 || record.esi == 2);
1544 let mut remote_source = full;
1545 remote_source.retain(|record| record.esi == 1);
1546
1547 let mut storage = TieredStorage::new(DurabilityMode::local());
1548 storage.insert_l2_segment(416, local_partial);
1549
1550 let mut remote = MockRemoteTier::default();
1551 remote.set_object_symbols(object_id, remote_source);
1552 let cx = Cx::<cap::All>::new();
1553
1554 let result =
1555 storage.fetch_object(&cx, object_id, 54, Some(&mut remote), Some(remote_cap(6)));
1556 assert!(matches!(result, Err(FrankenError::DatabaseCorrupt { .. })));
1557 if let Err(FrankenError::DatabaseCorrupt { detail }) = result {
1558 assert!(
1559 detail.contains("insufficient_symbols_for_fallback"),
1560 "expected deterministic fallback-failure detail, got: {detail}"
1561 );
1562 }
1563 let audit = storage.take_decode_audit_entries();
1564 assert!(
1565 audit.iter().any(|entry| !entry.decode_success),
1566 "expected at least one failure proof artifact"
1567 );
1568 assert!(
1569 audit
1570 .iter()
1571 .any(|entry| !entry.decode_success && entry.proof.symbols_received.len() >= 2),
1572 "expected proof to capture available symbol cardinality"
1573 );
1574 }
1575
1576 #[test]
1577 fn test_write_back_segment_keeps_missing_symbols_per_object() {
1578 let object_a = object_id_from_u64(68);
1579 let payload_a = b"first-payload";
1580 let full_a = make_symbol_records(object_a, payload_a, 8, 0);
1581 let mut local_a = full_a.clone();
1582 local_a.retain(|record| record.esi == 0);
1583
1584 let object_b = object_id_from_u64(69);
1585 let payload_b = b"second-bytes!";
1586 let full_b = make_symbol_records(object_b, payload_b, 8, 0);
1587 let mut local_b = full_b.clone();
1588 local_b.retain(|record| record.esi == 0);
1589
1590 let mut storage = TieredStorage::new(DurabilityMode::local());
1591 storage.insert_l2_segment(418, local_a);
1592 storage.insert_l2_segment(419, local_b);
1593
1594 let mut remote = MockRemoteTier::default();
1595 remote.set_object_symbols(object_a, full_a);
1596 remote.set_object_symbols(object_b, full_b);
1597 let cx = Cx::<cap::All>::new();
1598
1599 let first_fetch = storage
1600 .fetch_object(&cx, object_a, 57, Some(&mut remote), Some(remote_cap(13)))
1601 .expect("first object fetch succeeds");
1602 assert!(first_fetch.remote_used);
1603 assert!(first_fetch.write_back_count > 0);
1604
1605 let second_fetch = storage
1606 .fetch_object(&cx, object_b, 58, Some(&mut remote), Some(remote_cap(13)))
1607 .expect("second object fetch succeeds");
1608 assert!(second_fetch.remote_used);
1609 assert!(second_fetch.write_back_count > 0);
1610
1611 let replay_a = storage
1612 .fetch_object(&cx, object_a, 59, Option::<&mut MockRemoteTier>::None, None)
1613 .expect("first object should remain recoverable from local write-back");
1614 assert_eq!(replay_a.bytes, payload_a);
1615 assert!(!replay_a.remote_used);
1616
1617 let replay_b = storage
1618 .fetch_object(&cx, object_b, 60, Option::<&mut MockRemoteTier>::None, None)
1619 .expect("second object should remain recoverable from local write-back");
1620 assert_eq!(replay_b.bytes, payload_b);
1621 assert!(!replay_b.remote_used);
1622 }
1623
1624 #[test]
1625 fn test_fallback_decode_proof_stable_for_same_inputs() {
1626 let run_once = || -> EcsDecodeProof {
1627 let object_id = object_id_from_u64(67);
1628 let payload = b"fallback-proof-stability";
1629 let mut full = make_symbol_records(object_id, payload, 8, 3);
1630 for record in &mut full {
1631 if record.esi == 0 {
1632 *record = SymbolRecord::new(
1633 record.object_id,
1634 record.oti,
1635 record.esi,
1636 record.symbol_data.clone(),
1637 SymbolRecordFlags::empty(),
1638 );
1639 }
1640 }
1641
1642 let mut local_partial = full.clone();
1643 local_partial.retain(|record| record.esi == 0 || record.esi == 2);
1644 let mut remote_repairs = full;
1645 remote_repairs.retain(|record| record.esi == 1 || record.esi >= 3);
1646
1647 let mut storage = TieredStorage::new(DurabilityMode::local());
1648 storage.insert_l2_segment(417, local_partial);
1649
1650 let mut remote = MockRemoteTier::default();
1651 remote.set_object_symbols(object_id, remote_repairs);
1652 let cx = Cx::<cap::All>::new();
1653
1654 let outcome = storage
1655 .fetch_object(&cx, object_id, 55, Some(&mut remote), Some(remote_cap(7)))
1656 .expect("fallback fetch succeeds");
1657 assert!(matches!(
1658 outcome.read_path,
1659 SymbolReadPath::FullDecodeFallback { .. }
1660 ));
1661 outcome
1662 .decode_proof
1663 .expect("fallback-success path should emit decode proof")
1664 };
1665
1666 let proof_a = run_once();
1667 let proof_b = run_once();
1668 assert_eq!(
1669 proof_a, proof_b,
1670 "proof artifacts must be stable for identical fallback input sets"
1671 );
1672 }
1673
1674 #[test]
1675 fn test_symbolrecord_corruption_erasures_seeded_property() {
1676 let mut runner = TestRunner::new(ProptestConfig {
1677 cases: 96,
1678 failure_persistence: None,
1679 rng_algorithm: RngAlgorithm::ChaCha,
1680 rng_seed: RngSeed::Fixed(0x0BAD_C0DE_u64),
1681 ..ProptestConfig::default()
1682 });
1683
1684 let strategy = (
1685 prop::collection::vec(proptest::num::u8::ANY, 17..96),
1686 prop::collection::vec(0_u8..7, 0..4),
1687 prop::collection::vec(0_u8..7, 0..4),
1688 );
1689
1690 runner
1691 .run(&strategy, |(payload, dropped_raw, corrupted_raw)| {
1692 let object_id = object_id_from_u64(77);
1693 let full = make_symbol_records(object_id, &payload, 8, 4);
1694
1695 let dropped: BTreeSet<u32> = dropped_raw.into_iter().map(u32::from).collect();
1696 let corrupted: BTreeSet<u32> = corrupted_raw.into_iter().map(u32::from).collect();
1697
1698 let mut remote_records = Vec::new();
1699 let mut expected_rejected = BTreeSet::new();
1700 let mut accepted_esis = BTreeSet::new();
1701
1702 for mut record in full {
1703 if dropped.contains(&record.esi) {
1704 continue;
1705 }
1706 if record.esi == 0 {
1709 record = SymbolRecord::new(
1710 record.object_id,
1711 record.oti,
1712 record.esi,
1713 record.symbol_data.clone(),
1714 SymbolRecordFlags::empty(),
1715 );
1716 }
1717 if corrupted.contains(&record.esi) {
1718 if let Some(first) = record.symbol_data.first_mut() {
1719 *first ^= 0x5A;
1720 }
1721 expected_rejected.insert(record.esi);
1722 } else {
1723 accepted_esis.insert(record.esi);
1724 }
1725 remote_records.push(record);
1726 }
1727
1728 let source_symbols = payload.len().div_ceil(8);
1729 let required_symbols = source_symbols.saturating_add(DEFAULT_FALLBACK_DECODE_SLACK);
1730 let has_complete_source_run = (0..source_symbols).all(|index| {
1731 let esi = u32::try_from(index).expect("source index fits in u32");
1732 accepted_esis.contains(&esi)
1733 });
1734 let expect_success =
1735 accepted_esis.len() >= required_symbols && has_complete_source_run;
1736
1737 let mut storage = TieredStorage::new(DurabilityMode::local());
1738 let mut remote = MockRemoteTier::default();
1739 remote.set_object_symbols(object_id, remote_records);
1740 let cx = Cx::<cap::All>::new();
1741
1742 let result = storage.fetch_object(
1743 &cx,
1744 object_id,
1745 56,
1746 Some(&mut remote),
1747 Some(remote_cap(12)),
1748 );
1749
1750 if expect_success {
1751 let outcome =
1752 result.expect("decode should succeed when enough valid symbols remain");
1753 let used_fallback =
1754 matches!(outcome.read_path, SymbolReadPath::FullDecodeFallback { .. });
1755 prop_assert!(used_fallback);
1756 prop_assert_eq!(outcome.bytes, payload);
1757 let proof = outcome
1758 .decode_proof
1759 .expect("fallback-success path should emit decode proof");
1760 prop_assert!(proof.decode_success);
1761 prop_assert_eq!(rejected_esis_set(&proof), expected_rejected);
1762 prop_assert!(decode_proof_report_ok(&proof));
1763 } else {
1764 let is_corrupt_error =
1765 matches!(result, Err(FrankenError::DatabaseCorrupt { .. }));
1766 prop_assert!(is_corrupt_error);
1767 let audit = storage.take_decode_audit_entries();
1768 let Some(failure_entry) = audit.iter().find(|entry| !entry.decode_success)
1769 else {
1770 return Err(proptest::test_runner::TestCaseError::fail(
1771 "expected failure decode proof artifact",
1772 ));
1773 };
1774 prop_assert_eq!(rejected_esis_set(&failure_entry.proof), expected_rejected);
1775 prop_assert!(decode_proof_report_ok(&failure_entry.proof));
1776 }
1777
1778 Ok(())
1779 })
1780 .expect("seeded SymbolRecord corruption property should hold");
1781 }
1782
1783 #[test]
1784 fn test_durability_mode_local_no_remote() {
1785 let object_id = object_id_from_u64(7);
1786 let payload = b"local-durability-no-remote";
1787 let records = make_symbol_records(object_id, payload, 8, 1);
1788
1789 let mut storage = TieredStorage::new(DurabilityMode::local());
1790 let mut remote = MockRemoteTier::default();
1791 remote.set_acked_stores(3);
1792 let cx = Cx::<cap::All>::new();
1793 let request = CommitRequest::new(50, records, 60);
1794
1795 let outcome = storage
1796 .commit_segment(&cx, request, Some(&mut remote), Some(remote_cap(4)))
1797 .expect("local durability commit succeeds");
1798
1799 assert!(!outcome.remote_io);
1800 assert_eq!(remote.upload_calls(), 0);
1801 assert!(storage.l2_segment_exists(50));
1802 }
1803
1804 #[test]
1805 fn test_durability_mode_quorum_requires_ack() {
1806 let object_id = object_id_from_u64(8);
1807 let payload = b"quorum-durability";
1808 let records = make_symbol_records(object_id, payload, 8, 1);
1809
1810 let mut storage = TieredStorage::new(DurabilityMode::quorum(2, 3).expect("valid quorum"));
1811 let mut remote = MockRemoteTier::default();
1812 let cx = Cx::<cap::All>::new();
1813 let cap = Some(remote_cap(10));
1814
1815 remote.set_acked_stores(1);
1816 let req_fail = CommitRequest::new(60, records.clone(), 61);
1817 let fail = storage.commit_segment(&cx, req_fail, Some(&mut remote), cap);
1818 assert!(matches!(fail, Err(FrankenError::Busy)));
1819 assert!(storage.l2_segment_exists(60));
1820
1821 remote.set_acked_stores(2);
1822 let req_ok = CommitRequest::new(61, records, 62);
1823 let ok = storage
1824 .commit_segment(&cx, req_ok, Some(&mut remote), cap)
1825 .expect("quorum commit succeeds after sufficient ACKs");
1826 assert!(ok.remote_io);
1827 assert!(storage.l2_segment_exists(61));
1828 }
1829
1830 #[test]
1831 fn test_tiered_storage_remote_admission_cancelled_while_waiting() {
1832 let object_id = object_id_from_u64(9);
1833 let payload = b"tiered-admission-cancel";
1834 let records = make_symbol_records(object_id, payload, 8, 1);
1835 let mut storage = TieredStorage::new_with_remote_executor(
1836 DurabilityMode::quorum(1, 2).expect("valid quorum"),
1837 RemoteAdmissionExecutor::with_limits(
1838 crate::remote_effects::TIERED_STORAGE_EXECUTOR_NAME,
1839 1,
1840 1,
1841 Duration::from_secs(1),
1842 )
1843 .expect("executor config valid"),
1844 );
1845 let held_executor = Arc::clone(&storage.remote_executor);
1846 let _held_permit = held_executor
1847 .try_acquire_for_testing()
1848 .expect("first permit must saturate admission");
1849
1850 let cx = Cx::<cap::All>::new();
1851 let canceller = {
1852 let cancel_cx = cx.clone();
1853 std::thread::spawn(move || {
1854 std::thread::sleep(Duration::from_millis(20));
1855 cancel_cx.cancel();
1856 })
1857 };
1858
1859 let mut remote = MockRemoteTier::default();
1860 remote.set_acked_stores(2);
1861 let request = CommitRequest::new(70, records, 71);
1862 let result = storage.commit_segment(&cx, request, Some(&mut remote), Some(remote_cap(12)));
1863
1864 canceller.join().expect("canceller thread joins");
1865
1866 assert!(matches!(result, Err(FrankenError::Busy)));
1867 assert_eq!(remote.upload_calls(), 0);
1868 assert!(storage.l2_segment_exists(70));
1869 }
1870
1871 #[test]
1872 fn test_e2e_tiered_storage_evict_and_fetch() {
1873 let mut storage = TieredStorage::new(DurabilityMode::local());
1874 let mut remote = MockRemoteTier::default();
1875 remote.set_acked_stores(3);
1876 let cx = Cx::<cap::All>::new();
1877 let cap = Some(remote_cap(11));
1878
1879 let mut expected = HashMap::<ObjectId, Vec<u8>>::new();
1880 for idx in 0_u64..500_u64 {
1881 let segment_id = idx + 1;
1882 let object_id = object_id_from_u64(10_000 + idx);
1883 let payload = format!("commit-{segment_id:04}-payload").into_bytes();
1884 let records = make_symbol_records(object_id, &payload, 16, 2);
1885 storage.insert_l2_segment(segment_id, records);
1886 expected.insert(object_id, payload);
1887 }
1888
1889 for segment_id in 1_u64..=500_u64 {
1890 let outcome = storage
1891 .evict_segment(&cx, segment_id, 1, 70, &mut remote, cap)
1892 .expect("eviction succeeds");
1893 assert_eq!(outcome.phase, EvictionPhase::Retired);
1894 assert!(outcome.evicted);
1895 }
1896 assert_eq!(storage.l2_segment_count(), 0);
1897
1898 let target_object = object_id_from_u64(10_321);
1899 let outcome = storage
1900 .fetch_object(&cx, target_object, 71, Some(&mut remote), cap)
1901 .expect("remote fetch after eviction succeeds");
1902 assert_eq!(
1903 outcome.bytes,
1904 expected
1905 .get(&target_object)
1906 .expect("target payload available")
1907 .clone()
1908 );
1909 assert!(outcome.remote_used);
1910 assert!(storage.l2_segment_exists(storage.write_back_segment_id()));
1911 assert!(outcome.write_back_count > 0);
1912 }
1913}