sqlitegraph/backend/native/v2/wal/bulk_ingest.rs
1//! Bulk Ingest Mode for V2 WAL System
2//!
3//! This module provides bulk ingest optimization for large-scale data loading operations.
4//! Bulk ingest mode optimizes initial load scenarios by:
5//! - Batching WAL writes to reduce I/O overhead
6//! - Deferring expensive maintenance operations
7//! - Using checkpoints at natural bulk boundaries
8//! - Ensuring crash-recovery consistency through WAL replay
9//!
10//! # Usage
11//!
12//! ```rust
13//! let manager = V2WALManager::create(config)?;
14//! let bulk_guard = manager.begin_bulk_ingest(BulkIngestConfig::default())?;
15//!
16//! // Perform bulk operations...
17//! for record in bulk_records {
18//! manager.write_record(record)?;
19//! }
20//!
21//! // Automatically exits bulk mode and flushes when guard is dropped
22//! drop(bulk_guard);
23//! ```
24
25use super::{V2WALManager, WALManagerMetrics};
26use crate::backend::native::NativeResult;
27
28/// Configuration for bulk ingest mode
29#[derive(Debug, Clone)]
30pub struct BulkIngestConfig {
31 /// Maximum batch size for bulk operations (default: 10MB)
32 pub max_batch_size_bytes: usize,
33
34 /// Buffer flush timeout during bulk mode (default: 5 seconds)
35 pub flush_timeout_ms: u64,
36
37 /// Whether to force checkpoint when exiting bulk mode (default: true)
38 pub force_checkpoint_on_exit: bool,
39
40 /// Maximum number of records to batch before auto-flush (default: 10000)
41 pub max_records_per_batch: usize,
42}
43
44impl Default for BulkIngestConfig {
45 fn default() -> Self {
46 Self {
47 max_batch_size_bytes: 10 * 1024 * 1024, // 10MB
48 flush_timeout_ms: 5_000, // 5 seconds
49 force_checkpoint_on_exit: true,
50 max_records_per_batch: 10_000,
51 }
52 }
53}
54
55/// RAII guard for bulk ingest mode
56///
57/// Automatically exits bulk ingest mode and flushes pending changes when dropped.
58/// This ensures crash recovery consistency even if the bulk operation is interrupted.
59pub struct BulkIngestGuard<'a> {
60 manager: &'a V2WALManager,
61 config: BulkIngestConfig,
62 records_written: u64,
63 start_metrics: WALManagerMetrics,
64}
65
66impl<'a> BulkIngestGuard<'a> {
67 /// Create a new bulk ingest guard
68 pub(crate) fn new(manager: &'a V2WALManager, config: BulkIngestConfig) -> NativeResult<Self> {
69 let start_metrics = manager.get_metrics();
70
71 // Enable bulk mode optimizations in the writer
72 manager.enable_bulk_mode(&config)?;
73
74 Ok(Self {
75 manager,
76 config,
77 records_written: 0,
78 start_metrics,
79 })
80 }
81
82 /// Get the number of records written during this bulk session
83 pub fn records_written(&self) -> u64 {
84 self.records_written
85 }
86
87 /// Get the start metrics when bulk mode began
88 pub fn start_metrics(&self) -> &WALManagerMetrics {
89 &self.start_metrics
90 }
91
92 /// Force flush of pending bulk writes
93 pub fn flush(&mut self) -> NativeResult<()> {
94 self.manager.flush()?;
95 Ok(())
96 }
97
98 /// Complete bulk ingest manually (also happens on drop)
99 pub fn complete(mut self) -> NativeResult<()> {
100 self.finish_bulk_session()
101 }
102
103 /// Internal method to complete the bulk session
104 fn finish_bulk_session(&mut self) -> NativeResult<()> {
105 // Flush any remaining buffered writes
106 self.manager.flush()?;
107
108 // Disable bulk mode optimizations
109 self.manager.disable_bulk_mode()?;
110
111 // Force checkpoint if configured
112 if self.config.force_checkpoint_on_exit {
113 self.manager.force_checkpoint()?;
114 }
115
116 Ok(())
117 }
118}
119
120impl<'a> Drop for BulkIngestGuard<'a> {
121 fn drop(&mut self) {
122 // Ensure bulk session is completed even if panic occurs
123 let _ = self.finish_bulk_session();
124 }
125}
126
127/// Extension trait for V2WALManager to support bulk ingest
128pub trait BulkIngestExt {
129 /// Begin bulk ingest mode with the given configuration
130 ///
131 /// Returns a RAII guard that automatically exits bulk mode when dropped.
132 /// While bulk mode is active:
133 /// - WAL writes are batched more aggressively
134 /// - Flush operations are deferred to reduce I/O overhead
135 /// - Checkpoint boundaries are optimized for bulk operations
136 /// - All crash recovery semantics are preserved
137 fn begin_bulk_ingest(&self, config: BulkIngestConfig) -> NativeResult<BulkIngestGuard<'_>>;
138
139 /// Check if bulk ingest mode is currently active
140 fn is_bulk_ingest_active(&self) -> bool;
141
142 /// Get bulk ingest performance metrics
143 fn get_bulk_metrics(&self) -> BulkIngestMetrics;
144}
145
146/// Bulk ingest specific performance metrics
147#[derive(Debug, Clone)]
148pub struct BulkIngestMetrics {
149 /// Number of bulk ingest sessions completed
150 pub sessions_completed: u64,
151
152 /// Total records processed in bulk mode
153 pub total_bulk_records: u64,
154
155 /// Average batch size during bulk operations
156 pub avg_batch_size: f64,
157
158 /// Total time spent in bulk mode (milliseconds)
159 pub total_bulk_time_ms: u64,
160
161 /// Performance improvement ratio (vs non-bulk)
162 pub performance_improvement_ratio: f64,
163}
164
165impl BulkIngestExt for V2WALManager {
166 fn begin_bulk_ingest(&self, config: BulkIngestConfig) -> NativeResult<BulkIngestGuard<'_>> {
167 BulkIngestGuard::new(self, config)
168 }
169
170 fn is_bulk_ingest_active(&self) -> bool {
171 // Call the manager method directly
172 V2WALManager::is_bulk_mode_active(self)
173 }
174
175 fn get_bulk_metrics(&self) -> BulkIngestMetrics {
176 // For now, return default metrics - will be enhanced when we track bulk stats
177 BulkIngestMetrics {
178 sessions_completed: 0,
179 total_bulk_records: 0,
180 avg_batch_size: 0.0,
181 total_bulk_time_ms: 0,
182 performance_improvement_ratio: 1.0,
183 }
184 }
185}