use std::collections::BTreeMap;
use std::sync::Arc;
use fsqlite_error::{FrankenError, Result};
use fsqlite_types::cx::{Cx, cap};
use fsqlite_types::{
IdempotencyKey, ObjectId, Oti, RemoteCap, Saga, SymbolReadPath, SymbolRecord,
SystematicLayoutError, reconstruct_systematic_happy_path, source_symbol_count,
};
use tracing::{debug, info, warn};
use xxhash_rust::xxh3::xxh3_64;
use crate::decode_proofs::{EcsDecodeProof, RejectedSymbol, SymbolDigest, SymbolRejectionReason};
use crate::remote_effects::Executor as RemoteAdmissionExecutor;
const BEAD_ID: &str = "bd-1hi.29";
const FETCH_SYMBOLS_COMPUTATION: &str = "fsqlite:tiered:fetch_symbols:v1";
const UPLOAD_SEGMENT_COMPUTATION: &str = "fsqlite:tiered:upload_segment:v1";
const DEFAULT_WRITE_BACK_SEGMENT_ID: u64 = u64::MAX - 1;
const DEFAULT_FALLBACK_DECODE_SLACK: usize = 2;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DurabilityMode {
Local,
Quorum { required: u32, total: u32 },
}
impl DurabilityMode {
#[must_use]
pub const fn local() -> Self {
Self::Local
}
pub fn quorum(required: u32, total: u32) -> Result<Self> {
if required == 0 || required > total {
return Err(FrankenError::OutOfRange {
what: "durability quorum".to_owned(),
value: format!("required={required}, total={total}"),
});
}
Ok(Self::Quorum { required, total })
}
#[must_use]
pub const fn requires_remote(self) -> bool {
matches!(self, Self::Quorum { .. })
}
#[must_use]
pub const fn quorum_satisfied(self, acked_stores: u32) -> bool {
match self {
Self::Local => true,
Self::Quorum { required, .. } => acked_stores >= required,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FetchSymbolsRequest {
pub object_id: ObjectId,
pub preferred_esis: Vec<u32>,
pub max_symbols: usize,
pub idempotency_key: IdempotencyKey,
pub ecs_epoch: u64,
pub remote_cap: RemoteCap,
pub computation: &'static str,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct UploadSegmentRequest {
pub segment_id: u64,
pub records: Vec<SymbolRecord>,
pub idempotency_key: IdempotencyKey,
pub saga: Saga,
pub ecs_epoch: u64,
pub remote_cap: RemoteCap,
pub computation: &'static str,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct UploadSegmentReceipt {
pub acked_stores: u32,
pub deduplicated: bool,
}
pub trait RemoteTier {
fn fetch_symbols(&mut self, request: &FetchSymbolsRequest) -> Result<Vec<SymbolRecord>>;
fn upload_segment(&mut self, request: &UploadSegmentRequest) -> Result<UploadSegmentReceipt>;
fn segment_recoverable(&self, segment_id: u64, min_symbols_per_object: usize) -> bool;
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CommitRequest {
pub segment_id: u64,
pub records: Vec<SymbolRecord>,
pub idempotency_key: IdempotencyKey,
pub saga: Saga,
pub ecs_epoch: u64,
}
impl CommitRequest {
#[must_use]
pub fn new(segment_id: u64, records: Vec<SymbolRecord>, ecs_epoch: u64) -> Self {
let request_bytes = segment_request_bytes(segment_id, &records);
let idempotency_key = IdempotencyKey::derive(ecs_epoch, &request_bytes);
let saga = Saga::new(idempotency_key);
Self {
segment_id,
records,
idempotency_key,
saga,
ecs_epoch,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct CommitOutcome {
pub remote_io: bool,
pub upload_receipt: Option<UploadSegmentReceipt>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FetchOutcome {
pub bytes: Vec<u8>,
pub read_path: SymbolReadPath,
pub remote_used: bool,
pub write_back_count: usize,
pub decode_proof: Option<EcsDecodeProof>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DecodeAuditEntry {
pub seq: u64,
pub object_id: ObjectId,
pub decode_success: bool,
pub proof: EcsDecodeProof,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum EvictionPhase {
Uploaded,
CompensatedCancelled,
CompensatedPrecondition,
Retired,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct EvictionOutcome {
pub phase: EvictionPhase,
pub evicted: bool,
pub local_retained: bool,
pub upload_receipt: UploadSegmentReceipt,
}
#[derive(Debug)]
pub struct TieredStorage {
durability_mode: DurabilityMode,
remote_executor: Arc<RemoteAdmissionExecutor>,
write_back_segment_id: u64,
l2_segments: BTreeMap<u64, Vec<SymbolRecord>>,
decode_audit_seq: u64,
decode_audit: Vec<DecodeAuditEntry>,
}
impl Default for TieredStorage {
fn default() -> Self {
Self::new(DurabilityMode::Local)
}
}
impl TieredStorage {
fn new_with_remote_executor(
durability_mode: DurabilityMode,
remote_executor: RemoteAdmissionExecutor,
) -> Self {
Self {
durability_mode,
remote_executor: Arc::new(remote_executor),
write_back_segment_id: DEFAULT_WRITE_BACK_SEGMENT_ID,
l2_segments: BTreeMap::new(),
decode_audit_seq: 0,
decode_audit: Vec::new(),
}
}
#[must_use]
pub fn new(durability_mode: DurabilityMode) -> Self {
Self::new_with_remote_executor(
durability_mode,
RemoteAdmissionExecutor::balanced_tiered_storage_default(),
)
}
#[must_use]
pub const fn durability_mode(&self) -> DurabilityMode {
self.durability_mode
}
pub fn set_durability_mode(&mut self, mode: DurabilityMode) {
self.durability_mode = mode;
}
#[must_use]
pub const fn write_back_segment_id(&self) -> u64 {
self.write_back_segment_id
}
pub fn take_decode_audit_entries(&mut self) -> Vec<DecodeAuditEntry> {
std::mem::take(&mut self.decode_audit)
}
pub fn insert_l2_segment(&mut self, segment_id: u64, records: Vec<SymbolRecord>) {
self.l2_segments.insert(segment_id, records);
}
#[must_use]
pub fn l2_segment_count(&self) -> usize {
self.l2_segments.len()
}
#[must_use]
pub fn l2_segment_exists(&self, segment_id: u64) -> bool {
self.l2_segments.contains_key(&segment_id)
}
#[must_use]
pub fn l2_records_for_object(&self, object_id: ObjectId) -> Vec<SymbolRecord> {
let mut by_esi = BTreeMap::<u32, SymbolRecord>::new();
for segment in self.l2_segments.values() {
for record in segment {
if record.object_id == object_id {
merge_symbol_record_by_esi(&mut by_esi, record.clone());
}
}
}
by_esi.into_values().collect()
}
pub fn commit_segment<Caps, R>(
&mut self,
cx: &Cx<Caps>,
request: CommitRequest,
remote: Option<&mut R>,
remote_cap: Option<RemoteCap>,
) -> Result<CommitOutcome>
where
Caps: cap::SubsetOf<cap::All>,
R: RemoteTier,
{
self.insert_l2_segment(request.segment_id, request.records.clone());
if !self.durability_mode.requires_remote() {
info!(
bead_id = BEAD_ID,
segment_id = request.segment_id,
mode = "local",
"commit satisfied by L2 only"
);
return Ok(CommitOutcome {
remote_io: false,
upload_receipt: None,
});
}
let cap = remote_cap.ok_or(FrankenError::AuthDenied)?;
let remote_store = remote.ok_or(FrankenError::AuthDenied)?;
let upload_request = UploadSegmentRequest {
segment_id: request.segment_id,
records: request.records,
idempotency_key: request.idempotency_key,
saga: request.saga,
ecs_epoch: request.ecs_epoch,
remote_cap: cap,
computation: UPLOAD_SEGMENT_COMPUTATION,
};
let receipt = self.remote_executor.run(
cx,
upload_request.computation,
Some(upload_request.saga),
Some(upload_request.idempotency_key),
upload_request.ecs_epoch,
|| remote_store.upload_segment(&upload_request),
)?;
if !self.durability_mode.quorum_satisfied(receipt.acked_stores) {
warn!(
bead_id = BEAD_ID,
segment_id = request.segment_id,
acked_stores = receipt.acked_stores,
"quorum durability not yet satisfied"
);
return Err(FrankenError::Busy);
}
Ok(CommitOutcome {
remote_io: true,
upload_receipt: Some(receipt),
})
}
pub fn fetch_object<Caps, R>(
&mut self,
cx: &Cx<Caps>,
object_id: ObjectId,
ecs_epoch: u64,
remote: Option<&mut R>,
remote_cap: Option<RemoteCap>,
) -> Result<FetchOutcome>
where
Caps: cap::SubsetOf<cap::All>,
R: RemoteTier,
{
let local_records = self.l2_records_for_object(object_id);
if !local_records.is_empty() {
match recover_object_hybrid(&local_records) {
Ok(local) => {
if let Some(proof) = local.decode_proof.clone() {
self.record_decode_proof(proof);
}
return Ok(FetchOutcome {
bytes: local.bytes,
read_path: local.read_path,
remote_used: false,
write_back_count: 0,
decode_proof: local.decode_proof,
});
}
Err(failure) => {
self.record_decode_proof(failure.proof);
debug!(
bead_id = BEAD_ID,
object_id = %object_id,
reason = %failure.reason,
"local fallback decode attempt failed; escalating to remote tier"
);
}
}
}
let cap = remote_cap.ok_or(FrankenError::AuthDenied)?;
let remote_store = remote.ok_or(FrankenError::AuthDenied)?;
let preferred_esis = preferred_source_esis(local_records.first().map(|record| record.oti));
let idempotency_key = derive_fetch_key(object_id, &preferred_esis, ecs_epoch);
let fetch_request = FetchSymbolsRequest {
object_id,
preferred_esis,
max_symbols: usize::MAX,
idempotency_key,
ecs_epoch,
remote_cap: cap,
computation: FETCH_SYMBOLS_COMPUTATION,
};
let fetched = self.remote_executor.run(
cx,
fetch_request.computation,
None,
Some(fetch_request.idempotency_key),
fetch_request.ecs_epoch,
|| remote_store.fetch_symbols(&fetch_request),
)?;
if fetched.is_empty() {
return Err(FrankenError::Internal(format!(
"remote tier returned no symbols for object {object_id}"
)));
}
cx.checkpoint().map_err(|_| FrankenError::Busy)?;
let merged = merge_symbol_sets(&local_records, &fetched);
let recovered = match recover_object_hybrid(&merged) {
Ok(value) => value,
Err(failure) => {
let detail = failure.reason.clone();
self.record_decode_proof(failure.proof);
return Err(FrankenError::DatabaseCorrupt {
detail: format!("unable to recover object {object_id}: {detail}"),
});
}
};
if let Some(proof) = recovered.decode_proof.clone() {
self.record_decode_proof(proof);
}
let write_back_count = self.write_back_repairs(&local_records, &fetched);
Ok(FetchOutcome {
bytes: recovered.bytes,
read_path: recovered.read_path,
remote_used: true,
write_back_count,
decode_proof: recovered.decode_proof,
})
}
pub fn evict_segment<Caps, R>(
&mut self,
cx: &Cx<Caps>,
segment_id: u64,
min_symbols_per_object: usize,
ecs_epoch: u64,
remote: &mut R,
remote_cap: Option<RemoteCap>,
) -> Result<EvictionOutcome>
where
Caps: cap::SubsetOf<cap::All>,
R: RemoteTier,
{
let cap = remote_cap.ok_or(FrankenError::AuthDenied)?;
let records =
self.l2_segments.get(&segment_id).cloned().ok_or_else(|| {
FrankenError::Internal(format!("unknown L2 segment {segment_id}"))
})?;
let key = derive_evict_key(segment_id, &records, ecs_epoch);
let upload_request = UploadSegmentRequest {
segment_id,
records,
idempotency_key: key,
saga: Saga::new(key),
ecs_epoch,
remote_cap: cap,
computation: UPLOAD_SEGMENT_COMPUTATION,
};
let receipt = self.remote_executor.run(
cx,
upload_request.computation,
Some(upload_request.saga),
Some(upload_request.idempotency_key),
upload_request.ecs_epoch,
|| remote.upload_segment(&upload_request),
)?;
debug!(
bead_id = BEAD_ID,
segment_id,
acked_stores = receipt.acked_stores,
"segment uploaded to L3"
);
if cx.is_cancel_requested() || cx.checkpoint().is_err() {
warn!(
bead_id = BEAD_ID,
segment_id, "eviction cancelled; retaining local segment"
);
return Ok(EvictionOutcome {
phase: EvictionPhase::CompensatedCancelled,
evicted: false,
local_retained: true,
upload_receipt: receipt,
});
}
if !remote.segment_recoverable(segment_id, min_symbols_per_object) {
warn!(
bead_id = BEAD_ID,
segment_id,
min_symbols_per_object,
"eviction precondition failed; retaining local segment"
);
return Ok(EvictionOutcome {
phase: EvictionPhase::CompensatedPrecondition,
evicted: false,
local_retained: true,
upload_receipt: receipt,
});
}
let _removed = self.l2_segments.remove(&segment_id);
info!(bead_id = BEAD_ID, segment_id, "segment evicted from L2");
Ok(EvictionOutcome {
phase: EvictionPhase::Retired,
evicted: true,
local_retained: false,
upload_receipt: receipt,
})
}
fn write_back_repairs(&mut self, local: &[SymbolRecord], fetched: &[SymbolRecord]) -> usize {
let local_by_esi: BTreeMap<u32, &SymbolRecord> =
local.iter().map(|record| (record.esi, record)).collect();
let mut repairs_by_esi = BTreeMap::<u32, SymbolRecord>::new();
for record in fetched {
let needs_repair = match local_by_esi.get(&record.esi) {
None => true,
Some(existing) => !existing.verify_integrity() && record.verify_integrity(),
};
if needs_repair {
merge_symbol_record_by_esi(&mut repairs_by_esi, record.clone());
}
}
let repairs: Vec<SymbolRecord> = repairs_by_esi.into_values().collect();
if repairs.is_empty() {
return 0;
}
let added = repairs.len();
let segment = self
.l2_segments
.entry(self.write_back_segment_id)
.or_default();
let mut deduped = BTreeMap::<(ObjectId, u32), SymbolRecord>::new();
for record in segment.drain(..) {
deduped.insert((record.object_id, record.esi), record);
}
for record in repairs {
let key = (record.object_id, record.esi);
match deduped.entry(key) {
std::collections::btree_map::Entry::Vacant(entry) => {
entry.insert(record);
}
std::collections::btree_map::Entry::Occupied(mut entry) => {
if prefer_symbol_record(entry.get(), &record) {
entry.insert(record);
}
}
}
}
*segment = deduped.into_values().collect();
added
}
fn record_decode_proof(&mut self, proof: EcsDecodeProof) {
self.decode_audit_seq = self.decode_audit_seq.saturating_add(1);
self.decode_audit.push(DecodeAuditEntry {
seq: self.decode_audit_seq,
object_id: proof.object_id,
decode_success: proof.decode_success,
proof,
});
}
}
fn preferred_source_esis(oti: Option<Oti>) -> Vec<u32> {
let Some(oti) = oti else {
return Vec::new();
};
let Ok(source_symbols) = source_symbol_count(oti) else {
return Vec::new();
};
let max_u32 = usize::try_from(u32::MAX).unwrap_or(usize::MAX);
let capped = source_symbols.min(max_u32);
let mut esis = Vec::with_capacity(capped);
for idx in 0..capped {
if let Ok(esi) = u32::try_from(idx) {
esis.push(esi);
}
}
esis
}
fn derive_fetch_key(object_id: ObjectId, preferred_esis: &[u32], ecs_epoch: u64) -> IdempotencyKey {
let mut bytes = Vec::with_capacity(16 + preferred_esis.len() * 4);
bytes.extend_from_slice(object_id.as_bytes());
for esi in preferred_esis {
bytes.extend_from_slice(&esi.to_le_bytes());
}
IdempotencyKey::derive(ecs_epoch, &bytes)
}
fn segment_request_bytes(segment_id: u64, records: &[SymbolRecord]) -> Vec<u8> {
let payload_bytes = records.iter().fold(0_usize, |acc, record| {
acc.saturating_add(record.to_bytes().len())
});
let mut bytes = Vec::with_capacity(16 + payload_bytes + records.len().saturating_mul(8));
bytes.extend_from_slice(&segment_id.to_le_bytes());
bytes.extend_from_slice(
&u64::try_from(records.len())
.unwrap_or(u64::MAX)
.to_le_bytes(),
);
for record in records {
let record_bytes = record.to_bytes();
bytes.extend_from_slice(
&u64::try_from(record_bytes.len())
.unwrap_or(u64::MAX)
.to_le_bytes(),
);
bytes.extend_from_slice(&record_bytes);
}
bytes
}
fn derive_evict_key(segment_id: u64, records: &[SymbolRecord], ecs_epoch: u64) -> IdempotencyKey {
let request_bytes = segment_request_bytes(segment_id, records);
IdempotencyKey::derive(ecs_epoch, &request_bytes)
}
fn prefer_symbol_record(existing: &SymbolRecord, candidate: &SymbolRecord) -> bool {
!existing.verify_integrity() && candidate.verify_integrity()
}
fn merge_symbol_record_by_esi(by_esi: &mut BTreeMap<u32, SymbolRecord>, record: SymbolRecord) {
merge_symbol_record_by_key(by_esi, record.esi, record);
}
fn merge_symbol_record_by_key<K: Ord>(
map: &mut BTreeMap<K, SymbolRecord>,
key: K,
record: SymbolRecord,
) {
match map.entry(key) {
std::collections::btree_map::Entry::Vacant(entry) => {
entry.insert(record);
}
std::collections::btree_map::Entry::Occupied(mut entry) => {
if prefer_symbol_record(entry.get(), &record) {
entry.insert(record);
}
}
}
}
fn merge_symbol_sets(local: &[SymbolRecord], fetched: &[SymbolRecord]) -> Vec<SymbolRecord> {
let mut by_esi = BTreeMap::<u32, SymbolRecord>::new();
for record in local {
merge_symbol_record_by_esi(&mut by_esi, record.clone());
}
for record in fetched {
merge_symbol_record_by_esi(&mut by_esi, record.clone());
}
by_esi.into_values().collect()
}
#[derive(Debug, Clone)]
struct HybridRecoverResult {
bytes: Vec<u8>,
read_path: SymbolReadPath,
decode_proof: Option<EcsDecodeProof>,
}
#[derive(Debug, Clone)]
struct FallbackDecodeSuccess {
bytes: Vec<u8>,
proof: EcsDecodeProof,
}
#[derive(Debug, Clone)]
struct FallbackDecodeFailure {
reason: String,
proof: EcsDecodeProof,
}
impl std::fmt::Display for FallbackDecodeFailure {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.reason)
}
}
#[derive(Debug, Clone)]
struct FallbackSymbolEvidence {
object_id: ObjectId,
source_symbols: usize,
symbol_size: usize,
transfer_len: usize,
accepted_by_esi: BTreeMap<u32, SymbolRecord>,
accepted_esis: Vec<u32>,
rejected_symbols: Vec<RejectedSymbol>,
symbol_digests: Vec<SymbolDigest>,
}
fn recover_object_hybrid(
records: &[SymbolRecord],
) -> std::result::Result<HybridRecoverResult, Box<FallbackDecodeFailure>> {
match reconstruct_systematic_happy_path(records) {
Ok(bytes) => Ok(HybridRecoverResult {
bytes,
read_path: SymbolReadPath::SystematicFastPath,
decode_proof: None,
}),
Err(reason) => {
let fallback = fallback_decode_records(records, &reason)?;
Ok(HybridRecoverResult {
bytes: fallback.bytes,
read_path: SymbolReadPath::FullDecodeFallback { reason },
decode_proof: Some(fallback.proof),
})
}
}
}
fn fallback_decode_records(
records: &[SymbolRecord],
systematic_reason: &SystematicLayoutError,
) -> std::result::Result<FallbackDecodeSuccess, Box<FallbackDecodeFailure>> {
let evidence = collect_fallback_symbol_evidence(records)?;
let k_source = u32::try_from(evidence.source_symbols).unwrap_or(u32::MAX);
let available_symbols = evidence.accepted_esis.len();
let required_symbols = evidence
.source_symbols
.saturating_add(DEFAULT_FALLBACK_DECODE_SLACK);
if available_symbols < required_symbols {
let detail = format!(
"systematic_reason={systematic_reason}; insufficient_symbols_for_fallback: available={available_symbols} required={required_symbols} slack_decode={DEFAULT_FALLBACK_DECODE_SLACK}"
);
return Err(FallbackDecodeFailure {
reason: detail,
proof: build_fallback_decode_proof_from_parts(
evidence.object_id,
k_source,
&evidence.accepted_esis,
&evidence.rejected_symbols,
&evidence.symbol_digests,
false,
Some(u32::try_from(available_symbols).unwrap_or(u32::MAX)),
),
}
.into());
}
let mut out = Vec::with_capacity(evidence.source_symbols.saturating_mul(evidence.symbol_size));
for expected_esi in 0..evidence.source_symbols {
let expected_esi_u32 = u32::try_from(expected_esi).unwrap_or(u32::MAX);
let Some(record) = evidence.accepted_by_esi.get(&expected_esi_u32) else {
let detail = format!(
"systematic_reason={systematic_reason}; missing_source_symbol: esi={expected_esi_u32}"
);
return Err(FallbackDecodeFailure {
reason: detail,
proof: build_fallback_decode_proof_from_parts(
evidence.object_id,
k_source,
&evidence.accepted_esis,
&evidence.rejected_symbols,
&evidence.symbol_digests,
false,
Some(u32::try_from(available_symbols).unwrap_or(u32::MAX)),
),
}
.into());
};
out.extend_from_slice(&record.symbol_data);
}
out.truncate(evidence.transfer_len);
Ok(FallbackDecodeSuccess {
bytes: out,
proof: build_fallback_decode_proof_from_parts(
evidence.object_id,
k_source,
&evidence.accepted_esis,
&evidence.rejected_symbols,
&evidence.symbol_digests,
true,
Some(k_source),
),
})
}
fn collect_fallback_symbol_evidence(
records: &[SymbolRecord],
) -> std::result::Result<FallbackSymbolEvidence, Box<FallbackDecodeFailure>> {
let Some(first) = records.first() else {
return Err(FallbackDecodeFailure {
reason: String::from("empty_symbol_set"),
proof: build_fallback_decode_proof_from_parts(
ObjectId::from_bytes([0_u8; 16]),
0,
&[],
&[],
&[],
false,
Some(0),
),
}
.into());
};
let source_symbols = source_symbol_count(first.oti).map_err(|err| {
Box::new(FallbackDecodeFailure {
reason: format!("invalid_source_symbol_count: {err}"),
proof: build_fallback_decode_proof_from_parts(
first.object_id,
0,
&[],
&[],
&[],
false,
Some(0),
),
})
})?;
let symbol_size = usize::try_from(first.oti.t).map_err(|_| {
Box::new(FallbackDecodeFailure {
reason: String::from("invalid_symbol_size"),
proof: build_fallback_decode_proof_from_parts(
first.object_id,
u32::try_from(source_symbols).unwrap_or(u32::MAX),
&[],
&[],
&[],
false,
Some(0),
),
})
})?;
let transfer_len = usize::try_from(first.oti.f).map_err(|_| {
Box::new(FallbackDecodeFailure {
reason: String::from("invalid_transfer_length"),
proof: build_fallback_decode_proof_from_parts(
first.object_id,
u32::try_from(source_symbols).unwrap_or(u32::MAX),
&[],
&[],
&[],
false,
Some(0),
),
})
})?;
let mut ordered = records.to_vec();
ordered.sort_by_key(|record| record.esi);
let mut accepted_by_esi = BTreeMap::<u32, SymbolRecord>::new();
let mut rejected_symbols = Vec::new();
let mut symbol_digests = Vec::new();
for record in ordered {
let rejection = if record.object_id != first.object_id
|| record.oti != first.oti
|| record.symbol_data.len() != symbol_size
{
Some(SymbolRejectionReason::FormatViolation)
} else if !record.verify_integrity() {
Some(SymbolRejectionReason::HashMismatch)
} else if accepted_by_esi.contains_key(&record.esi) {
Some(SymbolRejectionReason::DuplicateEsi)
} else {
None
};
if let Some(reason) = rejection {
rejected_symbols.push(RejectedSymbol {
esi: record.esi,
reason,
});
continue;
}
symbol_digests.push(SymbolDigest {
esi: record.esi,
digest_xxh3: xxh3_64(&record.to_bytes()),
});
accepted_by_esi.insert(record.esi, record);
}
let accepted_esis = accepted_by_esi.keys().copied().collect();
symbol_digests.sort_by_key(|digest| digest.esi);
Ok(FallbackSymbolEvidence {
object_id: first.object_id,
source_symbols,
symbol_size,
transfer_len,
accepted_by_esi,
accepted_esis,
rejected_symbols,
symbol_digests,
})
}
fn build_fallback_decode_proof_from_parts(
object_id: ObjectId,
k_source: u32,
accepted_esis: &[u32],
rejected_symbols: &[RejectedSymbol],
symbol_digests: &[SymbolDigest],
decode_success: bool,
intermediate_rank: Option<u32>,
) -> EcsDecodeProof {
let seed = deterministic_fallback_seed(object_id, k_source);
let timing_ns = deterministic_fallback_timing_ns(
object_id,
k_source,
accepted_esis,
rejected_symbols,
decode_success,
);
let proof = EcsDecodeProof::from_esis(
object_id,
k_source,
accepted_esis,
decode_success,
intermediate_rank,
timing_ns,
seed,
);
proof
.with_rejected_symbols(rejected_symbols.to_vec())
.with_symbol_digests(symbol_digests.to_vec())
}
fn deterministic_fallback_seed(object_id: ObjectId, k_source: u32) -> u64 {
let mut material = Vec::with_capacity(40);
material.extend_from_slice(b"fsqlite:tiered:fallback:seed:v1");
material.extend_from_slice(object_id.as_bytes());
material.extend_from_slice(&k_source.to_le_bytes());
xxh3_64(&material)
}
fn deterministic_fallback_timing_ns(
object_id: ObjectId,
k_source: u32,
accepted_esis: &[u32],
rejected_symbols: &[RejectedSymbol],
decode_success: bool,
) -> u64 {
let mut material =
Vec::with_capacity(48 + accepted_esis.len() * 4 + rejected_symbols.len() * 5);
material.extend_from_slice(b"fsqlite:tiered:fallback:timing:v1");
material.extend_from_slice(object_id.as_bytes());
material.extend_from_slice(&k_source.to_le_bytes());
material.push(u8::from(decode_success));
for esi in accepted_esis {
material.extend_from_slice(&esi.to_le_bytes());
}
for item in rejected_symbols {
material.extend_from_slice(&item.esi.to_le_bytes());
material.push(rejection_reason_code(item.reason));
}
xxh3_64(&material)
}
fn rejection_reason_code(reason: SymbolRejectionReason) -> u8 {
match reason {
SymbolRejectionReason::HashMismatch => 1,
SymbolRejectionReason::InvalidAuthTag => 2,
SymbolRejectionReason::DuplicateEsi => 3,
SymbolRejectionReason::FormatViolation => 4,
}
}
#[cfg(test)]
mod tests {
use std::collections::{BTreeSet, HashMap};
use std::time::Duration;
use fsqlite_types::cx::{Cx, cap};
use fsqlite_types::{ObjectId, Oti, SymbolRecordFlags};
use proptest::prelude::*;
use proptest::test_runner::{Config as ProptestConfig, RngAlgorithm, RngSeed, TestRunner};
use super::*;
#[derive(Debug, Default)]
struct MockRemoteTier {
object_symbols: HashMap<ObjectId, Vec<SymbolRecord>>,
segment_symbols: HashMap<u64, Vec<SymbolRecord>>,
upload_receipts: HashMap<(u64, IdempotencyKey), UploadSegmentReceipt>,
segment_recoverability_overrides: HashMap<u64, bool>,
upload_calls: usize,
fetch_calls: usize,
configured_acks: u32,
cancel_after_upload: Option<Cx<cap::All>>,
cancel_after_fetch: Option<Cx<cap::All>>,
last_fetch_preferred: Vec<u32>,
}
impl MockRemoteTier {
fn set_object_symbols(&mut self, object_id: ObjectId, records: Vec<SymbolRecord>) {
self.object_symbols.insert(object_id, records);
}
fn set_acked_stores(&mut self, acked_stores: u32) {
self.configured_acks = acked_stores;
}
fn set_segment_recoverable(&mut self, segment_id: u64, recoverable: bool) {
self.segment_recoverability_overrides
.insert(segment_id, recoverable);
}
fn set_cancel_after_upload(&mut self, cx: Cx<cap::All>) {
self.cancel_after_upload = Some(cx);
}
fn upload_calls(&self) -> usize {
self.upload_calls
}
fn fetch_calls(&self) -> usize {
self.fetch_calls
}
}
impl RemoteTier for MockRemoteTier {
fn fetch_symbols(&mut self, request: &FetchSymbolsRequest) -> Result<Vec<SymbolRecord>> {
self.fetch_calls = self.fetch_calls.saturating_add(1);
self.last_fetch_preferred = request.preferred_esis.clone();
let Some(records) = self.object_symbols.get(&request.object_id) else {
return Ok(Vec::new());
};
let preferred: BTreeSet<u32> = request.preferred_esis.iter().copied().collect();
let mut ordered = records.clone();
ordered.sort_by_key(|record| (!preferred.contains(&record.esi), record.esi));
ordered.truncate(request.max_symbols);
if let Some(cx) = self.cancel_after_fetch.take() {
cx.cancel();
}
Ok(ordered)
}
fn upload_segment(
&mut self,
request: &UploadSegmentRequest,
) -> Result<UploadSegmentReceipt> {
let key = (request.segment_id, request.idempotency_key);
if let Some(existing) = self.upload_receipts.get(&key).copied() {
return Ok(UploadSegmentReceipt {
deduplicated: true,
..existing
});
}
self.upload_calls = self.upload_calls.saturating_add(1);
self.segment_symbols
.insert(request.segment_id, request.records.clone());
for record in &request.records {
let entry = self.object_symbols.entry(record.object_id).or_default();
if entry.iter().all(|existing| existing.esi != record.esi) {
entry.push(record.clone());
}
entry.sort_by_key(|existing| existing.esi);
}
let receipt = UploadSegmentReceipt {
acked_stores: self.configured_acks,
deduplicated: false,
};
self.upload_receipts.insert(key, receipt);
if let Some(cx) = self.cancel_after_upload.take() {
cx.cancel();
}
Ok(receipt)
}
fn segment_recoverable(&self, segment_id: u64, min_symbols_per_object: usize) -> bool {
if let Some(override_value) = self.segment_recoverability_overrides.get(&segment_id) {
return *override_value;
}
let Some(records) = self.segment_symbols.get(&segment_id) else {
return false;
};
let mut per_object = HashMap::<ObjectId, usize>::new();
for record in records {
let entry = per_object.entry(record.object_id).or_insert(0);
*entry = entry.saturating_add(1);
}
per_object
.values()
.all(|count| *count >= min_symbols_per_object)
}
}
fn object_id_from_u64(raw: u64) -> ObjectId {
let mut bytes = [0_u8; 16];
bytes[0..8].copy_from_slice(&raw.to_le_bytes());
bytes[8..16].copy_from_slice(&raw.to_le_bytes());
ObjectId::from_bytes(bytes)
}
fn remote_cap(seed: u8) -> RemoteCap {
RemoteCap::from_bytes([seed; 16])
}
fn make_symbol_records(
object_id: ObjectId,
payload: &[u8],
symbol_size: usize,
repair_symbols: usize,
) -> Vec<SymbolRecord> {
let symbol_size_u32 = u32::try_from(symbol_size).expect("symbol_size fits u32");
let transfer_len_u64 = u64::try_from(payload.len()).expect("payload len fits u64");
let oti = Oti {
f: transfer_len_u64,
al: 1,
t: symbol_size_u32,
z: 1,
n: 1,
};
let source_symbols = payload.len().div_ceil(symbol_size);
let mut out = Vec::new();
for idx in 0..source_symbols {
let start = idx * symbol_size;
let end = (start + symbol_size).min(payload.len());
let mut symbol = vec![0_u8; symbol_size];
symbol[..end - start].copy_from_slice(&payload[start..end]);
let esi = u32::try_from(idx).expect("source esi fits u32");
let flags = if idx == 0 {
SymbolRecordFlags::SYSTEMATIC_RUN_START
} else {
SymbolRecordFlags::empty()
};
out.push(SymbolRecord::new(object_id, oti, esi, symbol, flags));
}
for repair_idx in 0..repair_symbols {
let repair_esi_usize = source_symbols.saturating_add(repair_idx);
let esi = u32::try_from(repair_esi_usize).expect("repair esi fits u32");
let mut symbol = vec![0_u8; symbol_size];
let esi_low = u8::try_from(esi & 0xFF).expect("masked to u8");
for (offset, byte) in symbol.iter_mut().enumerate() {
let offset_low = u8::try_from(offset & 0xFF).expect("masked to u8");
*byte = esi_low ^ offset_low;
}
out.push(SymbolRecord::new(
object_id,
oti,
esi,
symbol,
SymbolRecordFlags::empty(),
));
}
out
}
fn rejected_esis_set(proof: &EcsDecodeProof) -> BTreeSet<u32> {
proof
.rejected_symbols
.iter()
.map(|entry| entry.esi)
.collect()
}
fn decode_proof_report_ok(proof: &EcsDecodeProof) -> bool {
proof
.verification_report(
crate::decode_proofs::DecodeProofVerificationConfig::default(),
&proof.symbol_digests,
&proof.rejected_symbols,
)
.ok
}
#[test]
fn test_l3_fetch_requires_remote_cap() {
let object_id = object_id_from_u64(1);
let payload = b"tiered-fetch-requires-cap";
let mut local = make_symbol_records(object_id, payload, 8, 0);
local.retain(|record| record.esi != 1);
let mut storage = TieredStorage::new(DurabilityMode::local());
storage.insert_l2_segment(1, local);
let mut remote = MockRemoteTier::default();
remote.set_object_symbols(object_id, make_symbol_records(object_id, payload, 8, 1));
let cx = Cx::<cap::All>::new();
let result = storage.fetch_object(&cx, object_id, 7, Some(&mut remote), None);
assert!(matches!(result, Err(FrankenError::AuthDenied)));
assert_eq!(remote.fetch_calls(), 0);
}
#[test]
fn test_l3_upload_idempotency_key() {
let object_id = object_id_from_u64(2);
let payload = b"idempotent-upload";
let records = make_symbol_records(object_id, payload, 8, 1);
let mut storage = TieredStorage::new(DurabilityMode::quorum(1, 3).expect("valid quorum"));
let mut remote = MockRemoteTier::default();
remote.set_acked_stores(2);
let cx = Cx::<cap::All>::new();
let cap = Some(remote_cap(9));
let request = CommitRequest::new(10, records, 11);
let first = storage
.commit_segment(&cx, request.clone(), Some(&mut remote), cap)
.expect("first upload succeeds");
let second = storage
.commit_segment(&cx, request, Some(&mut remote), cap)
.expect("second upload returns idempotent result");
assert_eq!(remote.upload_calls(), 1);
let first_receipt = first
.upload_receipt
.expect("first commit has upload receipt");
let second_receipt = second
.upload_receipt
.expect("second commit has upload receipt");
assert!(!first_receipt.deduplicated);
assert!(second_receipt.deduplicated);
}
#[test]
fn test_l3_upload_idempotency_key_changes_with_segment_contents() {
let object_id = object_id_from_u64(22);
let payload_a = b"idempotent-upload-a";
let payload_b = b"idempotent-upload-b";
let records_a = make_symbol_records(object_id, payload_a, 8, 1);
let records_b = make_symbol_records(object_id, payload_b, 8, 1);
let request_a = CommitRequest::new(10, records_a, 11);
let request_b = CommitRequest::new(10, records_b, 11);
assert_ne!(request_a.idempotency_key, request_b.idempotency_key);
assert_ne!(request_a.saga, request_b.saga);
}
#[test]
fn test_evict_idempotency_key_changes_with_segment_contents() {
let object_id = object_id_from_u64(23);
let payload_a = b"evict-segment-a";
let payload_b = b"evict-segment-b";
let records_a = make_symbol_records(object_id, payload_a, 8, 1);
let records_b = make_symbol_records(object_id, payload_b, 8, 1);
let key_a = derive_evict_key(40, &records_a, 12);
let key_b = derive_evict_key(40, &records_b, 12);
assert_ne!(key_a, key_b);
}
#[test]
fn test_eviction_cancel_safety() {
let object_id = object_id_from_u64(3);
let payload = b"eviction-cancel-safety";
let records = make_symbol_records(object_id, payload, 8, 1);
let mut storage = TieredStorage::new(DurabilityMode::local());
storage.insert_l2_segment(20, records);
let cx = Cx::<cap::All>::new();
let mut remote = MockRemoteTier::default();
remote.set_acked_stores(3);
remote.set_cancel_after_upload(cx.clone());
let outcome = storage
.evict_segment(&cx, 20, 1, 50, &mut remote, Some(remote_cap(7)))
.expect("eviction call succeeds");
assert_eq!(outcome.phase, EvictionPhase::CompensatedCancelled);
assert!(!outcome.evicted);
assert!(outcome.local_retained);
assert!(storage.l2_segment_exists(20));
}
#[test]
fn test_eviction_precondition_check() {
let object_id = object_id_from_u64(4);
let payload = b"eviction-precondition-check";
let records = make_symbol_records(object_id, payload, 8, 1);
let mut storage = TieredStorage::new(DurabilityMode::local());
storage.insert_l2_segment(30, records);
let cx = Cx::<cap::All>::new();
let mut remote = MockRemoteTier::default();
remote.set_acked_stores(3);
remote.set_segment_recoverable(30, false);
let outcome = storage
.evict_segment(&cx, 30, 2, 51, &mut remote, Some(remote_cap(8)))
.expect("eviction call succeeds");
assert_eq!(outcome.phase, EvictionPhase::CompensatedPrecondition);
assert!(!outcome.evicted);
assert!(outcome.local_retained);
assert!(storage.l2_segment_exists(30));
}
#[test]
fn test_fetch_on_demand_systematic_fast_path() {
let object_id = object_id_from_u64(5);
let payload = b"systematic-fast-path";
let records = make_symbol_records(object_id, payload, 8, 1);
let mut storage = TieredStorage::new(DurabilityMode::local());
storage.insert_l2_segment(40, records);
let cx = Cx::<cap::All>::new();
let outcome = storage
.fetch_object(
&cx,
object_id,
52,
Option::<&mut MockRemoteTier>::None,
None,
)
.expect("local fast-path fetch succeeds");
assert_eq!(outcome.bytes, payload);
assert!(matches!(
outcome.read_path,
SymbolReadPath::SystematicFastPath
));
assert!(!outcome.remote_used);
assert_eq!(outcome.write_back_count, 0);
assert!(outcome.decode_proof.is_none());
assert!(storage.take_decode_audit_entries().is_empty());
}
#[test]
fn test_fast_path_repeated_reads_emit_no_decode_artifacts() {
let object_id = object_id_from_u64(55);
let payload = b"systematic-fast-path-repeat";
let records = make_symbol_records(object_id, payload, 8, 1);
let mut storage = TieredStorage::new(DurabilityMode::local());
storage.insert_l2_segment(405, records);
let cx = Cx::<cap::All>::new();
for _ in 0..64 {
let outcome = storage
.fetch_object(
&cx,
object_id,
52,
Option::<&mut MockRemoteTier>::None,
None,
)
.expect("local fast-path fetch succeeds");
assert!(matches!(
outcome.read_path,
SymbolReadPath::SystematicFastPath
));
assert!(outcome.decode_proof.is_none());
assert!(!outcome.remote_used);
}
assert!(
storage.take_decode_audit_entries().is_empty(),
"fast path should never invoke fallback decoder/proof emission"
);
}
#[test]
fn test_fetch_on_demand_repair_fallback() {
let object_id = object_id_from_u64(6);
let payload = b"repair-fallback-path";
let mut full = make_symbol_records(object_id, payload, 8, 3);
for record in &mut full {
if record.esi == 0 {
*record = SymbolRecord::new(
record.object_id,
record.oti,
record.esi,
record.symbol_data.clone(),
SymbolRecordFlags::empty(),
);
}
}
let mut local_partial = full.clone();
local_partial.retain(|record| record.esi == 0 || record.esi == 2);
let mut remote_repairs = full;
remote_repairs.retain(|record| record.esi == 1 || record.esi >= 3);
let mut storage = TieredStorage::new(DurabilityMode::local());
storage.insert_l2_segment(41, local_partial);
let mut remote = MockRemoteTier::default();
remote.set_object_symbols(object_id, remote_repairs);
let cx = Cx::<cap::All>::new();
let outcome = storage
.fetch_object(&cx, object_id, 53, Some(&mut remote), Some(remote_cap(5)))
.expect("fallback fetch succeeds");
assert!(matches!(
outcome.read_path,
SymbolReadPath::FullDecodeFallback { .. }
));
assert_eq!(outcome.bytes, payload);
assert!(outcome.remote_used);
assert!(outcome.write_back_count > 0);
assert!(outcome.decode_proof.is_some());
assert_eq!(remote.last_fetch_preferred, vec![0, 1, 2]);
assert!(storage.l2_segment_exists(storage.write_back_segment_id()));
let audit = storage.take_decode_audit_entries();
assert!(
audit.iter().any(|entry| entry.decode_success),
"expected at least one successful fallback proof"
);
assert!(
audit.iter().any(|entry| !entry.decode_success),
"expected local failure proof before remote fallback success"
);
}
#[test]
fn test_fetch_repairs_corrupt_local_symbol_and_persists_healed_copy() {
let object_id = object_id_from_u64(72);
let payload = b"repair-corrupt-local-symbol";
let full = make_symbol_records(object_id, payload, 8, 2);
let mut local = full.clone();
let corrupt = local
.iter_mut()
.find(|record| record.esi == 1)
.expect("seeded symbol set must contain ESI 1");
if let Some(first) = corrupt.symbol_data.first_mut() {
*first ^= 0x5A;
}
assert!(
!corrupt.verify_integrity(),
"test fixture must actually corrupt the local symbol"
);
let mut storage = TieredStorage::new(DurabilityMode::local());
storage.insert_l2_segment(420, local);
let mut remote = MockRemoteTier::default();
remote.set_object_symbols(object_id, full.clone());
let cx = Cx::<cap::All>::new();
let repaired = storage
.fetch_object(&cx, object_id, 57, Some(&mut remote), Some(remote_cap(13)))
.expect("remote repair should recover the object");
assert_eq!(repaired.bytes, payload);
assert!(repaired.remote_used);
assert_eq!(repaired.write_back_count, 1);
let local_replay = storage
.fetch_object(
&cx,
object_id,
58,
Option::<&mut MockRemoteTier>::None,
None,
)
.expect("healed symbol should make later local-only reads succeed");
assert_eq!(local_replay.bytes, payload);
assert!(!local_replay.remote_used);
assert!(
storage
.l2_records_for_object(object_id)
.iter()
.all(SymbolRecord::verify_integrity),
"local self-healing must prefer the repaired symbol over the stale corrupt copy"
);
}
#[test]
fn test_fetch_cancelled_after_remote_read_does_not_write_back_repairs() {
let object_id = object_id_from_u64(73);
let payload = b"cancel-after-remote-fetch";
let full = make_symbol_records(object_id, payload, 8, 1);
let mut local = full.clone();
local.retain(|record| record.esi == 0);
let mut storage = TieredStorage::new(DurabilityMode::local());
storage.insert_l2_segment(421, local);
let cx = Cx::<cap::All>::new();
let mut remote = MockRemoteTier::default();
remote.set_object_symbols(object_id, full);
remote.cancel_after_fetch = Some(cx.clone());
let result =
storage.fetch_object(&cx, object_id, 59, Some(&mut remote), Some(remote_cap(14)));
assert!(matches!(result, Err(FrankenError::Busy)));
assert_eq!(remote.fetch_calls(), 1);
assert!(
!storage.l2_segment_exists(storage.write_back_segment_id()),
"cancelled fetch must not append repaired symbols into the write-back segment"
);
}
#[test]
fn test_fetch_fallback_failure_emits_decode_proof() {
let object_id = object_id_from_u64(66);
let payload = b"fallback-threshold-failure";
let mut full = make_symbol_records(object_id, payload, 8, 0);
for record in &mut full {
if record.esi == 0 {
*record = SymbolRecord::new(
record.object_id,
record.oti,
record.esi,
record.symbol_data.clone(),
SymbolRecordFlags::empty(),
);
}
}
let mut local_partial = full.clone();
local_partial.retain(|record| record.esi == 0 || record.esi == 2);
let mut remote_source = full;
remote_source.retain(|record| record.esi == 1);
let mut storage = TieredStorage::new(DurabilityMode::local());
storage.insert_l2_segment(416, local_partial);
let mut remote = MockRemoteTier::default();
remote.set_object_symbols(object_id, remote_source);
let cx = Cx::<cap::All>::new();
let result =
storage.fetch_object(&cx, object_id, 54, Some(&mut remote), Some(remote_cap(6)));
assert!(matches!(result, Err(FrankenError::DatabaseCorrupt { .. })));
if let Err(FrankenError::DatabaseCorrupt { detail }) = result {
assert!(
detail.contains("insufficient_symbols_for_fallback"),
"expected deterministic fallback-failure detail, got: {detail}"
);
}
let audit = storage.take_decode_audit_entries();
assert!(
audit.iter().any(|entry| !entry.decode_success),
"expected at least one failure proof artifact"
);
assert!(
audit
.iter()
.any(|entry| !entry.decode_success && entry.proof.symbols_received.len() >= 2),
"expected proof to capture available symbol cardinality"
);
}
#[test]
fn test_write_back_segment_keeps_missing_symbols_per_object() {
let object_a = object_id_from_u64(68);
let payload_a = b"first-payload";
let full_a = make_symbol_records(object_a, payload_a, 8, 0);
let mut local_a = full_a.clone();
local_a.retain(|record| record.esi == 0);
let object_b = object_id_from_u64(69);
let payload_b = b"second-bytes!";
let full_b = make_symbol_records(object_b, payload_b, 8, 0);
let mut local_b = full_b.clone();
local_b.retain(|record| record.esi == 0);
let mut storage = TieredStorage::new(DurabilityMode::local());
storage.insert_l2_segment(418, local_a);
storage.insert_l2_segment(419, local_b);
let mut remote = MockRemoteTier::default();
remote.set_object_symbols(object_a, full_a);
remote.set_object_symbols(object_b, full_b);
let cx = Cx::<cap::All>::new();
let first_fetch = storage
.fetch_object(&cx, object_a, 57, Some(&mut remote), Some(remote_cap(13)))
.expect("first object fetch succeeds");
assert!(first_fetch.remote_used);
assert!(first_fetch.write_back_count > 0);
let second_fetch = storage
.fetch_object(&cx, object_b, 58, Some(&mut remote), Some(remote_cap(13)))
.expect("second object fetch succeeds");
assert!(second_fetch.remote_used);
assert!(second_fetch.write_back_count > 0);
let replay_a = storage
.fetch_object(&cx, object_a, 59, Option::<&mut MockRemoteTier>::None, None)
.expect("first object should remain recoverable from local write-back");
assert_eq!(replay_a.bytes, payload_a);
assert!(!replay_a.remote_used);
let replay_b = storage
.fetch_object(&cx, object_b, 60, Option::<&mut MockRemoteTier>::None, None)
.expect("second object should remain recoverable from local write-back");
assert_eq!(replay_b.bytes, payload_b);
assert!(!replay_b.remote_used);
}
#[test]
fn test_fallback_decode_proof_stable_for_same_inputs() {
let run_once = || -> EcsDecodeProof {
let object_id = object_id_from_u64(67);
let payload = b"fallback-proof-stability";
let mut full = make_symbol_records(object_id, payload, 8, 3);
for record in &mut full {
if record.esi == 0 {
*record = SymbolRecord::new(
record.object_id,
record.oti,
record.esi,
record.symbol_data.clone(),
SymbolRecordFlags::empty(),
);
}
}
let mut local_partial = full.clone();
local_partial.retain(|record| record.esi == 0 || record.esi == 2);
let mut remote_repairs = full;
remote_repairs.retain(|record| record.esi == 1 || record.esi >= 3);
let mut storage = TieredStorage::new(DurabilityMode::local());
storage.insert_l2_segment(417, local_partial);
let mut remote = MockRemoteTier::default();
remote.set_object_symbols(object_id, remote_repairs);
let cx = Cx::<cap::All>::new();
let outcome = storage
.fetch_object(&cx, object_id, 55, Some(&mut remote), Some(remote_cap(7)))
.expect("fallback fetch succeeds");
assert!(matches!(
outcome.read_path,
SymbolReadPath::FullDecodeFallback { .. }
));
outcome
.decode_proof
.expect("fallback-success path should emit decode proof")
};
let proof_a = run_once();
let proof_b = run_once();
assert_eq!(
proof_a, proof_b,
"proof artifacts must be stable for identical fallback input sets"
);
}
#[test]
fn test_symbolrecord_corruption_erasures_seeded_property() {
let mut runner = TestRunner::new(ProptestConfig {
cases: 96,
failure_persistence: None,
rng_algorithm: RngAlgorithm::ChaCha,
rng_seed: RngSeed::Fixed(0x0BAD_C0DE_u64),
..ProptestConfig::default()
});
let strategy = (
prop::collection::vec(proptest::num::u8::ANY, 17..96),
prop::collection::vec(0_u8..7, 0..4),
prop::collection::vec(0_u8..7, 0..4),
);
runner
.run(&strategy, |(payload, dropped_raw, corrupted_raw)| {
let object_id = object_id_from_u64(77);
let full = make_symbol_records(object_id, &payload, 8, 4);
let dropped: BTreeSet<u32> = dropped_raw.into_iter().map(u32::from).collect();
let corrupted: BTreeSet<u32> = corrupted_raw.into_iter().map(u32::from).collect();
let mut remote_records = Vec::new();
let mut expected_rejected = BTreeSet::new();
let mut accepted_esis = BTreeSet::new();
for mut record in full {
if dropped.contains(&record.esi) {
continue;
}
if record.esi == 0 {
record = SymbolRecord::new(
record.object_id,
record.oti,
record.esi,
record.symbol_data.clone(),
SymbolRecordFlags::empty(),
);
}
if corrupted.contains(&record.esi) {
if let Some(first) = record.symbol_data.first_mut() {
*first ^= 0x5A;
}
expected_rejected.insert(record.esi);
} else {
accepted_esis.insert(record.esi);
}
remote_records.push(record);
}
let source_symbols = payload.len().div_ceil(8);
let required_symbols = source_symbols.saturating_add(DEFAULT_FALLBACK_DECODE_SLACK);
let has_complete_source_run = (0..source_symbols).all(|index| {
let esi = u32::try_from(index).expect("source index fits in u32");
accepted_esis.contains(&esi)
});
let expect_success =
accepted_esis.len() >= required_symbols && has_complete_source_run;
let mut storage = TieredStorage::new(DurabilityMode::local());
let mut remote = MockRemoteTier::default();
remote.set_object_symbols(object_id, remote_records);
let cx = Cx::<cap::All>::new();
let result = storage.fetch_object(
&cx,
object_id,
56,
Some(&mut remote),
Some(remote_cap(12)),
);
if expect_success {
let outcome =
result.expect("decode should succeed when enough valid symbols remain");
let used_fallback =
matches!(outcome.read_path, SymbolReadPath::FullDecodeFallback { .. });
prop_assert!(used_fallback);
prop_assert_eq!(outcome.bytes, payload);
let proof = outcome
.decode_proof
.expect("fallback-success path should emit decode proof");
prop_assert!(proof.decode_success);
prop_assert_eq!(rejected_esis_set(&proof), expected_rejected);
prop_assert!(decode_proof_report_ok(&proof));
} else {
let is_corrupt_error =
matches!(result, Err(FrankenError::DatabaseCorrupt { .. }));
prop_assert!(is_corrupt_error);
let audit = storage.take_decode_audit_entries();
let Some(failure_entry) = audit.iter().find(|entry| !entry.decode_success)
else {
return Err(proptest::test_runner::TestCaseError::fail(
"expected failure decode proof artifact",
));
};
prop_assert_eq!(rejected_esis_set(&failure_entry.proof), expected_rejected);
prop_assert!(decode_proof_report_ok(&failure_entry.proof));
}
Ok(())
})
.expect("seeded SymbolRecord corruption property should hold");
}
#[test]
fn test_durability_mode_local_no_remote() {
let object_id = object_id_from_u64(7);
let payload = b"local-durability-no-remote";
let records = make_symbol_records(object_id, payload, 8, 1);
let mut storage = TieredStorage::new(DurabilityMode::local());
let mut remote = MockRemoteTier::default();
remote.set_acked_stores(3);
let cx = Cx::<cap::All>::new();
let request = CommitRequest::new(50, records, 60);
let outcome = storage
.commit_segment(&cx, request, Some(&mut remote), Some(remote_cap(4)))
.expect("local durability commit succeeds");
assert!(!outcome.remote_io);
assert_eq!(remote.upload_calls(), 0);
assert!(storage.l2_segment_exists(50));
}
#[test]
fn test_durability_mode_quorum_requires_ack() {
let object_id = object_id_from_u64(8);
let payload = b"quorum-durability";
let records = make_symbol_records(object_id, payload, 8, 1);
let mut storage = TieredStorage::new(DurabilityMode::quorum(2, 3).expect("valid quorum"));
let mut remote = MockRemoteTier::default();
let cx = Cx::<cap::All>::new();
let cap = Some(remote_cap(10));
remote.set_acked_stores(1);
let req_fail = CommitRequest::new(60, records.clone(), 61);
let fail = storage.commit_segment(&cx, req_fail, Some(&mut remote), cap);
assert!(matches!(fail, Err(FrankenError::Busy)));
assert!(storage.l2_segment_exists(60));
remote.set_acked_stores(2);
let req_ok = CommitRequest::new(61, records, 62);
let ok = storage
.commit_segment(&cx, req_ok, Some(&mut remote), cap)
.expect("quorum commit succeeds after sufficient ACKs");
assert!(ok.remote_io);
assert!(storage.l2_segment_exists(61));
}
#[test]
fn test_tiered_storage_remote_admission_cancelled_while_waiting() {
let object_id = object_id_from_u64(9);
let payload = b"tiered-admission-cancel";
let records = make_symbol_records(object_id, payload, 8, 1);
let mut storage = TieredStorage::new_with_remote_executor(
DurabilityMode::quorum(1, 2).expect("valid quorum"),
RemoteAdmissionExecutor::with_limits(
crate::remote_effects::TIERED_STORAGE_EXECUTOR_NAME,
1,
1,
Duration::from_secs(1),
)
.expect("executor config valid"),
);
let held_executor = Arc::clone(&storage.remote_executor);
let _held_permit = held_executor
.try_acquire_for_testing()
.expect("first permit must saturate admission");
let cx = Cx::<cap::All>::new();
let canceller = {
let cancel_cx = cx.clone();
std::thread::spawn(move || {
std::thread::sleep(Duration::from_millis(20));
cancel_cx.cancel();
})
};
let mut remote = MockRemoteTier::default();
remote.set_acked_stores(2);
let request = CommitRequest::new(70, records, 71);
let result = storage.commit_segment(&cx, request, Some(&mut remote), Some(remote_cap(12)));
canceller.join().expect("canceller thread joins");
assert!(matches!(result, Err(FrankenError::Busy)));
assert_eq!(remote.upload_calls(), 0);
assert!(storage.l2_segment_exists(70));
}
#[test]
fn test_e2e_tiered_storage_evict_and_fetch() {
let mut storage = TieredStorage::new(DurabilityMode::local());
let mut remote = MockRemoteTier::default();
remote.set_acked_stores(3);
let cx = Cx::<cap::All>::new();
let cap = Some(remote_cap(11));
let mut expected = HashMap::<ObjectId, Vec<u8>>::new();
for idx in 0_u64..500_u64 {
let segment_id = idx + 1;
let object_id = object_id_from_u64(10_000 + idx);
let payload = format!("commit-{segment_id:04}-payload").into_bytes();
let records = make_symbol_records(object_id, &payload, 16, 2);
storage.insert_l2_segment(segment_id, records);
expected.insert(object_id, payload);
}
for segment_id in 1_u64..=500_u64 {
let outcome = storage
.evict_segment(&cx, segment_id, 1, 70, &mut remote, cap)
.expect("eviction succeeds");
assert_eq!(outcome.phase, EvictionPhase::Retired);
assert!(outcome.evicted);
}
assert_eq!(storage.l2_segment_count(), 0);
let target_object = object_id_from_u64(10_321);
let outcome = storage
.fetch_object(&cx, target_object, 71, Some(&mut remote), cap)
.expect("remote fetch after eviction succeeds");
assert_eq!(
outcome.bytes,
expected
.get(&target_object)
.expect("target payload available")
.clone()
);
assert!(outcome.remote_used);
assert!(storage.l2_segment_exists(storage.write_back_segment_id()));
assert!(outcome.write_back_count > 0);
}
}