use super::bulk_ingest::BulkIngestConfig;
use crate::backend::native::v2::wal::record::V2WALSerializer;
use crate::backend::native::v2::wal::{V2WALConfig, V2WALHeader, V2WALRecord, lsn};
use crate::backend::native::{NativeBackendError, NativeResult};
use parking_lot::Mutex;
use std::collections::HashMap;
use std::fs::File;
use std::io::{BufWriter, Write};
use std::sync::Arc;
use std::time::{Duration, Instant};
pub struct V2WALWriter {
config: V2WALConfig,
file: Arc<Mutex<BufWriter<File>>>,
header: Arc<Mutex<V2WALHeader>>,
write_buffer: Arc<Mutex<WriteBuffer>>,
group_commit: Arc<Mutex<GroupCommitState>>,
metrics: Arc<Mutex<WriterMetrics>>,
cluster_groups: Arc<Mutex<HashMap<i64, Vec<V2WALRecord>>>>,
bulk_mode: Arc<Mutex<BulkModeState>>,
}
#[derive(Debug)]
struct WriteBuffer {
buffer: Vec<u8>,
records: Vec<BufferedRecord>,
max_size: usize,
flush_timeout: Duration,
last_flush: Instant,
}
#[derive(Debug, Clone)]
struct BufferedRecord {
record: V2WALRecord,
lsn: u64,
timestamp: Instant,
committed: bool,
}
#[derive(Debug)]
struct GroupCommitState {
pending_records: Vec<BufferedRecord>,
max_batch_size: usize,
timeout: Duration,
last_commit: Instant,
active_transactions: u32,
}
#[derive(Debug)]
struct BulkModeState {
active: bool,
original_config: Option<V2WALConfig>,
records_written: u64,
session_start: Instant,
bulk_config: Option<BulkIngestConfig>,
}
#[derive(Debug, Default)]
pub struct WriterMetrics {
pub records_written: u64,
pub bytes_written: u64,
pub flush_count: u64,
pub avg_records_per_flush: f64,
pub group_commit_count: u64,
pub avg_group_commit_size: f64,
pub write_latency_p50: u64,
pub write_latency_p95: u64,
pub write_latency_p99: u64,
pub buffer_utilization: f64,
}
impl V2WALWriter {
pub fn create(config: V2WALConfig) -> NativeResult<Self> {
config.validate()?;
let header = V2WALHeader::new();
{
let mut file = std::fs::OpenOptions::new()
.create(true)
.write(true)
.truncate(true) .open(&config.wal_path)
.map_err(NativeBackendError::Io)?;
let header_bytes = unsafe {
std::slice::from_raw_parts(
&header as *const V2WALHeader as *const u8,
std::mem::size_of::<V2WALHeader>(),
)
};
file.write_all(header_bytes)
.map_err(NativeBackendError::Io)?;
file.flush().map_err(NativeBackendError::Io)?;
}
let file = std::fs::OpenOptions::new()
.create(true)
.write(true)
.append(true)
.open(&config.wal_path)
.map_err(NativeBackendError::Io)?;
let write_buffer = WriteBuffer {
buffer: Vec::with_capacity(config.buffer_size),
records: Vec::new(),
max_size: config.buffer_size,
flush_timeout: Duration::from_millis(config.group_commit_timeout_ms),
last_flush: Instant::now(),
};
let group_commit = GroupCommitState {
pending_records: Vec::new(),
max_batch_size: config.max_group_commit_size,
timeout: Duration::from_millis(config.group_commit_timeout_ms),
last_commit: Instant::now(),
active_transactions: 0,
};
let bulk_mode = BulkModeState {
active: false,
original_config: None,
records_written: 0,
session_start: Instant::now(),
bulk_config: None,
};
Ok(Self {
config,
file: Arc::new(Mutex::new(BufWriter::new(file))),
header: Arc::new(Mutex::new(header)),
write_buffer: Arc::new(Mutex::new(write_buffer)),
group_commit: Arc::new(Mutex::new(group_commit)),
metrics: Arc::new(Mutex::new(WriterMetrics::default())),
cluster_groups: Arc::new(Mutex::new(HashMap::new())),
bulk_mode: Arc::new(Mutex::new(bulk_mode)),
})
}
pub fn open(config: V2WALConfig) -> NativeResult<Self> {
config.validate()?;
let header = if config.wal_path.exists() {
let header_bytes = std::fs::read(&config.wal_path)
.map_err(NativeBackendError::Io)?;
if header_bytes.len() < std::mem::size_of::<V2WALHeader>() {
return Err(NativeBackendError::InvalidHeader {
field: "wal_file".to_string(),
reason: format!(
"WAL file too small: expected at least {} bytes, got {}",
std::mem::size_of::<V2WALHeader>(),
header_bytes.len()
),
});
}
unsafe {
std::ptr::read_unaligned(header_bytes.as_ptr() as *const V2WALHeader)
}
} else {
V2WALHeader::new()
};
let file = std::fs::OpenOptions::new()
.create(true)
.write(true)
.append(true)
.open(&config.wal_path)
.map_err(NativeBackendError::Io)?;
let write_buffer = WriteBuffer {
buffer: Vec::with_capacity(config.buffer_size),
records: Vec::new(),
max_size: config.buffer_size,
flush_timeout: Duration::from_millis(config.group_commit_timeout_ms),
last_flush: Instant::now(),
};
let group_commit = GroupCommitState {
pending_records: Vec::new(),
max_batch_size: config.max_group_commit_size,
timeout: Duration::from_millis(config.group_commit_timeout_ms),
last_commit: Instant::now(),
active_transactions: 0,
};
let bulk_mode = BulkModeState {
active: false,
original_config: None,
records_written: 0,
session_start: Instant::now(),
bulk_config: None,
};
Ok(Self {
config,
file: Arc::new(Mutex::new(BufWriter::new(file))),
header: Arc::new(Mutex::new(header)),
write_buffer: Arc::new(Mutex::new(write_buffer)),
group_commit: Arc::new(Mutex::new(group_commit)),
metrics: Arc::new(Mutex::new(WriterMetrics::default())),
cluster_groups: Arc::new(Mutex::new(HashMap::new())),
bulk_mode: Arc::new(Mutex::new(bulk_mode)),
})
}
pub fn write_record(&self, record: V2WALRecord) -> NativeResult<u64> {
let start_time = Instant::now();
let lsn = {
let mut header = self.header.lock();
let current_lsn = header.current_lsn;
header.current_lsn = lsn::next(current_lsn);
current_lsn
};
if let Some(cluster_key) = record.cluster_key() {
let mut cluster_groups = self.cluster_groups.lock();
cluster_groups
.entry(cluster_key)
.or_insert_with(Vec::new)
.push(record.clone());
}
{
let mut write_buffer = self.write_buffer.lock();
let buffered_record = BufferedRecord {
record: record.clone(),
lsn,
timestamp: Instant::now(),
committed: true, };
write_buffer.records.push(buffered_record);
let serialized = V2WALSerializer::serialize(&record)?;
write_buffer.buffer.extend_from_slice(&serialized);
{
let mut metrics = self.metrics.lock();
metrics.records_written += 1;
metrics.bytes_written += serialized.len() as u64;
metrics.buffer_utilization =
(write_buffer.buffer.len() as f64 / write_buffer.max_size as f64) * 100.0;
}
let should_flush = write_buffer.buffer.len() >= write_buffer.max_size
|| start_time.elapsed() >= write_buffer.flush_timeout;
if should_flush {
drop(write_buffer); self.flush_buffer()?;
}
}
let write_latency = start_time.elapsed().as_micros() as u64;
self.update_latency_metrics(write_latency);
Ok(lsn)
}
pub fn write_records_batch(&self, records: Vec<V2WALRecord>) -> NativeResult<Vec<u64>> {
let start_time = Instant::now();
let mut lsns = Vec::with_capacity(records.len());
{
let mut header = self.header.lock();
for _record in &records {
lsns.push(header.current_lsn);
header.current_lsn = lsn::next(header.current_lsn);
}
}
{
let mut group_commit = self.group_commit.lock();
for (i, record) in records.into_iter().enumerate() {
let buffered_record = BufferedRecord {
record,
lsn: lsns[i],
timestamp: Instant::now(),
committed: true,
};
group_commit.pending_records.push(buffered_record);
}
let should_commit = group_commit.pending_records.len() >= group_commit.max_batch_size
|| start_time.elapsed() >= group_commit.timeout;
if should_commit {
let records_to_commit = std::mem::take(&mut group_commit.pending_records);
drop(group_commit); self.commit_group_batch(records_to_commit)?;
}
}
Ok(lsns)
}
pub fn flush_buffer(&self) -> NativeResult<()> {
let _start_time = Instant::now();
let (buffer_data, record_count) = {
let mut write_buffer = self.write_buffer.lock();
if write_buffer.buffer.is_empty() {
return Ok(()); }
let buffer_data = std::mem::take(&mut write_buffer.buffer);
let record_count = write_buffer.records.len();
write_buffer.records.clear();
write_buffer.last_flush = Instant::now();
(buffer_data, record_count)
};
{
let mut file = self.file.lock();
file.write_all(&buffer_data)
.map_err(NativeBackendError::Io)?;
file.flush().map_err(NativeBackendError::Io)?;
}
{
let mut metrics = self.metrics.lock();
metrics.flush_count += 1;
metrics.avg_records_per_flush = ((metrics.avg_records_per_flush
* (metrics.flush_count - 1) as f64)
+ record_count as f64)
/ metrics.flush_count as f64;
}
Ok(())
}
fn commit_group_batch(&self, records: Vec<BufferedRecord>) -> NativeResult<()> {
let _start_time = Instant::now();
let mut total_bytes = 0;
for buffered_record in &records {
let serialized = V2WALSerializer::serialize(&buffered_record.record)?;
total_bytes += serialized.len();
let mut file = self.file.lock();
file.write_all(&serialized)
.map_err(NativeBackendError::Io)?;
}
{
let mut file = self.file.lock();
file.flush().map_err(NativeBackendError::Io)?;
}
{
let mut metrics = self.metrics.lock();
metrics.group_commit_count += 1;
metrics.avg_group_commit_size = ((metrics.avg_group_commit_size
* (metrics.group_commit_count - 1) as f64)
+ records.len() as f64)
/ metrics.group_commit_count as f64;
metrics.records_written += records.len() as u64;
metrics.bytes_written += total_bytes as u64;
}
Ok(())
}
fn update_latency_metrics(&self, latency_us: u64) {
let mut metrics = self.metrics.lock();
const ALPHA: f64 = 0.1;
metrics.write_latency_p50 =
((100.0 - ALPHA) * metrics.write_latency_p50 as f64 + ALPHA * latency_us as f64) as u64;
metrics.write_latency_p95 = ((100.0 - ALPHA) * metrics.write_latency_p95 as f64
+ ALPHA * (latency_us * 95 / 50) as f64) as u64;
metrics.write_latency_p99 = ((100.0 - ALPHA) * metrics.write_latency_p99 as f64
+ ALPHA * (latency_us * 99 / 50) as f64) as u64;
}
pub fn get_metrics(&self) -> WriterMetrics {
let metrics = self.metrics.lock();
WriterMetrics {
records_written: metrics.records_written,
bytes_written: metrics.bytes_written,
flush_count: metrics.flush_count,
avg_records_per_flush: metrics.avg_records_per_flush,
group_commit_count: metrics.group_commit_count,
avg_group_commit_size: metrics.avg_group_commit_size,
write_latency_p50: metrics.write_latency_p50,
write_latency_p95: metrics.write_latency_p95,
write_latency_p99: metrics.write_latency_p99,
buffer_utilization: metrics.buffer_utilization,
}
}
pub fn sync(&self) -> NativeResult<()> {
self.flush_buffer()?;
{
let file = self.file.lock();
file.get_ref().sync_all().map_err(NativeBackendError::Io)?;
}
Ok(())
}
pub fn get_header(&self) -> V2WALHeader {
*self.header.lock()
}
pub fn enable_bulk_mode(&self, config: &BulkIngestConfig) -> NativeResult<()> {
let mut bulk_mode = self.bulk_mode.lock();
if bulk_mode.active {
return Ok(()); }
bulk_mode.original_config = Some(self.config.clone());
bulk_mode.bulk_config = Some(config.clone());
bulk_mode.records_written = 0;
bulk_mode.session_start = Instant::now();
bulk_mode.active = true;
{
let mut write_buffer = self.write_buffer.lock();
write_buffer.max_size = config.max_batch_size_bytes;
write_buffer.flush_timeout = Duration::from_millis(config.flush_timeout_ms);
}
{
let mut group_commit = self.group_commit.lock();
group_commit.max_batch_size = config.max_records_per_batch;
group_commit.timeout = Duration::from_millis(config.flush_timeout_ms);
}
Ok(())
}
pub fn disable_bulk_mode(&self) -> NativeResult<()> {
let mut bulk_mode = self.bulk_mode.lock();
if !bulk_mode.active {
return Ok(()); }
drop(bulk_mode);
self.flush_buffer()?;
bulk_mode = self.bulk_mode.lock();
if let Some(original_config) = bulk_mode.original_config.take() {
{
let mut write_buffer = self.write_buffer.lock();
write_buffer.max_size = original_config.buffer_size;
write_buffer.flush_timeout =
Duration::from_millis(original_config.group_commit_timeout_ms);
}
{
let mut group_commit = self.group_commit.lock();
group_commit.max_batch_size = original_config.max_group_commit_size;
group_commit.timeout =
Duration::from_millis(original_config.group_commit_timeout_ms);
}
}
bulk_mode.active = false;
bulk_mode.bulk_config = None;
Ok(())
}
pub fn is_bulk_mode_active(&self) -> bool {
self.bulk_mode.lock().active
}
pub fn get_bulk_stats(&self) -> (bool, u64, Duration) {
let bulk_mode = self.bulk_mode.lock();
(
bulk_mode.active,
bulk_mode.records_written,
bulk_mode.session_start.elapsed(),
)
}
pub fn shutdown(&self) -> NativeResult<()> {
self.flush_buffer()?;
{
let mut group_commit = self.group_commit.lock();
if !group_commit.pending_records.is_empty() {
let records = std::mem::take(&mut group_commit.pending_records);
drop(group_commit);
self.commit_group_batch(records)?;
}
}
self.sync()?;
Ok(())
}
pub fn log_allocate_contiguous(
&self,
txn_id: u64,
region: crate::backend::native::v2::wal::ContiguousRegion,
) -> NativeResult<u64> {
use crate::backend::native::v2::storage::free_space::Region;
let _fs_region = Region::new(region.start_offset, region.total_size)
.with_clusters(region.cluster_count, region.stride);
let record = V2WALRecord::AllocateContiguous {
txn_id,
region,
timestamp: self.current_timestamp(),
};
self.write_record(record)
}
pub fn log_commit_contiguous(
&self,
txn_id: u64,
region: crate::backend::native::v2::wal::ContiguousRegion,
) -> NativeResult<u64> {
let record = V2WALRecord::CommitContiguous { txn_id, region };
self.write_record(record)
}
pub fn log_rollback_contiguous(
&self,
region: crate::backend::native::v2::wal::ContiguousRegion,
) -> NativeResult<u64> {
let record = V2WALRecord::RollbackContiguous { region };
self.write_record(record)
}
fn current_timestamp(&self) -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0)
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn test_v2_wal_writer_create() {
let temp_dir = tempdir().unwrap();
let config = V2WALConfig {
wal_path: temp_dir.path().join("test.wal"),
..Default::default()
};
let writer = V2WALWriter::create(config);
assert!(writer.is_ok());
}
#[test]
fn test_write_single_record() {
let temp_dir = tempdir().unwrap();
let config = V2WALConfig {
wal_path: temp_dir.path().join("test.wal"),
..Default::default()
};
let writer = V2WALWriter::create(config).unwrap();
let record = V2WALRecord::NodeInsert {
node_id: 42,
slot_offset: 1024,
node_data: vec![1, 2, 3, 4, 5],
};
let lsn = writer.write_record(record).unwrap();
assert!(lsn >= 1);
let metrics = writer.get_metrics();
assert_eq!(metrics.records_written, 1);
assert!(metrics.bytes_written > 0);
}
#[test]
fn test_write_records_batch() {
let temp_dir = tempdir().unwrap();
let config = V2WALConfig {
wal_path: temp_dir.path().join("test.wal"),
..Default::default()
};
let writer = V2WALWriter::create(config).unwrap();
let records = vec![
V2WALRecord::NodeInsert {
node_id: 1,
slot_offset: 1024,
node_data: vec![1, 2, 3],
},
V2WALRecord::NodeInsert {
node_id: 2,
slot_offset: 2048,
node_data: vec![4, 5, 6],
},
];
let lsns = writer.write_records_batch(records).unwrap();
assert_eq!(lsns.len(), 2);
assert!(lsns[1] > lsns[0]);
writer.shutdown().unwrap();
let metrics = writer.get_metrics();
assert_eq!(metrics.records_written, 2);
}
#[test]
fn test_flush_and_sync() {
let temp_dir = tempdir().unwrap();
let config = V2WALConfig {
wal_path: temp_dir.path().join("test.wal"),
..Default::default()
};
let writer = V2WALWriter::create(config).unwrap();
let record = V2WALRecord::NodeInsert {
node_id: 42,
slot_offset: 1024,
node_data: vec![1, 2, 3],
};
writer.write_record(record).unwrap();
writer.flush_buffer().unwrap();
writer.sync().unwrap();
let metrics = writer.get_metrics();
assert!(metrics.flush_count > 0);
}
#[test]
fn test_writer_shutdown() {
let temp_dir = tempdir().unwrap();
let config = V2WALConfig {
wal_path: temp_dir.path().join("test.wal"),
..Default::default()
};
let writer = V2WALWriter::create(config).unwrap();
let record = V2WALRecord::NodeInsert {
node_id: 42,
slot_offset: 1024,
node_data: vec![1, 2, 3],
};
writer.write_record(record).unwrap();
writer.shutdown().unwrap();
let metrics = writer.get_metrics();
assert!(metrics.flush_count > 0);
}
}