eventuali_core/performance/
wal_optimization.rs1use std::sync::Arc;
7use std::time::{Duration, Instant};
8use rusqlite::Connection as SqliteConnection;
9use crate::error::EventualiError;
10
11#[derive(Debug, Clone)]
13pub struct WalConfig {
14 pub synchronous_mode: WalSynchronousMode,
15 pub journal_mode: WalJournalMode,
16 pub checkpoint_interval: u64,
17 pub checkpoint_size_mb: u64,
18 pub wal_autocheckpoint: u32,
19 pub cache_size_kb: i32,
20 pub temp_store: TempStoreMode,
21 pub mmap_size_mb: u64,
22 pub page_size: u32,
23 pub auto_vacuum: AutoVacuumMode,
24}
25
26#[derive(Debug, Clone)]
28pub enum WalSynchronousMode {
29 Off, Normal, Full, Extra, }
34
35#[derive(Debug, Clone)]
37pub enum WalJournalMode {
38 Delete, Truncate, Persist, Memory, Wal, Off, }
45
46#[derive(Debug, Clone)]
48pub enum TempStoreMode {
49 Default, File, Memory, }
53
54#[derive(Debug, Clone)]
56pub enum AutoVacuumMode {
57 None, Full, Incremental, }
61
62impl Default for WalConfig {
63 fn default() -> Self {
64 Self {
65 synchronous_mode: WalSynchronousMode::Normal,
66 journal_mode: WalJournalMode::Wal,
67 checkpoint_interval: 1000,
68 checkpoint_size_mb: 100,
69 wal_autocheckpoint: 1000,
70 cache_size_kb: -2000, temp_store: TempStoreMode::Memory,
72 mmap_size_mb: 256,
73 page_size: 4096,
74 auto_vacuum: AutoVacuumMode::Incremental,
75 }
76 }
77}
78
79impl WalConfig {
80 pub fn high_performance() -> Self {
82 Self {
83 synchronous_mode: WalSynchronousMode::Normal,
84 journal_mode: WalJournalMode::Wal,
85 checkpoint_interval: 2000,
86 checkpoint_size_mb: 200,
87 wal_autocheckpoint: 2000,
88 cache_size_kb: -8000, temp_store: TempStoreMode::Memory,
90 mmap_size_mb: 1024, page_size: 4096,
92 auto_vacuum: AutoVacuumMode::Incremental,
93 }
94 }
95
96 pub fn memory_optimized() -> Self {
98 Self {
99 synchronous_mode: WalSynchronousMode::Normal,
100 journal_mode: WalJournalMode::Wal,
101 checkpoint_interval: 500,
102 checkpoint_size_mb: 50,
103 wal_autocheckpoint: 500,
104 cache_size_kb: -1000, temp_store: TempStoreMode::Memory,
106 mmap_size_mb: 64, page_size: 4096,
108 auto_vacuum: AutoVacuumMode::Incremental,
109 }
110 }
111
112 pub fn safety_first() -> Self {
114 Self {
115 synchronous_mode: WalSynchronousMode::Full,
116 journal_mode: WalJournalMode::Wal,
117 checkpoint_interval: 100,
118 checkpoint_size_mb: 20,
119 wal_autocheckpoint: 100,
120 cache_size_kb: -2000, temp_store: TempStoreMode::File,
122 mmap_size_mb: 128,
123 page_size: 4096,
124 auto_vacuum: AutoVacuumMode::Full,
125 }
126 }
127}
128
129#[derive(Debug, Clone)]
131pub struct WalStats {
132 pub total_checkpoints: u64,
133 pub avg_checkpoint_time_ms: f64,
134 pub wal_file_size_kb: u64,
135 pub pages_written: u64,
136 pub pages_read: u64,
137 pub cache_hits: u64,
138 pub cache_misses: u64,
139 pub cache_hit_rate: f64,
140}
141
142impl Default for WalStats {
143 fn default() -> Self {
144 Self {
145 total_checkpoints: 0,
146 avg_checkpoint_time_ms: 0.0,
147 wal_file_size_kb: 0,
148 pages_written: 0,
149 pages_read: 0,
150 cache_hits: 0,
151 cache_misses: 0,
152 cache_hit_rate: 0.0,
153 }
154 }
155}
156
157pub struct WalOptimizer {
159 config: WalConfig,
160 stats: Arc<std::sync::Mutex<WalStats>>,
161 last_checkpoint: Option<Instant>,
162}
163
164impl WalOptimizer {
165 pub fn new(config: WalConfig) -> Self {
167 Self {
168 config,
169 stats: Arc::new(std::sync::Mutex::new(WalStats::default())),
170 last_checkpoint: None,
171 }
172 }
173
174 pub fn optimize_connection(&self, conn: &SqliteConnection) -> Result<(), EventualiError> {
176 let journal_mode = self.journal_mode_to_string(&self.config.journal_mode);
178 conn.execute(&format!("PRAGMA journal_mode = {journal_mode}"), [])
179 .map_err(|e| EventualiError::Configuration(format!("Failed to set journal mode: {e}")))?;
180
181 let sync_mode = self.sync_mode_to_string(&self.config.synchronous_mode);
183 conn.execute(&format!("PRAGMA synchronous = {sync_mode}"), [])
184 .map_err(|e| EventualiError::Configuration(format!("Failed to set synchronous mode: {e}")))?;
185
186 conn.execute(&format!("PRAGMA cache_size = {}", self.config.cache_size_kb), [])
188 .map_err(|e| EventualiError::Configuration(format!("Failed to set cache size: {e}")))?;
189
190 let temp_store = self.temp_store_to_string(&self.config.temp_store);
192 conn.execute(&format!("PRAGMA temp_store = {temp_store}"), [])
193 .map_err(|e| EventualiError::Configuration(format!("Failed to set temp store: {e}")))?;
194
195 conn.execute(&format!("PRAGMA mmap_size = {}", self.config.mmap_size_mb * 1024 * 1024), [])
197 .map_err(|e| EventualiError::Configuration(format!("Failed to set mmap size: {e}")))?;
198
199 conn.execute(&format!("PRAGMA page_size = {}", self.config.page_size), [])
201 .map_err(|e| EventualiError::Configuration(format!("Failed to set page size: {e}")))?;
202
203 let auto_vacuum = self.auto_vacuum_to_string(&self.config.auto_vacuum);
205 conn.execute(&format!("PRAGMA auto_vacuum = {auto_vacuum}"), [])
206 .map_err(|e| EventualiError::Configuration(format!("Failed to set auto vacuum: {e}")))?;
207
208 conn.execute(&format!("PRAGMA wal_autocheckpoint = {}", self.config.wal_autocheckpoint), [])
210 .map_err(|e| EventualiError::Configuration(format!("Failed to set WAL autocheckpoint: {e}")))?;
211
212 Ok(())
213 }
214
215 pub fn checkpoint(&mut self, conn: &SqliteConnection) -> Result<(), EventualiError> {
217 let start_time = Instant::now();
218
219 conn.execute("PRAGMA wal_checkpoint(TRUNCATE)", [])
220 .map_err(|e| EventualiError::Configuration(format!("Failed to checkpoint WAL: {e}")))?;
221
222 let checkpoint_time = start_time.elapsed();
223 self.last_checkpoint = Some(Instant::now());
224
225 if let Ok(mut stats) = self.stats.lock() {
227 stats.total_checkpoints += 1;
228 let total_checkpoints = stats.total_checkpoints as f64;
229 stats.avg_checkpoint_time_ms = (stats.avg_checkpoint_time_ms * (total_checkpoints - 1.0) +
230 checkpoint_time.as_millis() as f64) / total_checkpoints;
231 }
232
233 Ok(())
234 }
235
236 pub fn get_stats(&self, conn: &SqliteConnection) -> Result<WalStats, EventualiError> {
238 let mut stats = if let Ok(stats) = self.stats.lock() {
239 stats.clone()
240 } else {
241 WalStats::default()
242 };
243
244 if let Ok(_stmt) = conn.prepare("PRAGMA wal_checkpoint") {
246 }
248
249 if stats.cache_hits + stats.cache_misses > 0 {
251 stats.cache_hit_rate = stats.cache_hits as f64 / (stats.cache_hits + stats.cache_misses) as f64;
252 }
253
254 Ok(stats)
255 }
256
257 pub fn needs_checkpoint(&self) -> bool {
259 if let Some(last) = self.last_checkpoint {
260 last.elapsed() > Duration::from_millis(self.config.checkpoint_interval)
261 } else {
262 true
263 }
264 }
265
266 pub fn get_config(&self) -> &WalConfig {
268 &self.config
269 }
270
271 fn journal_mode_to_string(&self, mode: &WalJournalMode) -> &'static str {
273 match mode {
274 WalJournalMode::Delete => "DELETE",
275 WalJournalMode::Truncate => "TRUNCATE",
276 WalJournalMode::Persist => "PERSIST",
277 WalJournalMode::Memory => "MEMORY",
278 WalJournalMode::Wal => "WAL",
279 WalJournalMode::Off => "OFF",
280 }
281 }
282
283 fn sync_mode_to_string(&self, mode: &WalSynchronousMode) -> &'static str {
284 match mode {
285 WalSynchronousMode::Off => "OFF",
286 WalSynchronousMode::Normal => "NORMAL",
287 WalSynchronousMode::Full => "FULL",
288 WalSynchronousMode::Extra => "EXTRA",
289 }
290 }
291
292 fn temp_store_to_string(&self, mode: &TempStoreMode) -> &'static str {
293 match mode {
294 TempStoreMode::Default => "DEFAULT",
295 TempStoreMode::File => "FILE",
296 TempStoreMode::Memory => "MEMORY",
297 }
298 }
299
300 fn auto_vacuum_to_string(&self, mode: &AutoVacuumMode) -> &'static str {
301 match mode {
302 AutoVacuumMode::None => "NONE",
303 AutoVacuumMode::Full => "FULL",
304 AutoVacuumMode::Incremental => "INCREMENTAL",
305 }
306 }
307}
308
309pub async fn benchmark_wal_configurations(
311 database_path: String,
312 configs: Vec<(String, WalConfig)>,
313 num_operations: usize,
314) -> Result<Vec<(String, f64, WalStats)>, EventualiError> {
315 use std::time::Instant;
316
317 let mut results = Vec::new();
318
319 for (config_name, config) in configs {
320 let start_time = Instant::now();
321
322 let conn = if database_path == ":memory:" {
324 SqliteConnection::open_in_memory()
325 } else {
326 SqliteConnection::open(&database_path)
327 }.map_err(|e| EventualiError::Configuration(format!("Failed to open database: {e}")))?;
328
329 let mut optimizer = WalOptimizer::new(config.clone());
330 optimizer.optimize_connection(&conn)?;
331
332 conn.execute(
334 "CREATE TABLE IF NOT EXISTS test_events (
335 id INTEGER PRIMARY KEY,
336 data TEXT,
337 timestamp INTEGER
338 )", []
339 ).map_err(|e| EventualiError::Configuration(format!("Failed to create test table: {e}")))?;
340
341 for i in 0..num_operations {
343 conn.execute(
344 "INSERT INTO test_events (data, timestamp) VALUES (?, ?)",
345 [&format!("test_data_{i}"), &format!("{i}")]
346 ).map_err(|e| EventualiError::Configuration(format!("Failed to insert test data: {e}")))?;
347
348 if i.is_multiple_of(100) && optimizer.needs_checkpoint() {
350 optimizer.checkpoint(&conn)?;
351 }
352 }
353
354 optimizer.checkpoint(&conn)?;
356
357 let total_time = start_time.elapsed();
358 let ops_per_sec = num_operations as f64 / total_time.as_secs_f64();
359 let stats = optimizer.get_stats(&conn)?;
360
361 results.push((config_name, ops_per_sec, stats));
362 }
363
364 Ok(results)
365}
366
367#[cfg(test)]
368mod tests {
369 use super::*;
370
371 #[test]
372 fn test_wal_config_creation() {
373 let config = WalConfig::default();
374 assert!(matches!(config.synchronous_mode, WalSynchronousMode::Normal));
375 assert!(matches!(config.journal_mode, WalJournalMode::Wal));
376 }
377
378 #[test]
379 fn test_wal_optimizer_creation() {
380 let config = WalConfig::high_performance();
381 let optimizer = WalOptimizer::new(config);
382 assert_eq!(optimizer.config.cache_size_kb, -8000);
383 }
384
385 #[test]
386 fn test_connection_optimization() {
387 let config = WalConfig::default();
388 let optimizer = WalOptimizer::new(config);
389
390 let conn = SqliteConnection::open_in_memory().unwrap();
391 let result = optimizer.optimize_connection(&conn);
392 assert!(result.is_ok());
393 }
394}