use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Condvar, Mutex};
use std::thread::{self, JoinHandle};
use std::time::Duration;
use super::concurrent::{ConcurrentWal, ConcurrentWalConfig};
use super::flush_coordinator::{FlushCoordinator, FlushCoordinatorConfig, FlushStats};
use super::group_commit::GroupCommitCoordinator;
use super::{LSN, WalOperation};
use crate::core::error::{Error, Result, StorageError};
use crate::storage::wal::DurabilityMode;
#[derive(Clone)]
pub struct ConcurrentWalSystemConfig {
pub wal_dir: PathBuf,
pub num_stripes: usize,
pub stripe_capacity: usize,
pub segment_size: usize,
pub segments_to_retain: usize,
pub flush_interval_ms: u64,
pub durability_mode: DurabilityMode,
pub write_buffer_size: usize,
pub wal_cipher: Option<Arc<dyn crate::encryption::cipher::Cipher>>,
}
impl std::fmt::Debug for ConcurrentWalSystemConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ConcurrentWalSystemConfig")
.field("wal_dir", &self.wal_dir)
.field("num_stripes", &self.num_stripes)
.field("stripe_capacity", &self.stripe_capacity)
.field("segment_size", &self.segment_size)
.field("segments_to_retain", &self.segments_to_retain)
.field("flush_interval_ms", &self.flush_interval_ms)
.field("durability_mode", &self.durability_mode)
.field("write_buffer_size", &self.write_buffer_size)
.field(
"wal_cipher",
&self.wal_cipher.as_ref().map(|c| c.algorithm_name()),
)
.finish()
}
}
impl Default for ConcurrentWalSystemConfig {
fn default() -> Self {
Self {
wal_dir: PathBuf::from("data/wal"),
num_stripes: 16,
stripe_capacity: 1024,
segment_size: 64 * 1024 * 1024, segments_to_retain: 10,
flush_interval_ms: 10,
durability_mode: DurabilityMode::Synchronous,
write_buffer_size: 64 * 1024, wal_cipher: None,
}
}
}
impl ConcurrentWalSystemConfig {
pub fn new(wal_dir: impl Into<PathBuf>) -> Self {
Self {
wal_dir: wal_dir.into(),
..Default::default()
}
}
pub fn with_durability_mode(mut self, mode: DurabilityMode) -> Self {
self.durability_mode = mode;
self
}
pub fn with_num_stripes(mut self, num_stripes: usize) -> Self {
self.num_stripes = num_stripes.next_power_of_two();
self
}
pub fn with_flush_interval_ms(mut self, ms: u64) -> Self {
self.flush_interval_ms = ms;
self
}
}
struct FlushNotifier {
lock: Mutex<bool>,
condvar: Condvar,
}
impl FlushNotifier {
fn new() -> Self {
Self {
lock: Mutex::new(false),
condvar: Condvar::new(),
}
}
fn notify(&self) {
let mut guard = self.lock.lock().unwrap_or_else(|e| e.into_inner());
*guard = true;
self.condvar.notify_one();
}
fn wait_timeout(&self, duration: Duration) -> bool {
let mut guard = self.lock.lock().unwrap_or_else(|e| e.into_inner());
if *guard {
*guard = false; return true;
}
let (new_guard, result) = self
.condvar
.wait_timeout(guard, duration)
.unwrap_or_else(|e| e.into_inner());
guard = new_guard;
let was_signaled = *guard && !result.timed_out();
*guard = false; was_signaled
}
}
const FLUSH_ERROR_WARNING_THRESHOLD: u64 = 3;
struct BackgroundFlusher {
wal: Arc<ConcurrentWal>,
coordinator: Arc<FlushCoordinator>,
shutdown: Arc<AtomicBool>,
flush_notifier: Arc<FlushNotifier>,
group_commit: Option<Arc<GroupCommitCoordinator>>,
error_counter: Arc<AtomicU64>,
interval: Duration,
sync_on_flush: bool,
}
impl BackgroundFlusher {
fn run(&self) {
while !self.shutdown.load(Ordering::Relaxed) {
self.perform_flush_cycle();
self.flush_notifier.wait_timeout(self.interval);
}
self.perform_final_flush();
}
fn perform_flush_cycle(&self) {
let entries = self.wal.drain_all();
let should_mark_flushed = !entries.is_empty()
|| self.group_commit.as_ref().is_some_and(|gc| {
gc.current_batch_size()
.expect("GroupCommitCoordinator lock poisoned - flush thread cannot continue")
> 0
});
if !entries.is_empty() {
let result = self.coordinator.flush(entries, self.sync_on_flush);
self.handle_flush_result(result.map(|_| ()));
} else if should_mark_flushed {
self.handle_flush_result(Ok(()));
}
}
fn perform_final_flush(&self) {
let entries = self.wal.drain_all();
if !entries.is_empty() {
let result = self.coordinator.flush(entries, true);
self.handle_flush_result(result.map(|_| ()));
}
}
fn handle_flush_result(&self, result: Result<()>) {
match result {
Ok(_) => {
self.error_counter.store(0, Ordering::Relaxed);
if let Some(ref gc) = self.group_commit {
gc.mark_flushed(Ok(())).expect(
"GroupCommitCoordinator lock poisoned - flush thread cannot continue",
);
}
}
Err(e) => {
let errors = self.error_counter.fetch_add(1, Ordering::Relaxed) + 1;
if errors == FLUSH_ERROR_WARNING_THRESHOLD {
eprintln!(
"CRITICAL: WAL flush failed {} consecutive times. \
Data durability may be compromised. Last error: {}",
errors, e
);
} else {
eprintln!("WAL flush error: {}", e);
}
if let Some(ref gc) = self.group_commit {
gc.mark_flushed(Err(crate::core::error::Error::other(e.to_string())))
.expect(
"GroupCommitCoordinator lock poisoned - flush thread cannot continue",
);
}
}
}
}
}
pub struct ConcurrentWalSystem {
wal: Arc<ConcurrentWal>,
coordinator: Arc<FlushCoordinator>,
flush_thread: Option<JoinHandle<()>>,
shutdown_signal: Arc<AtomicBool>,
flush_notifier: Arc<FlushNotifier>,
durability_mode: DurabilityMode,
group_commit: Option<Arc<GroupCommitCoordinator>>,
consecutive_flush_errors: Arc<AtomicU64>,
}
impl ConcurrentWalSystem {
pub fn new(config: ConcurrentWalSystemConfig) -> Result<Self> {
let wal_config = ConcurrentWalConfig {
wal_dir: config.wal_dir.clone(),
num_stripes: config.num_stripes,
stripe_capacity: config.stripe_capacity,
segment_size: config.segment_size,
segments_to_retain: config.segments_to_retain,
};
let coordinator_config = FlushCoordinatorConfig {
wal_dir: config.wal_dir,
segment_size: config.segment_size,
segments_to_retain: config.segments_to_retain,
flush_interval_ms: config.flush_interval_ms,
sync_on_flush: matches!(
config.durability_mode,
DurabilityMode::Synchronous | DurabilityMode::GroupCommit { .. }
),
write_buffer_size: config.write_buffer_size,
wal_cipher: config.wal_cipher.clone(),
};
let wal = Arc::new(ConcurrentWal::new(wal_config)?);
let coordinator = Arc::new(FlushCoordinator::new(coordinator_config)?);
let shutdown_signal = Arc::new(AtomicBool::new(false));
let group_commit = match config.durability_mode {
DurabilityMode::GroupCommit {
max_batch_size,
max_delay_ms,
} => Some(Arc::new(GroupCommitCoordinator::new(
max_delay_ms,
max_batch_size,
))),
DurabilityMode::AsyncBatched {
max_batch_size,
max_delay_ms,
..
} => Some(Arc::new(GroupCommitCoordinator::new(
max_delay_ms,
max_batch_size,
))),
_ => None,
};
let flush_notifier = Arc::new(FlushNotifier::new());
let consecutive_flush_errors = Arc::new(AtomicU64::new(0));
let flush_thread = if matches!(
config.durability_mode,
DurabilityMode::Async { .. }
| DurabilityMode::GroupCommit { .. }
| DurabilityMode::AsyncBatched { .. }
) {
let wal_clone = Arc::clone(&wal);
let coordinator_clone = Arc::clone(&coordinator);
let shutdown_clone = Arc::clone(&shutdown_signal);
let flush_notifier_clone = Arc::clone(&flush_notifier);
let group_commit_clone = group_commit.clone();
let error_counter_clone = Arc::clone(&consecutive_flush_errors);
let flush_interval = Duration::from_millis(config.flush_interval_ms);
let sync_on_flush =
matches!(config.durability_mode, DurabilityMode::GroupCommit { .. });
Some(thread::spawn(move || {
Self::flush_loop(
wal_clone,
coordinator_clone,
shutdown_clone,
flush_notifier_clone,
group_commit_clone,
error_counter_clone,
flush_interval,
sync_on_flush,
);
}))
} else {
None
};
Ok(Self {
wal,
coordinator,
flush_thread,
shutdown_signal,
flush_notifier,
durability_mode: config.durability_mode,
group_commit,
consecutive_flush_errors,
})
}
#[allow(clippy::too_many_arguments)]
fn flush_loop(
wal: Arc<ConcurrentWal>,
coordinator: Arc<FlushCoordinator>,
shutdown: Arc<AtomicBool>,
flush_notifier: Arc<FlushNotifier>,
group_commit: Option<Arc<GroupCommitCoordinator>>,
error_counter: Arc<AtomicU64>,
interval: Duration,
sync_on_flush: bool,
) {
let flusher = BackgroundFlusher {
wal,
coordinator,
shutdown,
flush_notifier,
group_commit,
error_counter,
interval,
sync_on_flush,
};
flusher.run();
}
pub fn append(&self, operation: WalOperation) -> Result<LSN> {
match self.durability_mode {
DurabilityMode::Synchronous => self.append_sync(operation),
DurabilityMode::Async { .. }
| DurabilityMode::GroupCommit { .. }
| DurabilityMode::AsyncBatched { .. } => self.append_async(operation),
}
}
pub fn append_async(&self, operation: WalOperation) -> Result<LSN> {
self.wal.append_async(operation)
}
pub fn append_sync(&self, operation: WalOperation) -> Result<LSN> {
let (lsn, handle) = self.wal.append_with_handle(operation)?;
let entries = self.wal.drain_all();
if !entries.is_empty() {
self.coordinator.flush(entries, true)?;
}
handle.wait().map_err(|e| {
Error::Storage(StorageError::WalError {
reason: format!("WAL flush failed: {}", e),
})
})?;
Ok(lsn)
}
pub fn append_batch(&self, operations: Vec<WalOperation>) -> Result<Vec<LSN>> {
if operations.is_empty() {
return Ok(Vec::new());
}
match self.durability_mode {
DurabilityMode::Synchronous => {
let (lsns, handles) = self.wal.append_batch_with_handles(operations)?;
let entries = self.wal.drain_all();
if !entries.is_empty() {
self.coordinator.flush(entries, true).map_err(|e| {
Error::Storage(StorageError::WalError {
reason: format!("Failed to flush batch after drain: {}", e),
})
})?;
}
if let Some(last_handle) = handles.into_iter().last() {
last_handle.wait().map_err(|e| {
Error::Storage(StorageError::WalError {
reason: format!("WAL flush failed: {}", e),
})
})?;
}
Ok(lsns)
}
DurabilityMode::Async { .. }
| DurabilityMode::GroupCommit { .. }
| DurabilityMode::AsyncBatched { .. } => {
self.wal.append_batch(operations)
}
}
}
pub fn flush(&self) -> Result<FlushStats> {
let entries = self.wal.drain_all();
if entries.is_empty() {
return Ok(FlushStats::default());
}
let should_sync = !matches!(self.durability_mode, DurabilityMode::Async { .. });
self.coordinator.flush(entries, should_sync)
}
pub fn commit(&self) -> Result<Option<u64>> {
match self.durability_mode {
DurabilityMode::Synchronous => {
let entries = self.wal.drain_all();
if !entries.is_empty() {
self.coordinator.flush(entries, true)?;
}
Ok(None)
}
DurabilityMode::Async { .. } => {
Ok(None)
}
DurabilityMode::GroupCommit { .. } | DurabilityMode::AsyncBatched { .. } => {
if let Some(ref gc) = self.group_commit {
let (epoch, should_trigger) = gc.register_transaction()?;
if should_trigger {
self.flush_notifier.notify();
}
Ok(Some(epoch))
} else {
let entries = self.wal.drain_all();
if !entries.is_empty() {
self.coordinator.flush(entries, true)?;
}
Ok(None)
}
}
}
}
pub fn group_commit_coordinator(&self) -> Option<&Arc<GroupCommitCoordinator>> {
self.group_commit.as_ref()
}
pub fn current_lsn(&self) -> LSN {
self.wal.current_lsn()
}
pub fn total_appends(&self) -> u64 {
self.wal.total_appends()
}
pub fn total_flushed(&self) -> u64 {
self.coordinator.total_entries_flushed()
}
pub fn durability_mode(&self) -> DurabilityMode {
self.durability_mode
}
pub fn consecutive_flush_errors(&self) -> u64 {
self.consecutive_flush_errors.load(Ordering::Relaxed)
}
pub fn is_healthy(&self) -> bool {
self.consecutive_flush_errors() == 0
}
pub fn wal_dir(&self) -> &std::path::Path {
self.coordinator.wal_dir()
}
pub fn read_from(&self, start_lsn: LSN) -> Result<Vec<super::WalEntry>> {
crate::storage::wal_reader::read_wal_entries(self.wal_dir(), start_lsn)
}
pub fn shutdown(&mut self) {
self.wal.shutdown_graceful();
self.shutdown_signal.store(true, Ordering::Relaxed);
self.flush_notifier.notify();
if let Some(handle) = self.flush_thread.take() {
let _ = handle.join();
}
}
}
impl Drop for ConcurrentWalSystem {
fn drop(&mut self) {
self.shutdown();
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::GLOBAL_INTERNER;
use crate::core::id::NodeId;
use crate::core::property::PropertyMap;
use crate::core::temporal::time;
use tempfile::tempdir;
fn create_test_operation(id: u64) -> WalOperation {
WalOperation::CreateNode {
node_id: NodeId::new(id).unwrap(),
label: GLOBAL_INTERNER.intern(format!("Node{}", id)).unwrap(),
properties: PropertyMap::new(),
valid_from: time::now(),
}
}
#[test]
fn test_concurrent_wal_system_creation() {
let dir = tempdir().unwrap();
let config = ConcurrentWalSystemConfig::new(dir.path());
let wal = ConcurrentWalSystem::new(config).unwrap();
assert_eq!(wal.total_appends(), 0);
assert_eq!(wal.current_lsn(), LSN(1));
}
#[test]
fn test_append_sync_mode() {
let dir = tempdir().unwrap();
let config = ConcurrentWalSystemConfig::new(dir.path())
.with_durability_mode(DurabilityMode::Synchronous);
let wal = ConcurrentWalSystem::new(config).unwrap();
let lsn = wal.append(create_test_operation(1)).unwrap();
assert_eq!(lsn, LSN(1));
assert_eq!(wal.total_appends(), 1);
}
#[test]
fn test_append_sync_mode_handles_more_than_stripe_capacity() {
let dir = tempdir().unwrap();
let mut config = ConcurrentWalSystemConfig::new(dir.path())
.with_durability_mode(DurabilityMode::Synchronous);
config.stripe_capacity = 8;
let wal = ConcurrentWalSystem::new(config).unwrap();
for i in 1..=64 {
let lsn = wal.append(create_test_operation(i)).unwrap();
assert_eq!(lsn, LSN(i));
}
assert_eq!(wal.total_appends(), 64);
assert_eq!(wal.total_flushed(), 64);
}
#[test]
fn test_append_async_mode() {
let dir = tempdir().unwrap();
let config = ConcurrentWalSystemConfig::new(dir.path())
.with_flush_interval_ms(10_000) .with_durability_mode(DurabilityMode::Async {
flush_interval_ms: 10_000,
});
let mut wal = ConcurrentWalSystem::new(config).unwrap();
for i in 1..=10 {
let lsn = wal.append(create_test_operation(i)).unwrap();
assert_eq!(lsn, LSN(i));
}
assert_eq!(wal.total_appends(), 10);
wal.flush().unwrap();
let start = std::time::Instant::now();
let timeout = Duration::from_secs(5);
while wal.total_flushed() < 10 {
if start.elapsed() > timeout {
break;
}
std::thread::sleep(Duration::from_millis(10));
}
assert_eq!(wal.total_flushed(), 10, "All 10 entries should be flushed");
wal.shutdown();
assert_eq!(wal.total_flushed(), 10, "All 10 entries should be flushed");
}
#[test]
fn test_concurrent_appends() {
use std::sync::Arc;
use std::thread;
let dir = tempdir().unwrap();
let config = ConcurrentWalSystemConfig::new(dir.path())
.with_durability_mode(DurabilityMode::Async {
flush_interval_ms: 100,
})
.with_num_stripes(4);
let wal = Arc::new(ConcurrentWalSystem::new(config).unwrap());
let num_threads = 4;
let ops_per_thread = 100;
let handles: Vec<_> = (0..num_threads)
.map(|t| {
let wal = Arc::clone(&wal);
thread::spawn(move || {
for i in 0..ops_per_thread {
let id = (t * ops_per_thread + i + 1) as u64;
wal.append_async(create_test_operation(id)).unwrap();
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
assert_eq!(wal.total_appends(), (num_threads * ops_per_thread) as u64);
}
#[test]
fn test_flush_persists_entries() {
let dir = tempdir().unwrap();
let config = ConcurrentWalSystemConfig::new(dir.path())
.with_durability_mode(DurabilityMode::Synchronous);
let mut wal = ConcurrentWalSystem::new(config).unwrap();
for i in 1..=5 {
wal.append_async(create_test_operation(i)).unwrap();
}
let stats = wal.flush().unwrap();
assert_eq!(stats.entries_flushed, 5);
assert_eq!(wal.total_flushed(), 5);
wal.shutdown();
}
#[test]
fn test_group_commit_mode() {
let dir = tempdir().unwrap();
let config = ConcurrentWalSystemConfig::new(dir.path())
.with_durability_mode(DurabilityMode::GroupCommit {
max_batch_size: 10,
max_delay_ms: 10,
})
.with_flush_interval_ms(5);
let mut wal = ConcurrentWalSystem::new(config).unwrap();
for i in 1..=5 {
wal.append(create_test_operation(i)).unwrap();
}
let start = std::time::Instant::now();
let timeout = Duration::from_secs(5); let mut flushed = false;
while start.elapsed() < timeout {
if wal.total_flushed() >= 1 {
flushed = true;
break;
}
std::thread::sleep(Duration::from_millis(5));
}
assert!(
flushed,
"Expected at least 1 entry to be flushed within {}ms, but got {} flushed",
timeout.as_millis(),
wal.total_flushed()
);
wal.shutdown();
}
#[test]
fn test_shutdown_flushes_remaining() {
let dir = tempdir().unwrap();
let config = ConcurrentWalSystemConfig::new(dir.path()).with_durability_mode(
DurabilityMode::Async {
flush_interval_ms: 100,
},
);
let mut wal = ConcurrentWalSystem::new(config).unwrap();
for i in 1..=5 {
wal.append_async(create_test_operation(i)).unwrap();
}
wal.shutdown();
assert_eq!(wal.total_flushed(), 5);
}
#[test]
fn test_append_batch_async() {
let dir = tempdir().unwrap();
let config = ConcurrentWalSystemConfig::new(dir.path()).with_durability_mode(
DurabilityMode::Async {
flush_interval_ms: 10_000,
},
);
let wal = ConcurrentWalSystem::new(config).unwrap();
let ops = vec![
create_test_operation(1),
create_test_operation(2),
create_test_operation(3),
];
let lsns = wal.append_batch(ops).unwrap();
assert_eq!(lsns.len(), 3);
assert_eq!(lsns[0], LSN(1));
assert_eq!(lsns[1], LSN(2));
assert_eq!(lsns[2], LSN(3));
assert_eq!(wal.total_appends(), 3);
}
#[test]
fn test_append_batch_sync() {
let dir = tempdir().unwrap();
let config = ConcurrentWalSystemConfig::new(dir.path())
.with_durability_mode(DurabilityMode::Synchronous);
let wal = ConcurrentWalSystem::new(config).unwrap();
let ops = vec![create_test_operation(1), create_test_operation(2)];
let lsns = wal.append_batch(ops).unwrap();
assert_eq!(lsns.len(), 2);
assert_eq!(lsns[0], LSN(1));
assert_eq!(lsns[1], LSN(2));
assert_eq!(wal.total_appends(), 2);
}
#[test]
fn test_append_batch_empty() {
let dir = tempdir().unwrap();
let config = ConcurrentWalSystemConfig::new(dir.path());
let wal = ConcurrentWalSystem::new(config).unwrap();
let lsns = wal.append_batch(vec![]).unwrap();
assert_eq!(lsns.len(), 0);
assert_eq!(wal.total_appends(), 0);
}
#[test]
fn test_append_batch_large() {
let dir = tempdir().unwrap();
let config = ConcurrentWalSystemConfig::new(dir.path()).with_durability_mode(
DurabilityMode::Async {
flush_interval_ms: 10_000,
},
);
let wal = ConcurrentWalSystem::new(config).unwrap();
let ops: Vec<_> = (1..=100).map(create_test_operation).collect();
let lsns = wal.append_batch(ops).unwrap();
assert_eq!(lsns.len(), 100);
assert_eq!(lsns[0], LSN(1));
assert_eq!(lsns[99], LSN(100));
assert_eq!(wal.total_appends(), 100);
}
#[test]
fn test_append_sync_persistence_guarantee() {
let dir = tempdir().unwrap();
let config = ConcurrentWalSystemConfig::new(dir.path())
.with_durability_mode(DurabilityMode::Synchronous);
let wal = ConcurrentWalSystem::new(config).unwrap();
assert_eq!(wal.total_flushed(), 0);
let lsn = wal.append_sync(create_test_operation(1)).unwrap();
assert_eq!(
wal.total_flushed(),
1,
"Should be flushed immediately after return"
);
assert_eq!(lsn, LSN(1));
let ops = vec![create_test_operation(2), create_test_operation(3)];
let lsns = wal.append_batch(ops).unwrap();
assert_eq!(
wal.total_flushed(),
3,
"Batch should be flushed immediately"
);
assert_eq!(lsns.len(), 2);
}
}