Skip to main content

sqlitegraph/backend/native/v2/wal/
bulk_ingest.rs

1//! Bulk Ingest Mode for V2 WAL System
2//!
3//! This module provides bulk ingest optimization for large-scale data loading operations.
4//! Bulk ingest mode optimizes initial load scenarios by:
5//! - Batching WAL writes to reduce I/O overhead
6//! - Deferring expensive maintenance operations
7//! - Using checkpoints at natural bulk boundaries
8//! - Ensuring crash-recovery consistency through WAL replay
9//!
10//! # Usage
11//!
12//! ```rust
13//! let manager = V2WALManager::create(config)?;
14//! let bulk_guard = manager.begin_bulk_ingest(BulkIngestConfig::default())?;
15//!
16//! // Perform bulk operations...
17//! for record in bulk_records {
18//!     manager.write_record(record)?;
19//! }
20//!
21//! // Automatically exits bulk mode and flushes when guard is dropped
22//! drop(bulk_guard);
23//! ```
24
25use super::{V2WALManager, WALManagerMetrics};
26use crate::backend::native::NativeResult;
27
28/// Configuration for bulk ingest mode
29#[derive(Debug, Clone)]
30pub struct BulkIngestConfig {
31    /// Maximum batch size for bulk operations (default: 10MB)
32    pub max_batch_size_bytes: usize,
33
34    /// Buffer flush timeout during bulk mode (default: 5 seconds)
35    pub flush_timeout_ms: u64,
36
37    /// Whether to force checkpoint when exiting bulk mode (default: true)
38    pub force_checkpoint_on_exit: bool,
39
40    /// Maximum number of records to batch before auto-flush (default: 10000)
41    pub max_records_per_batch: usize,
42}
43
44impl Default for BulkIngestConfig {
45    fn default() -> Self {
46        Self {
47            max_batch_size_bytes: 10 * 1024 * 1024, // 10MB
48            flush_timeout_ms: 5_000,                // 5 seconds
49            force_checkpoint_on_exit: true,
50            max_records_per_batch: 10_000,
51        }
52    }
53}
54
55/// RAII guard for bulk ingest mode
56///
57/// Automatically exits bulk ingest mode and flushes pending changes when dropped.
58/// This ensures crash recovery consistency even if the bulk operation is interrupted.
59pub struct BulkIngestGuard<'a> {
60    manager: &'a V2WALManager,
61    config: BulkIngestConfig,
62    records_written: u64,
63    start_metrics: WALManagerMetrics,
64}
65
66impl<'a> BulkIngestGuard<'a> {
67    /// Create a new bulk ingest guard
68    pub(crate) fn new(manager: &'a V2WALManager, config: BulkIngestConfig) -> NativeResult<Self> {
69        let start_metrics = manager.get_metrics();
70
71        // Enable bulk mode optimizations in the writer
72        manager.enable_bulk_mode(&config)?;
73
74        Ok(Self {
75            manager,
76            config,
77            records_written: 0,
78            start_metrics,
79        })
80    }
81
82    /// Get the number of records written during this bulk session
83    pub fn records_written(&self) -> u64 {
84        self.records_written
85    }
86
87    /// Get the start metrics when bulk mode began
88    pub fn start_metrics(&self) -> &WALManagerMetrics {
89        &self.start_metrics
90    }
91
92    /// Force flush of pending bulk writes
93    pub fn flush(&mut self) -> NativeResult<()> {
94        self.manager.flush()?;
95        Ok(())
96    }
97
98    /// Complete bulk ingest manually (also happens on drop)
99    pub fn complete(mut self) -> NativeResult<()> {
100        self.finish_bulk_session()
101    }
102
103    /// Internal method to complete the bulk session
104    fn finish_bulk_session(&mut self) -> NativeResult<()> {
105        // Flush any remaining buffered writes
106        self.manager.flush()?;
107
108        // Disable bulk mode optimizations
109        self.manager.disable_bulk_mode()?;
110
111        // Force checkpoint if configured
112        if self.config.force_checkpoint_on_exit {
113            self.manager.force_checkpoint()?;
114        }
115
116        Ok(())
117    }
118}
119
120impl<'a> Drop for BulkIngestGuard<'a> {
121    fn drop(&mut self) {
122        // Ensure bulk session is completed even if panic occurs
123        let _ = self.finish_bulk_session();
124    }
125}
126
127/// Extension trait for V2WALManager to support bulk ingest
128pub trait BulkIngestExt {
129    /// Begin bulk ingest mode with the given configuration
130    ///
131    /// Returns a RAII guard that automatically exits bulk mode when dropped.
132    /// While bulk mode is active:
133    /// - WAL writes are batched more aggressively
134    /// - Flush operations are deferred to reduce I/O overhead
135    /// - Checkpoint boundaries are optimized for bulk operations
136    /// - All crash recovery semantics are preserved
137    fn begin_bulk_ingest(&self, config: BulkIngestConfig) -> NativeResult<BulkIngestGuard<'_>>;
138
139    /// Check if bulk ingest mode is currently active
140    fn is_bulk_ingest_active(&self) -> bool;
141
142    /// Get bulk ingest performance metrics
143    fn get_bulk_metrics(&self) -> BulkIngestMetrics;
144}
145
146/// Bulk ingest specific performance metrics
147#[derive(Debug, Clone)]
148pub struct BulkIngestMetrics {
149    /// Number of bulk ingest sessions completed
150    pub sessions_completed: u64,
151
152    /// Total records processed in bulk mode
153    pub total_bulk_records: u64,
154
155    /// Average batch size during bulk operations
156    pub avg_batch_size: f64,
157
158    /// Total time spent in bulk mode (milliseconds)
159    pub total_bulk_time_ms: u64,
160
161    /// Performance improvement ratio (vs non-bulk)
162    pub performance_improvement_ratio: f64,
163}
164
165impl BulkIngestExt for V2WALManager {
166    fn begin_bulk_ingest(&self, config: BulkIngestConfig) -> NativeResult<BulkIngestGuard<'_>> {
167        BulkIngestGuard::new(self, config)
168    }
169
170    fn is_bulk_ingest_active(&self) -> bool {
171        // Call the manager method directly
172        V2WALManager::is_bulk_mode_active(self)
173    }
174
175    fn get_bulk_metrics(&self) -> BulkIngestMetrics {
176        // For now, return default metrics - will be enhanced when we track bulk stats
177        BulkIngestMetrics {
178            sessions_completed: 0,
179            total_bulk_records: 0,
180            avg_batch_size: 0.0,
181            total_bulk_time_ms: 0,
182            performance_improvement_ratio: 1.0,
183        }
184    }
185}