use crate::core::record::Record;
use crate::error::FeoxError;
use crate::stats::Statistics;
use crate::storage::free_space::FreeSpaceManager;
use crate::storage::io::DiskIO;
use crate::storage::write_buffer::{WriteBuffer, WriteEntry};
use parking_lot::RwLock;
use std::sync::Arc;
use std::thread;
fn create_test_write_buffer() -> (WriteBuffer, Arc<Statistics>) {
let temp_file = tempfile::NamedTempFile::new().unwrap();
let file = temp_file.reopen().unwrap();
#[cfg(unix)]
let disk_io = DiskIO::new(Arc::new(file), false).unwrap();
#[cfg(not(unix))]
let disk_io = DiskIO::new_from_file(file).unwrap();
let free_space = Arc::new(RwLock::new({
let mut fs = FreeSpaceManager::new();
fs.initialize(1024 * 1024 * 1024).unwrap(); fs
}));
let stats = Arc::new(Statistics::new());
let wb = WriteBuffer::new(Arc::new(RwLock::new(disk_io)), free_space, stats.clone(), 2);
(wb, stats)
}
#[test]
fn test_write_buffer_creation() {
let (_wb, stats) = create_test_write_buffer();
assert_eq!(
stats
.writes_buffered
.load(std::sync::atomic::Ordering::Relaxed),
0
);
}
#[test]
fn test_add_write_operation() {
let (wb, stats) = create_test_write_buffer();
let record = Arc::new(Record::new(b"key".to_vec(), b"value".to_vec(), 100));
wb.add_write(crate::constants::Operation::Insert, record, 0)
.unwrap();
assert_eq!(
stats
.writes_buffered
.load(std::sync::atomic::Ordering::Relaxed),
1
);
}
#[test]
fn test_write_buffer_shutdown() {
let (mut wb, _stats) = create_test_write_buffer();
let num_shards = (num_cpus::get() / 2).max(1);
wb.start_workers(num_shards);
for i in 0..10 {
let record = Arc::new(Record::new(
format!("key_{}", i).into_bytes(),
format!("value_{}", i).into_bytes(),
100 + i as u64,
));
wb.add_write(crate::constants::Operation::Insert, record, 0)
.unwrap();
}
wb.initiate_shutdown();
wb.complete_shutdown();
let record = Arc::new(Record::new(b"after".to_vec(), b"shutdown".to_vec(), 200));
let result = wb.add_write(crate::constants::Operation::Insert, record, 0);
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), FeoxError::ShuttingDown));
}
#[test]
fn test_force_flush() {
let (mut wb, stats) = create_test_write_buffer();
let num_shards = (num_cpus::get() / 2).max(1);
wb.start_workers(num_shards);
for i in 0..5 {
let record = Arc::new(Record::new(
format!("key_{}", i).into_bytes(),
format!("value_{}", i).into_bytes(),
100 + i as u64,
));
wb.add_write(crate::constants::Operation::Insert, record, 0)
.unwrap();
}
wb.force_flush().unwrap();
assert!(stats.flush_count.load(std::sync::atomic::Ordering::Relaxed) > 0);
wb.complete_shutdown();
}
#[test]
fn test_concurrent_writes() {
let (mut wb_mut, stats) = create_test_write_buffer();
wb_mut.start_workers(4);
let wb = Arc::new(wb_mut);
let mut handles = vec![];
for t in 0..10 {
let wb_clone = Arc::clone(&wb);
handles.push(thread::spawn(move || {
for i in 0..100 {
let record = Arc::new(Record::new(
format!("thread{}:key{}", t, i).into_bytes(),
format!("value_{}_{}", t, i).into_bytes(),
1000 + (t * 100 + i) as u64,
));
wb_clone
.add_write(crate::constants::Operation::Insert, record, 0)
.unwrap();
}
}));
}
for handle in handles {
handle.join().unwrap();
}
assert_eq!(
stats
.writes_buffered
.load(std::sync::atomic::Ordering::Relaxed),
1000
);
}
#[test]
fn test_delete_operation() {
let (wb, stats) = create_test_write_buffer();
let record = Record::new(b"delete_key".to_vec(), b"value".to_vec(), 100);
record
.sector
.store(12345, std::sync::atomic::Ordering::Release);
wb.add_write(
crate::constants::Operation::Delete,
Arc::new(record),
5, )
.unwrap();
assert!(
stats
.writes_buffered
.load(std::sync::atomic::Ordering::Relaxed)
> 0
);
}
#[test]
fn test_update_operation() {
let (wb, stats) = create_test_write_buffer();
let record = Arc::new(Record::new(
b"update_key".to_vec(),
b"new_value".to_vec(),
200,
));
wb.add_write(
crate::constants::Operation::Update,
record,
9, )
.unwrap();
assert!(
stats
.writes_buffered
.load(std::sync::atomic::Ordering::Relaxed)
> 0
);
}
#[test]
fn test_write_buffer_full_trigger() {
let (mut wb, stats) = create_test_write_buffer();
let num_shards = (num_cpus::get() / 2).max(1);
wb.start_workers(num_shards);
for i in 0..20000 {
let record = Arc::new(Record::new(
format!("key_{}", i).into_bytes(),
vec![0u8; 2048], 100 + i as u64,
));
wb.add_write(crate::constants::Operation::Insert, record, 0)
.unwrap();
}
wb.force_flush().unwrap();
assert!(stats.flush_count.load(std::sync::atomic::Ordering::Relaxed) > 0);
wb.complete_shutdown();
}
#[test]
fn test_write_entry_fields() {
use std::sync::atomic::Ordering;
use std::time::Instant;
let record = Arc::new(Record::new(b"key".to_vec(), b"value".to_vec(), 100));
let entry = WriteEntry {
op: crate::constants::Operation::Insert,
record: record.clone(),
old_value_len: 0,
work_status: std::sync::atomic::AtomicU32::new(0),
retry_count: std::sync::atomic::AtomicU32::new(0),
timestamp: Instant::now(),
};
assert_eq!(entry.work_status.load(Ordering::Relaxed), 0);
assert_eq!(entry.retry_count.load(Ordering::Relaxed), 0);
entry.work_status.store(1, Ordering::Release);
entry.retry_count.store(3, Ordering::Release);
assert_eq!(entry.work_status.load(Ordering::Relaxed), 1);
assert_eq!(entry.retry_count.load(Ordering::Relaxed), 3);
}