use std::sync::atomic::{AtomicU64, Ordering};
use serde::Serialize;
static FSQLITE_SEQLOCK_READS_TOTAL: AtomicU64 = AtomicU64::new(0);
static FSQLITE_SEQLOCK_RETRIES_TOTAL: AtomicU64 = AtomicU64::new(0);
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
pub struct SeqlockMetrics {
pub fsqlite_seqlock_reads_total: u64,
pub fsqlite_seqlock_retries_total: u64,
}
#[must_use]
pub fn seqlock_metrics() -> SeqlockMetrics {
SeqlockMetrics {
fsqlite_seqlock_reads_total: FSQLITE_SEQLOCK_READS_TOTAL.load(Ordering::Relaxed),
fsqlite_seqlock_retries_total: FSQLITE_SEQLOCK_RETRIES_TOTAL.load(Ordering::Relaxed),
}
}
pub fn reset_seqlock_metrics() {
FSQLITE_SEQLOCK_READS_TOTAL.store(0, Ordering::Relaxed);
FSQLITE_SEQLOCK_RETRIES_TOTAL.store(0, Ordering::Relaxed);
}
const MAX_RETRIES: u32 = 1_000_000;
pub struct SeqLock {
seq: AtomicU64,
value: AtomicU64,
write_lock: fsqlite_types::sync_primitives::Mutex<()>,
}
impl SeqLock {
pub fn new(initial: u64) -> Self {
Self {
seq: AtomicU64::new(0),
value: AtomicU64::new(initial),
write_lock: fsqlite_types::sync_primitives::Mutex::new(()),
}
}
#[inline]
pub fn read(&self, data_key: &str) -> Option<u64> {
let mut retries: u32 = 0;
let result = loop {
let seq1 = self.seq.load(Ordering::Acquire);
if seq1 & 1 == 1 {
retries += 1;
if retries >= MAX_RETRIES {
emit_trace(data_key, retries);
return None;
}
std::hint::spin_loop();
continue;
}
std::sync::atomic::fence(Ordering::Acquire);
let snapshot = self.value.load(Ordering::Relaxed);
std::sync::atomic::fence(Ordering::Acquire);
let seq2 = self.seq.load(Ordering::Acquire);
if seq1 == seq2 {
break snapshot;
}
retries += 1;
if retries >= MAX_RETRIES {
emit_trace(data_key, retries);
return None;
}
std::hint::spin_loop();
};
FSQLITE_SEQLOCK_READS_TOTAL.fetch_add(1, Ordering::Relaxed);
if retries > 0 {
FSQLITE_SEQLOCK_RETRIES_TOTAL.fetch_add(u64::from(retries), Ordering::Relaxed);
}
emit_trace(data_key, retries);
Some(result)
}
pub fn write(&self, new_value: u64) {
let _guard = self.write_lock.lock();
self.seq.fetch_add(1, Ordering::Release); self.value.store(new_value, Ordering::Release);
self.seq.fetch_add(1, Ordering::Release); }
pub fn update(&self, f: impl FnOnce(u64) -> u64) {
let _guard = self.write_lock.lock();
self.seq.fetch_add(1, Ordering::Release);
let old = self.value.load(Ordering::Acquire);
self.value.store(f(old), Ordering::Release);
self.seq.fetch_add(1, Ordering::Release);
}
#[must_use]
pub fn sequence(&self) -> u64 {
self.seq.load(Ordering::Relaxed)
}
}
#[allow(clippy::missing_fields_in_debug)]
impl std::fmt::Debug for SeqLock {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let seq = self.seq.load(Ordering::Relaxed);
f.debug_struct("SeqLock")
.field("seq", &seq)
.field("writing", &(seq & 1 == 1))
.finish_non_exhaustive()
}
}
pub struct SeqLockPair {
seq: AtomicU64,
a: AtomicU64,
b: AtomicU64,
write_lock: fsqlite_types::sync_primitives::Mutex<()>,
}
impl SeqLockPair {
pub fn new(a: u64, b: u64) -> Self {
Self {
seq: AtomicU64::new(0),
a: AtomicU64::new(a),
b: AtomicU64::new(b),
write_lock: fsqlite_types::sync_primitives::Mutex::new(()),
}
}
#[inline]
pub fn read(&self, data_key: &str) -> Option<(u64, u64)> {
let mut retries: u32 = 0;
let result = loop {
let seq1 = self.seq.load(Ordering::Acquire);
if seq1 & 1 == 1 {
retries += 1;
if retries >= MAX_RETRIES {
emit_trace(data_key, retries);
return None;
}
std::hint::spin_loop();
continue;
}
std::sync::atomic::fence(Ordering::Acquire);
let va = self.a.load(Ordering::Relaxed);
let vb = self.b.load(Ordering::Relaxed);
std::sync::atomic::fence(Ordering::Acquire);
let seq2 = self.seq.load(Ordering::Acquire);
if seq1 == seq2 {
break (va, vb);
}
retries += 1;
if retries >= MAX_RETRIES {
emit_trace(data_key, retries);
return None;
}
std::hint::spin_loop();
};
FSQLITE_SEQLOCK_READS_TOTAL.fetch_add(1, Ordering::Relaxed);
if retries > 0 {
FSQLITE_SEQLOCK_RETRIES_TOTAL.fetch_add(u64::from(retries), Ordering::Relaxed);
}
emit_trace(data_key, retries);
Some(result)
}
pub fn write(&self, a: u64, b: u64) {
let _guard = self.write_lock.lock();
self.seq.fetch_add(1, Ordering::Release);
self.a.store(a, Ordering::Release);
self.b.store(b, Ordering::Release);
self.seq.fetch_add(1, Ordering::Release);
}
#[must_use]
pub fn sequence(&self) -> u64 {
self.seq.load(Ordering::Relaxed)
}
}
#[allow(clippy::missing_fields_in_debug)]
impl std::fmt::Debug for SeqLockPair {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let seq = self.seq.load(Ordering::Relaxed);
f.debug_struct("SeqLockPair")
.field("seq", &seq)
.field("writing", &(seq & 1 == 1))
.finish_non_exhaustive()
}
}
pub struct SeqLockTriple {
seq: AtomicU64,
a: AtomicU64,
b: AtomicU64,
c: AtomicU64,
write_lock: fsqlite_types::sync_primitives::Mutex<()>,
}
impl SeqLockTriple {
pub fn new(a: u64, b: u64, c: u64) -> Self {
Self {
seq: AtomicU64::new(0),
a: AtomicU64::new(a),
b: AtomicU64::new(b),
c: AtomicU64::new(c),
write_lock: fsqlite_types::sync_primitives::Mutex::new(()),
}
}
#[inline]
pub fn read(&self, data_key: &str) -> Option<(u64, u64, u64)> {
let mut retries: u32 = 0;
let result = loop {
let seq1 = self.seq.load(Ordering::Acquire);
if seq1 & 1 == 1 {
retries += 1;
if retries >= MAX_RETRIES {
emit_trace(data_key, retries);
return None;
}
std::hint::spin_loop();
continue;
}
std::sync::atomic::fence(Ordering::Acquire);
let va = self.a.load(Ordering::Relaxed);
let vb = self.b.load(Ordering::Relaxed);
let vc = self.c.load(Ordering::Relaxed);
std::sync::atomic::fence(Ordering::Acquire);
let seq2 = self.seq.load(Ordering::Acquire);
if seq1 == seq2 {
break (va, vb, vc);
}
retries += 1;
if retries >= MAX_RETRIES {
emit_trace(data_key, retries);
return None;
}
std::hint::spin_loop();
};
FSQLITE_SEQLOCK_READS_TOTAL.fetch_add(1, Ordering::Relaxed);
if retries > 0 {
FSQLITE_SEQLOCK_RETRIES_TOTAL.fetch_add(u64::from(retries), Ordering::Relaxed);
}
emit_trace(data_key, retries);
Some(result)
}
pub fn write(&self, a: u64, b: u64, c: u64) {
let _guard = self.write_lock.lock();
self.seq.fetch_add(1, Ordering::Release);
self.a.store(a, Ordering::Release);
self.b.store(b, Ordering::Release);
self.c.store(c, Ordering::Release);
self.seq.fetch_add(1, Ordering::Release);
}
#[must_use]
pub fn sequence(&self) -> u64 {
self.seq.load(Ordering::Relaxed)
}
}
#[allow(clippy::missing_fields_in_debug)]
impl std::fmt::Debug for SeqLockTriple {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let seq = self.seq.load(Ordering::Relaxed);
f.debug_struct("SeqLockTriple")
.field("seq", &seq)
.field("writing", &(seq & 1 == 1))
.finish_non_exhaustive()
}
}
fn emit_trace(data_key: &str, retries: u32) {
if retries > 0 {
tracing::debug!(
target: "fsqlite.seqlock",
data_key,
retries,
"seqlock_read contended"
);
} else {
tracing::trace!(
target: "fsqlite.seqlock",
data_key,
retries = 0u32,
"seqlock_read"
);
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::{Arc, Barrier};
use std::thread;
use std::time::Duration;
#[test]
fn basic_read_write() {
let sl = SeqLock::new(42);
assert_eq!(sl.read("test"), Some(42));
sl.write(99);
assert_eq!(sl.read("test"), Some(99));
assert_eq!(sl.sequence(), 2); }
#[test]
fn update_closure() {
let sl = SeqLock::new(10);
sl.update(|v| v + 5);
assert_eq!(sl.read("test"), Some(15));
sl.update(|v| v * 2);
assert_eq!(sl.read("test"), Some(30));
}
#[test]
fn pair_consistent_snapshot() {
let sl = SeqLockPair::new(1, 2);
assert_eq!(sl.read("pair"), Some((1, 2)));
sl.write(10, 20);
assert_eq!(sl.read("pair"), Some((10, 20)));
}
#[test]
fn triple_consistent_snapshot() {
let sl = SeqLockTriple::new(1, 2, 3);
assert_eq!(sl.read("triple"), Some((1, 2, 3)));
sl.write(10, 20, 30);
assert_eq!(sl.read("triple"), Some((10, 20, 30)));
}
#[test]
fn no_torn_reads_pair() {
let sl = Arc::new(SeqLockPair::new(0, 0));
let stop = Arc::new(std::sync::atomic::AtomicBool::new(false));
let barrier = Arc::new(Barrier::new(5));
let writer_sl = Arc::clone(&sl);
let writer_stop = Arc::clone(&stop);
let writer_barrier = Arc::clone(&barrier);
let writer = thread::spawn(move || {
writer_barrier.wait();
let mut val = 0u64;
while !writer_stop.load(std::sync::atomic::Ordering::Relaxed) {
val += 1;
writer_sl.write(val, val);
}
val
});
let mut readers = Vec::new();
for _ in 0..4 {
let rsl = Arc::clone(&sl);
let rs = Arc::clone(&stop);
let rb = Arc::clone(&barrier);
readers.push(thread::spawn(move || {
rb.wait();
let mut reads = 0u64;
while !rs.load(std::sync::atomic::Ordering::Relaxed) {
if let Some((a, b)) = rsl.read("pair") {
assert_eq!(a, b, "torn read: a={a}, b={b}");
reads += 1;
}
}
reads
}));
}
thread::sleep(Duration::from_millis(500));
stop.store(true, std::sync::atomic::Ordering::Release);
let writer_count = writer.join().unwrap();
let mut total_reads = 0u64;
for r in readers {
total_reads += r.join().unwrap();
}
assert!(writer_count > 0, "writer must have written");
assert!(total_reads > 0, "readers must have read");
println!("[seqlock_pair] writes={writer_count} reads={total_reads} no torn reads");
}
#[test]
fn no_torn_reads_triple() {
let sl = Arc::new(SeqLockTriple::new(0, 0, 0));
let stop = Arc::new(std::sync::atomic::AtomicBool::new(false));
let barrier = Arc::new(Barrier::new(5));
let writer_sl = Arc::clone(&sl);
let writer_stop = Arc::clone(&stop);
let writer_barrier = Arc::clone(&barrier);
let writer = thread::spawn(move || {
writer_barrier.wait();
let mut val = 0u64;
while !writer_stop.load(std::sync::atomic::Ordering::Relaxed) {
val += 1;
writer_sl.write(val, val, val);
}
val
});
let mut readers = Vec::new();
for _ in 0..4 {
let rsl = Arc::clone(&sl);
let rs = Arc::clone(&stop);
let rb = Arc::clone(&barrier);
readers.push(thread::spawn(move || {
rb.wait();
let mut reads = 0u64;
while !rs.load(std::sync::atomic::Ordering::Relaxed) {
if let Some((a, b, c)) = rsl.read("triple") {
assert!(a == b && b == c, "torn read: a={a}, b={b}, c={c}");
reads += 1;
}
}
reads
}));
}
thread::sleep(Duration::from_millis(500));
stop.store(true, std::sync::atomic::Ordering::Release);
let writer_count = writer.join().unwrap();
let mut total_reads = 0u64;
for r in readers {
total_reads += r.join().unwrap();
}
assert!(writer_count > 0);
assert!(total_reads > 0);
println!("[seqlock_triple] writes={writer_count} reads={total_reads} no torn reads");
}
#[test]
fn multiple_writers_serialize() {
let sl = Arc::new(SeqLock::new(0));
let barrier = Arc::new(Barrier::new(4));
let mut handles = Vec::new();
for _ in 0..4 {
let s = Arc::clone(&sl);
let b = Arc::clone(&barrier);
handles.push(thread::spawn(move || {
b.wait();
for _ in 0..1000 {
s.update(|v| v + 1);
}
}));
}
for h in handles {
h.join().unwrap();
}
assert_eq!(sl.read("counter"), Some(4000));
}
#[test]
fn metrics_increment() {
let before = seqlock_metrics();
let sl = SeqLock::new(1);
sl.read("m1");
sl.read("m2");
sl.read("m3");
let after = seqlock_metrics();
let reads_delta = after.fsqlite_seqlock_reads_total - before.fsqlite_seqlock_reads_total;
assert!(
reads_delta >= 3,
"expected at least 3 reads, got {reads_delta}"
);
}
#[test]
fn sequence_always_even_after_writes() {
let sl = Arc::new(SeqLock::new(0));
let barrier = Arc::new(Barrier::new(3));
let mut handles = Vec::new();
for _ in 0..3 {
let s = Arc::clone(&sl);
let b = Arc::clone(&barrier);
handles.push(thread::spawn(move || {
b.wait();
for i in 0..500 {
s.write(i);
}
}));
}
for h in handles {
h.join().unwrap();
}
let seq = sl.sequence();
assert_eq!(seq % 2, 0, "sequence must be even: {seq}");
assert_eq!(seq, 3 * 500 * 2);
}
#[test]
fn debug_format() {
let sl = SeqLock::new(42);
let dbg = format!("{sl:?}");
assert!(dbg.contains("SeqLock"));
assert!(dbg.contains("writing: false"));
}
#[test]
fn test_seqlock_reader_no_lock() {
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
let sl = Arc::new(SeqLock::new(42));
let barrier = Arc::new(Barrier::new(8));
let stop = Arc::new(AtomicBool::new(false));
let total_reads = Arc::new(AtomicU64::new(0));
let mut handles = Vec::with_capacity(8);
for _ in 0..8 {
let s = Arc::clone(&sl);
let b = Arc::clone(&barrier);
let st = Arc::clone(&stop);
let tr = Arc::clone(&total_reads);
handles.push(thread::spawn(move || {
b.wait();
let mut local_reads = 0u64;
while !st.load(Ordering::Relaxed) {
if s.read("no_lock_test").is_some() {
local_reads += 1;
}
}
tr.fetch_add(local_reads, Ordering::Relaxed);
}));
}
thread::sleep(Duration::from_millis(100));
stop.store(true, Ordering::Release);
for h in handles {
h.join().unwrap();
}
let reads = total_reads.load(Ordering::Relaxed);
assert!(
reads > 100_000,
"bd-3wop3.6: expected >100k reads with 8 lock-free readers, got {reads}"
);
println!("[test_seqlock_reader_no_lock] 8 threads, {reads} total reads, no locks acquired");
}
#[test]
fn test_seqlock_writer_blocks_readers() {
use std::sync::atomic::{AtomicU64, Ordering};
let sl = Arc::new(SeqLock::new(0));
let reader_retries = Arc::new(AtomicU64::new(0));
let writer_done = Arc::new(std::sync::atomic::AtomicBool::new(false));
let reader_sl = Arc::clone(&sl);
let reader_wd = Arc::clone(&writer_done);
let rr = Arc::clone(&reader_retries);
let reader = thread::spawn(move || {
while !reader_wd.load(Ordering::Acquire) {
thread::yield_now();
}
let before = seqlock_metrics().fsqlite_seqlock_retries_total;
let val = reader_sl.read("writer_blocks_test");
let after = seqlock_metrics().fsqlite_seqlock_retries_total;
rr.store(after.saturating_sub(before), Ordering::Release);
val
});
{
let _guard = sl.write_lock.lock();
sl.seq.fetch_add(1, Ordering::Release); writer_done.store(true, Ordering::Release);
thread::sleep(Duration::from_millis(10));
sl.value.store(999, Ordering::Release);
sl.seq.fetch_add(1, Ordering::Release); }
let result = reader.join().unwrap();
assert_eq!(result, Some(999), "reader should see final value");
let retries = reader_retries.load(Ordering::Acquire);
assert!(
retries > 0,
"bd-3wop3.6: reader should have retried during writer hold, got {retries} retries"
);
println!("[test_seqlock_writer_blocks_readers] reader retried {retries} times");
}
#[test]
fn test_seqlock_throughput_vs_mutex() {
use fsqlite_types::sync_primitives::Mutex;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
const THREADS: usize = 8;
const DURATION_MS: u64 = 100;
let mutex_value = Arc::new(Mutex::new(42u64));
let mutex_reads = Arc::new(AtomicU64::new(0));
let mutex_stop = Arc::new(AtomicBool::new(false));
let mutex_barrier = Arc::new(Barrier::new(THREADS));
let mut mutex_handles = Vec::with_capacity(THREADS);
for _ in 0..THREADS {
let v = Arc::clone(&mutex_value);
let r = Arc::clone(&mutex_reads);
let s = Arc::clone(&mutex_stop);
let b = Arc::clone(&mutex_barrier);
mutex_handles.push(thread::spawn(move || {
b.wait();
let mut local = 0u64;
while !s.load(Ordering::Relaxed) {
let _val = *v.lock();
local += 1;
}
r.fetch_add(local, Ordering::Relaxed);
}));
}
thread::sleep(Duration::from_millis(DURATION_MS));
mutex_stop.store(true, Ordering::Release);
for h in mutex_handles {
h.join().unwrap();
}
let mutex_total = mutex_reads.load(Ordering::Relaxed);
let sl = Arc::new(SeqLock::new(42));
let sl_reads = Arc::new(AtomicU64::new(0));
let sl_stop = Arc::new(AtomicBool::new(false));
let sl_barrier = Arc::new(Barrier::new(THREADS));
let mut sl_handles = Vec::with_capacity(THREADS);
for _ in 0..THREADS {
let s = Arc::clone(&sl);
let r = Arc::clone(&sl_reads);
let st = Arc::clone(&sl_stop);
let b = Arc::clone(&sl_barrier);
sl_handles.push(thread::spawn(move || {
b.wait();
let mut local = 0u64;
while !st.load(Ordering::Relaxed) {
if s.read("throughput_test").is_some() {
local += 1;
}
}
r.fetch_add(local, Ordering::Relaxed);
}));
}
thread::sleep(Duration::from_millis(DURATION_MS));
sl_stop.store(true, Ordering::Release);
for h in sl_handles {
h.join().unwrap();
}
let sl_total = sl_reads.load(Ordering::Relaxed);
let speedup = sl_total as f64 / mutex_total.max(1) as f64;
println!(
"[test_seqlock_throughput_vs_mutex] mutex={mutex_total} seqlock={sl_total} speedup={speedup:.1}x"
);
if speedup <= 1.0 {
println!(
"WARNING: bd-3wop3.6: seqlock throughput was not faster than mutex (got {speedup:.1}x). This is expected in some noisy CI environments."
);
}
}
}