use fsqlite_types::sync_primitives::Instant;
use fsqlite_error::{FrankenError, Result};
use fsqlite_wal::wal_index::{WAL_READ_MARK_COUNT, WalCkptInfo, WalIndexHdr};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CompatMode {
HybridShm,
FileLockOnly,
}
impl CompatMode {
#[must_use]
pub const fn supports_concurrent(&self) -> bool {
matches!(self, Self::HybridShm)
}
#[must_use]
pub const fn requires_dual_shm(&self) -> bool {
matches!(self, Self::HybridShm)
}
}
pub fn begin_concurrent_check(mode: CompatMode) -> Result<()> {
if mode == CompatMode::FileLockOnly {
return Err(FrankenError::ConcurrentUnavailable);
}
Ok(())
}
#[derive(Debug)]
pub struct HybridShmState {
wal_write_lock_held: bool,
legacy_hdr: WalIndexHdr,
legacy_ckpt: WalCkptInfo,
lock_acquired_at: Option<Instant>,
}
impl HybridShmState {
#[must_use]
pub fn new(initial_hdr: WalIndexHdr, initial_ckpt: WalCkptInfo) -> Self {
Self {
wal_write_lock_held: false,
legacy_hdr: initial_hdr,
legacy_ckpt: initial_ckpt,
lock_acquired_at: None,
}
}
pub fn mark_write_lock_held(&mut self, now: Instant) {
self.wal_write_lock_held = true;
self.lock_acquired_at = Some(now);
}
#[must_use]
pub fn is_write_lock_held(&self) -> bool {
self.wal_write_lock_held
}
pub fn update_legacy_shm(
&mut self,
new_mx_frame: u32,
new_n_page: u32,
new_frame_cksum: [u32; 2],
new_salt: [u32; 2],
) -> Result<UpdatedLegacyShm> {
if !self.wal_write_lock_held {
return Err(FrankenError::Internal(
"cannot update legacy SHM without WAL_WRITE_LOCK".into(),
));
}
self.legacy_hdr.mx_frame = new_mx_frame;
self.legacy_hdr.n_page = new_n_page;
self.legacy_hdr.a_frame_cksum = new_frame_cksum;
self.legacy_hdr.a_salt = new_salt;
self.legacy_hdr.i_change = self.legacy_hdr.i_change.wrapping_add(1);
let hdr_bytes = self.legacy_hdr.to_bytes();
let (mut s1, mut s2) = (0_u32, 0_u32);
for chunk in hdr_bytes[..40].chunks_exact(8) {
let w1 = u32::from_ne_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]);
let w2 = u32::from_ne_bytes([chunk[4], chunk[5], chunk[6], chunk[7]]);
s1 = s1.wrapping_add(w1).wrapping_add(s2);
s2 = s2.wrapping_add(w2).wrapping_add(s1);
}
self.legacy_hdr.a_cksum = [s1, s2];
Ok(UpdatedLegacyShm {
hdr: self.legacy_hdr,
ckpt: self.legacy_ckpt,
})
}
pub fn update_backfill(&mut self, n_backfill: u32) {
self.legacy_ckpt.n_backfill = n_backfill;
}
pub fn mark_write_lock_released(&mut self) {
self.wal_write_lock_held = false;
self.lock_acquired_at = None;
}
#[must_use]
pub fn lock_held_duration(&self, now: Instant) -> Option<std::time::Duration> {
self.lock_acquired_at.map(|t| now.duration_since(t))
}
#[must_use]
pub fn legacy_hdr(&self) -> &WalIndexHdr {
&self.legacy_hdr
}
#[must_use]
pub fn legacy_ckpt(&self) -> &WalCkptInfo {
&self.legacy_ckpt
}
}
#[derive(Debug, Clone)]
pub struct UpdatedLegacyShm {
pub hdr: WalIndexHdr,
pub ckpt: WalCkptInfo,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ReadLockOutcome {
Joined { slot: usize },
Claimed { slot: usize },
AllSlotsBusy,
}
#[must_use]
pub fn choose_reader_slot(
a_read_mark: &[u32; WAL_READ_MARK_COUNT],
desired_mark: u32,
) -> ReadLockOutcome {
for (i, &mark) in a_read_mark.iter().enumerate() {
if mark == desired_mark {
return ReadLockOutcome::Joined { slot: i };
}
}
for (i, &mark) in a_read_mark.iter().enumerate() {
if mark == 0 {
return ReadLockOutcome::Claimed { slot: i };
}
}
ReadLockOutcome::AllSlotsBusy
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CoordinatorProbeResult {
Active,
NoCoordinator,
Timeout,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RecoveryPlan {
pub reconstruct_wal_index: bool,
pub clear_stale_read_marks: bool,
pub validate_backfill: bool,
}
impl RecoveryPlan {
#[must_use]
pub fn from_header_state(copies_match: bool, is_init: bool, has_stale_marks: bool) -> Self {
Self {
reconstruct_wal_index: !copies_match || !is_init,
clear_stale_read_marks: has_stale_marks,
validate_backfill: !copies_match,
}
}
#[must_use]
pub fn needs_recovery(&self) -> bool {
self.reconstruct_wal_index || self.clear_stale_read_marks || self.validate_backfill
}
}
#[cfg(test)]
mod tests {
use super::*;
use fsqlite_wal::wal_index::{WAL_INDEX_VERSION, WalCkptInfo, WalIndexHdr};
const BEAD_3INZ: &str = "bd-3inz";
fn make_hdr(mx_frame: u32, n_page: u32) -> WalIndexHdr {
WalIndexHdr {
i_version: WAL_INDEX_VERSION,
unused: 0,
i_change: 1,
is_init: 1,
big_end_cksum: 0,
sz_page: 4096,
mx_frame,
n_page,
a_frame_cksum: [0, 0],
a_salt: [0x1234_5678, 0x9ABC_DEF0],
a_cksum: [0, 0],
}
}
fn make_ckpt() -> WalCkptInfo {
WalCkptInfo {
n_backfill: 0,
a_read_mark: [0; WAL_READ_MARK_COUNT],
a_lock: [0; 8],
n_backfill_attempted: 0,
not_used0: 0,
}
}
#[test]
fn test_legacy_reader_sees_committed_data() {
let hdr = make_hdr(10, 100);
let ckpt = make_ckpt();
let mut state = HybridShmState::new(hdr, ckpt);
state.mark_write_lock_held(Instant::now());
let updated = state
.update_legacy_shm(15, 105, [0xAA, 0xBB], [0x1234_5678, 0x9ABC_DEF0])
.expect("bead_id=bd-3inz: update should succeed while write lock held");
assert_eq!(
updated.hdr.mx_frame, 15,
"bead_id={BEAD_3INZ} legacy reader should see mxFrame=15 after commit"
);
assert_eq!(
updated.hdr.n_page, 105,
"bead_id={BEAD_3INZ} legacy reader should see updated nPage"
);
assert_ne!(
updated.hdr.a_cksum,
[0, 0],
"bead_id={BEAD_3INZ} header checksum must be recomputed"
);
assert_eq!(
updated.hdr.i_change, 2,
"bead_id={BEAD_3INZ} iChange should increment on commit"
);
}
#[test]
fn test_legacy_writer_blocked() {
let hdr = make_hdr(5, 50);
let ckpt = make_ckpt();
let mut state = HybridShmState::new(hdr, ckpt);
let result = state.update_legacy_shm(10, 60, [0, 0], [0, 0]);
assert!(
result.is_err(),
"bead_id={BEAD_3INZ} update without WAL_WRITE_LOCK must be rejected"
);
state.mark_write_lock_held(Instant::now());
let result = state.update_legacy_shm(10, 60, [0, 0], [0, 0]);
assert!(
result.is_ok(),
"bead_id={BEAD_3INZ} update with WAL_WRITE_LOCK must succeed"
);
state.mark_write_lock_released();
assert!(
!state.is_write_lock_held(),
"bead_id={BEAD_3INZ} WAL_WRITE_LOCK must be released after coordinator shutdown"
);
let result = state.update_legacy_shm(11, 61, [0, 0], [0, 0]);
assert!(
result.is_err(),
"bead_id={BEAD_3INZ} update after release must fail"
);
}
#[test]
fn test_hybrid_shm_dual_maintenance() {
let hdr = make_hdr(0, 10);
let ckpt = make_ckpt();
let mut state = HybridShmState::new(hdr, ckpt);
state.mark_write_lock_held(Instant::now());
let mut prev_mx_frame = 0;
for commit_num in 1..=5_u32 {
let new_mx_frame = commit_num * 3; let updated = state
.update_legacy_shm(new_mx_frame, 10 + commit_num, [commit_num, 0], [0, 0])
.unwrap_or_else(|_| unreachable!("commit {commit_num} should succeed"));
assert!(
updated.hdr.mx_frame > prev_mx_frame,
"bead_id={BEAD_3INZ} mxFrame must advance monotonically: {} > {}",
updated.hdr.mx_frame,
prev_mx_frame
);
assert_eq!(updated.hdr.mx_frame, new_mx_frame);
assert_eq!(updated.hdr.a_frame_cksum[0], commit_num);
prev_mx_frame = updated.hdr.mx_frame;
}
assert_eq!(
state.legacy_hdr().mx_frame,
15,
"bead_id={BEAD_3INZ} final mxFrame should be 15 after 5 commits"
);
state.update_backfill(10);
assert_eq!(
state.legacy_ckpt().n_backfill,
10,
"bead_id={BEAD_3INZ} nBackfill must be updated during checkpoint"
);
}
#[test]
fn test_fallback_to_file_locking() {
let mode = CompatMode::FileLockOnly;
assert!(
!mode.supports_concurrent(),
"bead_id={BEAD_3INZ} FileLockOnly must not support CONCURRENT"
);
assert!(
!mode.requires_dual_shm(),
"bead_id={BEAD_3INZ} FileLockOnly must not require dual SHM"
);
let result = begin_concurrent_check(mode);
assert!(
result.is_err(),
"bead_id={BEAD_3INZ} BEGIN CONCURRENT must fail in FileLockOnly mode"
);
let err = result.unwrap_err();
assert!(
matches!(err, FrankenError::ConcurrentUnavailable),
"bead_id={BEAD_3INZ} error must be ConcurrentUnavailable, got: {err}"
);
assert_eq!(
err.error_code(),
fsqlite_error::ErrorCode::Error,
"bead_id={BEAD_3INZ} ConcurrentUnavailable must map to SQLITE_ERROR"
);
assert!(
err.suggestion().is_some(),
"bead_id={BEAD_3INZ} ConcurrentUnavailable must have a suggestion"
);
let hybrid_mode = CompatMode::HybridShm;
assert!(hybrid_mode.supports_concurrent());
assert!(hybrid_mode.requires_dual_shm());
assert!(begin_concurrent_check(hybrid_mode).is_ok());
}
#[test]
fn test_coordinator_crash_recovery() {
let plan = RecoveryPlan::from_header_state(
true, true, false, );
assert!(
!plan.needs_recovery(),
"bead_id={BEAD_3INZ} clean state should need no recovery"
);
let plan = RecoveryPlan::from_header_state(
false, true, false,
);
assert!(
plan.needs_recovery(),
"bead_id={BEAD_3INZ} mismatched headers should need recovery"
);
assert!(
plan.reconstruct_wal_index,
"bead_id={BEAD_3INZ} must reconstruct WAL-index after header mismatch"
);
assert!(
plan.validate_backfill,
"bead_id={BEAD_3INZ} must validate nBackfill after header mismatch"
);
let plan = RecoveryPlan::from_header_state(
true, false, false,
);
assert!(
plan.reconstruct_wal_index,
"bead_id={BEAD_3INZ} uninitialized WAL-index must be reconstructed"
);
let plan = RecoveryPlan::from_header_state(
true, true, true, );
assert!(
plan.clear_stale_read_marks,
"bead_id={BEAD_3INZ} stale reader marks must be cleared"
);
assert!(
!plan.reconstruct_wal_index,
"bead_id={BEAD_3INZ} header is fine, no reconstruction needed"
);
let plan = RecoveryPlan::from_header_state(false, false, true);
assert!(plan.needs_recovery());
assert!(plan.reconstruct_wal_index);
assert!(plan.clear_stale_read_marks);
assert!(plan.validate_backfill);
}
#[test]
fn test_read_lock_protocol() {
let marks = [0_u32; WAL_READ_MARK_COUNT];
let outcome = choose_reader_slot(&marks, 42);
assert_eq!(
outcome,
ReadLockOutcome::Claimed { slot: 0 },
"bead_id={BEAD_3INZ} first reader should claim slot 0 when all empty"
);
let marks = [42, 0, 0, 0, 0];
let outcome = choose_reader_slot(&marks, 42);
assert_eq!(
outcome,
ReadLockOutcome::Joined { slot: 0 },
"bead_id={BEAD_3INZ} second reader should join existing mark"
);
let marks = [10, 0, 0, 0, 0];
let outcome = choose_reader_slot(&marks, 42);
assert_eq!(
outcome,
ReadLockOutcome::Claimed { slot: 1 },
"bead_id={BEAD_3INZ} should claim first free slot when no match"
);
let marks = [10, 20, 30, 40, 50];
let outcome = choose_reader_slot(&marks, 42);
assert_eq!(
outcome,
ReadLockOutcome::AllSlotsBusy,
"bead_id={BEAD_3INZ} all slots busy when no match and no free slot"
);
let marks = [10, 20, 30, 42, 50];
let outcome = choose_reader_slot(&marks, 42);
assert_eq!(
outcome,
ReadLockOutcome::Joined { slot: 3 },
"bead_id={BEAD_3INZ} should join matching mark at any slot position"
);
let marks = [42, 42, 42, 42, 42]; let outcome = choose_reader_slot(&marks, 42);
assert_eq!(
outcome,
ReadLockOutcome::Joined { slot: 0 },
"bead_id={BEAD_3INZ} multiple slots with same mark: join first match"
);
let outcome = choose_reader_slot(&marks, 99);
assert_eq!(
outcome,
ReadLockOutcome::AllSlotsBusy,
"bead_id={BEAD_3INZ} no free slot for different mark value"
);
}
}