mod common;
use common::{TestEnv, current_wal_dir};
use std::sync::{Arc, Barrier};
use std::thread;
use std::time::Duration;
use walrus_rust::{FsyncSchedule, ReadConsistency, Walrus, disable_fd_backend, enable_fd_backend};
fn setup_test_env() -> TestEnv {
TestEnv::new()
}
fn cleanup_test_env() {
let _ = std::fs::remove_dir_all(current_wal_dir());
}
#[test]
fn test_batch_write_basic() {
let _guard = setup_test_env();
enable_fd_backend();
let wal = Walrus::with_consistency_and_schedule(
ReadConsistency::StrictlyAtOnce,
FsyncSchedule::NoFsync,
)
.unwrap();
let entries: Vec<&[u8]> = vec![b"entry1", b"entry2", b"entry3"];
wal.batch_append_for_topic("test_topic", &entries).unwrap();
let e1 = wal.read_next("test_topic", true).unwrap().unwrap();
assert_eq!(e1.data, b"entry1");
let e2 = wal.read_next("test_topic", true).unwrap().unwrap();
assert_eq!(e2.data, b"entry2");
let e3 = wal.read_next("test_topic", true).unwrap().unwrap();
assert_eq!(e3.data, b"entry3");
assert!(wal.read_next("test_topic", true).unwrap().is_none());
cleanup_test_env();
}
#[test]
fn test_batch_write_atomicity_with_reader() {
let _guard = setup_test_env();
enable_fd_backend();
let wal = Walrus::with_consistency_and_schedule(
ReadConsistency::StrictlyAtOnce,
FsyncSchedule::NoFsync,
)
.unwrap();
wal.append_for_topic("test_topic", b"before").unwrap();
let e = wal.read_next("test_topic", true).unwrap().unwrap();
assert_eq!(e.data, b"before");
let entries: Vec<&[u8]> = vec![b"batch1", b"batch2", b"batch3"];
wal.batch_append_for_topic("test_topic", &entries).unwrap();
let e1 = wal.read_next("test_topic", true).unwrap().unwrap();
assert_eq!(e1.data, b"batch1");
let e2 = wal.read_next("test_topic", true).unwrap().unwrap();
assert_eq!(e2.data, b"batch2");
let e3 = wal.read_next("test_topic", true).unwrap().unwrap();
assert_eq!(e3.data, b"batch3");
cleanup_test_env();
}
#[test]
fn test_batch_size_limit_enforcement() {
let _guard = setup_test_env();
enable_fd_backend();
let wal = Walrus::with_consistency_and_schedule(
ReadConsistency::StrictlyAtOnce,
FsyncSchedule::NoFsync,
)
.unwrap();
let one_gb = vec![0u8; 1024 * 1024 * 1024];
let entries: Vec<&[u8]> = vec![
&one_gb, &one_gb, &one_gb, &one_gb, &one_gb, &one_gb, &one_gb, &one_gb, &one_gb, &one_gb,
&one_gb,
];
let result = wal.batch_append_for_topic("test_topic", &entries);
assert!(result.is_err());
let err = result.unwrap_err();
assert_eq!(err.kind(), std::io::ErrorKind::InvalidInput);
assert!(err.to_string().contains("10GB limit"));
assert!(wal.read_next("test_topic", true).unwrap().is_none());
cleanup_test_env();
}
#[test]
fn test_concurrent_batch_writes_rejected() {
let _guard = setup_test_env();
enable_fd_backend();
let wal = Arc::new(
Walrus::with_consistency_and_schedule(
ReadConsistency::StrictlyAtOnce,
FsyncSchedule::NoFsync,
)
.unwrap(),
);
let barrier = Arc::new(Barrier::new(2));
let success_count = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let would_block_count = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let mut handles = vec![];
for i in 0..2 {
let wal_clone = wal.clone();
let barrier_clone = barrier.clone();
let success = success_count.clone();
let blocked = would_block_count.clone();
let handle = thread::spawn(move || {
let large_entry = vec![0u8; 10 * 1024 * 1024];
let entries: Vec<&[u8]> = (0..100).map(|_| large_entry.as_slice()).collect();
barrier_clone.wait();
let result = wal_clone.batch_append_for_topic("test_topic", &entries);
match result {
Ok(_) => {
success.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
}
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
blocked.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
}
Err(e) => panic!("Unexpected error: {}", e),
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
let successes = success_count.load(std::sync::atomic::Ordering::SeqCst);
let blocks = would_block_count.load(std::sync::atomic::Ordering::SeqCst);
assert_eq!(successes, 1, "Expected exactly 1 successful batch write");
assert_eq!(blocks, 1, "Expected exactly 1 blocked batch write");
cleanup_test_env();
}
#[test]
fn test_regular_write_blocked_during_batch() {
let _guard = setup_test_env();
enable_fd_backend();
let wal = Arc::new(
Walrus::with_consistency_and_schedule(
ReadConsistency::StrictlyAtOnce,
FsyncSchedule::NoFsync,
)
.unwrap(),
);
let barrier = Arc::new(Barrier::new(2));
let batch_started = Arc::new(std::sync::atomic::AtomicBool::new(false));
let write_blocked = Arc::new(std::sync::atomic::AtomicBool::new(false));
let wal_clone = wal.clone();
let barrier_clone = barrier.clone();
let batch_flag = batch_started.clone();
let batch_handle = thread::spawn(move || {
let large_entry = vec![0u8; 50 * 1024 * 1024];
let entries: Vec<&[u8]> = (0..50).map(|_| large_entry.as_slice()).collect();
barrier_clone.wait();
batch_flag.store(true, std::sync::atomic::Ordering::SeqCst);
wal_clone
.batch_append_for_topic("test_topic", &entries)
.unwrap();
});
let wal_clone = wal.clone();
let barrier_clone = barrier.clone();
let batch_flag = batch_started.clone();
let blocked_flag = write_blocked.clone();
let write_handle = thread::spawn(move || {
barrier_clone.wait();
while !batch_flag.load(std::sync::atomic::Ordering::SeqCst) {
thread::sleep(Duration::from_millis(1));
}
thread::sleep(Duration::from_millis(10));
let result = wal_clone.append_for_topic("test_topic", b"regular_entry");
if let Err(e) = result {
if e.kind() == std::io::ErrorKind::WouldBlock {
blocked_flag.store(true, std::sync::atomic::Ordering::SeqCst);
}
}
});
batch_handle.join().unwrap();
write_handle.join().unwrap();
assert!(
write_blocked.load(std::sync::atomic::Ordering::SeqCst),
"Regular write should have been blocked during batch"
);
cleanup_test_env();
}
#[test]
fn test_empty_batch() {
let _guard = setup_test_env();
enable_fd_backend();
let wal = Walrus::with_consistency_and_schedule(
ReadConsistency::StrictlyAtOnce,
FsyncSchedule::NoFsync,
)
.unwrap();
let entries: Vec<&[u8]> = vec![];
wal.batch_append_for_topic("test_topic", &entries).unwrap();
assert!(wal.read_next("test_topic", true).unwrap().is_none());
cleanup_test_env();
}
#[test]
fn test_batch_spans_multiple_blocks() {
let _guard = setup_test_env();
enable_fd_backend();
let wal = Walrus::with_consistency_and_schedule(
ReadConsistency::StrictlyAtOnce,
FsyncSchedule::NoFsync,
)
.unwrap();
let large_entry = vec![0xAB; 5 * 1024 * 1024];
let entries: Vec<&[u8]> = (0..100).map(|_| large_entry.as_slice()).collect();
wal.batch_append_for_topic("test_topic", &entries).unwrap();
for i in 0..100 {
let entry = wal.read_next("test_topic", true).unwrap().unwrap();
assert_eq!(entry.data.len(), 5 * 1024 * 1024);
assert_eq!(entry.data[0], 0xAB);
}
assert!(wal.read_next("test_topic", true).unwrap().is_none());
cleanup_test_env();
}
#[test]
fn test_batch_with_varying_entry_sizes() {
let _guard = setup_test_env();
enable_fd_backend();
let wal = Walrus::with_consistency_and_schedule(
ReadConsistency::StrictlyAtOnce,
FsyncSchedule::NoFsync,
)
.unwrap();
let e1 = vec![1u8; 100];
let e2 = vec![2u8; 1024 * 1024];
let e3 = vec![3u8; 10 * 1024 * 1024];
let e4 = vec![4u8; 500];
let e5 = vec![5u8; 50 * 1024 * 1024];
let entries: Vec<&[u8]> = vec![&e1, &e2, &e3, &e4, &e5];
wal.batch_append_for_topic("test_topic", &entries).unwrap();
let r1 = wal.read_next("test_topic", true).unwrap().unwrap();
assert_eq!(r1.data.len(), 100);
assert_eq!(r1.data[0], 1);
let r2 = wal.read_next("test_topic", true).unwrap().unwrap();
assert_eq!(r2.data.len(), 1024 * 1024);
assert_eq!(r2.data[0], 2);
let r3 = wal.read_next("test_topic", true).unwrap().unwrap();
assert_eq!(r3.data.len(), 10 * 1024 * 1024);
assert_eq!(r3.data[0], 3);
let r4 = wal.read_next("test_topic", true).unwrap().unwrap();
assert_eq!(r4.data.len(), 500);
assert_eq!(r4.data[0], 4);
let r5 = wal.read_next("test_topic", true).unwrap().unwrap();
assert_eq!(r5.data.len(), 50 * 1024 * 1024);
assert_eq!(r5.data[0], 5);
cleanup_test_env();
}
#[test]
fn test_chaos_interleaved_batch_and_regular_writes() {
let _guard = setup_test_env();
enable_fd_backend();
let wal = Arc::new(
Walrus::with_consistency_and_schedule(
ReadConsistency::StrictlyAtOnce,
FsyncSchedule::NoFsync,
)
.unwrap(),
);
let num_threads = 10;
let mut handles = vec![];
for thread_id in 0..num_threads {
let wal_clone = wal.clone();
let handle = thread::spawn(move || {
for i in 0..20 {
if i % 3 == 0 {
let entries: Vec<&[u8]> = vec![b"batch1", b"batch2", b"batch3"];
let _ = wal_clone.batch_append_for_topic("chaos_topic", &entries);
} else {
let data = format!("regular_t{}_i{}", thread_id, i);
let _ = wal_clone.append_for_topic("chaos_topic", data.as_bytes());
}
thread::sleep(Duration::from_micros(100));
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
let mut count = 0;
while let Some(entry) = wal.read_next("chaos_topic", true).unwrap() {
assert!(!entry.data.is_empty());
count += 1;
}
assert!(count > 0, "Expected some entries to be written");
cleanup_test_env();
}
#[test]
fn test_chaos_multiple_topics_concurrent_batches() {
let _guard = setup_test_env();
enable_fd_backend();
let wal = Arc::new(
Walrus::with_consistency_and_schedule(
ReadConsistency::StrictlyAtOnce,
FsyncSchedule::NoFsync,
)
.unwrap(),
);
let num_topics = 3;
let batches_per_topic = 6;
let mut handles = vec![];
for topic_id in 0..num_topics {
let wal_clone = wal.clone();
let handle = thread::spawn(move || {
let topic_name = format!("topic_{}", topic_id);
for batch_num in 0..batches_per_topic {
let entry_data = format!("t{}_b{}_e", topic_id, batch_num);
let e1 = entry_data.clone() + "1";
let e2 = entry_data.clone() + "2";
let e3 = entry_data.clone() + "3";
let entries: Vec<&[u8]> = vec![e1.as_bytes(), e2.as_bytes(), e3.as_bytes()];
wal_clone
.batch_append_for_topic(&topic_name, &entries)
.unwrap();
thread::sleep(Duration::from_millis(5));
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
for topic_id in 0..num_topics {
let topic_name = format!("topic_{}", topic_id);
let mut count = 0;
while let Some(entry) = wal.read_next(&topic_name, true).unwrap() {
let data_str = String::from_utf8_lossy(&entry.data);
assert!(data_str.starts_with(&format!("t{}_", topic_id)));
count += 1;
}
assert_eq!(
count,
batches_per_topic * 3,
"Topic {} should have {} entries",
topic_id,
batches_per_topic * 3
);
}
cleanup_test_env();
}
#[test]
fn test_chaos_batch_write_with_concurrent_readers() {
let _guard = setup_test_env();
enable_fd_backend();
let wal = Arc::new(
Walrus::with_consistency_and_schedule(
ReadConsistency::AtLeastOnce { persist_every: 10 },
FsyncSchedule::NoFsync,
)
.unwrap(),
);
let stop_flag = Arc::new(std::sync::atomic::AtomicBool::new(false));
let mut handles = vec![];
for writer_id in 0..3 {
let wal_clone = wal.clone();
let stop = stop_flag.clone();
let handle = thread::spawn(move || {
let mut batch_count = 0;
while !stop.load(std::sync::atomic::Ordering::Relaxed) {
let entry_data = format!("writer_{}_batch_{}", writer_id, batch_count);
let entries: Vec<&[u8]> = vec![
entry_data.as_bytes(),
entry_data.as_bytes(),
entry_data.as_bytes(),
];
let _ = wal_clone.batch_append_for_topic("chaos_rw_topic", &entries);
batch_count += 1;
thread::sleep(Duration::from_millis(10));
}
});
handles.push(handle);
}
for _ in 0..5 {
let wal_clone = wal.clone();
let stop = stop_flag.clone();
let handle = thread::spawn(move || {
let mut read_count = 0;
while !stop.load(std::sync::atomic::Ordering::Relaxed) {
if let Ok(Some(entry)) = wal_clone.read_next("chaos_rw_topic", true) {
assert!(!entry.data.is_empty());
read_count += 1;
} else {
thread::sleep(Duration::from_millis(5));
}
}
});
handles.push(handle);
}
thread::sleep(Duration::from_secs(3));
stop_flag.store(true, std::sync::atomic::Ordering::Relaxed);
for handle in handles {
handle.join().unwrap();
}
cleanup_test_env();
}
#[test]
fn test_chaos_batch_write_crash_recovery() {
let _guard = setup_test_env();
enable_fd_backend();
let test_key = "crash_recovery_test";
{
let wal = Walrus::with_consistency_and_schedule_for_key(
test_key,
ReadConsistency::StrictlyAtOnce,
FsyncSchedule::SyncEach,
)
.unwrap();
let entries: Vec<&[u8]> = vec![b"before_crash_1", b"before_crash_2", b"before_crash_3"];
wal.batch_append_for_topic("crash_topic", &entries).unwrap();
drop(wal);
thread::sleep(Duration::from_millis(50));
}
{
let wal = Walrus::with_consistency_and_schedule_for_key(
test_key,
ReadConsistency::StrictlyAtOnce,
FsyncSchedule::SyncEach,
)
.unwrap();
let e1 = wal.read_next("crash_topic", true).unwrap().unwrap();
assert_eq!(e1.data, b"before_crash_1");
let e2 = wal.read_next("crash_topic", true).unwrap().unwrap();
assert_eq!(e2.data, b"before_crash_2");
let e3 = wal.read_next("crash_topic", true).unwrap().unwrap();
assert_eq!(e3.data, b"before_crash_3");
let entries2: Vec<&[u8]> = vec![b"after_crash_1", b"after_crash_2"];
wal.batch_append_for_topic("crash_topic", &entries2)
.unwrap();
let e4 = wal.read_next("crash_topic", true).unwrap().unwrap();
assert_eq!(e4.data, b"after_crash_1");
let e5 = wal.read_next("crash_topic", true).unwrap().unwrap();
assert_eq!(e5.data, b"after_crash_2");
}
cleanup_test_env();
}
#[test]
fn test_chaos_alternating_tiny_and_huge_batches() {
let _guard = setup_test_env();
enable_fd_backend();
let wal = Walrus::with_consistency_and_schedule(
ReadConsistency::StrictlyAtOnce,
FsyncSchedule::NoFsync,
)
.unwrap();
for round in 0..10 {
let tiny: Vec<&[u8]> = vec![b"t"];
wal.batch_append_for_topic("alternating", &tiny).unwrap();
let huge_entry = vec![0xAB; 10 * 1024 * 1024];
let huge: Vec<&[u8]> = (0..5).map(|_| huge_entry.as_slice()).collect();
wal.batch_append_for_topic("alternating", &huge).unwrap();
}
for round in 0..10 {
let tiny_entry = wal.read_next("alternating", true).unwrap().unwrap();
assert_eq!(tiny_entry.data, b"t");
for _ in 0..5 {
let huge_entry = wal.read_next("alternating", true).unwrap().unwrap();
assert_eq!(huge_entry.data.len(), 10 * 1024 * 1024);
assert_eq!(huge_entry.data[0], 0xAB);
}
}
cleanup_test_env();
}
#[test]
fn test_chaos_batch_writes_force_multiple_block_rotations() {
let _guard = setup_test_env();
enable_fd_backend();
let wal = Walrus::with_consistency_and_schedule(
ReadConsistency::StrictlyAtOnce,
FsyncSchedule::NoFsync,
)
.unwrap();
let entry = vec![0xEE; 5 * 1024 * 1024];
let entries: Vec<&[u8]> = (0..100).map(|_| entry.as_slice()).collect();
wal.batch_append_for_topic("rotation_topic", &entries)
.unwrap();
for _ in 0..100 {
let e = wal.read_next("rotation_topic", true).unwrap().unwrap();
assert_eq!(e.data.len(), 5 * 1024 * 1024);
assert_eq!(e.data[0], 0xEE);
}
cleanup_test_env();
}
#[test]
fn test_chaos_readers_at_different_positions_during_batch() {
let _guard = setup_test_env();
enable_fd_backend();
let wal = Walrus::with_consistency_and_schedule(
ReadConsistency::StrictlyAtOnce,
FsyncSchedule::NoFsync,
)
.unwrap();
for i in 0..5 {
let data = format!("initial_{}", i);
wal.append_for_topic("atomic_test", data.as_bytes())
.unwrap();
}
for _ in 0..3 {
wal.read_next("atomic_test", true).unwrap();
}
let batch: Vec<&[u8]> = vec![b"batch_0", b"batch_1", b"batch_2", b"batch_3"];
wal.batch_append_for_topic("atomic_test", &batch).unwrap();
let mut entries = Vec::new();
while let Some(entry) = wal.read_next("atomic_test", true).unwrap() {
entries.push(entry.data);
}
assert_eq!(entries.len(), 6);
assert_eq!(entries[0], b"initial_3");
assert_eq!(entries[1], b"initial_4");
assert_eq!(entries[2], b"batch_0");
assert_eq!(entries[3], b"batch_1");
assert_eq!(entries[4], b"batch_2");
assert_eq!(entries[5], b"batch_3");
cleanup_test_env();
}
#[test]
fn test_chaos_many_topics_racing_batch_and_regular() {
let _guard = setup_test_env();
enable_fd_backend();
let wal = Arc::new(
Walrus::with_consistency_and_schedule(
ReadConsistency::StrictlyAtOnce,
FsyncSchedule::NoFsync,
)
.unwrap(),
);
let num_topics = 20;
let mut handles = vec![];
for topic_id in 0..num_topics {
let wal_clone = wal.clone();
let h1 = thread::spawn(move || {
let topic = format!("race_topic_{}", topic_id);
for batch_num in 0..20 {
let data = format!("t{}_b{}", topic_id, batch_num);
let entries: Vec<&[u8]> = vec![data.as_bytes(), data.as_bytes(), data.as_bytes()];
let _ = wal_clone.batch_append_for_topic(&topic, &entries);
thread::sleep(Duration::from_micros(100));
}
});
let wal_clone = wal.clone();
let h2 = thread::spawn(move || {
let topic = format!("race_topic_{}", topic_id);
for entry_num in 0..20 {
let data = format!("t{}_r{}", topic_id, entry_num);
let _ = wal_clone.append_for_topic(&topic, data.as_bytes());
thread::sleep(Duration::from_micros(100));
}
});
handles.push(h1);
handles.push(h2);
}
for handle in handles {
handle.join().unwrap();
}
for topic_id in 0..num_topics {
let topic = format!("race_topic_{}", topic_id);
let mut count = 0;
while wal.read_next(&topic, true).unwrap().is_some() {
count += 1;
}
assert!(count > 0, "Topic {} should have entries", topic_id);
}
cleanup_test_env();
}
#[test]
fn test_chaos_sequential_batches_with_crashes() {
let _guard = setup_test_env();
enable_fd_backend();
let test_key = "sequential_crashes_test";
for cycle in 0..5 {
let wal = Walrus::with_consistency_and_schedule_for_key(
test_key,
ReadConsistency::StrictlyAtOnce,
FsyncSchedule::SyncEach,
)
.unwrap();
let data = format!("cycle_{}", cycle);
let entries: Vec<&[u8]> = vec![data.as_bytes(), data.as_bytes()];
wal.batch_append_for_topic("crash_cycles", &entries)
.unwrap();
drop(wal);
thread::sleep(Duration::from_millis(50));
}
let wal = Walrus::with_consistency_and_schedule_for_key(
test_key,
ReadConsistency::StrictlyAtOnce,
FsyncSchedule::SyncEach,
)
.unwrap();
for cycle in 0..5 {
let expected = format!("cycle_{}", cycle);
for _ in 0..2 {
let entry = wal.read_next("crash_cycles", true).unwrap().unwrap();
assert_eq!(entry.data, expected.as_bytes());
}
}
assert!(wal.read_next("crash_cycles", true).unwrap().is_none());
cleanup_test_env();
}
#[test]
fn test_chaos_batch_with_exactly_block_size_entries() {
let _guard = setup_test_env();
enable_fd_backend();
let wal = Walrus::with_consistency_and_schedule(
ReadConsistency::StrictlyAtOnce,
FsyncSchedule::NoFsync,
)
.unwrap();
let metadata_overhead = 64;
let exact_size = (10 * 1024 * 1024 - metadata_overhead) as usize;
let entries_data: Vec<Vec<u8>> = (0..5).map(|i| vec![i as u8; exact_size]).collect();
let entries: Vec<&[u8]> = entries_data.iter().map(|v| v.as_slice()).collect();
wal.batch_append_for_topic("exact_topic", &entries).unwrap();
for i in 0..5 {
let entry = wal.read_next("exact_topic", true).unwrap().unwrap();
assert_eq!(entry.data.len(), exact_size);
assert_eq!(entry.data[0], i as u8);
}
cleanup_test_env();
}
#[test]
fn test_chaos_hammering_same_topic_with_batches() {
let _guard = setup_test_env();
enable_fd_backend();
let wal = Arc::new(
Walrus::with_consistency_and_schedule(
ReadConsistency::StrictlyAtOnce,
FsyncSchedule::NoFsync,
)
.unwrap(),
);
let num_threads = 20;
let barrier = Arc::new(Barrier::new(num_threads));
let mut handles = vec![];
for thread_id in 0..num_threads {
let wal_clone = wal.clone();
let barrier_clone = barrier.clone();
let handle = thread::spawn(move || {
barrier_clone.wait();
let mut success_count = 0;
let mut blocked_count = 0;
for _ in 0..10 {
let data = format!("thread_{}", thread_id);
let entries: Vec<&[u8]> = vec![data.as_bytes(), data.as_bytes()];
match wal_clone.batch_append_for_topic("hammered_topic", &entries) {
Ok(_) => success_count += 1,
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => blocked_count += 1,
Err(e) => panic!("Unexpected error: {}", e),
}
thread::sleep(Duration::from_micros(50));
}
(success_count, blocked_count)
});
handles.push(handle);
}
let mut total_success = 0;
let mut total_blocked = 0;
for handle in handles {
let (success, blocked) = handle.join().unwrap();
total_success += success;
total_blocked += blocked;
}
assert!(total_success > 0, "Expected some successful batch writes");
assert!(total_blocked > 0, "Expected some blocked batch writes");
assert_eq!(total_success + total_blocked, num_threads * 10);
let mut count = 0;
while wal.read_next("hammered_topic", true).unwrap().is_some() {
count += 1;
}
assert_eq!(count, total_success * 2);
cleanup_test_env();
}
#[test]
fn test_chaos_zero_length_entries_in_batch() {
let _guard = setup_test_env();
enable_fd_backend();
let wal = Walrus::with_consistency_and_schedule(
ReadConsistency::StrictlyAtOnce,
FsyncSchedule::NoFsync,
)
.unwrap();
let entries: Vec<&[u8]> = vec![b"", b"normal", b"", b"", b"another", b""];
wal.batch_append_for_topic("zero_topic", &entries).unwrap();
assert_eq!(
wal.read_next("zero_topic", true).unwrap().unwrap().data,
b""
);
assert_eq!(
wal.read_next("zero_topic", true).unwrap().unwrap().data,
b"normal"
);
assert_eq!(
wal.read_next("zero_topic", true).unwrap().unwrap().data,
b""
);
assert_eq!(
wal.read_next("zero_topic", true).unwrap().unwrap().data,
b""
);
assert_eq!(
wal.read_next("zero_topic", true).unwrap().unwrap().data,
b"another"
);
assert_eq!(
wal.read_next("zero_topic", true).unwrap().unwrap().data,
b""
);
cleanup_test_env();
}
#[test]
fn test_chaos_batch_interspersed_with_frequent_fsync() {
let _guard = setup_test_env();
enable_fd_backend();
let wal = Walrus::with_consistency_and_schedule(
ReadConsistency::StrictlyAtOnce,
FsyncSchedule::SyncEach,
)
.unwrap();
for i in 0..10 {
let data = format!("batch_{}", i);
let entries: Vec<&[u8]> = vec![data.as_bytes(); 5];
wal.batch_append_for_topic("fsync_topic", &entries).unwrap();
}
let mut count = 0;
while wal.read_next("fsync_topic", true).unwrap().is_some() {
count += 1;
}
assert_eq!(count, 50);
cleanup_test_env();
}
#[test]
fn test_batch_single_entry() {
let _guard = setup_test_env();
enable_fd_backend();
let wal = Walrus::with_consistency_and_schedule(
ReadConsistency::StrictlyAtOnce,
FsyncSchedule::NoFsync,
)
.unwrap();
let entries: Vec<&[u8]> = vec![b"single_entry"];
wal.batch_append_for_topic("test_topic", &entries).unwrap();
let entry = wal.read_next("test_topic", true).unwrap().unwrap();
assert_eq!(entry.data, b"single_entry");
cleanup_test_env();
}
#[test]
fn test_batch_exactly_at_block_boundary() {
let _guard = setup_test_env();
enable_fd_backend();
let wal = Walrus::with_consistency_and_schedule(
ReadConsistency::StrictlyAtOnce,
FsyncSchedule::NoFsync,
)
.unwrap();
let metadata_overhead = 64;
let entry_size = (10 * 1024 * 1024 - metadata_overhead * 2) / 2;
let e1 = vec![0xAA; entry_size];
let e2 = vec![0xBB; entry_size];
let entries: Vec<&[u8]> = vec![&e1, &e2];
wal.batch_append_for_topic("test_topic", &entries).unwrap();
let r1 = wal.read_next("test_topic", true).unwrap().unwrap();
assert_eq!(r1.data[0], 0xAA);
let r2 = wal.read_next("test_topic", true).unwrap().unwrap();
assert_eq!(r2.data[0], 0xBB);
cleanup_test_env();
}
#[test]
fn test_batch_then_regular_write() {
let _guard = setup_test_env();
enable_fd_backend();
let wal = Walrus::with_consistency_and_schedule(
ReadConsistency::StrictlyAtOnce,
FsyncSchedule::NoFsync,
)
.unwrap();
let entries: Vec<&[u8]> = vec![b"batch1", b"batch2"];
wal.batch_append_for_topic("test_topic", &entries).unwrap();
wal.append_for_topic("test_topic", b"regular").unwrap();
assert_eq!(
wal.read_next("test_topic", true).unwrap().unwrap().data,
b"batch1"
);
assert_eq!(
wal.read_next("test_topic", true).unwrap().unwrap().data,
b"batch2"
);
assert_eq!(
wal.read_next("test_topic", true).unwrap().unwrap().data,
b"regular"
);
cleanup_test_env();
}
#[test]
fn test_multiple_sequential_batches() {
let _guard = setup_test_env();
enable_fd_backend();
let wal = Walrus::with_consistency_and_schedule(
ReadConsistency::StrictlyAtOnce,
FsyncSchedule::NoFsync,
)
.unwrap();
let total_batches = 4;
for batch_num in 0..total_batches {
let e1 = format!("batch_{}_entry_1", batch_num);
let e2 = format!("batch_{}_entry_2", batch_num);
let e3 = format!("batch_{}_entry_3", batch_num);
let entries: Vec<&[u8]> = vec![e1.as_bytes(), e2.as_bytes(), e3.as_bytes()];
wal.batch_append_for_topic("test_topic", &entries).unwrap();
}
for batch_num in 0..total_batches {
for entry_num in 1..=3 {
let entry = wal.read_next("test_topic", true).unwrap().unwrap();
let expected = format!("batch_{}_entry_{}", batch_num, entry_num);
assert_eq!(entry.data, expected.as_bytes());
}
}
cleanup_test_env();
}
#[test]
fn test_integrity_batch_write_sequential_numbers() {
let _guard = setup_test_env();
enable_fd_backend();
let wal = Walrus::with_consistency_and_schedule(
ReadConsistency::StrictlyAtOnce,
FsyncSchedule::NoFsync,
)
.unwrap();
let batch_size = 64;
let entries_data: Vec<Vec<u8>> = (0..batch_size)
.map(|i| {
let mut data = vec![];
data.extend_from_slice(&(i as u64).to_le_bytes());
data.extend_from_slice(&format!("entry_{}", i).as_bytes());
data
})
.collect();
let entries: Vec<&[u8]> = entries_data.iter().map(|v| v.as_slice()).collect();
wal.batch_append_for_topic("integrity_seq", &entries)
.unwrap();
for i in 0..batch_size {
let entry = wal.read_next("integrity_seq", true).unwrap().unwrap();
let num = u64::from_le_bytes([
entry.data[0],
entry.data[1],
entry.data[2],
entry.data[3],
entry.data[4],
entry.data[5],
entry.data[6],
entry.data[7],
]);
assert_eq!(num, i as u64, "Numeric prefix mismatch at entry {}", i);
let text = &entry.data[8..];
let expected = format!("entry_{}", i);
assert_eq!(text, expected.as_bytes(), "Text mismatch at entry {}", i);
}
assert!(wal.read_next("integrity_seq", true).unwrap().is_none());
cleanup_test_env();
}
#[test]
fn test_integrity_batch_write_random_patterns() {
let _guard = setup_test_env();
enable_fd_backend();
let wal = Walrus::with_consistency_and_schedule(
ReadConsistency::StrictlyAtOnce,
FsyncSchedule::NoFsync,
)
.unwrap();
let batch_size = 50;
let entries_data: Vec<Vec<u8>> = (0..batch_size)
.map(|i| {
let size = 1000 + (i * 137) % 5000;
let mut data = vec![0u8; size];
for (j, byte) in data.iter_mut().enumerate() {
*byte = ((i + j) % 256) as u8;
}
data
})
.collect();
let entries: Vec<&[u8]> = entries_data.iter().map(|v| v.as_slice()).collect();
wal.batch_append_for_topic("integrity_random", &entries)
.unwrap();
for i in 0..batch_size {
let entry = wal.read_next("integrity_random", true).unwrap().unwrap();
let expected_size = 1000 + (i * 137) % 5000;
assert_eq!(
entry.data.len(),
expected_size,
"Size mismatch at entry {}",
i
);
for (j, &byte) in entry.data.iter().enumerate() {
let expected_byte = ((i + j) % 256) as u8;
assert_eq!(
byte, expected_byte,
"Byte mismatch at entry {} offset {}",
i, j
);
}
}
assert!(wal.read_next("integrity_random", true).unwrap().is_none());
cleanup_test_env();
}
#[test]
fn test_integrity_batch_write_large_entries_exact_match() {
let _guard = setup_test_env();
enable_fd_backend();
let wal = Walrus::with_consistency_and_schedule(
ReadConsistency::StrictlyAtOnce,
FsyncSchedule::NoFsync,
)
.unwrap();
let batch_size = 10;
let entry_size = 5 * 1024 * 1024;
let entries_data: Vec<Vec<u8>> = (0..batch_size)
.map(|i| {
let mut data = vec![0u8; entry_size];
let pattern = (i as u8).wrapping_mul(17).wrapping_add(37);
for (j, byte) in data.iter_mut().enumerate() {
*byte = pattern.wrapping_add((j % 256) as u8);
}
data[0..8].copy_from_slice(&(i as u64).to_le_bytes());
let len = data.len();
data[len - 8..len].copy_from_slice(&(i as u64).to_le_bytes());
data
})
.collect();
let entries: Vec<&[u8]> = entries_data.iter().map(|v| v.as_slice()).collect();
wal.batch_append_for_topic("integrity_large", &entries)
.unwrap();
for i in 0..batch_size {
let entry = wal.read_next("integrity_large", true).unwrap().unwrap();
assert_eq!(entry.data.len(), entry_size, "Size mismatch at entry {}", i);
let start_idx = u64::from_le_bytes([
entry.data[0],
entry.data[1],
entry.data[2],
entry.data[3],
entry.data[4],
entry.data[5],
entry.data[6],
entry.data[7],
]);
assert_eq!(start_idx, i as u64, "Start marker mismatch at entry {}", i);
let len = entry.data.len();
let end_idx = u64::from_le_bytes([
entry.data[len - 8],
entry.data[len - 7],
entry.data[len - 6],
entry.data[len - 5],
entry.data[len - 4],
entry.data[len - 3],
entry.data[len - 2],
entry.data[len - 1],
]);
assert_eq!(end_idx, i as u64, "End marker mismatch at entry {}", i);
let pattern = (i as u8).wrapping_mul(17).wrapping_add(37);
for j in 8..len - 8 {
let expected = pattern.wrapping_add((j % 256) as u8);
assert_eq!(
entry.data[j], expected,
"Pattern mismatch at entry {} offset {}",
i, j
);
}
}
assert!(wal.read_next("integrity_large", true).unwrap().is_none());
cleanup_test_env();
}
#[test]
fn test_integrity_batch_spanning_blocks_exact_data() {
let _guard = setup_test_env();
enable_fd_backend();
let wal = Walrus::with_consistency_and_schedule(
ReadConsistency::StrictlyAtOnce,
FsyncSchedule::NoFsync,
)
.unwrap();
let batch_size = 20;
let entry_size = 3 * 1024 * 1024;
let entries_data: Vec<Vec<u8>> = (0..batch_size)
.map(|i| {
let mut data = vec![0u8; entry_size];
let seed = (i as u32).wrapping_mul(0x9e3779b9);
for chunk_idx in 0..entry_size / 4 {
let value = seed.wrapping_add(chunk_idx as u32);
let offset = chunk_idx * 4;
data[offset..offset + 4].copy_from_slice(&value.to_le_bytes());
}
data
})
.collect();
let entries: Vec<&[u8]> = entries_data.iter().map(|v| v.as_slice()).collect();
wal.batch_append_for_topic("integrity_spanning", &entries)
.unwrap();
for i in 0..batch_size {
let entry = wal.read_next("integrity_spanning", true).unwrap().unwrap();
assert_eq!(entry.data.len(), entry_size, "Size mismatch at entry {}", i);
let seed = (i as u32).wrapping_mul(0x9e3779b9);
for chunk_idx in 0..entry_size / 4 {
let offset = chunk_idx * 4;
let value = u32::from_le_bytes([
entry.data[offset],
entry.data[offset + 1],
entry.data[offset + 2],
entry.data[offset + 3],
]);
let expected = seed.wrapping_add(chunk_idx as u32);
assert_eq!(
value, expected,
"Data corruption at entry {} offset {}",
i, offset
);
}
}
assert!(wal.read_next("integrity_spanning", true).unwrap().is_none());
cleanup_test_env();
}
#[test]
fn test_integrity_multiple_batches_sequential() {
let _guard = setup_test_env();
enable_fd_backend();
let wal = Walrus::with_consistency_and_schedule(
ReadConsistency::StrictlyAtOnce,
FsyncSchedule::NoFsync,
)
.unwrap();
let num_batches = 6;
let entries_per_batch = 5;
for batch_id in 0..num_batches {
let entries_data: Vec<Vec<u8>> = (0..entries_per_batch)
.map(|entry_id| {
let mut data = Vec::new();
data.extend_from_slice(&(batch_id as u32).to_le_bytes());
data.extend_from_slice(&(entry_id as u32).to_le_bytes());
data.extend_from_slice(format!("batch_{}_entry_{}", batch_id, entry_id).as_bytes());
data
})
.collect();
let entries: Vec<&[u8]> = entries_data.iter().map(|v| v.as_slice()).collect();
wal.batch_append_for_topic("integrity_multi", &entries)
.unwrap();
}
for batch_id in 0..num_batches {
for entry_id in 0..entries_per_batch {
let entry = wal.read_next("integrity_multi", true).unwrap().unwrap();
let batch_id_read =
u32::from_le_bytes([entry.data[0], entry.data[1], entry.data[2], entry.data[3]]);
let entry_id_read =
u32::from_le_bytes([entry.data[4], entry.data[5], entry.data[6], entry.data[7]]);
assert_eq!(batch_id_read, batch_id, "Batch ID mismatch");
assert_eq!(entry_id_read, entry_id, "Entry ID mismatch");
let text = &entry.data[8..];
let expected = format!("batch_{}_entry_{}", batch_id, entry_id);
assert_eq!(text, expected.as_bytes(), "Text mismatch");
}
}
assert!(wal.read_next("integrity_multi", true).unwrap().is_none());
cleanup_test_env();
}
#[test]
fn test_integrity_batch_after_crash_recovery() {
let _guard = setup_test_env();
enable_fd_backend();
let test_key = "integrity_crash_test";
{
let wal = Walrus::with_consistency_and_schedule_for_key(
test_key,
ReadConsistency::StrictlyAtOnce,
FsyncSchedule::SyncEach,
)
.unwrap();
let entries_data: Vec<Vec<u8>> = (0..12)
.map(|i| {
let mut data = vec![0u8; 4096];
data[0..8].copy_from_slice(&(i as u64).to_le_bytes());
data[8..16].copy_from_slice(&(0xDEADBEEFCAFEBABE_u64).to_le_bytes());
for j in 16..4096 {
data[j] = ((i + j) % 256) as u8;
}
data
})
.collect();
let entries: Vec<&[u8]> = entries_data.iter().map(|v| v.as_slice()).collect();
wal.batch_append_for_topic("integrity_crash", &entries)
.unwrap();
drop(wal);
thread::sleep(Duration::from_millis(50));
}
{
let wal = Walrus::with_consistency_and_schedule_for_key(
test_key,
ReadConsistency::StrictlyAtOnce,
FsyncSchedule::SyncEach,
)
.unwrap();
for i in 0..12 {
let entry = wal.read_next("integrity_crash", true).unwrap().unwrap();
assert_eq!(
entry.data.len(),
4096,
"Size mismatch at entry {} after recovery",
i
);
let idx = u64::from_le_bytes([
entry.data[0],
entry.data[1],
entry.data[2],
entry.data[3],
entry.data[4],
entry.data[5],
entry.data[6],
entry.data[7],
]);
assert_eq!(
idx, i as u64,
"Index mismatch at entry {} after recovery",
i
);
let magic = u64::from_le_bytes([
entry.data[8],
entry.data[9],
entry.data[10],
entry.data[11],
entry.data[12],
entry.data[13],
entry.data[14],
entry.data[15],
]);
assert_eq!(
magic, 0xDEADBEEFCAFEBABE_u64,
"Magic mismatch at entry {} after recovery",
i
);
for j in 16..4096 {
let expected = ((i + j) % 256) as u8;
assert_eq!(
entry.data[j], expected,
"Data corruption at entry {} offset {} after recovery",
i, j
);
}
}
assert!(wal.read_next("integrity_crash", true).unwrap().is_none());
}
cleanup_test_env();
}
#[test]
#[ignore]
fn test_stress_large_batch_1000_entries() {
let _guard = setup_test_env();
enable_fd_backend();
let wal = Walrus::with_consistency_and_schedule(
ReadConsistency::StrictlyAtOnce,
FsyncSchedule::NoFsync,
)
.unwrap();
test_println!("[stress] Creating 1000 entries of 1MB each...");
let entry = vec![0xCD; 1024 * 1024];
let entries: Vec<&[u8]> = (0..1000).map(|_| entry.as_slice()).collect();
test_println!("[stress] Writing batch of 1GB...");
let start = std::time::Instant::now();
wal.batch_append_for_topic("stress_topic", &entries)
.unwrap();
let duration = start.elapsed();
test_println!(
"[stress] Batch write of 1000x1MB entries took: {:?}",
duration
);
test_println!("[stress] Verifying all 1000 entries...");
for i in 0..1000 {
if i % 100 == 0 {
test_println!("[stress] Verified {} entries...", i);
}
let e = wal.read_next("stress_topic", true).unwrap().unwrap();
assert_eq!(e.data.len(), 1024 * 1024);
assert_eq!(e.data[0], 0xCD);
}
test_println!("[stress] All 1000 entries verified successfully!");
cleanup_test_env();
}
#[test]
#[ignore]
fn test_stress_many_small_batches() {
let _guard = setup_test_env();
enable_fd_backend();
let wal = Walrus::with_consistency_and_schedule(
ReadConsistency::StrictlyAtOnce,
FsyncSchedule::NoFsync,
)
.unwrap();
test_println!("[stress] Writing 10000 batches of 10 entries each...");
let start = std::time::Instant::now();
for i in 0..10000 {
if i % 1000 == 0 {
test_println!("[stress] Written {} batches...", i);
}
let data = format!("batch_{}", i);
let entries: Vec<&[u8]> = (0..10).map(|_| data.as_bytes()).collect();
wal.batch_append_for_topic("stress_topic", &entries)
.unwrap();
}
let duration = start.elapsed();
test_println!("[stress] 10000 batches of 10 entries took: {:?}", duration);
test_println!("[stress] Reading and verifying 100000 entries...");
let mut count = 0;
while wal.read_next("stress_topic", true).unwrap().is_some() {
count += 1;
if count % 10000 == 0 {
test_println!("[stress] Read {} entries...", count);
}
}
assert_eq!(count, 100000);
test_println!("[stress] All 100000 entries verified successfully!");
cleanup_test_env();
}
#[test]
fn test_rollback_data_becomes_invisible_to_readers() {
let _guard = setup_test_env();
enable_fd_backend();
let wal = Arc::new(
Walrus::with_consistency_and_schedule(
ReadConsistency::StrictlyAtOnce,
FsyncSchedule::NoFsync,
)
.unwrap(),
);
wal.append_for_topic("rollback_test", b"initial").unwrap();
let initial = wal.read_next("rollback_test", true).unwrap().unwrap();
assert_eq!(initial.data, b"initial");
let barrier = Arc::new(Barrier::new(2));
let mut handles = vec![];
for i in 0..2 {
let wal_clone = wal.clone();
let barrier_clone = barrier.clone();
let handle = thread::spawn(move || {
let entry = vec![i as u8; 5 * 1024 * 1024];
let entries: Vec<&[u8]> = (0..3).map(|_| entry.as_slice()).collect();
barrier_clone.wait();
wal_clone.batch_append_for_topic("rollback_test", &entries)
});
handles.push(handle);
}
let mut successes = 0;
let mut rollbacks = 0;
for handle in handles {
match handle.join().unwrap() {
Ok(_) => successes += 1,
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => rollbacks += 1,
Err(e) => {
test_println!(
"Batch write error (expected in resource-constrained tests): {:?}",
e
);
rollbacks += 1;
}
}
}
assert!(successes <= 1, "At most one batch should succeed");
assert!(rollbacks >= 1, "At least one batch should be blocked/fail");
let mut count = 0;
while wal.read_next("rollback_test", true).unwrap().is_some() {
count += 1;
}
if successes == 1 {
assert_eq!(
count, 3,
"Should read exactly 3 entries from the successful batch"
);
} else {
assert_eq!(count, 0, "Should read no entries if no batch succeeded");
}
cleanup_test_env();
}
#[test]
fn test_rollback_allows_data_overwrite() {
let _guard = setup_test_env();
enable_fd_backend();
let wal = Walrus::with_consistency_and_schedule(
ReadConsistency::StrictlyAtOnce,
FsyncSchedule::NoFsync,
)
.unwrap();
wal.append_for_topic("overwrite_test", b"before_batch")
.unwrap();
let entry = wal.read_next("overwrite_test", true).unwrap().unwrap();
assert_eq!(entry.data, b"before_batch");
let entries: Vec<&[u8]> = vec![b"batch1", b"batch2"];
wal.batch_append_for_topic("overwrite_test", &entries)
.unwrap();
assert_eq!(
wal.read_next("overwrite_test", true).unwrap().unwrap().data,
b"batch1"
);
assert_eq!(
wal.read_next("overwrite_test", true).unwrap().unwrap().data,
b"batch2"
);
wal.append_for_topic("overwrite_test", b"after_batch")
.unwrap();
assert_eq!(
wal.read_next("overwrite_test", true).unwrap().unwrap().data,
b"after_batch"
);
cleanup_test_env();
}
#[test]
fn test_rollback_block_state_consistency() {
let _guard = setup_test_env();
enable_fd_backend();
let wal = Arc::new(
Walrus::with_consistency_and_schedule(
ReadConsistency::StrictlyAtOnce,
FsyncSchedule::NoFsync,
)
.unwrap(),
);
let num_threads = 2;
let barrier = Arc::new(Barrier::new(num_threads));
let mut handles = vec![];
for i in 0..num_threads {
let wal_clone = wal.clone();
let barrier_clone = barrier.clone();
let handle = thread::spawn(move || {
let entry = vec![i as u8; 512 * 1024];
let entries: Vec<&[u8]> = (0..3).map(|_| entry.as_slice()).collect();
barrier_clone.wait();
wal_clone.batch_append_for_topic("block_state_test", &entries)
});
handles.push(handle);
}
let mut successes = 0;
let mut failures = 0;
for handle in handles {
match handle.join().unwrap() {
Ok(_) => successes += 1,
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => failures += 1,
Err(e) => {
test_println!(
"Batch write error (expected in resource-constrained tests): {:?}",
e
);
failures += 1;
}
}
}
assert!(successes <= 1, "At most one batch should succeed");
assert!(failures >= 1, "At least one batch should fail/be blocked");
let test_batch: Vec<&[u8]> = vec![b"consistency_check"];
wal.batch_append_for_topic("block_state_test", &test_batch)
.unwrap();
let mut count = 0;
while wal.read_next("block_state_test", true).unwrap().is_some() {
count += 1;
}
assert!(
count > 0,
"Should have some readable entries after rollbacks"
);
cleanup_test_env();
}
#[test]
fn test_rollback_preserves_existing_data() {
let _guard = setup_test_env();
enable_fd_backend();
let wal = Walrus::with_consistency_and_schedule(
ReadConsistency::StrictlyAtOnce,
FsyncSchedule::NoFsync,
)
.unwrap();
for i in 0..5 {
let data = format!("stable_entry_{}", i);
wal.append_for_topic("rollback_preserve", data.as_bytes())
.unwrap();
}
let mut read_entries = Vec::new();
for i in 0..5 {
let entry = wal.read_next("rollback_preserve", true).unwrap().unwrap();
read_entries.push(entry.data);
}
assert_eq!(read_entries.len(), 5);
let entry_data = vec![0xFF; 5 * 1024 * 1024];
let entries: Vec<&[u8]> = vec![entry_data.as_slice(); 2];
let batch_result = wal.batch_append_for_topic("rollback_preserve_batch", &entries);
match batch_result {
Ok(_) => {
test_println!("Batch write succeeded");
let mut batch_count = 0;
while wal
.read_next("rollback_preserve_batch", true)
.unwrap()
.is_some()
{
batch_count += 1;
}
assert_eq!(
batch_count, 2,
"Should read 2 entries from successful batch"
);
}
Err(e) => {
test_println!(
"Batch write failed (expected in resource-constrained tests): {:?}",
e
);
assert!(
wal.read_next("rollback_preserve_batch", true)
.unwrap()
.is_none()
);
}
}
assert!(wal.read_next("rollback_preserve", true).unwrap().is_none());
wal.append_for_topic("rollback_preserve", b"after_rollbacks")
.unwrap();
let entry = wal.read_next("rollback_preserve", true).unwrap().unwrap();
assert_eq!(entry.data, b"after_rollbacks");
cleanup_test_env();
}
#[test]
fn test_rollback_file_state_tracking() {
let _guard = setup_test_env();
enable_fd_backend();
let wal = Walrus::with_consistency_and_schedule(
ReadConsistency::StrictlyAtOnce,
FsyncSchedule::NoFsync,
)
.unwrap();
for batch_num in 0..15 {
if batch_num % 5 == 0 {
let large_entry = vec![batch_num as u8; 15 * 1024 * 1024];
let entries: Vec<&[u8]> = vec![large_entry.as_slice(); 3];
let _ = wal.batch_append_for_topic("file_state_test", &entries);
} else {
let small_data = format!("batch_{}", batch_num);
let entries: Vec<&[u8]> = vec![small_data.as_bytes(); 10];
let _ = wal.batch_append_for_topic("file_state_test", &entries);
}
}
let final_batch: Vec<&[u8]> = vec![b"final_entry"];
wal.batch_append_for_topic("file_state_test", &final_batch)
.unwrap();
let mut found_final = false;
let mut total_count = 0;
while let Some(entry) = wal.read_next("file_state_test", true).unwrap() {
if entry.data == b"final_entry" {
found_final = true;
}
total_count += 1;
}
assert!(
found_final,
"Should find the final entry after all operations"
);
assert!(total_count > 0, "Should have read some entries");
cleanup_test_env();
}