#[cfg(test)]
pub mod test {
use std::sync::atomic::Ordering;
use crate::{ flush_behaviour::WriteMode, flush_buffer::BufferError, log_structured_store::{FOUR_KB_PAGE, LogStructuredStore}};
#[test]
fn api_test() {
let path = "test_store/log.db";
let log =
LogStructuredStore::open_with_behavior(path, WriteMode::TailLocalizedWrites).unwrap();
let res = match log.reserve_space(4096) {
Ok(r) => Ok(r),
Err(e) => Err(e),
};
if res.is_ok() {
let res = res.unwrap();
let successful_cas = true;
if successful_cas {
let _ = log.write_payload(b"LENGTH_OF_4KB", res);
log.flush_cur_buffer();
} else {
let _ = log.write_payload(b"FAILED_FLUSH", res);
}
}
log.check_async_cque();
}
#[test]
fn test_out_of_order_completion() {
let path = "test_store/ooo_log.db";
let log =
LogStructuredStore::open_with_behavior(path, WriteMode::TailLocalizedWrites).unwrap();
log.mark_slot_complete(3);
assert_eq!(
log.hi_stable.load(Ordering::Acquire),
0,
"hi_stable should stay at 0"
);
assert_eq!(log.get_island_count(), 1, "Should have 1 island (slot 3)");
log.mark_slot_complete(1);
assert_eq!(
log.hi_stable.load(Ordering::Acquire),
1,
"hi_stable should advance to 1"
);
assert_eq!(
log.get_island_count(),
1,
"Should still have 1 island (slot 3)"
);
log.mark_slot_complete(2);
assert_eq!(
log.hi_stable.load(Ordering::Acquire),
3,
"hi_stable should advance to 3"
);
assert_eq!(log.get_island_count(), 0, "Should have no islands");
let _ = std::fs::remove_file(path);
}
#[test]
fn test_large_gap() {
let path = "test_store/gap_log.db";
let log =
LogStructuredStore::open_with_behavior(path, WriteMode::TailLocalizedWrites).unwrap();
log.mark_slot_complete(100);
assert_eq!(log.hi_stable.load(Ordering::Acquire), 0);
assert_eq!(
log.get_island_count(),
1,
"Should only have island at slot 100"
);
log.mark_slot_complete(1);
assert_eq!(log.hi_stable.load(Ordering::Acquire), 1);
assert_eq!(
log.get_island_count(),
1,
"Should still only have island at slot 100"
);
log.mark_slot_complete(2);
assert_eq!(log.hi_stable.load(Ordering::Acquire), 2);
assert_eq!(
log.get_island_count(),
1,
"Should ONLY have island at slot 100"
);
let _ = std::fs::remove_file(path);
}
fn test_multiple_islands() {
let path = "test_store/multi_island_log.db";
let log =
LogStructuredStore::open_with_behavior(path, WriteMode::TailLocalizedWrites).unwrap();
log.mark_slot_complete(5);
log.mark_slot_complete(10);
log.mark_slot_complete(15);
assert_eq!(log.hi_stable.load(Ordering::Acquire), 0);
assert_eq!(log.get_island_count(), 3, "Should have exactly 3 islands");
for slot in 1..=5 {
log.mark_slot_complete(slot);
}
assert_eq!(log.hi_stable.load(Ordering::Acquire), 5);
assert_eq!(log.get_island_count(), 2, "Should have 2 islands remaining");
let _ = std::fs::remove_file(path);
}
#[test]
fn test_async_flushes_and_hi_stable() {
let path = "test_store/async_flush.db";
let _ = std::fs::create_dir_all("test_store");
let _ = std::fs::remove_file(path);
let log =
LogStructuredStore::open_with_behavior(path, WriteMode::TailLocalizedWrites).unwrap();
for slot in 1..=3 {
log.mark_slot_complete(slot);
}
let mut records: Vec<(i32, u64, usize, bool)> = Vec::new(); for i in 0..8 {
let payload = vec![i as u8; 1024];
loop {
match log.reserve_space(1024) {
Ok(res) => {
let offset = res.offset;
log.write_payload(&payload, res).unwrap();
let slot = 4 + (i / 4) as u64; records.push((i as i32, slot, offset, false));
break;
}
Err(BufferError::InsufficientSpace) => {
let current = unsafe {
log.buffer
.current_buffer
.load(Ordering::Acquire)
.as_ref()
.unwrap()
};
let _ = log
.buffer
.put(current, Err(BufferError::InsufficientSpace), &[]);
log.check_async_cque();
}
Err(e) => panic!("unexpected error: {e:?}"),
}
}
}
let current = log.get_cur_buffer();
let _ = log
.buffer
.put(current, Err(BufferError::InsufficientSpace), &[]);
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(10);
let max_slot = records.iter().map(|r| r.1).max().unwrap();
loop {
log.check_async_cque();
let stable = log.hi_stable.load(Ordering::Acquire);
if stable >= max_slot {
break;
}
if std::time::Instant::now() > deadline {
panic!(
"timed out waiting for flushes — hi_stable={}, max_slot={}, islands={}",
stable,
max_slot,
log.get_island_count()
);
}
}
let final_stable = log.hi_stable.load(Ordering::Acquire);
assert!(
final_stable >= max_slot,
"hi_stable {} should be >= max_slot {}",
final_stable,
max_slot
);
for record in &mut records {
record.3 = true;
}
for (i, _, _, flushed) in &records {
assert!(*flushed, "Record {} should be flushed", i);
}
let _ = std::fs::remove_file(path);
}
#[test]
fn test_read_multiple_slots_from_disk() {
use std::os::unix::fs::FileExt;
let path = "test_store/read_multi_slots.db";
let _ = std::fs::create_dir_all("test_store");
let _ = std::fs::remove_file(path);
let log =
LogStructuredStore::open_with_behavior(path, WriteMode::TailLocalizedWrites).unwrap();
let mut records = Vec::new();
for i in 0..8 {
let payload = vec![i as u8; 1024];
loop {
match log.reserve_space(1024) {
Ok(res) => {
let offset = res.offset;
log.write_payload(&payload, res).unwrap();
let slot = 4 + (i / 4) as u64; records.push((i, slot, offset));
break;
}
Err(BufferError::InsufficientSpace) => {
let sealed = unsafe {
log.buffer
.current_buffer
.load(Ordering::Acquire)
.as_ref()
.unwrap()
};
let _ = log
.buffer
.put(sealed, Err(BufferError::InsufficientSpace), &[]);
sealed.set_flush_in_progress();
log.flusher.submit_buffer_and_wait(sealed).unwrap();
log.buffer.reset_buffer(sealed);
}
Err(e) => panic!("unexpected error: {e:?}"),
}
}
}
let sealed = log.get_cur_buffer();
let _ = log
.buffer
.put(sealed, Err(BufferError::InsufficientSpace), &[]);
sealed.set_flush_in_progress();
log.flusher.submit_buffer_and_wait(sealed).unwrap();
log.buffer.reset_buffer(sealed);
let file = std::fs::File::open(path).unwrap();
for (i, slot, offset) in &records {
let byte_offset = slot * FOUR_KB_PAGE as u64 + *offset as u64;
let mut read_back = vec![0u8; 1024];
file.read_at(&mut read_back, byte_offset).unwrap();
let expected = vec![*i as u8; 1024];
assert_eq!(
read_back, expected,
"Failed to read correct data at slot {}, offset {}: payload mismatch for i={}",
slot, offset, i
);
}
let _ = std::fs::remove_file(path);
}
}