use super::{V2WALManager, WALManagerMetrics};
use crate::backend::native::NativeResult;
#[derive(Debug, Clone)]
pub struct BulkIngestConfig {
pub max_batch_size_bytes: usize,
pub flush_timeout_ms: u64,
pub force_checkpoint_on_exit: bool,
pub max_records_per_batch: usize,
}
impl Default for BulkIngestConfig {
fn default() -> Self {
Self {
max_batch_size_bytes: 10 * 1024 * 1024, flush_timeout_ms: 5_000, force_checkpoint_on_exit: true,
max_records_per_batch: 10_000,
}
}
}
pub struct BulkIngestGuard<'a> {
manager: &'a V2WALManager,
config: BulkIngestConfig,
records_written: u64,
start_metrics: WALManagerMetrics,
}
impl<'a> BulkIngestGuard<'a> {
pub(crate) fn new(manager: &'a V2WALManager, config: BulkIngestConfig) -> NativeResult<Self> {
let start_metrics = manager.get_metrics();
manager.enable_bulk_mode(&config)?;
Ok(Self {
manager,
config,
records_written: 0,
start_metrics,
})
}
pub fn records_written(&self) -> u64 {
self.records_written
}
pub fn start_metrics(&self) -> &WALManagerMetrics {
&self.start_metrics
}
pub fn flush(&mut self) -> NativeResult<()> {
self.manager.flush()?;
Ok(())
}
pub fn complete(mut self) -> NativeResult<()> {
self.finish_bulk_session()
}
fn finish_bulk_session(&mut self) -> NativeResult<()> {
self.manager.flush()?;
self.manager.disable_bulk_mode()?;
if self.config.force_checkpoint_on_exit {
self.manager.force_checkpoint()?;
}
Ok(())
}
}
impl<'a> Drop for BulkIngestGuard<'a> {
fn drop(&mut self) {
let _ = self.finish_bulk_session();
}
}
pub trait BulkIngestExt {
fn begin_bulk_ingest(&self, config: BulkIngestConfig) -> NativeResult<BulkIngestGuard<'_>>;
fn is_bulk_ingest_active(&self) -> bool;
fn get_bulk_metrics(&self) -> BulkIngestMetrics;
}
#[derive(Debug, Clone)]
pub struct BulkIngestMetrics {
pub sessions_completed: u64,
pub total_bulk_records: u64,
pub avg_batch_size: f64,
pub total_bulk_time_ms: u64,
pub performance_improvement_ratio: f64,
}
impl BulkIngestExt for V2WALManager {
fn begin_bulk_ingest(&self, config: BulkIngestConfig) -> NativeResult<BulkIngestGuard<'_>> {
BulkIngestGuard::new(self, config)
}
fn is_bulk_ingest_active(&self) -> bool {
V2WALManager::is_bulk_mode_active(self)
}
fn get_bulk_metrics(&self) -> BulkIngestMetrics {
BulkIngestMetrics {
sessions_completed: 0,
total_bulk_records: 0,
avg_batch_size: 0.0,
total_bulk_time_ms: 0,
performance_improvement_ratio: 1.0,
}
}
}