use std::cell::Cell;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use super::lsn_allocator::LsnAllocator;
use super::ring_buffer::{CompletionHandle, PendingEntry};
use super::stripe::{StripeMetrics, WalStripe};
use super::{LSN, WalOperation};
use crate::core::error::{Error, Result, StorageError};
pub const DEFAULT_NUM_STRIPES: usize = 16;
pub const DEFAULT_STRIPE_CAPACITY: usize = 1024;
#[derive(Debug, Clone)]
pub struct ConcurrentWalConfig {
pub wal_dir: PathBuf,
pub num_stripes: usize,
pub stripe_capacity: usize,
pub segment_size: usize,
pub segments_to_retain: usize,
}
impl Default for ConcurrentWalConfig {
fn default() -> Self {
Self {
wal_dir: PathBuf::from("data/wal"),
num_stripes: DEFAULT_NUM_STRIPES,
stripe_capacity: DEFAULT_STRIPE_CAPACITY,
segment_size: 64 * 1024 * 1024, segments_to_retain: 10,
}
}
}
impl ConcurrentWalConfig {
pub fn new(wal_dir: impl Into<PathBuf>) -> Self {
Self {
wal_dir: wal_dir.into(),
..Default::default()
}
}
pub fn with_num_stripes(mut self, num_stripes: usize) -> Self {
self.num_stripes = num_stripes.next_power_of_two();
self
}
pub fn with_stripe_capacity(mut self, capacity: usize) -> Self {
self.stripe_capacity = capacity;
self
}
pub fn with_segment_size(mut self, size: usize) -> Self {
self.segment_size = size;
self
}
}
thread_local! {
static THREAD_ID_HASH: Cell<Option<u64>> = const { Cell::new(None) };
}
pub struct ConcurrentWal {
config: ConcurrentWalConfig,
lsn_allocator: LsnAllocator,
stripes: Vec<WalStripe>,
num_stripes: usize,
stripe_mask: usize,
total_appends: AtomicU64,
shutdown_requested: AtomicBool,
active_batches: AtomicUsize,
}
struct ActiveBatchGuard<'a>(&'a AtomicUsize);
impl<'a> ActiveBatchGuard<'a> {
fn new(counter: &'a AtomicUsize) -> Self {
counter.fetch_add(1, Ordering::SeqCst);
Self(counter)
}
}
impl<'a> Drop for ActiveBatchGuard<'a> {
fn drop(&mut self) {
self.0.fetch_sub(1, Ordering::SeqCst);
}
}
impl ConcurrentWal {
pub fn new(config: ConcurrentWalConfig) -> Result<Self> {
let num_stripes = config.num_stripes.next_power_of_two();
let stripe_mask = num_stripes - 1;
let stripes: Vec<WalStripe> = (0..num_stripes)
.map(|id| WalStripe::with_capacity(id, config.stripe_capacity))
.collect();
Ok(Self {
config,
lsn_allocator: LsnAllocator::new(),
stripes,
num_stripes,
stripe_mask,
total_appends: AtomicU64::new(0),
shutdown_requested: AtomicBool::new(false),
active_batches: AtomicUsize::new(0),
})
}
pub fn with_defaults(wal_dir: impl Into<PathBuf>) -> Result<Self> {
Self::new(ConcurrentWalConfig::new(wal_dir))
}
#[inline]
pub fn current_lsn(&self) -> LSN {
self.lsn_allocator.current()
}
#[inline]
pub fn num_stripes(&self) -> usize {
self.num_stripes
}
#[inline]
pub fn total_appends(&self) -> u64 {
self.total_appends.load(Ordering::Relaxed)
}
#[inline]
fn get_stripe(&self) -> &WalStripe {
let hash = THREAD_ID_HASH.with(|id| {
if let Some(existing) = id.get() {
existing
} else {
let thread_id = std::thread::current().id();
let h = {
use std::hash::{Hash, Hasher};
let mut hasher = std::collections::hash_map::DefaultHasher::new();
thread_id.hash(&mut hasher);
hasher.finish()
};
id.set(Some(h));
h
}
});
let stripe_id = (hash as usize) & self.stripe_mask;
&self.stripes[stripe_id]
}
#[inline]
pub fn stripe(&self, id: usize) -> Option<&WalStripe> {
self.stripes.get(id)
}
#[inline]
fn check_not_shutting_down(&self) -> Result<()> {
if self.shutdown_requested.load(Ordering::SeqCst) {
return Err(Error::Storage(StorageError::WalError {
reason: "WAL is shutting down".to_string(),
}));
}
Ok(())
}
pub fn append_async(&self, operation: WalOperation) -> Result<LSN> {
self.check_not_shutting_down()?;
let _guard = ActiveBatchGuard::new(&self.active_batches);
let lsn = self.lsn_allocator.allocate();
let data = self.serialize_entry(lsn, &operation)?;
let stripe = self.get_stripe();
match stripe.append_blocking(lsn, data) {
Ok(()) => {
self.total_appends.fetch_add(1, Ordering::Relaxed);
Ok(lsn)
}
Err(_entry) => Err(Error::Storage(StorageError::WalError {
reason: "WAL buffer closed".to_string(),
})),
}
}
pub fn append_sync(&self, operation: WalOperation) -> Result<LSN> {
self.check_not_shutting_down()?;
let _guard = ActiveBatchGuard::new(&self.active_batches);
let lsn = self.lsn_allocator.allocate();
let data = self.serialize_entry(lsn, &operation)?;
let stripe = self.get_stripe();
match stripe.append_sync(lsn, data) {
Ok(handle) => {
self.total_appends.fetch_add(1, Ordering::Relaxed);
handle.wait().map_err(|e| {
Error::Storage(StorageError::WalError {
reason: format!("WAL flush failed: {}", e),
})
})?;
Ok(lsn)
}
Err(_entry) => Err(Error::Storage(StorageError::WalError {
reason: "WAL buffer full - backpressure".to_string(),
})),
}
}
pub fn append_with_handle(&self, operation: WalOperation) -> Result<(LSN, CompletionHandle)> {
self.check_not_shutting_down()?;
let _guard = ActiveBatchGuard::new(&self.active_batches);
let lsn = self.lsn_allocator.allocate();
let data = self.serialize_entry(lsn, &operation)?;
let stripe = self.get_stripe();
match stripe.append_sync_blocking(lsn, data) {
Ok(handle) => {
self.total_appends.fetch_add(1, Ordering::Relaxed);
Ok((lsn, handle))
}
Err(_entry) => Err(Error::Storage(StorageError::WalError {
reason: "WAL buffer closed".to_string(),
})),
}
}
pub fn append_batch(&self, operations: Vec<WalOperation>) -> Result<Vec<LSN>> {
self.check_not_shutting_down()?;
let _guard = ActiveBatchGuard::new(&self.active_batches);
if operations.is_empty() {
return Ok(Vec::new());
}
let count = operations.len() as u64;
debug_assert!(count > 0, "count should be > 0 after empty check");
let (first_lsn, _last_lsn) = self.lsn_allocator.allocate_batch(count);
let mut lsns = Vec::with_capacity(operations.len());
for (idx, operation) in operations.into_iter().enumerate() {
let lsn = LSN(first_lsn.0 + idx as u64);
lsns.push(lsn);
let data = self.serialize_entry(lsn, &operation)?;
let stripe = self.get_stripe();
match stripe.append_blocking(lsn, data) {
Ok(()) => {
self.total_appends.fetch_add(1, Ordering::Relaxed);
}
Err(_entry) => {
return Err(Error::Storage(StorageError::WalError {
reason: "WAL buffer closed".to_string(),
}));
}
}
}
Ok(lsns)
}
pub fn append_batch_with_handles(
&self,
operations: Vec<WalOperation>,
) -> Result<(Vec<LSN>, Vec<CompletionHandle>)> {
self.check_not_shutting_down()?;
let _guard = ActiveBatchGuard::new(&self.active_batches);
if operations.is_empty() {
return Ok((Vec::new(), Vec::new()));
}
let count = operations.len() as u64;
debug_assert!(count > 0, "count should be > 0 after empty check");
let (first_lsn, _last_lsn) = self.lsn_allocator.allocate_batch(count);
let mut lsns = Vec::with_capacity(operations.len());
let mut handles = Vec::with_capacity(operations.len());
for (idx, operation) in operations.into_iter().enumerate() {
let lsn = LSN(first_lsn.0 + idx as u64);
lsns.push(lsn);
let data = self.serialize_entry(lsn, &operation)?;
let stripe = self.get_stripe();
match stripe.append_sync_blocking(lsn, data) {
Ok(handle) => {
self.total_appends.fetch_add(1, Ordering::Relaxed);
handles.push(handle);
}
Err(_entry) => {
return Err(Error::Storage(StorageError::WalError {
reason: "WAL buffer closed".to_string(),
}));
}
}
}
Ok((lsns, handles))
}
fn serialize_entry(&self, lsn: LSN, operation: &WalOperation) -> Result<Vec<u8>> {
let estimated_capacity = super::estimate_entry_capacity(operation);
if estimated_capacity > super::entry::MAX_WAL_ENTRY_SIZE {
return Err(Error::Storage(StorageError::CapacityExceeded {
resource: "WAL entry size".to_string(),
current: estimated_capacity,
limit: super::entry::MAX_WAL_ENTRY_SIZE,
}));
}
let mut buffer = Vec::with_capacity(estimated_capacity);
let timestamp = crate::core::temporal::time::now();
super::serialization::serialize_operation_into(lsn, timestamp, operation, &mut buffer)?;
Ok(buffer)
}
pub fn drain_all(&self) -> Vec<PendingEntry> {
let total_pending: usize = self.stripes.iter().map(|s| s.pending_count()).sum();
let mut all_entries = Vec::with_capacity(total_pending);
for stripe in &self.stripes {
all_entries.extend(stripe.drain());
}
all_entries.sort_by_key(|e| e.lsn);
all_entries
}
pub fn drain_stripe(&self, stripe_id: usize) -> Vec<PendingEntry> {
self.stripes
.get(stripe_id)
.map(|s| s.drain())
.unwrap_or_default()
}
pub fn stripe_metrics(&self) -> Vec<StripeMetrics> {
self.stripes.iter().map(|s| s.metrics()).collect()
}
pub fn close(&self) {
for stripe in &self.stripes {
stripe.close();
}
}
pub fn shutdown_graceful(&self) {
self.shutdown_requested.store(true, Ordering::SeqCst);
let mut spins = 0;
while self.active_batches.load(Ordering::SeqCst) > 0 {
if spins < 100 {
std::hint::spin_loop();
} else {
std::thread::yield_now();
}
spins += 1;
}
self.close();
}
pub fn is_closed(&self) -> bool {
self.stripes.first().map(|s| s.is_closed()).unwrap_or(true)
}
pub fn set_next_lsn(&self, lsn: LSN) {
self.lsn_allocator.set_next(lsn);
}
pub fn wal_dir(&self) -> &Path {
&self.config.wal_dir
}
pub fn config(&self) -> &ConcurrentWalConfig {
&self.config
}
}
#[derive(Debug, Clone)]
pub struct ConcurrentWalMetrics {
pub total_appends: u64,
pub current_lsn: LSN,
pub stripes: Vec<StripeMetrics>,
pub total_pending: usize,
}
impl ConcurrentWal {
pub fn metrics(&self) -> ConcurrentWalMetrics {
let stripes = self.stripe_metrics();
let total_pending: usize = stripes.iter().map(|s| s.pending_count).sum();
ConcurrentWalMetrics {
total_appends: self.total_appends(),
current_lsn: self.current_lsn(),
stripes,
total_pending,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::GLOBAL_INTERNER;
use crate::core::id::NodeId;
use crate::core::property::PropertyMap;
use crate::core::temporal::time;
use std::sync::Arc;
use std::thread;
use tempfile::tempdir;
fn test_operation() -> WalOperation {
WalOperation::CreateNode {
node_id: NodeId::new(1).unwrap(),
label: GLOBAL_INTERNER.intern("Test").unwrap(),
properties: PropertyMap::new(),
valid_from: time::now(),
}
}
#[test]
fn test_concurrent_wal_creation() {
let dir = tempdir().unwrap();
let config = ConcurrentWalConfig::new(dir.path());
let wal = ConcurrentWal::new(config).unwrap();
assert_eq!(wal.num_stripes(), DEFAULT_NUM_STRIPES);
assert_eq!(wal.current_lsn(), LSN(1));
assert_eq!(wal.total_appends(), 0);
}
#[test]
fn test_concurrent_wal_custom_stripes() {
let dir = tempdir().unwrap();
let config = ConcurrentWalConfig::new(dir.path()).with_num_stripes(8);
let wal = ConcurrentWal::new(config).unwrap();
assert_eq!(wal.num_stripes(), 8);
}
#[test]
fn test_concurrent_wal_stripe_rounding() {
let dir = tempdir().unwrap();
let config = ConcurrentWalConfig::new(dir.path()).with_num_stripes(10);
let wal = ConcurrentWal::new(config).unwrap();
assert_eq!(wal.num_stripes(), 16);
}
#[test]
fn test_append_async_allocates_lsn() {
let dir = tempdir().unwrap();
let config = ConcurrentWalConfig::new(dir.path());
let wal = ConcurrentWal::new(config).unwrap();
let lsn1 = wal.append_async(test_operation()).unwrap();
let lsn2 = wal.append_async(test_operation()).unwrap();
let lsn3 = wal.append_async(test_operation()).unwrap();
assert_eq!(lsn1, LSN(1));
assert_eq!(lsn2, LSN(2));
assert_eq!(lsn3, LSN(3));
assert_eq!(wal.total_appends(), 3);
}
#[test]
fn test_append_with_handle() {
let dir = tempdir().unwrap();
let config = ConcurrentWalConfig::new(dir.path());
let wal = ConcurrentWal::new(config).unwrap();
let (lsn, handle) = wal.append_with_handle(test_operation()).unwrap();
assert_eq!(lsn, LSN(1));
assert!(!handle.is_complete());
}
#[test]
fn test_drain_all_sorted_by_lsn() {
let dir = tempdir().unwrap();
let config = ConcurrentWalConfig::new(dir.path()).with_num_stripes(4);
let wal = ConcurrentWal::new(config).unwrap();
let wal = Arc::new(wal);
let handles: Vec<_> = (0..4)
.map(|_| {
let wal = Arc::clone(&wal);
thread::spawn(move || {
for _ in 0..10 {
wal.append_async(test_operation()).unwrap();
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let entries = wal.drain_all();
assert_eq!(entries.len(), 40);
for i in 1..entries.len() {
assert!(entries[i].lsn > entries[i - 1].lsn);
}
}
#[test]
fn test_stripe_affinity() {
let dir = tempdir().unwrap();
let config = ConcurrentWalConfig::new(dir.path()).with_num_stripes(16);
let wal = ConcurrentWal::new(config).unwrap();
let stripe1 = wal.get_stripe().id();
let stripe2 = wal.get_stripe().id();
let stripe3 = wal.get_stripe().id();
assert_eq!(stripe1, stripe2);
assert_eq!(stripe2, stripe3);
}
#[test]
fn test_concurrent_appends() {
let dir = tempdir().unwrap();
let config = ConcurrentWalConfig::new(dir.path()).with_num_stripes(8);
let wal = Arc::new(ConcurrentWal::new(config).unwrap());
let num_threads = 8;
let appends_per_thread = 100;
let handles: Vec<_> = (0..num_threads)
.map(|_| {
let wal = Arc::clone(&wal);
thread::spawn(move || {
for _ in 0..appends_per_thread {
wal.append_async(test_operation()).unwrap();
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
assert_eq!(
wal.total_appends(),
(num_threads * appends_per_thread) as u64
);
assert_eq!(
wal.current_lsn(),
LSN((num_threads * appends_per_thread + 1) as u64)
);
}
#[test]
fn test_close_prevents_appends() {
let dir = tempdir().unwrap();
let config = ConcurrentWalConfig::new(dir.path());
let wal = ConcurrentWal::new(config).unwrap();
wal.close();
assert!(wal.is_closed());
let result = wal.append_async(test_operation());
assert!(result.is_err());
}
#[test]
fn test_metrics() {
let dir = tempdir().unwrap();
let config = ConcurrentWalConfig::new(dir.path()).with_num_stripes(4);
let wal = ConcurrentWal::new(config).unwrap();
for _ in 0..10 {
wal.append_async(test_operation()).unwrap();
}
let metrics = wal.metrics();
assert_eq!(metrics.total_appends, 10);
assert_eq!(metrics.current_lsn, LSN(11));
assert_eq!(metrics.stripes.len(), 4);
assert_eq!(metrics.total_pending, 10);
}
#[test]
fn test_set_next_lsn() {
let dir = tempdir().unwrap();
let config = ConcurrentWalConfig::new(dir.path());
let wal = ConcurrentWal::new(config).unwrap();
wal.set_next_lsn(LSN(1000));
assert_eq!(wal.current_lsn(), LSN(1000));
let lsn = wal.append_async(test_operation()).unwrap();
assert_eq!(lsn, LSN(1000));
}
#[test]
fn test_drain_stripe() {
let dir = tempdir().unwrap();
let config = ConcurrentWalConfig::new(dir.path()).with_num_stripes(4);
let wal = ConcurrentWal::new(config).unwrap();
wal.append_async(test_operation()).unwrap();
let total: usize = (0..4).map(|i| wal.drain_stripe(i).len()).sum();
assert_eq!(total, 1);
}
#[test]
fn test_completion_notification_via_drain() {
let dir = tempdir().unwrap();
let config = ConcurrentWalConfig::new(dir.path());
let wal = ConcurrentWal::new(config).unwrap();
let (_lsn, handle) = wal.append_with_handle(test_operation()).unwrap();
assert!(!handle.is_complete());
let entries = wal.drain_all();
for entry in &entries {
entry.notify_completion();
}
assert!(handle.is_complete());
}
#[test]
fn test_append_batch_allocates_consecutive_lsns() {
let dir = tempdir().unwrap();
let config = ConcurrentWalConfig::new(dir.path());
let wal = ConcurrentWal::new(config).unwrap();
let ops = vec![test_operation(), test_operation(), test_operation()];
let lsns = wal.append_batch(ops).unwrap();
assert_eq!(lsns.len(), 3);
assert_eq!(lsns[0], LSN(1));
assert_eq!(lsns[1], LSN(2));
assert_eq!(lsns[2], LSN(3));
assert_eq!(wal.total_appends(), 3);
}
#[test]
fn test_append_batch_empty_operations() {
let dir = tempdir().unwrap();
let config = ConcurrentWalConfig::new(dir.path());
let wal = ConcurrentWal::new(config).unwrap();
let ops: Vec<WalOperation> = vec![];
let lsns = wal.append_batch(ops).unwrap();
assert_eq!(lsns.len(), 0);
assert_eq!(wal.total_appends(), 0);
}
#[test]
fn test_append_batch_single_operation() {
let dir = tempdir().unwrap();
let config = ConcurrentWalConfig::new(dir.path());
let wal = ConcurrentWal::new(config).unwrap();
let ops = vec![test_operation()];
let lsns = wal.append_batch(ops).unwrap();
assert_eq!(lsns.len(), 1);
assert_eq!(lsns[0], LSN(1));
assert_eq!(wal.total_appends(), 1);
}
#[test]
fn test_append_batch_many_operations() {
let dir = tempdir().unwrap();
let config = ConcurrentWalConfig::new(dir.path());
let wal = ConcurrentWal::new(config).unwrap();
let now = time::now();
let ops: Vec<WalOperation> = (0..100)
.map(|i| WalOperation::CreateNode {
node_id: NodeId::new(i + 1).unwrap(),
label: GLOBAL_INTERNER.intern(format!("Node{}", i)).unwrap(),
properties: PropertyMap::new(),
valid_from: now,
})
.collect();
let lsns = wal.append_batch(ops).unwrap();
assert_eq!(lsns.len(), 100);
assert_eq!(lsns[0], LSN(1));
assert_eq!(lsns[99], LSN(100));
assert_eq!(wal.total_appends(), 100);
}
#[test]
fn test_append_batch_with_drain() {
let dir = tempdir().unwrap();
let config = ConcurrentWalConfig::new(dir.path());
let wal = ConcurrentWal::new(config).unwrap();
let ops = vec![test_operation(), test_operation()];
let lsns = wal.append_batch(ops).unwrap();
let entries = wal.drain_all();
assert_eq!(entries.len(), 2);
assert_eq!(entries[0].lsn, lsns[0]);
assert_eq!(entries[1].lsn, lsns[1]);
}
#[test]
fn test_append_batch_interleaved_with_single() {
let dir = tempdir().unwrap();
let config = ConcurrentWalConfig::new(dir.path());
let wal = ConcurrentWal::new(config).unwrap();
let lsn1 = wal.append_async(test_operation()).unwrap();
let batch_lsns = wal
.append_batch(vec![test_operation(), test_operation()])
.unwrap();
let lsn4 = wal.append_async(test_operation()).unwrap();
assert_eq!(lsn1, LSN(1));
assert_eq!(batch_lsns[0], LSN(2));
assert_eq!(batch_lsns[1], LSN(3));
assert_eq!(lsn4, LSN(4));
assert_eq!(wal.total_appends(), 4);
}
#[test]
fn test_concurrent_wal_accessors() {
let dir = tempdir().unwrap();
let config = ConcurrentWalConfig::new(dir.path());
let wal = ConcurrentWal::new(config.clone()).unwrap();
assert_eq!(wal.wal_dir(), dir.path());
assert_eq!(wal.config().num_stripes, config.num_stripes);
assert!(wal.stripe(0).is_some());
assert!(wal.stripe(1000).is_none());
}
}
#[cfg(test)]
mod sentry_tests {
use super::*;
use crate::GLOBAL_INTERNER;
use crate::core::id::NodeId;
use crate::core::property::PropertyMapBuilder;
use crate::core::temporal::time;
use crate::storage::wal::entry::MAX_WAL_ENTRY_SIZE;
use tempfile::tempdir;
#[test]
fn test_append_entry_exactly_max_size_succeeds() {
let dir = tempdir().unwrap();
let config = ConcurrentWalConfig::new(dir.path()).with_segment_size(MAX_WAL_ENTRY_SIZE * 2);
let wal = ConcurrentWal::new(config).unwrap();
let overhead = 63;
let target_val_len = MAX_WAL_ENTRY_SIZE - overhead;
let big_string = "x".repeat(target_val_len);
let properties = PropertyMapBuilder::new().insert("k", big_string).build();
let op = WalOperation::CreateNode {
node_id: NodeId::new(1).unwrap(),
label: GLOBAL_INTERNER.intern("Test").unwrap(),
properties,
valid_from: time::now(),
};
let estimated = crate::storage::wal::estimate_entry_capacity(&op);
assert_eq!(
estimated, MAX_WAL_ENTRY_SIZE,
"Entry size calculation incorrect"
);
let result = wal.append_async(op);
assert!(
result.is_ok(),
"Failed to append entry of exactly MAX_WAL_ENTRY_SIZE: {:?}",
result.err()
);
}
#[test]
fn test_append_entry_exceeding_max_size_fails() {
let dir = tempdir().unwrap();
let config = ConcurrentWalConfig::new(dir.path()).with_segment_size(MAX_WAL_ENTRY_SIZE * 2);
let wal = ConcurrentWal::new(config).unwrap();
let overhead = 63;
let target_val_len = MAX_WAL_ENTRY_SIZE - overhead + 1;
let big_string = "x".repeat(target_val_len);
let properties = PropertyMapBuilder::new().insert("k", big_string).build();
let op = WalOperation::CreateNode {
node_id: NodeId::new(1).unwrap(),
label: GLOBAL_INTERNER.intern("Test").unwrap(),
properties,
valid_from: time::now(),
};
let estimated = crate::storage::wal::estimate_entry_capacity(&op);
assert_eq!(
estimated,
MAX_WAL_ENTRY_SIZE + 1,
"Entry size calculation incorrect"
);
let result = wal.append_async(op);
assert!(result.is_err(), "Should have rejected oversized entry");
match result {
Err(Error::Storage(StorageError::CapacityExceeded { current, limit, .. })) => {
assert_eq!(current, MAX_WAL_ENTRY_SIZE + 1);
assert_eq!(limit, MAX_WAL_ENTRY_SIZE);
}
_ => panic!("Expected CapacityExceeded error, got {:?}", result),
}
}
#[test]
fn test_thread_local_switching_between_sizes() {
use std::thread;
let handles: Vec<_> = (0..10)
.map(|_| {
thread::spawn(|| {
let dir_large = tempdir().unwrap();
let config_large =
ConcurrentWalConfig::new(dir_large.path()).with_num_stripes(32);
let wal_large = ConcurrentWal::new(config_large).unwrap();
let op = WalOperation::CreateNode {
node_id: NodeId::new(1).unwrap(),
label: GLOBAL_INTERNER.intern("Test").unwrap(),
properties: PropertyMapBuilder::new().build(),
valid_from: time::now(),
};
wal_large.append_async(op.clone()).unwrap();
let dir_small = tempdir().unwrap();
let config_small =
ConcurrentWalConfig::new(dir_small.path()).with_num_stripes(4);
let wal_small = ConcurrentWal::new(config_small).unwrap();
wal_small.append_async(op).unwrap();
})
})
.collect();
for h in handles {
h.join().unwrap();
}
}
}