#![allow(dead_code)]
#![allow(unexpected_cfgs)]
use std::collections::BTreeMap;
#[cfg(loom)]
use loom::sync::{
atomic::{AtomicU64, Ordering},
Mutex,
};
#[cfg(not(loom))]
use std::sync::{
atomic::{AtomicU64, Ordering},
Mutex,
};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) struct ReserveTicket {
pub(crate) start: u64,
pub(crate) end: u64,
}
impl ReserveTicket {
#[inline]
fn len(self) -> usize {
(self.end - self.start) as usize
}
}
#[derive(Debug)]
struct Advancer {
pending: BTreeMap<u64, u64>,
committed_count: u64,
}
pub(crate) struct WalRing {
buf: Box<[std::cell::UnsafeCell<u8>]>,
mask: u64,
capacity: u64,
tail: AtomicU64,
committed_addr: AtomicU64,
committed_records: AtomicU64,
flush_cursor: AtomicU64,
advance: Mutex<Advancer>,
}
unsafe impl Sync for WalRing {}
unsafe impl Send for WalRing {}
impl WalRing {
pub(crate) fn with_capacity(capacity_bytes: usize) -> Self {
let cap = capacity_bytes.max(64).next_power_of_two();
let buf = (0..cap)
.map(|_| std::cell::UnsafeCell::new(0u8))
.collect::<Vec<_>>()
.into_boxed_slice();
WalRing {
buf,
mask: (cap as u64) - 1,
capacity: cap as u64,
tail: AtomicU64::new(0),
committed_addr: AtomicU64::new(0),
committed_records: AtomicU64::new(0),
flush_cursor: AtomicU64::new(0),
advance: Mutex::new(Advancer {
pending: BTreeMap::new(),
committed_count: 0,
}),
}
}
#[inline]
pub(crate) fn capacity(&self) -> u64 {
self.capacity
}
#[inline]
pub(crate) fn reserve(&self, total_len: u64) -> ReserveTicket {
debug_assert!(total_len > 0 && total_len <= self.capacity);
let start = self.tail.fetch_add(total_len, Ordering::Relaxed);
ReserveTicket {
start,
end: start + total_len,
}
}
#[inline]
pub(crate) fn reserve_space_ready(&self, ticket: &ReserveTicket) -> bool {
ticket.end <= self.flush_cursor.load(Ordering::Acquire) + self.capacity
}
pub(crate) fn fill(&self, ticket: &ReserveTicket, bytes: &[u8]) {
debug_assert_eq!(bytes.len(), ticket.len());
debug_assert!(
self.reserve_space_ready(ticket),
"ring overrun: caller must wait for reserved space"
);
let cap = self.buf.len();
let off = (ticket.start & self.mask) as usize;
let first = bytes.len().min(cap - off);
unsafe {
std::ptr::copy_nonoverlapping(bytes.as_ptr(), self.buf[off].get(), first);
if first < bytes.len() {
std::ptr::copy_nonoverlapping(
bytes[first..].as_ptr(),
self.buf[0].get(),
bytes.len() - first,
);
}
}
}
pub(crate) fn publish(&self, ticket: &ReserveTicket) {
let mut adv = self.advance.lock().unwrap();
adv.pending.insert(ticket.start, ticket.end);
let mut addr = self.committed_addr.load(Ordering::Relaxed);
let start_addr = addr;
let mut folded = 0u64;
while let Some(end) = adv.pending.remove(&addr) {
addr = end;
folded += 1;
}
if addr != start_addr {
adv.committed_count += folded;
self.committed_addr.store(addr, Ordering::Release);
self.committed_records
.store(adv.committed_count, Ordering::Release);
}
}
pub(crate) fn copy_committed_prefix(&self, sink: &mut impl FnMut(&[u8])) -> u64 {
let committed = self.committed_addr.load(Ordering::Acquire);
let from = self.flush_cursor.load(Ordering::Acquire);
if committed <= from {
return 0;
}
let total = (committed - from) as usize;
let cap = self.buf.len();
let off = (from & self.mask) as usize;
let first = total.min(cap - off);
unsafe {
sink(std::slice::from_raw_parts(self.buf[off].get(), first));
if first < total {
sink(std::slice::from_raw_parts(self.buf[0].get(), total - first));
}
}
self.flush_cursor.store(committed, Ordering::Release);
total as u64
}
pub(crate) fn reset_after_drain(&self) {
let adv = self.advance.lock().unwrap();
debug_assert!(adv.pending.is_empty(), "unpublished intents at reset");
let committed = self.committed_addr.load(Ordering::Relaxed);
debug_assert_eq!(
self.tail.load(Ordering::Relaxed),
committed,
"tail not published"
);
debug_assert_eq!(
self.flush_cursor.load(Ordering::Relaxed),
committed,
"flusher not caught up"
);
self.flush_cursor.store(0, Ordering::Release);
self.committed_addr.store(0, Ordering::Release);
self.tail.store(0, Ordering::Release);
drop(adv);
}
#[inline]
pub(crate) fn committed_addr(&self) -> u64 {
self.committed_addr.load(Ordering::Acquire)
}
#[inline]
pub(crate) fn committed_records(&self) -> u64 {
self.committed_records.load(Ordering::Acquire)
}
#[inline]
pub(crate) fn flush_cursor(&self) -> u64 {
self.flush_cursor.load(Ordering::Acquire)
}
#[inline]
pub(crate) fn tail(&self) -> u64 {
self.tail.load(Ordering::Acquire)
}
pub(crate) fn append(&self, bytes: &[u8]) -> ReserveTicket {
let t = self.reserve(bytes.len() as u64);
self.fill(&t, bytes);
self.publish(&t);
t
}
}
#[cfg(all(test, not(loom)))]
mod tests {
use super::*;
use std::sync::Arc;
fn rec(tag: u8, len: usize) -> Vec<u8> {
vec![tag; len]
}
#[test]
fn dense_advance_and_byte_identical() {
let ring = WalRing::with_capacity(4096);
let records = [rec(1, 10), rec(2, 20), rec(3, 5), rec(4, 33)];
let mut expected = Vec::new();
for (i, r) in records.iter().enumerate() {
ring.append(r);
assert_eq!(ring.committed_records(), (i + 1) as u64);
expected.extend_from_slice(r);
}
assert_eq!(ring.committed_addr(), expected.len() as u64);
let mut flushed = Vec::new();
let copied = ring.copy_committed_prefix(&mut |s| flushed.extend_from_slice(s));
assert_eq!(copied, expected.len() as u64);
assert_eq!(flushed, expected, "flushed stream must equal record concat");
assert_eq!(
ring.copy_committed_prefix(&mut |_| panic!("nothing to copy")),
0
);
}
#[test]
fn out_of_order_publish_holds_prefix() {
let ring = WalRing::with_capacity(4096);
let r1 = rec(0xA1, 8);
let r2 = rec(0xB2, 16);
let r3 = rec(0xC3, 4);
let t1 = ring.reserve(r1.len() as u64);
let t2 = ring.reserve(r2.len() as u64);
let t3 = ring.reserve(r3.len() as u64);
ring.fill(&t2, &r2);
ring.publish(&t2);
assert_eq!(
ring.committed_addr(),
0,
"byte 0 interval missing => stalls"
);
assert_eq!(ring.committed_records(), 0);
ring.fill(&t3, &r3);
ring.publish(&t3);
assert_eq!(ring.committed_addr(), 0, "still missing the [0,8) interval");
ring.fill(&t1, &r1);
ring.publish(&t1);
assert_eq!(ring.committed_addr(), 28, "all published => prefix folds");
assert_eq!(ring.committed_records(), 3);
let mut flushed = Vec::new();
ring.copy_committed_prefix(&mut |s| flushed.extend_from_slice(s));
let mut expected = r1.clone();
expected.extend_from_slice(&r2);
expected.extend_from_slice(&r3);
assert_eq!(flushed, expected, "file order == byte order (r1++r2++r3)");
}
#[test]
fn wrapped_copy_equals_linear() {
let ring = WalRing::with_capacity(64); assert_eq!(ring.capacity(), 64);
let pre = rec(0x11, 48);
ring.append(&pre);
let mut sink = Vec::new();
ring.copy_committed_prefix(&mut |s| sink.extend_from_slice(s));
assert_eq!(sink, pre);
let wrapping = (0u8..32).collect::<Vec<u8>>();
let t = ring.append(&wrapping);
assert!(
(t.start & ring.mask) + 32 > ring.capacity,
"test must exercise the wrap: start_off={}",
t.start & ring.mask
);
let mut flushed = Vec::new();
ring.copy_committed_prefix(&mut |s| flushed.extend_from_slice(s));
assert_eq!(flushed, wrapping, "wrapped record reassembles in order");
}
#[test]
fn concurrent_producers_single_flusher() {
use std::sync::atomic::{AtomicBool, Ordering as O};
use std::thread;
const PRODUCERS: usize = 4;
const PER: usize = 2000;
const REC_LEN: usize = 24;
let ring = Arc::new(WalRing::with_capacity(1 << 20)); let done = Arc::new(AtomicBool::new(false));
let flusher = {
let ring = Arc::clone(&ring);
let done = Arc::clone(&done);
thread::spawn(move || {
let mut out: Vec<u8> = Vec::new();
loop {
let n = ring.copy_committed_prefix(&mut |s| out.extend_from_slice(s));
if n == 0 {
if done.load(O::Acquire) && ring.committed_addr() == ring.flush_cursor() {
break;
}
std::hint::spin_loop();
}
}
out
})
};
let mut handles = Vec::new();
for p in 0..PRODUCERS {
let ring = Arc::clone(&ring);
handles.push(thread::spawn(move || {
for i in 0..PER {
let tag = (p * PER + i) as u32;
let mut r = vec![0u8; REC_LEN];
r[..4].copy_from_slice(&tag.to_le_bytes());
ring.append(&r);
}
}));
}
for h in handles {
h.join().unwrap();
}
done.store(true, O::Release);
let out = flusher.join().unwrap();
let total = PRODUCERS * PER;
assert_eq!(
out.len(),
total * REC_LEN,
"every record flushed exactly once"
);
assert_eq!(ring.committed_records(), total as u64);
let mut seen = vec![false; total];
for chunk in out.chunks_exact(REC_LEN) {
let tag = u32::from_le_bytes(chunk[..4].try_into().unwrap()) as usize;
assert!(!seen[tag], "duplicate record {tag}");
seen[tag] = true;
}
assert!(seen.iter().all(|&b| b), "no record may be lost");
}
#[test]
fn ring_to_walwriter_byte_identical_and_replays() {
use crate::journal::codec::encode_insert_record;
use crate::journal::reader::replay;
use crate::journal::wal_op::WalOp;
use crate::journal::writer::WalWriter;
let tree_id = 7u64;
let inputs: Vec<(u64, Vec<u8>, Vec<u8>)> = (0..64u64)
.map(|i| {
let key = format!("bucket/{:02}/object-{i}", i % 4).into_bytes();
let value = vec![(i & 0xff) as u8; (i as usize % 37) + 1];
(i + 1, key, value) })
.collect();
let records: Vec<Vec<u8>> = inputs
.iter()
.map(|(seq, k, v)| {
let mut buf = Vec::new();
encode_insert_record(&mut buf, *seq, tree_id, k, v);
buf
})
.collect();
let dir = tempfile::tempdir().unwrap();
let path_a = dir.path().join("a.wal");
{
let mut w = WalWriter::open_or_create(&path_a, tree_id).unwrap();
for r in &records {
w.append_encoded(r).unwrap();
}
w.flush().unwrap();
}
let path_b = dir.path().join("b.wal");
{
let ring = WalRing::with_capacity(256);
let mut w = WalWriter::open_or_create(&path_b, tree_id).unwrap();
for r in &records {
assert!(r.len() as u64 <= ring.capacity());
ring.append(r);
ring.copy_committed_prefix(&mut |s| w.append_encoded(s).unwrap());
}
w.flush().unwrap();
}
assert_eq!(
std::fs::read(&path_a).unwrap(),
std::fs::read(&path_b).unwrap(),
"ring-produced WAL must be byte-identical to the direct path"
);
let mut got: Vec<(u64, Vec<u8>, Vec<u8>)> = Vec::new();
replay(&path_b, |op, seq, _| {
if let WalOp::Insert {
key,
value,
tree_id: tid,
..
} = op
{
assert_eq!(*tid, tree_id);
got.push((seq, key.clone(), value.clone()));
}
Ok(())
})
.unwrap();
let expected: Vec<(u64, Vec<u8>, Vec<u8>)> = inputs
.iter()
.map(|(s, k, v)| (*s, k.clone(), v.clone()))
.collect();
assert_eq!(got, expected, "replay must round-trip records in order");
}
#[test]
fn background_flusher_concurrent_producers_replay() {
use crate::journal::codec::encode_insert_record;
use crate::journal::reader::replay;
use crate::journal::wal_op::WalOp;
use crate::journal::writer::WalWriter;
use std::sync::atomic::{AtomicBool, Ordering as O};
use std::sync::Mutex;
use std::thread;
const PRODUCERS: usize = 4;
const PER: usize = 250;
let tree_id = 9u64;
let ring = Arc::new(WalRing::with_capacity(4096));
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("bg.wal");
let writer = Arc::new(Mutex::new(
WalWriter::open_or_create(&path, tree_id).unwrap(),
));
let stop = Arc::new(AtomicBool::new(false));
let flusher = {
let ring = Arc::clone(&ring);
let writer = Arc::clone(&writer);
let stop = Arc::clone(&stop);
thread::spawn(move || loop {
let n = ring.copy_committed_prefix(&mut |s| {
writer.lock().unwrap().append_encoded(s).unwrap();
});
if n == 0 {
if stop.load(O::Acquire) && ring.flush_cursor() == ring.committed_addr() {
break;
}
std::hint::spin_loop();
}
})
};
let mut handles = Vec::new();
for p in 0..PRODUCERS {
let ring = Arc::clone(&ring);
handles.push(thread::spawn(move || {
for i in 0..PER {
let seq = (p * PER + i + 1) as u64;
let key = format!("p{p}/obj-{i}").into_bytes();
let value = vec![(seq & 0xff) as u8; (i % 29) + 1];
let mut rec = Vec::new();
encode_insert_record(&mut rec, seq, tree_id, &key, &value);
assert!(rec.len() as u64 <= ring.capacity());
let t = ring.reserve(rec.len() as u64);
while !ring.reserve_space_ready(&t) {
std::hint::spin_loop();
}
ring.fill(&t, &rec);
ring.publish(&t);
}
}));
}
for h in handles {
h.join().unwrap();
}
stop.store(true, O::Release);
flusher.join().unwrap();
writer.lock().unwrap().flush().unwrap();
let total = PRODUCERS * PER;
assert_eq!(ring.committed_records(), total as u64);
let mut seqs = std::collections::BTreeSet::new();
replay(&path, |op, seq, _| {
if let WalOp::Insert { tree_id: tid, .. } = op {
assert_eq!(*tid, tree_id);
assert!(seqs.insert(seq), "duplicate seq {seq} on replay");
}
Ok(())
})
.unwrap();
assert_eq!(seqs.len(), total, "every record must replay exactly once");
assert_eq!(*seqs.iter().next().unwrap(), 1);
assert_eq!(*seqs.iter().next_back().unwrap(), total as u64);
}
#[test]
fn reset_after_drain_keeps_record_count() {
let ring = WalRing::with_capacity(256);
for i in 0..3u8 {
ring.append(&[i + 1; 20]);
}
let mut sink = Vec::new();
ring.copy_committed_prefix(&mut |s| sink.extend_from_slice(s));
assert_eq!(ring.committed_records(), 3);
assert_eq!(
(ring.tail(), ring.committed_addr(), ring.flush_cursor()),
(60, 60, 60)
);
ring.reset_after_drain();
assert_eq!(
(ring.tail(), ring.committed_addr(), ring.flush_cursor()),
(0, 0, 0)
);
assert_eq!(
ring.committed_records(),
3,
"record count survives truncate reset"
);
let t = ring.append(&[9u8; 10]);
assert_eq!(t.start, 0, "appends resume from byte 0 after reset");
assert_eq!(ring.committed_records(), 4);
let mut sink2 = Vec::new();
ring.copy_committed_prefix(&mut |s| sink2.extend_from_slice(s));
assert_eq!(sink2, vec![9u8; 10]);
}
}
#[cfg(loom)]
mod loom_tests {
use super::*;
use loom::sync::Arc;
use loom::thread;
#[test]
fn gap_safety_two_producers_one_flusher() {
loom::model(|| {
let ring = Arc::new(WalRing::with_capacity(64));
let w1 = {
let ring = Arc::clone(&ring);
thread::spawn(move || ring.append(&[1u8; 4]))
};
let w2 = {
let ring = Arc::clone(&ring);
thread::spawn(move || ring.append(&[2u8; 4]))
};
let mut drained: Vec<u8> = Vec::new();
ring.copy_committed_prefix(&mut |s| drained.extend_from_slice(s));
for &b in &drained {
assert!(b == 1 || b == 2, "torn/gap byte {b} observed by flusher");
}
w1.join().unwrap();
w2.join().unwrap();
let mut rest: Vec<u8> = Vec::new();
ring.copy_committed_prefix(&mut |s| rest.extend_from_slice(s));
for &b in &rest {
assert!(b == 1 || b == 2, "torn/gap byte {b} in final drain");
}
assert_eq!(drained.len() + rest.len(), 8, "both records must drain");
assert_eq!(ring.committed_records(), 2);
});
}
#[test]
fn gap_safety_three_producers_one_flusher() {
loom::model(|| {
let ring = Arc::new(WalRing::with_capacity(64));
let mut workers = Vec::new();
for tag in 1u8..=3 {
let ring = Arc::clone(&ring);
workers.push(thread::spawn(move || ring.append(&[tag; 4])));
}
let mut drained: Vec<u8> = Vec::new();
ring.copy_committed_prefix(&mut |s| drained.extend_from_slice(s));
for &b in &drained {
assert!((1..=3).contains(&b), "torn/gap byte {b}");
}
for w in workers {
w.join().unwrap();
}
let mut rest: Vec<u8> = Vec::new();
ring.copy_committed_prefix(&mut |s| rest.extend_from_slice(s));
for &b in &rest {
assert!((1..=3).contains(&b), "torn/gap byte {b} in final drain");
}
assert_eq!(drained.len() + rest.len(), 12, "all three must drain");
assert_eq!(ring.committed_records(), 3);
});
}
}