use crate::error::EngineError;
use crate::wal::WalWriter;
use std::sync::{Arc, Condvar, Mutex};
use std::thread::JoinHandle;
use std::time::Duration;
pub const MAX_SYNC_FAILURES: u32 = 5;
pub(crate) struct WalSyncState {
pub(crate) wal_writer: WalWriter,
pub(crate) buffered_bytes: usize,
pub(crate) shutdown: bool,
pub(crate) sync_error_count: u32,
pub(crate) poisoned: Option<String>,
}
pub(crate) fn sync_thread_loop(wal_state: Arc<(Mutex<WalSyncState>, Condvar)>, interval: Duration) {
let (lock, cvar) = &*wal_state;
loop {
let mut state = lock.lock().unwrap();
let result = cvar.wait_timeout(state, interval).unwrap();
state = result.0;
if state.shutdown && state.buffered_bytes == 0 {
break;
}
if state.buffered_bytes > 0 {
match state.wal_writer.sync() {
Ok(()) => {
state.buffered_bytes = 0;
state.sync_error_count = 0;
cvar.notify_all();
}
Err(e) => {
state.sync_error_count += 1;
eprintln!(
"WAL sync error ({}/{}): {}",
state.sync_error_count, MAX_SYNC_FAILURES, e
);
if state.sync_error_count >= MAX_SYNC_FAILURES {
state.poisoned = Some(format!(
"WAL sync failed {} consecutive times, last error: {}",
state.sync_error_count, e
));
cvar.notify_all(); break; }
}
}
}
if state.shutdown {
break;
}
}
}
pub(crate) fn shutdown_sync_thread(
wal_state: &Arc<(Mutex<WalSyncState>, Condvar)>,
sync_thread: &mut Option<JoinHandle<()>>,
) -> Result<(), EngineError> {
{
let (lock, cvar) = &**wal_state;
let mut state = lock.lock().unwrap();
state.shutdown = true;
cvar.notify_all();
}
if let Some(handle) = sync_thread.take() {
if handle.join().is_err() {
eprintln!("WAL sync thread panicked during shutdown");
}
}
{
let (lock, _) = &**wal_state;
let mut state = lock.lock().unwrap();
if state.buffered_bytes > 0 {
state.wal_writer.sync()?;
state.buffered_bytes = 0;
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::WalOp;
use crate::wal::WalReader;
use std::collections::BTreeMap;
use tempfile::TempDir;
fn make_test_node(id: u64, key: &str) -> WalOp {
use crate::types::*;
let mut props = BTreeMap::new();
props.insert("name".to_string(), PropValue::String(key.to_string()));
WalOp::UpsertNode(NodeRecord {
id,
label_ids: NodeLabelSet::single(1).unwrap(),
key: key.to_string(),
props,
created_at: 1000 * id as i64,
updated_at: 1000 * id as i64 + 1,
weight: 0.5,
dense_vector: None,
sparse_vector: None,
last_write_seq: 0,
})
}
#[test]
fn test_sync_thread_basic_operation() {
let dir = TempDir::new().unwrap();
let writer = WalWriter::open_generation(dir.path(), 0).unwrap();
let state = WalSyncState {
wal_writer: writer,
buffered_bytes: 0,
shutdown: false,
sync_error_count: 0,
poisoned: None,
};
let wal_state = Arc::new((Mutex::new(state), Condvar::new()));
let wal_state_clone = Arc::clone(&wal_state);
let handle = std::thread::spawn(move || {
sync_thread_loop(wal_state_clone, Duration::from_millis(5));
});
{
let (lock, cvar) = &*wal_state;
let mut s = lock.lock().unwrap();
let bytes = s.wal_writer.append(&make_test_node(1, "test"), 1).unwrap();
s.buffered_bytes += bytes;
cvar.notify_all(); }
std::thread::sleep(Duration::from_millis(50));
{
let (lock, _) = &*wal_state;
let s = lock.lock().unwrap();
assert_eq!(s.buffered_bytes, 0);
}
shutdown_sync_thread(&wal_state, &mut Some(handle)).unwrap();
let reader = WalReader::new(dir.path());
let ops = reader.read_all().unwrap();
assert_eq!(ops.len(), 1);
}
#[test]
fn test_append_batch_returns_size() {
let dir = TempDir::new().unwrap();
let mut writer = WalWriter::open_generation(dir.path(), 0).unwrap();
let ops = vec![
(1u64, make_test_node(1, "a")),
(2u64, make_test_node(2, "b")),
];
let total = writer.append_batch(&ops).unwrap();
assert!(total > 0);
writer.sync().unwrap();
drop(writer);
let reader = WalReader::new(dir.path());
assert_eq!(reader.read_all().unwrap().len(), 4);
}
#[test]
fn test_shutdown_with_pending_buffered_data() {
let dir = TempDir::new().unwrap();
let mut writer = WalWriter::open_generation(dir.path(), 0).unwrap();
let bytes = writer.append(&make_test_node(1, "pending"), 1).unwrap();
writer.flush().unwrap();
let state = WalSyncState {
wal_writer: writer,
buffered_bytes: bytes,
shutdown: false,
sync_error_count: 0,
poisoned: None,
};
let wal_state = Arc::new((Mutex::new(state), Condvar::new()));
let wal_state_clone = Arc::clone(&wal_state);
let handle = std::thread::spawn(move || {
sync_thread_loop(wal_state_clone, Duration::from_millis(50));
});
std::thread::sleep(Duration::from_millis(10));
shutdown_sync_thread(&wal_state, &mut Some(handle)).unwrap();
{
let (lock, _) = &*wal_state;
let s = lock.lock().unwrap();
assert_eq!(s.buffered_bytes, 0, "shutdown should drain buffered data");
}
let reader = WalReader::new(dir.path());
let ops = reader.read_all().unwrap();
assert_eq!(ops.len(), 1);
}
#[test]
fn test_poisoned_state_is_visible() {
let dir = TempDir::new().unwrap();
let writer = WalWriter::open_generation(dir.path(), 0).unwrap();
let state = WalSyncState {
wal_writer: writer,
buffered_bytes: 0,
shutdown: false,
sync_error_count: 0,
poisoned: None,
};
let wal_state = Arc::new((Mutex::new(state), Condvar::new()));
{
let (lock, cvar) = &*wal_state;
let mut s = lock.lock().unwrap();
s.poisoned = Some("WAL sync failed 5 consecutive times".to_string());
s.shutdown = true;
cvar.notify_all();
}
{
let (lock, _) = &*wal_state;
let s = lock.lock().unwrap();
assert!(s.poisoned.is_some());
assert!(s.poisoned.as_ref().unwrap().contains("5 consecutive"));
}
}
#[test]
fn test_multiple_sync_cycles_drain_all() {
let dir = TempDir::new().unwrap();
let writer = WalWriter::open_generation(dir.path(), 0).unwrap();
let state = WalSyncState {
wal_writer: writer,
buffered_bytes: 0,
shutdown: false,
sync_error_count: 0,
poisoned: None,
};
let wal_state = Arc::new((Mutex::new(state), Condvar::new()));
let wal_state_clone = Arc::clone(&wal_state);
let handle = std::thread::spawn(move || {
sync_thread_loop(wal_state_clone, Duration::from_millis(5));
});
for i in 0..3 {
let (lock, cvar) = &*wal_state;
let mut s = lock.lock().unwrap();
let bytes = s
.wal_writer
.append(&make_test_node(i + 1, &format!("n{}", i)), i + 1)
.unwrap();
s.buffered_bytes += bytes;
cvar.notify_all();
drop(s);
std::thread::sleep(Duration::from_millis(30));
}
{
let (lock, _) = &*wal_state;
let s = lock.lock().unwrap();
assert_eq!(s.buffered_bytes, 0);
assert_eq!(s.sync_error_count, 0);
}
shutdown_sync_thread(&wal_state, &mut Some(handle)).unwrap();
let reader = WalReader::new(dir.path());
let ops = reader.read_all().unwrap();
assert_eq!(ops.len(), 3);
}
#[test]
fn test_sync_thread_shutdown_with_zero_buffered() {
let dir = TempDir::new().unwrap();
let writer = WalWriter::open_generation(dir.path(), 0).unwrap();
let state = WalSyncState {
wal_writer: writer,
buffered_bytes: 0,
shutdown: false,
sync_error_count: 0,
poisoned: None,
};
let wal_state = Arc::new((Mutex::new(state), Condvar::new()));
let wal_state_clone = Arc::clone(&wal_state);
let handle = std::thread::spawn(move || {
sync_thread_loop(wal_state_clone, Duration::from_millis(5));
});
shutdown_sync_thread(&wal_state, &mut Some(handle)).unwrap();
let reader = WalReader::new(dir.path());
assert!(reader.read_all().unwrap().is_empty());
}
}