use crate::config::WalBatchConfig;
use parking_lot::Mutex;
use std::io::{self, Write};
struct PendingBatch {
buffer: Vec<u8>,
entry_count: usize,
}
pub struct WalBatcher {
config: WalBatchConfig,
pending: Mutex<PendingBatch>,
}
impl WalBatcher {
#[must_use]
pub fn new(config: WalBatchConfig) -> Self {
Self {
pending: Mutex::new(PendingBatch {
buffer: Vec::new(),
entry_count: 0,
}),
config,
}
}
pub fn submit(&self, data: &[u8], writer: &mut impl Write) -> io::Result<()> {
if !self.config.enabled {
return write_and_flush(writer, data);
}
let needs_flush = {
let mut batch = self.pending.lock();
batch.buffer.extend_from_slice(data);
batch.entry_count += 1;
batch.entry_count >= self.config.max_batch_size
};
if needs_flush {
self.flush(writer)?;
}
Ok(())
}
pub fn flush(&self, writer: &mut impl Write) -> io::Result<()> {
let data = {
let mut batch = self.pending.lock();
if batch.entry_count == 0 {
return Ok(());
}
let data = std::mem::take(&mut batch.buffer);
batch.entry_count = 0;
data
};
writer.write_all(&data)?;
writer.flush()
}
#[must_use]
pub const fn is_enabled(&self) -> bool {
self.config.enabled
}
#[must_use]
pub fn pending_count(&self) -> usize {
self.pending.lock().entry_count
}
#[must_use]
pub fn pending_bytes(&self) -> usize {
self.pending.lock().buffer.len()
}
}
fn write_and_flush(writer: &mut impl Write, data: &[u8]) -> io::Result<()> {
writer.write_all(data)?;
writer.flush()
}