use std::sync::atomic::{AtomicU64, Ordering};
use serde::Serialize;
static FSQLITE_LEFTRIGHT_READS_TOTAL: AtomicU64 = AtomicU64::new(0);
static FSQLITE_LEFTRIGHT_SWAPS_TOTAL: AtomicU64 = AtomicU64::new(0);
static FSQLITE_LEFTRIGHT_READER_RETRIES_TOTAL: AtomicU64 = AtomicU64::new(0);
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
pub struct LeftRightMetrics {
pub fsqlite_leftright_reads_total: u64,
pub fsqlite_leftright_swaps_total: u64,
pub fsqlite_leftright_reader_retries_total: u64,
}
#[must_use]
pub fn leftright_metrics() -> LeftRightMetrics {
LeftRightMetrics {
fsqlite_leftright_reads_total: FSQLITE_LEFTRIGHT_READS_TOTAL.load(Ordering::Relaxed),
fsqlite_leftright_swaps_total: FSQLITE_LEFTRIGHT_SWAPS_TOTAL.load(Ordering::Relaxed),
fsqlite_leftright_reader_retries_total: FSQLITE_LEFTRIGHT_READER_RETRIES_TOTAL
.load(Ordering::Relaxed),
}
}
pub fn reset_leftright_metrics() {
FSQLITE_LEFTRIGHT_READS_TOTAL.store(0, Ordering::Relaxed);
FSQLITE_LEFTRIGHT_SWAPS_TOTAL.store(0, Ordering::Relaxed);
FSQLITE_LEFTRIGHT_READER_RETRIES_TOTAL.store(0, Ordering::Relaxed);
}
pub struct LeftRight {
left: AtomicU64,
right: AtomicU64,
active: AtomicU64,
left_readers: AtomicU64,
right_readers: AtomicU64,
writer_lock: fsqlite_types::sync_primitives::Mutex<()>,
}
impl LeftRight {
pub fn new(initial: u64) -> Self {
Self {
left: AtomicU64::new(initial),
right: AtomicU64::new(initial),
active: AtomicU64::new(0),
left_readers: AtomicU64::new(0),
right_readers: AtomicU64::new(0),
writer_lock: fsqlite_types::sync_primitives::Mutex::new(()),
}
}
#[inline]
pub fn read(&self, data_key: &str) -> u64 {
let mut retries = 0u32;
let value = loop {
let side = self.active.load(Ordering::Acquire);
let (readers, data) = if side == 0 {
(&self.left_readers, &self.left)
} else {
(&self.right_readers, &self.right)
};
readers.fetch_add(1, Ordering::AcqRel);
if self.active.load(Ordering::Acquire) == side {
let v = data.load(Ordering::Acquire);
readers.fetch_sub(1, Ordering::Release);
break v;
}
readers.fetch_sub(1, Ordering::Release);
retries += 1;
};
FSQLITE_LEFTRIGHT_READS_TOTAL.fetch_add(1, Ordering::Relaxed);
if retries > 0 {
FSQLITE_LEFTRIGHT_READER_RETRIES_TOTAL.fetch_add(u64::from(retries), Ordering::Relaxed);
}
emit_read_trace(data_key, retries);
value
}
pub fn write(&self, new_val: u64) {
let _guard = self.writer_lock.lock();
self.write_inner(new_val);
}
pub fn update<F: FnOnce(u64) -> u64>(&self, f: F) {
let _guard = self.writer_lock.lock();
let active = self.active.load(Ordering::Acquire);
let current = if active == 0 {
self.left.load(Ordering::Acquire)
} else {
self.right.load(Ordering::Acquire)
};
self.write_inner(f(current));
}
fn write_inner(&self, new_val: u64) {
let active = self.active.load(Ordering::Acquire);
if active == 0 {
self.right.store(new_val, Ordering::Release);
} else {
self.left.store(new_val, Ordering::Release);
}
let new_active = 1 - active;
self.active.store(new_active, Ordering::Release);
FSQLITE_LEFTRIGHT_SWAPS_TOTAL.fetch_add(1, Ordering::Relaxed);
let old_readers = if active == 0 {
&self.left_readers
} else {
&self.right_readers
};
while old_readers.load(Ordering::Acquire) > 0 {
std::hint::spin_loop();
}
if active == 0 {
self.left.store(new_val, Ordering::Release);
} else {
self.right.store(new_val, Ordering::Release);
}
emit_swap_trace(1);
}
#[must_use]
pub fn active_side(&self) -> u64 {
self.active.load(Ordering::Relaxed)
}
}
#[allow(clippy::missing_fields_in_debug)]
impl std::fmt::Debug for LeftRight {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let active = self.active.load(Ordering::Relaxed);
let lr = self.left_readers.load(Ordering::Relaxed);
let rr = self.right_readers.load(Ordering::Relaxed);
f.debug_struct("LeftRight")
.field("active", &if active == 0 { "left" } else { "right" })
.field("left_readers", &lr)
.field("right_readers", &rr)
.finish_non_exhaustive()
}
}
pub struct LeftRightPair {
left_a: AtomicU64,
left_b: AtomicU64,
right_a: AtomicU64,
right_b: AtomicU64,
active: AtomicU64,
left_readers: AtomicU64,
right_readers: AtomicU64,
writer_lock: fsqlite_types::sync_primitives::Mutex<()>,
}
impl LeftRightPair {
pub fn new(a: u64, b: u64) -> Self {
Self {
left_a: AtomicU64::new(a),
left_b: AtomicU64::new(b),
right_a: AtomicU64::new(a),
right_b: AtomicU64::new(b),
active: AtomicU64::new(0),
left_readers: AtomicU64::new(0),
right_readers: AtomicU64::new(0),
writer_lock: fsqlite_types::sync_primitives::Mutex::new(()),
}
}
#[inline]
pub fn read(&self, data_key: &str) -> (u64, u64) {
let mut retries = 0u32;
let value = loop {
let side = self.active.load(Ordering::Acquire);
let (readers, da, db) = if side == 0 {
(&self.left_readers, &self.left_a, &self.left_b)
} else {
(&self.right_readers, &self.right_a, &self.right_b)
};
readers.fetch_add(1, Ordering::AcqRel);
if self.active.load(Ordering::Acquire) == side {
let va = da.load(Ordering::Acquire);
let vb = db.load(Ordering::Acquire);
readers.fetch_sub(1, Ordering::Release);
break (va, vb);
}
readers.fetch_sub(1, Ordering::Release);
retries += 1;
};
FSQLITE_LEFTRIGHT_READS_TOTAL.fetch_add(1, Ordering::Relaxed);
if retries > 0 {
FSQLITE_LEFTRIGHT_READER_RETRIES_TOTAL.fetch_add(u64::from(retries), Ordering::Relaxed);
}
emit_read_trace(data_key, retries);
value
}
pub fn write(&self, a: u64, b: u64) {
let _guard = self.writer_lock.lock();
let active = self.active.load(Ordering::Acquire);
if active == 0 {
self.right_a.store(a, Ordering::Release);
self.right_b.store(b, Ordering::Release);
} else {
self.left_a.store(a, Ordering::Release);
self.left_b.store(b, Ordering::Release);
}
self.active.store(1 - active, Ordering::Release);
FSQLITE_LEFTRIGHT_SWAPS_TOTAL.fetch_add(1, Ordering::Relaxed);
let old_readers = if active == 0 {
&self.left_readers
} else {
&self.right_readers
};
while old_readers.load(Ordering::Acquire) > 0 {
std::hint::spin_loop();
}
if active == 0 {
self.left_a.store(a, Ordering::Release);
self.left_b.store(b, Ordering::Release);
} else {
self.right_a.store(a, Ordering::Release);
self.right_b.store(b, Ordering::Release);
}
emit_swap_trace(1);
}
}
#[allow(clippy::missing_fields_in_debug)]
impl std::fmt::Debug for LeftRightPair {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let active = self.active.load(Ordering::Relaxed);
f.debug_struct("LeftRightPair")
.field("active", &if active == 0 { "left" } else { "right" })
.finish_non_exhaustive()
}
}
pub struct LeftRightTriple {
left_a: AtomicU64,
left_b: AtomicU64,
left_c: AtomicU64,
right_a: AtomicU64,
right_b: AtomicU64,
right_c: AtomicU64,
active: AtomicU64,
left_readers: AtomicU64,
right_readers: AtomicU64,
writer_lock: fsqlite_types::sync_primitives::Mutex<()>,
}
impl LeftRightTriple {
pub fn new(a: u64, b: u64, c: u64) -> Self {
Self {
left_a: AtomicU64::new(a),
left_b: AtomicU64::new(b),
left_c: AtomicU64::new(c),
right_a: AtomicU64::new(a),
right_b: AtomicU64::new(b),
right_c: AtomicU64::new(c),
active: AtomicU64::new(0),
left_readers: AtomicU64::new(0),
right_readers: AtomicU64::new(0),
writer_lock: fsqlite_types::sync_primitives::Mutex::new(()),
}
}
#[inline]
pub fn read(&self, data_key: &str) -> (u64, u64, u64) {
let mut retries = 0u32;
let value = loop {
let side = self.active.load(Ordering::Acquire);
let (readers, da, db, dc) = if side == 0 {
(&self.left_readers, &self.left_a, &self.left_b, &self.left_c)
} else {
(
&self.right_readers,
&self.right_a,
&self.right_b,
&self.right_c,
)
};
readers.fetch_add(1, Ordering::AcqRel);
if self.active.load(Ordering::Acquire) == side {
let va = da.load(Ordering::Acquire);
let vb = db.load(Ordering::Acquire);
let vc = dc.load(Ordering::Acquire);
readers.fetch_sub(1, Ordering::Release);
break (va, vb, vc);
}
readers.fetch_sub(1, Ordering::Release);
retries += 1;
};
FSQLITE_LEFTRIGHT_READS_TOTAL.fetch_add(1, Ordering::Relaxed);
if retries > 0 {
FSQLITE_LEFTRIGHT_READER_RETRIES_TOTAL.fetch_add(u64::from(retries), Ordering::Relaxed);
}
emit_read_trace(data_key, retries);
value
}
pub fn write(&self, a: u64, b: u64, c: u64) {
let _guard = self.writer_lock.lock();
let active = self.active.load(Ordering::Acquire);
if active == 0 {
self.right_a.store(a, Ordering::Release);
self.right_b.store(b, Ordering::Release);
self.right_c.store(c, Ordering::Release);
} else {
self.left_a.store(a, Ordering::Release);
self.left_b.store(b, Ordering::Release);
self.left_c.store(c, Ordering::Release);
}
self.active.store(1 - active, Ordering::Release);
FSQLITE_LEFTRIGHT_SWAPS_TOTAL.fetch_add(1, Ordering::Relaxed);
let old_readers = if active == 0 {
&self.left_readers
} else {
&self.right_readers
};
while old_readers.load(Ordering::Acquire) > 0 {
std::hint::spin_loop();
}
if active == 0 {
self.left_a.store(a, Ordering::Release);
self.left_b.store(b, Ordering::Release);
self.left_c.store(c, Ordering::Release);
} else {
self.right_a.store(a, Ordering::Release);
self.right_b.store(b, Ordering::Release);
self.right_c.store(c, Ordering::Release);
}
emit_swap_trace(1);
}
}
#[allow(clippy::missing_fields_in_debug)]
impl std::fmt::Debug for LeftRightTriple {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let active = self.active.load(Ordering::Relaxed);
f.debug_struct("LeftRightTriple")
.field("active", &if active == 0 { "left" } else { "right" })
.finish_non_exhaustive()
}
}
fn emit_read_trace(data_key: &str, retries: u32) {
if retries > 0 {
tracing::debug!(
target: "fsqlite.left_right",
data_key,
retries,
"left_right_read retried"
);
} else {
tracing::trace!(
target: "fsqlite.left_right",
data_key,
retries = 0u32,
"left_right_read"
);
}
}
fn emit_swap_trace(swap_count: u32) {
tracing::debug!(
target: "fsqlite.left_right",
swap_count,
"left_right_swap"
);
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, Barrier};
use std::thread;
use std::time::Duration;
#[test]
fn basic_read_write() {
let lr = LeftRight::new(42);
assert_eq!(lr.read("test"), 42);
lr.write(99);
assert_eq!(lr.read("test"), 99);
}
#[test]
fn update_closure() {
let lr = LeftRight::new(10);
lr.update(|v| v + 5);
assert_eq!(lr.read("test"), 15);
lr.update(|v| v * 2);
assert_eq!(lr.read("test"), 30);
}
#[test]
fn pair_consistent_snapshot() {
let lr = LeftRightPair::new(1, 2);
assert_eq!(lr.read("pair"), (1, 2));
lr.write(10, 20);
assert_eq!(lr.read("pair"), (10, 20));
}
#[test]
fn triple_consistent_snapshot() {
let lr = LeftRightTriple::new(1, 2, 3);
assert_eq!(lr.read("triple"), (1, 2, 3));
lr.write(10, 20, 30);
assert_eq!(lr.read("triple"), (10, 20, 30));
}
#[test]
fn no_torn_reads_pair() {
let lr = Arc::new(LeftRightPair::new(0, 0));
let stop = Arc::new(AtomicBool::new(false));
let barrier = Arc::new(Barrier::new(5));
let writer_lr = Arc::clone(&lr);
let writer_stop = Arc::clone(&stop);
let writer_barrier = Arc::clone(&barrier);
let writer = thread::spawn(move || {
writer_barrier.wait();
let mut val = 0u64;
while !writer_stop.load(Ordering::Relaxed) {
val += 1;
writer_lr.write(val, val);
}
val
});
let mut readers = Vec::new();
for _ in 0..4 {
let rlr = Arc::clone(&lr);
let rs = Arc::clone(&stop);
let rb = Arc::clone(&barrier);
readers.push(thread::spawn(move || {
rb.wait();
let mut reads = 0u64;
while !rs.load(Ordering::Relaxed) {
let (a, b) = rlr.read("pair");
assert_eq!(a, b, "torn read: a={a}, b={b}");
reads += 1;
}
reads
}));
}
thread::sleep(Duration::from_millis(500));
stop.store(true, Ordering::Release);
let writer_count = writer.join().unwrap();
let mut total_reads = 0u64;
for r in readers {
total_reads += r.join().unwrap();
}
assert!(writer_count > 0, "writer must have written");
assert!(total_reads > 0, "readers must have read");
println!("[left_right_pair] writes={writer_count} reads={total_reads} no torn reads");
}
#[test]
fn no_torn_reads_triple() {
let lr = Arc::new(LeftRightTriple::new(0, 0, 0));
let stop = Arc::new(AtomicBool::new(false));
let barrier = Arc::new(Barrier::new(5));
let writer_lr = Arc::clone(&lr);
let writer_stop = Arc::clone(&stop);
let writer_barrier = Arc::clone(&barrier);
let writer = thread::spawn(move || {
writer_barrier.wait();
let mut val = 0u64;
while !writer_stop.load(Ordering::Relaxed) {
val += 1;
writer_lr.write(val, val, val);
}
val
});
let mut readers = Vec::new();
for _ in 0..4 {
let rlr = Arc::clone(&lr);
let rs = Arc::clone(&stop);
let rb = Arc::clone(&barrier);
readers.push(thread::spawn(move || {
rb.wait();
let mut reads = 0u64;
while !rs.load(Ordering::Relaxed) {
let (a, b, c) = rlr.read("triple");
assert!(a == b && b == c, "torn read: a={a}, b={b}, c={c}");
reads += 1;
}
reads
}));
}
thread::sleep(Duration::from_millis(500));
stop.store(true, Ordering::Release);
let writer_count = writer.join().unwrap();
let mut total_reads = 0u64;
for r in readers {
total_reads += r.join().unwrap();
}
assert!(writer_count > 0);
assert!(total_reads > 0);
println!("[left_right_triple] writes={writer_count} reads={total_reads} no torn reads");
}
#[test]
fn multiple_writers_serialize() {
let lr = Arc::new(LeftRight::new(0));
let barrier = Arc::new(Barrier::new(4));
let mut handles = Vec::new();
for _ in 0..4 {
let l = Arc::clone(&lr);
let b = Arc::clone(&barrier);
handles.push(thread::spawn(move || {
b.wait();
for _ in 0..1000 {
l.update(|v| v + 1);
}
}));
}
for h in handles {
h.join().unwrap();
}
assert_eq!(lr.read("counter"), 4000);
}
#[test]
fn metrics_increment() {
let before = leftright_metrics();
let lr = LeftRight::new(1);
lr.read("m1");
lr.read("m2");
lr.read("m3");
let after = leftright_metrics();
let delta = after.fsqlite_leftright_reads_total - before.fsqlite_leftright_reads_total;
assert_eq!(delta, 3);
}
#[test]
fn swap_count_increments() {
let before = leftright_metrics();
let lr = LeftRight::new(0);
lr.write(1);
lr.write(2);
lr.write(3);
let after = leftright_metrics();
let delta = after.fsqlite_leftright_swaps_total - before.fsqlite_leftright_swaps_total;
assert!(delta >= 3, "expected at least 3 swaps, got {delta}");
}
#[test]
fn debug_format() {
let lr = LeftRight::new(42);
let dbg = format!("{lr:?}");
assert!(dbg.contains("LeftRight"));
assert!(
dbg.contains("left") || dbg.contains("right"),
"debug must show active side"
);
}
#[test]
fn stress_concurrent_rw() {
let lr = Arc::new(LeftRight::new(0));
let stop = Arc::new(AtomicBool::new(false));
let barrier = Arc::new(Barrier::new(6)); let mut handles = Vec::new();
for _ in 0..2 {
let l = Arc::clone(&lr);
let st = Arc::clone(&stop);
let b = Arc::clone(&barrier);
handles.push(thread::spawn(move || {
b.wait();
let mut writes = 0u64;
while !st.load(Ordering::Relaxed) {
l.update(|v| v.wrapping_add(1));
writes += 1;
}
writes
}));
}
for _ in 0..4 {
let l = Arc::clone(&lr);
let st = Arc::clone(&stop);
let b = Arc::clone(&barrier);
handles.push(thread::spawn(move || {
b.wait();
let mut reads = 0u64;
while !st.load(Ordering::Relaxed) {
let _ = l.read("stress");
reads += 1;
}
reads
}));
}
thread::sleep(Duration::from_millis(500));
stop.store(true, Ordering::Release);
let mut total_writes = 0u64;
let mut total_reads = 0u64;
for (i, h) in handles.into_iter().enumerate() {
let count = h.join().unwrap();
if i < 2 {
total_writes += count;
} else {
total_reads += count;
}
}
assert!(total_writes > 0);
assert!(total_reads > 0);
println!("[left_right_stress] writes={total_writes} reads={total_reads}");
}
}