1use super::{
35 durability::{DurabilityConfig, DurabilityMode},
36 entry::{Lsn, WalEntry, WalOp},
37 writer::WalWriter,
38};
39use crate::StorageError;
40
41pub const DEFAULT_CHANNEL_CAPACITY: usize = 10_000;
43
44pub const DEFAULT_FLUSH_INTERVAL_MS: u64 = 50;
46
47pub const DEFAULT_FLUSH_COUNT: usize = 1000;
49
50#[derive(Debug, Clone)]
52pub struct PersistenceConfig {
53 pub channel_capacity: usize,
55 pub flush_interval_ms: u64,
57 pub flush_count: usize,
59 pub durability_mode: DurabilityMode,
61}
62
63impl Default for PersistenceConfig {
64 fn default() -> Self {
65 Self {
66 channel_capacity: DEFAULT_CHANNEL_CAPACITY,
67 flush_interval_ms: DEFAULT_FLUSH_INTERVAL_MS,
68 flush_count: DEFAULT_FLUSH_COUNT,
69 durability_mode: DurabilityMode::Lazy,
70 }
71 }
72}
73
74impl PersistenceConfig {
75 pub fn from_durability_config(config: &DurabilityConfig) -> Self {
77 Self {
78 channel_capacity: DEFAULT_CHANNEL_CAPACITY,
79 flush_interval_ms: config.wal_flush_interval_ms,
80 flush_count: config.wal_flush_batch_size,
81 durability_mode: config.mode,
82 }
83 }
84
85 pub fn volatile() -> Self {
87 Self {
88 channel_capacity: DEFAULT_CHANNEL_CAPACITY,
89 flush_interval_ms: 0,
90 flush_count: usize::MAX,
91 durability_mode: DurabilityMode::Volatile,
92 }
93 }
94
95 pub fn lazy() -> Self {
97 Self {
98 channel_capacity: DEFAULT_CHANNEL_CAPACITY,
99 flush_interval_ms: DEFAULT_FLUSH_INTERVAL_MS,
100 flush_count: DEFAULT_FLUSH_COUNT,
101 durability_mode: DurabilityMode::Lazy,
102 }
103 }
104
105 pub fn durable() -> Self {
107 Self {
108 channel_capacity: DEFAULT_CHANNEL_CAPACITY,
109 flush_interval_ms: 0,
110 flush_count: 1,
111 durability_mode: DurabilityMode::Durable,
112 }
113 }
114
115 pub fn paranoid() -> Self {
117 Self {
118 channel_capacity: DEFAULT_CHANNEL_CAPACITY,
119 flush_interval_ms: 0,
120 flush_count: 1,
121 durability_mode: DurabilityMode::Paranoid,
122 }
123 }
124
125 pub fn writes_wal(&self) -> bool {
127 self.durability_mode.writes_wal()
128 }
129
130 pub fn sync_on_commit(&self) -> bool {
132 self.durability_mode.sync_on_commit()
133 }
134
135 pub fn sync_on_every_op(&self) -> bool {
137 self.durability_mode.sync_on_every_op()
138 }
139}
140
141#[derive(Debug)]
143pub enum WalMessage {
144 Entry(WalEntry),
146 Flush,
148 FlushAndNotify(FlushNotifier),
150 Shutdown,
152}
153
154#[cfg(not(target_arch = "wasm32"))]
159mod native {
160 use std::{
161 fs::OpenOptions,
162 io::{BufWriter, Seek, Write},
163 path::Path,
164 sync::{
165 mpsc::{self, RecvTimeoutError, SyncSender},
166 Arc,
167 },
168 thread::{self, JoinHandle},
169 time::{Duration, Instant},
170 };
171
172 use parking_lot::Mutex;
173
174 use super::*;
175
176 #[derive(Clone)]
178 pub struct FlushNotifier {
179 completed: Arc<(parking_lot::Mutex<bool>, parking_lot::Condvar)>,
180 }
181
182 impl std::fmt::Debug for FlushNotifier {
183 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
184 f.debug_struct("FlushNotifier").finish()
185 }
186 }
187
188 impl FlushNotifier {
189 pub fn new() -> Self {
190 Self {
191 completed: Arc::new((parking_lot::Mutex::new(false), parking_lot::Condvar::new())),
192 }
193 }
194
195 pub fn notify(&self) {
197 let (lock, cvar) = &*self.completed;
198 let mut completed = lock.lock();
199 *completed = true;
200 cvar.notify_all();
201 }
202
203 pub fn wait(&self) {
205 let (lock, cvar) = &*self.completed;
206 let mut completed = lock.lock();
207 while !*completed {
208 cvar.wait(&mut completed);
209 }
210 }
211
212 pub fn wait_timeout(&self, timeout: Duration) -> bool {
214 let (lock, cvar) = &*self.completed;
215 let mut completed = lock.lock();
216 while !*completed {
217 let result = cvar.wait_for(&mut completed, timeout);
218 if result.timed_out() {
219 return false;
220 }
221 }
222 true
223 }
224 }
225
226 impl Default for FlushNotifier {
227 fn default() -> Self {
228 Self::new()
229 }
230 }
231
232 #[derive(Debug, Clone, Default)]
234 pub struct PersistenceStats {
235 pub entries_sent: u64,
237 pub entries_written: u64,
239 pub batches_written: u64,
241 pub bytes_written: u64,
243 pub time_flushes: u64,
245 pub count_flushes: u64,
247 pub explicit_flushes: u64,
249 pub volatile_discards: u64,
251 pub commit_syncs: u64,
253 pub op_syncs: u64,
255 pub avg_flush_latency_us: u64,
257 pub max_flush_latency_us: u64,
259 pub pending_entries: u64,
261 total_flush_latency_us: u64,
263 flush_latency_samples: u64,
265 }
266
267 impl PersistenceStats {
268 pub fn record_flush_latency(&mut self, duration: Duration) {
270 let latency_us = duration.as_micros() as u64;
271
272 if latency_us > self.max_flush_latency_us {
274 self.max_flush_latency_us = latency_us;
275 }
276
277 self.total_flush_latency_us += latency_us;
279 self.flush_latency_samples += 1;
280 self.avg_flush_latency_us = self.total_flush_latency_us / self.flush_latency_samples;
281 }
282 }
283
284 pub struct PersistenceEngine {
286 sender: SyncSender<WalMessage>,
288 handle: Option<JoinHandle<()>>,
290 stats: Arc<Mutex<PersistenceStats>>,
292 next_lsn: Arc<Mutex<Lsn>>,
294 shutdown: Arc<Mutex<bool>>,
296 config: PersistenceConfig,
298 }
299
300 impl std::fmt::Debug for PersistenceEngine {
301 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
302 f.debug_struct("PersistenceEngine")
303 .field("next_lsn", &*self.next_lsn.lock())
304 .field("shutdown", &*self.shutdown.lock())
305 .field("stats", &*self.stats.lock())
306 .finish_non_exhaustive()
307 }
308 }
309
310 impl PersistenceEngine {
311 pub fn new<P: AsRef<Path>>(
313 path: P,
314 config: PersistenceConfig,
315 ) -> Result<Self, StorageError> {
316 let file = OpenOptions::new()
317 .create(true)
318 .append(true)
319 .open(path.as_ref())
320 .map_err(|e| StorageError::IoError(e.to_string()))?;
321
322 let writer = BufWriter::new(file);
323 Self::with_writer(writer, config)
324 }
325
326 pub fn with_writer<W: Write + Seek + Send + 'static>(
328 writer: W,
329 config: PersistenceConfig,
330 ) -> Result<Self, StorageError> {
331 let (sender, receiver) = mpsc::sync_channel(config.channel_capacity);
332 let stats = Arc::new(Mutex::new(PersistenceStats::default()));
333 let next_lsn = Arc::new(Mutex::new(1u64));
334 let shutdown = Arc::new(Mutex::new(false));
335
336 let stats_clone = stats.clone();
337 let flush_interval = Duration::from_millis(config.flush_interval_ms);
338 let flush_count = config.flush_count;
339
340 let handle = thread::spawn(move || {
341 Self::writer_loop(writer, receiver, stats_clone, flush_interval, flush_count);
342 });
343
344 Ok(Self { sender, handle: Some(handle), stats, next_lsn, shutdown, config })
345 }
346
347 fn writer_loop<W: Write + Seek>(
349 writer: W,
350 receiver: mpsc::Receiver<WalMessage>,
351 stats: Arc<Mutex<PersistenceStats>>,
352 flush_interval: Duration,
353 flush_count: usize,
354 ) {
355 let mut wal_writer = match WalWriter::create(writer) {
356 Ok(w) => w,
357 Err(e) => {
358 log::error!("Failed to create WAL writer: {}", e);
359 return;
360 }
361 };
362
363 let batch_capacity = flush_count.min(DEFAULT_FLUSH_COUNT);
365 let mut batch: Vec<WalEntry> = Vec::with_capacity(batch_capacity);
366 let mut last_flush = Instant::now();
367 let mut pending_notifiers: Vec<FlushNotifier> = Vec::new();
368
369 loop {
370 let elapsed = last_flush.elapsed();
372 let timeout = flush_interval.saturating_sub(elapsed);
373
374 match receiver.recv_timeout(timeout) {
375 Ok(WalMessage::Entry(entry)) => {
376 batch.push(entry);
377
378 if batch.len() >= flush_count {
380 Self::flush_batch(
381 &mut wal_writer,
382 &mut batch,
383 &stats,
384 &mut pending_notifiers,
385 true,
386 );
387 last_flush = Instant::now();
388 }
389 }
390 Ok(WalMessage::Flush) => {
391 Self::flush_batch(
392 &mut wal_writer,
393 &mut batch,
394 &stats,
395 &mut pending_notifiers,
396 false,
397 );
398 last_flush = Instant::now();
399 stats.lock().explicit_flushes += 1;
400 }
401 Ok(WalMessage::FlushAndNotify(notifier)) => {
402 pending_notifiers.push(notifier);
403 Self::flush_batch(
404 &mut wal_writer,
405 &mut batch,
406 &stats,
407 &mut pending_notifiers,
408 false,
409 );
410 last_flush = Instant::now();
411 stats.lock().explicit_flushes += 1;
412 }
413 Ok(WalMessage::Shutdown) => {
414 Self::flush_batch(
416 &mut wal_writer,
417 &mut batch,
418 &stats,
419 &mut pending_notifiers,
420 false,
421 );
422 break;
423 }
424 Err(RecvTimeoutError::Timeout) => {
425 if !batch.is_empty() {
427 Self::flush_batch(
428 &mut wal_writer,
429 &mut batch,
430 &stats,
431 &mut pending_notifiers,
432 false,
433 );
434 stats.lock().time_flushes += 1;
435 }
436 last_flush = Instant::now();
437 }
438 Err(RecvTimeoutError::Disconnected) => {
439 Self::flush_batch(
441 &mut wal_writer,
442 &mut batch,
443 &stats,
444 &mut pending_notifiers,
445 false,
446 );
447 break;
448 }
449 }
450 }
451
452 log::debug!("WAL writer thread shutting down");
453 }
454
455 fn flush_batch<W: Write + Seek>(
457 writer: &mut WalWriter<W>,
458 batch: &mut Vec<WalEntry>,
459 stats: &Arc<Mutex<PersistenceStats>>,
460 pending_notifiers: &mut Vec<FlushNotifier>,
461 is_count_flush: bool,
462 ) {
463 if batch.is_empty() {
464 for notifier in pending_notifiers.drain(..) {
466 notifier.notify();
467 }
468 return;
469 }
470
471 let mut bytes_written = 0u64;
472 let entries_count = batch.len() as u64;
473
474 let flush_start = Instant::now();
476
477 for entry in batch.drain(..) {
478 bytes_written += 32; if let Err(e) = writer.append(&entry) {
481 log::error!("Failed to write WAL entry: {}", e);
482 }
484 }
485
486 if let Err(e) = writer.flush() {
487 log::error!("Failed to flush WAL: {}", e);
488 }
489
490 let flush_duration = flush_start.elapsed();
492
493 {
495 let mut stats = stats.lock();
496 stats.entries_written += entries_count;
497 stats.batches_written += 1;
498 stats.bytes_written += bytes_written;
499 if is_count_flush {
500 stats.count_flushes += 1;
501 }
502 stats.record_flush_latency(flush_duration);
503 }
504
505 for notifier in pending_notifiers.drain(..) {
507 notifier.notify();
508 }
509 }
510
511 pub fn send(&self, op: WalOp) -> Result<Lsn, StorageError> {
518 let lsn = {
519 let mut next_lsn = self.next_lsn.lock();
520 let lsn = *next_lsn;
521 *next_lsn += 1;
522 lsn
523 };
524
525 if !self.config.writes_wal() {
527 self.stats.lock().volatile_discards += 1;
528 return Ok(lsn);
529 }
530
531 let timestamp_ms = current_timestamp_ms();
532 let entry = WalEntry::new(lsn, timestamp_ms, op);
533
534 self.sender
535 .send(WalMessage::Entry(entry))
536 .map_err(|_| StorageError::IoError("WAL writer thread terminated".to_string()))?;
537
538 self.stats.lock().entries_sent += 1;
539
540 Ok(lsn)
541 }
542
543 pub fn send_entry(&self, entry: WalEntry) -> Result<Lsn, StorageError> {
547 let lsn = entry.lsn;
548
549 {
551 let mut next_lsn = self.next_lsn.lock();
552 if entry.lsn >= *next_lsn {
553 *next_lsn = entry.lsn + 1;
554 }
555 }
556
557 if !self.config.writes_wal() {
559 self.stats.lock().volatile_discards += 1;
560 return Ok(lsn);
561 }
562
563 self.sender
564 .send(WalMessage::Entry(entry))
565 .map_err(|_| StorageError::IoError("WAL writer thread terminated".to_string()))?;
566
567 self.stats.lock().entries_sent += 1;
568
569 Ok(lsn)
570 }
571
572 pub fn flush(&self) -> Result<(), StorageError> {
577 self.sender
578 .send(WalMessage::Flush)
579 .map_err(|_| StorageError::IoError("WAL writer thread terminated".to_string()))
580 }
581
582 pub fn sync(&self) -> Result<(), StorageError> {
586 let notifier = FlushNotifier::new();
587 self.sender
588 .send(WalMessage::FlushAndNotify(notifier.clone()))
589 .map_err(|_| StorageError::IoError("WAL writer thread terminated".to_string()))?;
590 notifier.wait();
591 Ok(())
592 }
593
594 pub fn sync_timeout(&self, timeout: Duration) -> Result<bool, StorageError> {
598 let notifier = FlushNotifier::new();
599 self.sender
600 .send(WalMessage::FlushAndNotify(notifier.clone()))
601 .map_err(|_| StorageError::IoError("WAL writer thread terminated".to_string()))?;
602 Ok(notifier.wait_timeout(timeout))
603 }
604
605 pub fn next_lsn(&self) -> Lsn {
607 *self.next_lsn.lock()
608 }
609
610 pub fn stats(&self) -> PersistenceStats {
612 self.stats.lock().clone()
613 }
614
615 pub fn durability_mode(&self) -> DurabilityMode {
617 self.config.durability_mode
618 }
619
620 pub fn config(&self) -> &PersistenceConfig {
622 &self.config
623 }
624
625 pub fn shutdown(&mut self) -> Result<(), StorageError> {
629 {
630 let mut shutdown = self.shutdown.lock();
631 if *shutdown {
632 return Ok(());
633 }
634 *shutdown = true;
635 }
636
637 let _ = self.sender.send(WalMessage::Shutdown);
639
640 if let Some(handle) = self.handle.take() {
642 handle
643 .join()
644 .map_err(|_| StorageError::IoError("WAL writer thread panicked".to_string()))?;
645 }
646
647 Ok(())
648 }
649
650 pub fn is_shutdown(&self) -> bool {
652 *self.shutdown.lock()
653 }
654 }
655
656 impl Drop for PersistenceEngine {
657 fn drop(&mut self) {
658 if let Err(e) = self.shutdown() {
659 log::error!("Error during PersistenceEngine shutdown: {}", e);
660 }
661 }
662 }
663
664 fn current_timestamp_ms() -> u64 {
666 use instant::SystemTime;
667 SystemTime::now()
668 .duration_since(instant::SystemTime::UNIX_EPOCH)
669 .map(|d| d.as_millis() as u64)
670 .unwrap_or(0)
671 }
672}
673
674#[cfg(target_arch = "wasm32")]
679mod wasm {
680 use std::sync::{Arc, Mutex};
681
682 use super::*;
683
684 #[derive(Debug, Clone, Default)]
686 pub struct FlushNotifier;
687
688 impl FlushNotifier {
689 pub fn new() -> Self {
690 Self
691 }
692
693 pub fn notify(&self) {}
694 pub fn wait(&self) {}
695 }
696
697 #[derive(Debug, Clone, Default)]
699 pub struct PersistenceStats {
700 pub entries_sent: u64,
702 pub entries_written: u64,
704 pub batches_written: u64,
706 pub bytes_written: u64,
708 pub time_flushes: u64,
710 pub count_flushes: u64,
712 pub explicit_flushes: u64,
714 pub volatile_discards: u64,
716 pub commit_syncs: u64,
718 pub op_syncs: u64,
720 pub avg_flush_latency_us: u64,
722 pub max_flush_latency_us: u64,
724 pub pending_entries: u64,
726 }
727
728 pub struct PersistenceEngine {
734 buffer: Arc<Mutex<Vec<WalEntry>>>,
736 stats: Arc<Mutex<PersistenceStats>>,
738 next_lsn: Arc<Mutex<Lsn>>,
740 config: PersistenceConfig,
742 }
743
744 impl std::fmt::Debug for PersistenceEngine {
745 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
746 f.debug_struct("PersistenceEngine")
747 .field("next_lsn", &*self.next_lsn.lock().unwrap())
748 .field("buffer_len", &self.buffer.lock().unwrap().len())
749 .field("config", &self.config)
750 .finish_non_exhaustive()
751 }
752 }
753
754 impl PersistenceEngine {
755 pub fn new(config: PersistenceConfig) -> Result<Self, StorageError> {
757 Ok(Self {
758 buffer: Arc::new(Mutex::new(Vec::with_capacity(config.flush_count))),
759 stats: Arc::new(Mutex::new(PersistenceStats::default())),
760 next_lsn: Arc::new(Mutex::new(1u64)),
761 config,
762 })
763 }
764
765 pub fn send(&self, op: WalOp) -> Result<Lsn, StorageError> {
769 let lsn = {
770 let mut next_lsn = self.next_lsn.lock().unwrap();
771 let lsn = *next_lsn;
772 *next_lsn += 1;
773 lsn
774 };
775
776 if !self.config.writes_wal() {
778 self.stats.lock().unwrap().volatile_discards += 1;
779 return Ok(lsn);
780 }
781
782 let timestamp_ms = current_timestamp_ms();
783 let entry = WalEntry::new(lsn, timestamp_ms, op);
784
785 {
786 let mut buffer = self.buffer.lock().unwrap();
787 buffer.push(entry);
788 }
789
790 self.stats.lock().unwrap().entries_sent += 1;
791
792 Ok(lsn)
793 }
794
795 pub fn send_entry(&self, entry: WalEntry) -> Result<Lsn, StorageError> {
799 let lsn = entry.lsn;
800
801 {
802 let mut next_lsn = self.next_lsn.lock().unwrap();
803 if entry.lsn >= *next_lsn {
804 *next_lsn = entry.lsn + 1;
805 }
806 }
807
808 if !self.config.writes_wal() {
810 self.stats.lock().unwrap().volatile_discards += 1;
811 return Ok(lsn);
812 }
813
814 {
815 let mut buffer = self.buffer.lock().unwrap();
816 buffer.push(entry);
817 }
818
819 self.stats.lock().unwrap().entries_sent += 1;
820
821 Ok(lsn)
822 }
823
824 pub fn flush(&self) -> Result<(), StorageError> {
826 log::debug!("WASM flush requested (buffered only)");
829 Ok(())
830 }
831
832 pub fn sync(&self) -> Result<(), StorageError> {
834 self.flush()
835 }
836
837 pub fn next_lsn(&self) -> Lsn {
839 *self.next_lsn.lock().unwrap()
840 }
841
842 pub fn stats(&self) -> PersistenceStats {
844 self.stats.lock().unwrap().clone()
845 }
846
847 pub fn durability_mode(&self) -> DurabilityMode {
849 self.config.durability_mode
850 }
851
852 pub fn config(&self) -> &PersistenceConfig {
854 &self.config
855 }
856
857 pub fn buffered_entries(&self) -> Vec<WalEntry> {
859 self.buffer.lock().unwrap().clone()
860 }
861
862 pub fn clear_buffer(&self) -> Vec<WalEntry> {
864 let mut buffer = self.buffer.lock().unwrap();
865 std::mem::take(&mut *buffer)
866 }
867
868 pub fn shutdown(&mut self) -> Result<(), StorageError> {
870 Ok(())
871 }
872
873 pub fn is_shutdown(&self) -> bool {
875 false
876 }
877 }
878
879 fn current_timestamp_ms() -> u64 {
881 use instant::SystemTime;
882 SystemTime::now()
883 .duration_since(instant::SystemTime::UNIX_EPOCH)
884 .map(|d| d.as_millis() as u64)
885 .unwrap_or(0)
886 }
887}
888
889#[cfg(not(target_arch = "wasm32"))]
894pub use native::{FlushNotifier, PersistenceEngine, PersistenceStats};
895#[cfg(target_arch = "wasm32")]
896pub use wasm::{FlushNotifier, PersistenceEngine, PersistenceStats};
897
898#[cfg(test)]
899mod tests {
900 use std::{io::Cursor, time::Duration};
901
902 use vibesql_types::SqlValue;
903
904 use super::*;
905
906 #[test]
907 fn test_persistence_engine_create() {
908 let buf = Vec::new();
909 let cursor = Cursor::new(buf);
910
911 let engine = PersistenceEngine::with_writer(cursor, PersistenceConfig::default()).unwrap();
912
913 assert_eq!(engine.next_lsn(), 1);
914 assert!(!engine.is_shutdown());
915 }
916
917 #[test]
918 fn test_send_entry() {
919 let buf = Vec::new();
920 let cursor = Cursor::new(buf);
921
922 let engine = PersistenceEngine::with_writer(cursor, PersistenceConfig::default()).unwrap();
923
924 let lsn = engine
925 .send(WalOp::Insert { table_id: 1, row_id: 100, values: vec![SqlValue::Integer(42)] })
926 .unwrap();
927
928 assert_eq!(lsn, 1);
929 assert_eq!(engine.next_lsn(), 2);
930
931 let stats = engine.stats();
932 assert_eq!(stats.entries_sent, 1);
933 }
934
935 #[test]
936 fn test_send_multiple_entries() {
937 let buf = Vec::new();
938 let cursor = Cursor::new(buf);
939
940 let engine = PersistenceEngine::with_writer(cursor, PersistenceConfig::default()).unwrap();
941
942 for i in 1..=10 {
943 let lsn = engine
944 .send(WalOp::Insert {
945 table_id: 1,
946 row_id: i as u64,
947 values: vec![SqlValue::Integer(i)],
948 })
949 .unwrap();
950 assert_eq!(lsn, i as u64);
951 }
952
953 assert_eq!(engine.next_lsn(), 11);
954 assert_eq!(engine.stats().entries_sent, 10);
955 }
956
957 #[test]
958 fn test_sync_flushes_entries() {
959 let buf = Vec::new();
960 let cursor = Cursor::new(buf);
961
962 let mut engine =
963 PersistenceEngine::with_writer(cursor, PersistenceConfig::default()).unwrap();
964
965 for i in 1..=5 {
967 engine
968 .send(WalOp::Insert {
969 table_id: 1,
970 row_id: i as u64,
971 values: vec![SqlValue::Integer(i)],
972 })
973 .unwrap();
974 }
975
976 engine.sync().unwrap();
978
979 let stats = engine.stats();
980 assert_eq!(stats.entries_sent, 5);
981 assert!(stats.entries_written >= 5);
982
983 engine.shutdown().unwrap();
984 }
985
986 #[test]
987 fn test_shutdown_flushes_pending() {
988 let buf = Vec::new();
989 let cursor = Cursor::new(buf);
990
991 let mut engine =
992 PersistenceEngine::with_writer(cursor, PersistenceConfig::default()).unwrap();
993
994 for i in 1..=3 {
996 engine
997 .send(WalOp::Insert {
998 table_id: 1,
999 row_id: i as u64,
1000 values: vec![SqlValue::Integer(i)],
1001 })
1002 .unwrap();
1003 }
1004
1005 engine.shutdown().unwrap();
1007
1008 assert!(engine.is_shutdown());
1009 }
1010
1011 #[test]
1012 fn test_config_defaults() {
1013 let config = PersistenceConfig::default();
1014 assert_eq!(config.channel_capacity, DEFAULT_CHANNEL_CAPACITY);
1015 assert_eq!(config.flush_interval_ms, DEFAULT_FLUSH_INTERVAL_MS);
1016 assert_eq!(config.flush_count, DEFAULT_FLUSH_COUNT);
1017 }
1018
1019 #[test]
1020 fn test_flush_non_blocking() {
1021 let buf = Vec::new();
1022 let cursor = Cursor::new(buf);
1023
1024 let engine = PersistenceEngine::with_writer(cursor, PersistenceConfig::default()).unwrap();
1025
1026 engine
1027 .send(WalOp::Insert { table_id: 1, row_id: 1, values: vec![SqlValue::Integer(1)] })
1028 .unwrap();
1029
1030 engine.flush().unwrap();
1032 }
1033
1034 #[cfg(not(target_arch = "wasm32"))]
1035 #[test]
1036 fn test_sync_with_timeout() {
1037 let buf = Vec::new();
1038 let cursor = Cursor::new(buf);
1039
1040 let mut engine =
1041 PersistenceEngine::with_writer(cursor, PersistenceConfig::default()).unwrap();
1042
1043 engine
1044 .send(WalOp::Insert { table_id: 1, row_id: 1, values: vec![SqlValue::Integer(1)] })
1045 .unwrap();
1046
1047 let completed = engine.sync_timeout(Duration::from_secs(5)).unwrap();
1049 assert!(completed);
1050
1051 engine.shutdown().unwrap();
1052 }
1053
1054 #[test]
1055 fn test_count_based_flush() {
1056 let buf = Vec::new();
1057 let cursor = Cursor::new(buf);
1058
1059 let config = PersistenceConfig { flush_count: 5, ..Default::default() };
1061
1062 let mut engine = PersistenceEngine::with_writer(cursor, config).unwrap();
1063
1064 for i in 1..=10 {
1066 engine
1067 .send(WalOp::Insert {
1068 table_id: 1,
1069 row_id: i as u64,
1070 values: vec![SqlValue::Integer(i)],
1071 })
1072 .unwrap();
1073 }
1074
1075 std::thread::sleep(Duration::from_millis(100));
1077
1078 let stats = engine.stats();
1079 assert!(stats.count_flushes >= 1);
1081
1082 engine.shutdown().unwrap();
1083 }
1084
1085 #[test]
1086 fn test_flush_latency_tracking() {
1087 let buf = Vec::new();
1088 let cursor = Cursor::new(buf);
1089
1090 let mut engine =
1091 PersistenceEngine::with_writer(cursor, PersistenceConfig::default()).unwrap();
1092
1093 for i in 1..=5 {
1095 engine
1096 .send(WalOp::Insert {
1097 table_id: 1,
1098 row_id: i as u64,
1099 values: vec![SqlValue::Integer(i)],
1100 })
1101 .unwrap();
1102 }
1103
1104 engine.sync().unwrap();
1106
1107 let stats = engine.stats();
1108
1109 assert!(
1112 stats.avg_flush_latency_us > 0 || stats.max_flush_latency_us > 0,
1113 "Flush latency should be recorded after sync"
1114 );
1115
1116 assert!(
1118 stats.max_flush_latency_us >= stats.avg_flush_latency_us,
1119 "Max latency should be >= avg latency"
1120 );
1121
1122 engine.shutdown().unwrap();
1123 }
1124
1125 #[test]
1126 fn test_flush_latency_max_tracking() {
1127 let buf = Vec::new();
1128 let cursor = Cursor::new(buf);
1129
1130 let config = PersistenceConfig { flush_count: 2, ..Default::default() };
1132
1133 let mut engine = PersistenceEngine::with_writer(cursor, config).unwrap();
1134
1135 for i in 1..=10 {
1137 engine
1138 .send(WalOp::Insert {
1139 table_id: 1,
1140 row_id: i as u64,
1141 values: vec![SqlValue::Integer(i)],
1142 })
1143 .unwrap();
1144 }
1145
1146 std::thread::sleep(Duration::from_millis(100));
1148
1149 engine.sync().unwrap();
1151
1152 let stats = engine.stats();
1153
1154 assert!(stats.batches_written >= 1, "At least one batch should have been written");
1156
1157 assert!(
1161 stats.max_flush_latency_us >= stats.avg_flush_latency_us,
1162 "Max latency ({}) should be >= avg latency ({})",
1163 stats.max_flush_latency_us,
1164 stats.avg_flush_latency_us
1165 );
1166
1167 engine.shutdown().unwrap();
1168 }
1169
1170 #[test]
1171 fn test_volatile_mode_discards_entries() {
1172 let buf = Vec::new();
1173 let cursor = Cursor::new(buf);
1174
1175 let config = PersistenceConfig::volatile();
1177 let mut engine = PersistenceEngine::with_writer(cursor, config).unwrap();
1178
1179 for i in 1..=5 {
1181 let lsn = engine
1182 .send(WalOp::Insert {
1183 table_id: 1,
1184 row_id: i as u64,
1185 values: vec![SqlValue::Integer(i)],
1186 })
1187 .unwrap();
1188 assert_eq!(lsn, i as u64);
1190 }
1191
1192 let stats = engine.stats();
1194 assert_eq!(stats.volatile_discards, 5);
1195 assert_eq!(stats.entries_sent, 0);
1196
1197 assert_eq!(engine.durability_mode(), DurabilityMode::Volatile);
1199 assert!(!engine.config().writes_wal());
1200
1201 engine.shutdown().unwrap();
1202 }
1203
1204 #[test]
1205 fn test_lazy_mode_sends_entries() {
1206 let buf = Vec::new();
1207 let cursor = Cursor::new(buf);
1208
1209 let config = PersistenceConfig::lazy();
1211 let mut engine = PersistenceEngine::with_writer(cursor, config).unwrap();
1212
1213 for i in 1..=3 {
1215 engine
1216 .send(WalOp::Insert {
1217 table_id: 1,
1218 row_id: i as u64,
1219 values: vec![SqlValue::Integer(i)],
1220 })
1221 .unwrap();
1222 }
1223
1224 let stats = engine.stats();
1226 assert_eq!(stats.entries_sent, 3);
1227 assert_eq!(stats.volatile_discards, 0);
1228
1229 assert_eq!(engine.durability_mode(), DurabilityMode::Lazy);
1231 assert!(engine.config().writes_wal());
1232 assert!(!engine.config().sync_on_commit());
1233
1234 engine.shutdown().unwrap();
1235 }
1236
1237 #[test]
1238 fn test_config_presets() {
1239 let volatile = PersistenceConfig::volatile();
1241 assert_eq!(volatile.durability_mode, DurabilityMode::Volatile);
1242 assert!(!volatile.writes_wal());
1243
1244 let lazy = PersistenceConfig::lazy();
1246 assert_eq!(lazy.durability_mode, DurabilityMode::Lazy);
1247 assert!(lazy.writes_wal());
1248 assert!(!lazy.sync_on_commit());
1249
1250 let durable = PersistenceConfig::durable();
1252 assert_eq!(durable.durability_mode, DurabilityMode::Durable);
1253 assert!(durable.writes_wal());
1254 assert!(durable.sync_on_commit());
1255 assert!(!durable.sync_on_every_op());
1256
1257 let paranoid = PersistenceConfig::paranoid();
1259 assert_eq!(paranoid.durability_mode, DurabilityMode::Paranoid);
1260 assert!(paranoid.writes_wal());
1261 assert!(paranoid.sync_on_commit());
1262 assert!(paranoid.sync_on_every_op());
1263 }
1264
1265 #[test]
1266 fn test_config_from_durability_config() {
1267 use crate::wal::durability::DurabilityConfig;
1268
1269 let dur_config = DurabilityConfig::durable();
1270 let config = PersistenceConfig::from_durability_config(&dur_config);
1271
1272 assert_eq!(config.durability_mode, DurabilityMode::Durable);
1273 assert_eq!(config.flush_interval_ms, dur_config.wal_flush_interval_ms);
1274 assert_eq!(config.flush_count, dur_config.wal_flush_batch_size);
1275 }
1276
1277 #[test]
1278 fn test_durability_mode_getter() {
1279 for (mode, config) in [
1281 (DurabilityMode::Volatile, PersistenceConfig::volatile()),
1282 (DurabilityMode::Lazy, PersistenceConfig::lazy()),
1283 (DurabilityMode::Durable, PersistenceConfig::durable()),
1284 (DurabilityMode::Paranoid, PersistenceConfig::paranoid()),
1285 ] {
1286 let buf: Vec<u8> = Vec::new();
1287 let cursor = Cursor::new(buf);
1288 let mut engine = PersistenceEngine::with_writer(cursor, config).unwrap();
1289 assert_eq!(engine.durability_mode(), mode);
1290 engine.shutdown().unwrap();
1291 }
1292 }
1293}