vibesql_storage/wal/
engine.rs

1// ============================================================================
2// Persistence Engine
3// ============================================================================
4//
5// Async persistence engine that receives WAL entries from the main thread
6// and writes them to disk in batches using a dedicated background thread.
7//
8// ## Architecture
9//
10// ```text
11// ┌─────────────────┐     bounded channel      ┌──────────────────┐
12// │  Main Thread    │ ──────────────────────▶  │  WAL Writer      │
13// │  (DB ops)       │   WalEntry messages      │  Thread          │
14// └─────────────────┘                          └────────┬─────────┘
15//                                                       │
16//                                                       ▼
17//                                              ┌──────────────────┐
18//                                              │  WAL File        │
19//                                              └──────────────────┘
20// ```
21//
22// ## Batching Strategy
23//
24// Entries are batched before writing to reduce fsync overhead:
25// - Time-based: flush every N ms (default 50ms)
26// - Count-based: flush every M entries (default 1000)
27// - Whichever threshold is reached first triggers a flush
28//
29// ## WASM Compatibility
30//
31// WASM builds don't have threads, so the engine uses a buffered
32// no-op implementation that stores entries in memory.
33
34use super::{
35    durability::{DurabilityConfig, DurabilityMode},
36    entry::{Lsn, WalEntry, WalOp},
37    writer::WalWriter,
38};
39use crate::StorageError;
40
41/// Default channel capacity (number of entries)
42pub const DEFAULT_CHANNEL_CAPACITY: usize = 10_000;
43
44/// Default time-based flush threshold in milliseconds
45pub const DEFAULT_FLUSH_INTERVAL_MS: u64 = 50;
46
47/// Default count-based flush threshold
48pub const DEFAULT_FLUSH_COUNT: usize = 1000;
49
50/// Configuration for the persistence engine
51#[derive(Debug, Clone)]
52pub struct PersistenceConfig {
53    /// Maximum number of entries in the channel before backpressure
54    pub channel_capacity: usize,
55    /// Time-based flush interval in milliseconds
56    pub flush_interval_ms: u64,
57    /// Count-based flush threshold
58    pub flush_count: usize,
59    /// Durability mode controlling WAL behavior
60    pub durability_mode: DurabilityMode,
61}
62
63impl Default for PersistenceConfig {
64    fn default() -> Self {
65        Self {
66            channel_capacity: DEFAULT_CHANNEL_CAPACITY,
67            flush_interval_ms: DEFAULT_FLUSH_INTERVAL_MS,
68            flush_count: DEFAULT_FLUSH_COUNT,
69            durability_mode: DurabilityMode::Lazy,
70        }
71    }
72}
73
74impl PersistenceConfig {
75    /// Create configuration from a DurabilityConfig
76    pub fn from_durability_config(config: &DurabilityConfig) -> Self {
77        Self {
78            channel_capacity: DEFAULT_CHANNEL_CAPACITY,
79            flush_interval_ms: config.wal_flush_interval_ms,
80            flush_count: config.wal_flush_batch_size,
81            durability_mode: config.mode,
82        }
83    }
84
85    /// Create configuration for volatile mode (no persistence)
86    pub fn volatile() -> Self {
87        Self {
88            channel_capacity: DEFAULT_CHANNEL_CAPACITY,
89            flush_interval_ms: 0,
90            flush_count: usize::MAX,
91            durability_mode: DurabilityMode::Volatile,
92        }
93    }
94
95    /// Create configuration for lazy mode (batched writes)
96    pub fn lazy() -> Self {
97        Self {
98            channel_capacity: DEFAULT_CHANNEL_CAPACITY,
99            flush_interval_ms: DEFAULT_FLUSH_INTERVAL_MS,
100            flush_count: DEFAULT_FLUSH_COUNT,
101            durability_mode: DurabilityMode::Lazy,
102        }
103    }
104
105    /// Create configuration for durable mode (sync on commit)
106    pub fn durable() -> Self {
107        Self {
108            channel_capacity: DEFAULT_CHANNEL_CAPACITY,
109            flush_interval_ms: 0,
110            flush_count: 1,
111            durability_mode: DurabilityMode::Durable,
112        }
113    }
114
115    /// Create configuration for paranoid mode (sync on every op)
116    pub fn paranoid() -> Self {
117        Self {
118            channel_capacity: DEFAULT_CHANNEL_CAPACITY,
119            flush_interval_ms: 0,
120            flush_count: 1,
121            durability_mode: DurabilityMode::Paranoid,
122        }
123    }
124
125    /// Returns true if this configuration writes to WAL
126    pub fn writes_wal(&self) -> bool {
127        self.durability_mode.writes_wal()
128    }
129
130    /// Returns true if this configuration syncs on commit
131    pub fn sync_on_commit(&self) -> bool {
132        self.durability_mode.sync_on_commit()
133    }
134
135    /// Returns true if this configuration syncs on every operation
136    pub fn sync_on_every_op(&self) -> bool {
137        self.durability_mode.sync_on_every_op()
138    }
139}
140
141/// Message types sent to the WAL writer thread
142#[derive(Debug)]
143pub enum WalMessage {
144    /// A WAL entry to persist
145    Entry(WalEntry),
146    /// Force an immediate flush
147    Flush,
148    /// Flush and send completion notification
149    FlushAndNotify(FlushNotifier),
150    /// Shutdown the writer thread
151    Shutdown,
152}
153
154// ============================================================================
155// Native (non-WASM) Implementation
156// ============================================================================
157
158#[cfg(not(target_arch = "wasm32"))]
159mod native {
160    use std::{
161        fs::OpenOptions,
162        io::{BufWriter, Seek, Write},
163        path::Path,
164        sync::{
165            mpsc::{self, RecvTimeoutError, SyncSender},
166            Arc,
167        },
168        thread::{self, JoinHandle},
169        time::{Duration, Instant},
170    };
171
172    use parking_lot::Mutex;
173
174    use super::*;
175
176    /// Flush completion notifier using a condition variable
177    #[derive(Clone)]
178    pub struct FlushNotifier {
179        completed: Arc<(parking_lot::Mutex<bool>, parking_lot::Condvar)>,
180    }
181
182    impl std::fmt::Debug for FlushNotifier {
183        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
184            f.debug_struct("FlushNotifier").finish()
185        }
186    }
187
188    impl FlushNotifier {
189        pub fn new() -> Self {
190            Self {
191                completed: Arc::new((parking_lot::Mutex::new(false), parking_lot::Condvar::new())),
192            }
193        }
194
195        /// Signal that the flush is complete
196        pub fn notify(&self) {
197            let (lock, cvar) = &*self.completed;
198            let mut completed = lock.lock();
199            *completed = true;
200            cvar.notify_all();
201        }
202
203        /// Wait for the flush to complete
204        pub fn wait(&self) {
205            let (lock, cvar) = &*self.completed;
206            let mut completed = lock.lock();
207            while !*completed {
208                cvar.wait(&mut completed);
209            }
210        }
211
212        /// Wait for the flush to complete with a timeout
213        pub fn wait_timeout(&self, timeout: Duration) -> bool {
214            let (lock, cvar) = &*self.completed;
215            let mut completed = lock.lock();
216            while !*completed {
217                let result = cvar.wait_for(&mut completed, timeout);
218                if result.timed_out() {
219                    return false;
220                }
221            }
222            true
223        }
224    }
225
226    impl Default for FlushNotifier {
227        fn default() -> Self {
228            Self::new()
229        }
230    }
231
232    /// Statistics about the persistence engine
233    #[derive(Debug, Clone, Default)]
234    pub struct PersistenceStats {
235        /// Total entries sent to the engine
236        pub entries_sent: u64,
237        /// Total entries written to disk
238        pub entries_written: u64,
239        /// Total batches written
240        pub batches_written: u64,
241        /// Total bytes written
242        pub bytes_written: u64,
243        /// Number of time-based flushes
244        pub time_flushes: u64,
245        /// Number of count-based flushes
246        pub count_flushes: u64,
247        /// Number of explicit flushes
248        pub explicit_flushes: u64,
249        /// Number of entries discarded in volatile mode
250        pub volatile_discards: u64,
251        /// Number of sync-on-commit flushes (Durable/Paranoid mode)
252        pub commit_syncs: u64,
253        /// Number of sync-on-op flushes (Paranoid mode only)
254        pub op_syncs: u64,
255        /// Average flush latency in microseconds
256        pub avg_flush_latency_us: u64,
257        /// Maximum flush latency in microseconds
258        pub max_flush_latency_us: u64,
259        /// Number of pending entries in channel (snapshot)
260        pub pending_entries: u64,
261        /// Total flush latency in microseconds (used for avg calculation)
262        total_flush_latency_us: u64,
263        /// Number of flush latency samples (used for avg calculation)
264        flush_latency_samples: u64,
265    }
266
267    impl PersistenceStats {
268        /// Record a flush latency measurement
269        pub fn record_flush_latency(&mut self, duration: Duration) {
270            let latency_us = duration.as_micros() as u64;
271
272            // Update max
273            if latency_us > self.max_flush_latency_us {
274                self.max_flush_latency_us = latency_us;
275            }
276
277            // Update running average
278            self.total_flush_latency_us += latency_us;
279            self.flush_latency_samples += 1;
280            self.avg_flush_latency_us = self.total_flush_latency_us / self.flush_latency_samples;
281        }
282    }
283
284    /// Persistence engine that manages async WAL writing
285    pub struct PersistenceEngine {
286        /// Sender for WAL messages
287        sender: SyncSender<WalMessage>,
288        /// Handle to the writer thread
289        handle: Option<JoinHandle<()>>,
290        /// Shared statistics
291        stats: Arc<Mutex<PersistenceStats>>,
292        /// Next LSN to assign
293        next_lsn: Arc<Mutex<Lsn>>,
294        /// Whether the engine has been shut down
295        shutdown: Arc<Mutex<bool>>,
296        /// Configuration (stored for durability mode checks)
297        config: PersistenceConfig,
298    }
299
300    impl std::fmt::Debug for PersistenceEngine {
301        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
302            f.debug_struct("PersistenceEngine")
303                .field("next_lsn", &*self.next_lsn.lock())
304                .field("shutdown", &*self.shutdown.lock())
305                .field("stats", &*self.stats.lock())
306                .finish_non_exhaustive()
307        }
308    }
309
310    impl PersistenceEngine {
311        /// Create a new persistence engine writing to a file
312        pub fn new<P: AsRef<Path>>(
313            path: P,
314            config: PersistenceConfig,
315        ) -> Result<Self, StorageError> {
316            let file = OpenOptions::new()
317                .create(true)
318                .append(true)
319                .open(path.as_ref())
320                .map_err(|e| StorageError::IoError(e.to_string()))?;
321
322            let writer = BufWriter::new(file);
323            Self::with_writer(writer, config)
324        }
325
326        /// Create a new persistence engine with a custom writer
327        pub fn with_writer<W: Write + Seek + Send + 'static>(
328            writer: W,
329            config: PersistenceConfig,
330        ) -> Result<Self, StorageError> {
331            let (sender, receiver) = mpsc::sync_channel(config.channel_capacity);
332            let stats = Arc::new(Mutex::new(PersistenceStats::default()));
333            let next_lsn = Arc::new(Mutex::new(1u64));
334            let shutdown = Arc::new(Mutex::new(false));
335
336            let stats_clone = stats.clone();
337            let flush_interval = Duration::from_millis(config.flush_interval_ms);
338            let flush_count = config.flush_count;
339
340            let handle = thread::spawn(move || {
341                Self::writer_loop(writer, receiver, stats_clone, flush_interval, flush_count);
342            });
343
344            Ok(Self { sender, handle: Some(handle), stats, next_lsn, shutdown, config })
345        }
346
347        /// The main writer loop running in the background thread
348        fn writer_loop<W: Write + Seek>(
349            writer: W,
350            receiver: mpsc::Receiver<WalMessage>,
351            stats: Arc<Mutex<PersistenceStats>>,
352            flush_interval: Duration,
353            flush_count: usize,
354        ) {
355            let mut wal_writer = match WalWriter::create(writer) {
356                Ok(w) => w,
357                Err(e) => {
358                    log::error!("Failed to create WAL writer: {}", e);
359                    return;
360                }
361            };
362
363            // Cap batch capacity at a reasonable size to prevent allocation overflow
364            let batch_capacity = flush_count.min(DEFAULT_FLUSH_COUNT);
365            let mut batch: Vec<WalEntry> = Vec::with_capacity(batch_capacity);
366            let mut last_flush = Instant::now();
367            let mut pending_notifiers: Vec<FlushNotifier> = Vec::new();
368
369            loop {
370                // Calculate timeout until next time-based flush
371                let elapsed = last_flush.elapsed();
372                let timeout = flush_interval.saturating_sub(elapsed);
373
374                match receiver.recv_timeout(timeout) {
375                    Ok(WalMessage::Entry(entry)) => {
376                        batch.push(entry);
377
378                        // Count-based flush
379                        if batch.len() >= flush_count {
380                            Self::flush_batch(
381                                &mut wal_writer,
382                                &mut batch,
383                                &stats,
384                                &mut pending_notifiers,
385                                true,
386                            );
387                            last_flush = Instant::now();
388                        }
389                    }
390                    Ok(WalMessage::Flush) => {
391                        Self::flush_batch(
392                            &mut wal_writer,
393                            &mut batch,
394                            &stats,
395                            &mut pending_notifiers,
396                            false,
397                        );
398                        last_flush = Instant::now();
399                        stats.lock().explicit_flushes += 1;
400                    }
401                    Ok(WalMessage::FlushAndNotify(notifier)) => {
402                        pending_notifiers.push(notifier);
403                        Self::flush_batch(
404                            &mut wal_writer,
405                            &mut batch,
406                            &stats,
407                            &mut pending_notifiers,
408                            false,
409                        );
410                        last_flush = Instant::now();
411                        stats.lock().explicit_flushes += 1;
412                    }
413                    Ok(WalMessage::Shutdown) => {
414                        // Flush remaining entries before shutdown
415                        Self::flush_batch(
416                            &mut wal_writer,
417                            &mut batch,
418                            &stats,
419                            &mut pending_notifiers,
420                            false,
421                        );
422                        break;
423                    }
424                    Err(RecvTimeoutError::Timeout) => {
425                        // Time-based flush
426                        if !batch.is_empty() {
427                            Self::flush_batch(
428                                &mut wal_writer,
429                                &mut batch,
430                                &stats,
431                                &mut pending_notifiers,
432                                false,
433                            );
434                            stats.lock().time_flushes += 1;
435                        }
436                        last_flush = Instant::now();
437                    }
438                    Err(RecvTimeoutError::Disconnected) => {
439                        // Channel closed, flush and exit
440                        Self::flush_batch(
441                            &mut wal_writer,
442                            &mut batch,
443                            &stats,
444                            &mut pending_notifiers,
445                            false,
446                        );
447                        break;
448                    }
449                }
450            }
451
452            log::debug!("WAL writer thread shutting down");
453        }
454
455        /// Flush a batch of entries to disk
456        fn flush_batch<W: Write + Seek>(
457            writer: &mut WalWriter<W>,
458            batch: &mut Vec<WalEntry>,
459            stats: &Arc<Mutex<PersistenceStats>>,
460            pending_notifiers: &mut Vec<FlushNotifier>,
461            is_count_flush: bool,
462        ) {
463            if batch.is_empty() {
464                // Still notify waiters even if batch is empty
465                for notifier in pending_notifiers.drain(..) {
466                    notifier.notify();
467                }
468                return;
469            }
470
471            let mut bytes_written = 0u64;
472            let entries_count = batch.len() as u64;
473
474            // Start timing the flush operation
475            let flush_start = Instant::now();
476
477            for entry in batch.drain(..) {
478                // Estimate bytes (actual size varies with entry content)
479                bytes_written += 32; // Approximate overhead per entry
480                if let Err(e) = writer.append(&entry) {
481                    log::error!("Failed to write WAL entry: {}", e);
482                    // Continue trying to write other entries
483                }
484            }
485
486            if let Err(e) = writer.flush() {
487                log::error!("Failed to flush WAL: {}", e);
488            }
489
490            // Measure flush latency
491            let flush_duration = flush_start.elapsed();
492
493            // Update stats
494            {
495                let mut stats = stats.lock();
496                stats.entries_written += entries_count;
497                stats.batches_written += 1;
498                stats.bytes_written += bytes_written;
499                if is_count_flush {
500                    stats.count_flushes += 1;
501                }
502                stats.record_flush_latency(flush_duration);
503            }
504
505            // Notify all waiters
506            for notifier in pending_notifiers.drain(..) {
507                notifier.notify();
508            }
509        }
510
511        /// Send a WAL operation to be persisted
512        ///
513        /// This method is non-blocking unless the channel is full (backpressure).
514        /// Returns the LSN assigned to the entry.
515        ///
516        /// In volatile mode, entries are not sent to the WAL - just an LSN is returned.
517        pub fn send(&self, op: WalOp) -> Result<Lsn, StorageError> {
518            let lsn = {
519                let mut next_lsn = self.next_lsn.lock();
520                let lsn = *next_lsn;
521                *next_lsn += 1;
522                lsn
523            };
524
525            // In volatile mode, skip WAL entirely
526            if !self.config.writes_wal() {
527                self.stats.lock().volatile_discards += 1;
528                return Ok(lsn);
529            }
530
531            let timestamp_ms = current_timestamp_ms();
532            let entry = WalEntry::new(lsn, timestamp_ms, op);
533
534            self.sender
535                .send(WalMessage::Entry(entry))
536                .map_err(|_| StorageError::IoError("WAL writer thread terminated".to_string()))?;
537
538            self.stats.lock().entries_sent += 1;
539
540            Ok(lsn)
541        }
542
543        /// Send a pre-built WAL entry to be persisted
544        ///
545        /// In volatile mode, entries are not sent to the WAL.
546        pub fn send_entry(&self, entry: WalEntry) -> Result<Lsn, StorageError> {
547            let lsn = entry.lsn;
548
549            // Update next_lsn if needed
550            {
551                let mut next_lsn = self.next_lsn.lock();
552                if entry.lsn >= *next_lsn {
553                    *next_lsn = entry.lsn + 1;
554                }
555            }
556
557            // In volatile mode, skip WAL entirely
558            if !self.config.writes_wal() {
559                self.stats.lock().volatile_discards += 1;
560                return Ok(lsn);
561            }
562
563            self.sender
564                .send(WalMessage::Entry(entry))
565                .map_err(|_| StorageError::IoError("WAL writer thread terminated".to_string()))?;
566
567            self.stats.lock().entries_sent += 1;
568
569            Ok(lsn)
570        }
571
572        /// Trigger an immediate flush of pending entries
573        ///
574        /// This is non-blocking - it signals the writer thread to flush
575        /// but doesn't wait for completion.
576        pub fn flush(&self) -> Result<(), StorageError> {
577            self.sender
578                .send(WalMessage::Flush)
579                .map_err(|_| StorageError::IoError("WAL writer thread terminated".to_string()))
580        }
581
582        /// Flush pending entries and wait for completion
583        ///
584        /// This blocks until all pending entries have been written to disk.
585        pub fn sync(&self) -> Result<(), StorageError> {
586            let notifier = FlushNotifier::new();
587            self.sender
588                .send(WalMessage::FlushAndNotify(notifier.clone()))
589                .map_err(|_| StorageError::IoError("WAL writer thread terminated".to_string()))?;
590            notifier.wait();
591            Ok(())
592        }
593
594        /// Flush pending entries and wait for completion with timeout
595        ///
596        /// Returns true if the flush completed within the timeout, false otherwise.
597        pub fn sync_timeout(&self, timeout: Duration) -> Result<bool, StorageError> {
598            let notifier = FlushNotifier::new();
599            self.sender
600                .send(WalMessage::FlushAndNotify(notifier.clone()))
601                .map_err(|_| StorageError::IoError("WAL writer thread terminated".to_string()))?;
602            Ok(notifier.wait_timeout(timeout))
603        }
604
605        /// Get the next LSN that will be assigned
606        pub fn next_lsn(&self) -> Lsn {
607            *self.next_lsn.lock()
608        }
609
610        /// Get current statistics
611        pub fn stats(&self) -> PersistenceStats {
612            self.stats.lock().clone()
613        }
614
615        /// Get the current durability mode
616        pub fn durability_mode(&self) -> DurabilityMode {
617            self.config.durability_mode
618        }
619
620        /// Get the current configuration
621        pub fn config(&self) -> &PersistenceConfig {
622            &self.config
623        }
624
625        /// Shutdown the persistence engine gracefully
626        ///
627        /// This flushes all pending entries and waits for the writer thread to terminate.
628        pub fn shutdown(&mut self) -> Result<(), StorageError> {
629            {
630                let mut shutdown = self.shutdown.lock();
631                if *shutdown {
632                    return Ok(());
633                }
634                *shutdown = true;
635            }
636
637            // Signal shutdown
638            let _ = self.sender.send(WalMessage::Shutdown);
639
640            // Wait for thread to finish
641            if let Some(handle) = self.handle.take() {
642                handle
643                    .join()
644                    .map_err(|_| StorageError::IoError("WAL writer thread panicked".to_string()))?;
645            }
646
647            Ok(())
648        }
649
650        /// Check if the engine has been shut down
651        pub fn is_shutdown(&self) -> bool {
652            *self.shutdown.lock()
653        }
654    }
655
656    impl Drop for PersistenceEngine {
657        fn drop(&mut self) {
658            if let Err(e) = self.shutdown() {
659                log::error!("Error during PersistenceEngine shutdown: {}", e);
660            }
661        }
662    }
663
664    /// Get current timestamp in milliseconds since epoch
665    fn current_timestamp_ms() -> u64 {
666        use instant::SystemTime;
667        SystemTime::now()
668            .duration_since(instant::SystemTime::UNIX_EPOCH)
669            .map(|d| d.as_millis() as u64)
670            .unwrap_or(0)
671    }
672}
673
674// ============================================================================
675// WASM (no-op/buffered) Implementation
676// ============================================================================
677
678#[cfg(target_arch = "wasm32")]
679mod wasm {
680    use std::sync::{Arc, Mutex};
681
682    use super::*;
683
684    /// Flush completion notifier (no-op for WASM)
685    #[derive(Debug, Clone, Default)]
686    pub struct FlushNotifier;
687
688    impl FlushNotifier {
689        pub fn new() -> Self {
690            Self
691        }
692
693        pub fn notify(&self) {}
694        pub fn wait(&self) {}
695    }
696
697    /// Statistics about the persistence engine (WASM version)
698    #[derive(Debug, Clone, Default)]
699    pub struct PersistenceStats {
700        /// Total entries sent to the engine
701        pub entries_sent: u64,
702        /// Total entries written to disk
703        pub entries_written: u64,
704        /// Total batches written
705        pub batches_written: u64,
706        /// Total bytes written
707        pub bytes_written: u64,
708        /// Number of time-based flushes
709        pub time_flushes: u64,
710        /// Number of count-based flushes
711        pub count_flushes: u64,
712        /// Number of explicit flushes
713        pub explicit_flushes: u64,
714        /// Number of entries discarded in volatile mode
715        pub volatile_discards: u64,
716        /// Number of sync-on-commit flushes (Durable/Paranoid mode)
717        pub commit_syncs: u64,
718        /// Number of sync-on-op flushes (Paranoid mode only)
719        pub op_syncs: u64,
720        /// Average flush latency in microseconds (always 0 for WASM)
721        pub avg_flush_latency_us: u64,
722        /// Maximum flush latency in microseconds (always 0 for WASM)
723        pub max_flush_latency_us: u64,
724        /// Number of pending entries in channel (snapshot)
725        pub pending_entries: u64,
726    }
727
728    /// Persistence engine for WASM (buffered, no background thread)
729    ///
730    /// In WASM, we can't spawn threads, so this implementation buffers
731    /// entries in memory. In a real browser environment, you would
732    /// periodically flush to IndexedDB or the Origin Private File System.
733    pub struct PersistenceEngine {
734        /// Buffered entries
735        buffer: Arc<Mutex<Vec<WalEntry>>>,
736        /// Statistics
737        stats: Arc<Mutex<PersistenceStats>>,
738        /// Next LSN to assign
739        next_lsn: Arc<Mutex<Lsn>>,
740        /// Configuration
741        config: PersistenceConfig,
742    }
743
744    impl std::fmt::Debug for PersistenceEngine {
745        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
746            f.debug_struct("PersistenceEngine")
747                .field("next_lsn", &*self.next_lsn.lock().unwrap())
748                .field("buffer_len", &self.buffer.lock().unwrap().len())
749                .field("config", &self.config)
750                .finish_non_exhaustive()
751        }
752    }
753
754    impl PersistenceEngine {
755        /// Create a new WASM persistence engine
756        pub fn new(config: PersistenceConfig) -> Result<Self, StorageError> {
757            Ok(Self {
758                buffer: Arc::new(Mutex::new(Vec::with_capacity(config.flush_count))),
759                stats: Arc::new(Mutex::new(PersistenceStats::default())),
760                next_lsn: Arc::new(Mutex::new(1u64)),
761                config,
762            })
763        }
764
765        /// Send a WAL operation to be persisted
766        ///
767        /// In volatile mode, entries are not buffered.
768        pub fn send(&self, op: WalOp) -> Result<Lsn, StorageError> {
769            let lsn = {
770                let mut next_lsn = self.next_lsn.lock().unwrap();
771                let lsn = *next_lsn;
772                *next_lsn += 1;
773                lsn
774            };
775
776            // In volatile mode, skip buffering
777            if !self.config.writes_wal() {
778                self.stats.lock().unwrap().volatile_discards += 1;
779                return Ok(lsn);
780            }
781
782            let timestamp_ms = current_timestamp_ms();
783            let entry = WalEntry::new(lsn, timestamp_ms, op);
784
785            {
786                let mut buffer = self.buffer.lock().unwrap();
787                buffer.push(entry);
788            }
789
790            self.stats.lock().unwrap().entries_sent += 1;
791
792            Ok(lsn)
793        }
794
795        /// Send a pre-built WAL entry to be persisted
796        ///
797        /// In volatile mode, entries are not buffered.
798        pub fn send_entry(&self, entry: WalEntry) -> Result<Lsn, StorageError> {
799            let lsn = entry.lsn;
800
801            {
802                let mut next_lsn = self.next_lsn.lock().unwrap();
803                if entry.lsn >= *next_lsn {
804                    *next_lsn = entry.lsn + 1;
805                }
806            }
807
808            // In volatile mode, skip buffering
809            if !self.config.writes_wal() {
810                self.stats.lock().unwrap().volatile_discards += 1;
811                return Ok(lsn);
812            }
813
814            {
815                let mut buffer = self.buffer.lock().unwrap();
816                buffer.push(entry);
817            }
818
819            self.stats.lock().unwrap().entries_sent += 1;
820
821            Ok(lsn)
822        }
823
824        /// Trigger a flush (no-op in WASM - would need async to IndexedDB)
825        pub fn flush(&self) -> Result<(), StorageError> {
826            // In a real implementation, this would schedule an async write
827            // to IndexedDB or OPFS
828            log::debug!("WASM flush requested (buffered only)");
829            Ok(())
830        }
831
832        /// Sync (no-op in WASM)
833        pub fn sync(&self) -> Result<(), StorageError> {
834            self.flush()
835        }
836
837        /// Get the next LSN that will be assigned
838        pub fn next_lsn(&self) -> Lsn {
839            *self.next_lsn.lock().unwrap()
840        }
841
842        /// Get current statistics
843        pub fn stats(&self) -> PersistenceStats {
844            self.stats.lock().unwrap().clone()
845        }
846
847        /// Get the current durability mode
848        pub fn durability_mode(&self) -> DurabilityMode {
849            self.config.durability_mode
850        }
851
852        /// Get the current configuration
853        pub fn config(&self) -> &PersistenceConfig {
854            &self.config
855        }
856
857        /// Get buffered entries (WASM-specific)
858        pub fn buffered_entries(&self) -> Vec<WalEntry> {
859            self.buffer.lock().unwrap().clone()
860        }
861
862        /// Clear buffered entries (WASM-specific)
863        pub fn clear_buffer(&self) -> Vec<WalEntry> {
864            let mut buffer = self.buffer.lock().unwrap();
865            std::mem::take(&mut *buffer)
866        }
867
868        /// Shutdown (no-op in WASM)
869        pub fn shutdown(&mut self) -> Result<(), StorageError> {
870            Ok(())
871        }
872
873        /// Check if the engine has been shut down (always false for WASM)
874        pub fn is_shutdown(&self) -> bool {
875            false
876        }
877    }
878
879    /// Get current timestamp in milliseconds since epoch
880    fn current_timestamp_ms() -> u64 {
881        use instant::SystemTime;
882        SystemTime::now()
883            .duration_since(instant::SystemTime::UNIX_EPOCH)
884            .map(|d| d.as_millis() as u64)
885            .unwrap_or(0)
886    }
887}
888
889// ============================================================================
890// Public API (platform-agnostic)
891// ============================================================================
892
893#[cfg(not(target_arch = "wasm32"))]
894pub use native::{FlushNotifier, PersistenceEngine, PersistenceStats};
895#[cfg(target_arch = "wasm32")]
896pub use wasm::{FlushNotifier, PersistenceEngine, PersistenceStats};
897
898#[cfg(test)]
899mod tests {
900    use std::{io::Cursor, time::Duration};
901
902    use vibesql_types::SqlValue;
903
904    use super::*;
905
906    #[test]
907    fn test_persistence_engine_create() {
908        let buf = Vec::new();
909        let cursor = Cursor::new(buf);
910
911        let engine = PersistenceEngine::with_writer(cursor, PersistenceConfig::default()).unwrap();
912
913        assert_eq!(engine.next_lsn(), 1);
914        assert!(!engine.is_shutdown());
915    }
916
917    #[test]
918    fn test_send_entry() {
919        let buf = Vec::new();
920        let cursor = Cursor::new(buf);
921
922        let engine = PersistenceEngine::with_writer(cursor, PersistenceConfig::default()).unwrap();
923
924        let lsn = engine
925            .send(WalOp::Insert { table_id: 1, row_id: 100, values: vec![SqlValue::Integer(42)] })
926            .unwrap();
927
928        assert_eq!(lsn, 1);
929        assert_eq!(engine.next_lsn(), 2);
930
931        let stats = engine.stats();
932        assert_eq!(stats.entries_sent, 1);
933    }
934
935    #[test]
936    fn test_send_multiple_entries() {
937        let buf = Vec::new();
938        let cursor = Cursor::new(buf);
939
940        let engine = PersistenceEngine::with_writer(cursor, PersistenceConfig::default()).unwrap();
941
942        for i in 1..=10 {
943            let lsn = engine
944                .send(WalOp::Insert {
945                    table_id: 1,
946                    row_id: i as u64,
947                    values: vec![SqlValue::Integer(i)],
948                })
949                .unwrap();
950            assert_eq!(lsn, i as u64);
951        }
952
953        assert_eq!(engine.next_lsn(), 11);
954        assert_eq!(engine.stats().entries_sent, 10);
955    }
956
957    #[test]
958    fn test_sync_flushes_entries() {
959        let buf = Vec::new();
960        let cursor = Cursor::new(buf);
961
962        let mut engine =
963            PersistenceEngine::with_writer(cursor, PersistenceConfig::default()).unwrap();
964
965        // Send some entries
966        for i in 1..=5 {
967            engine
968                .send(WalOp::Insert {
969                    table_id: 1,
970                    row_id: i as u64,
971                    values: vec![SqlValue::Integer(i)],
972                })
973                .unwrap();
974        }
975
976        // Sync should flush all entries
977        engine.sync().unwrap();
978
979        let stats = engine.stats();
980        assert_eq!(stats.entries_sent, 5);
981        assert!(stats.entries_written >= 5);
982
983        engine.shutdown().unwrap();
984    }
985
986    #[test]
987    fn test_shutdown_flushes_pending() {
988        let buf = Vec::new();
989        let cursor = Cursor::new(buf);
990
991        let mut engine =
992            PersistenceEngine::with_writer(cursor, PersistenceConfig::default()).unwrap();
993
994        // Send entries without explicit sync
995        for i in 1..=3 {
996            engine
997                .send(WalOp::Insert {
998                    table_id: 1,
999                    row_id: i as u64,
1000                    values: vec![SqlValue::Integer(i)],
1001                })
1002                .unwrap();
1003        }
1004
1005        // Shutdown should flush pending entries
1006        engine.shutdown().unwrap();
1007
1008        assert!(engine.is_shutdown());
1009    }
1010
1011    #[test]
1012    fn test_config_defaults() {
1013        let config = PersistenceConfig::default();
1014        assert_eq!(config.channel_capacity, DEFAULT_CHANNEL_CAPACITY);
1015        assert_eq!(config.flush_interval_ms, DEFAULT_FLUSH_INTERVAL_MS);
1016        assert_eq!(config.flush_count, DEFAULT_FLUSH_COUNT);
1017    }
1018
1019    #[test]
1020    fn test_flush_non_blocking() {
1021        let buf = Vec::new();
1022        let cursor = Cursor::new(buf);
1023
1024        let engine = PersistenceEngine::with_writer(cursor, PersistenceConfig::default()).unwrap();
1025
1026        engine
1027            .send(WalOp::Insert { table_id: 1, row_id: 1, values: vec![SqlValue::Integer(1)] })
1028            .unwrap();
1029
1030        // Flush should return immediately
1031        engine.flush().unwrap();
1032    }
1033
1034    #[cfg(not(target_arch = "wasm32"))]
1035    #[test]
1036    fn test_sync_with_timeout() {
1037        let buf = Vec::new();
1038        let cursor = Cursor::new(buf);
1039
1040        let mut engine =
1041            PersistenceEngine::with_writer(cursor, PersistenceConfig::default()).unwrap();
1042
1043        engine
1044            .send(WalOp::Insert { table_id: 1, row_id: 1, values: vec![SqlValue::Integer(1)] })
1045            .unwrap();
1046
1047        // Should complete within timeout
1048        let completed = engine.sync_timeout(Duration::from_secs(5)).unwrap();
1049        assert!(completed);
1050
1051        engine.shutdown().unwrap();
1052    }
1053
1054    #[test]
1055    fn test_count_based_flush() {
1056        let buf = Vec::new();
1057        let cursor = Cursor::new(buf);
1058
1059        // Set a low flush count to trigger count-based flush
1060        let config = PersistenceConfig { flush_count: 5, ..Default::default() };
1061
1062        let mut engine = PersistenceEngine::with_writer(cursor, config).unwrap();
1063
1064        // Send more than flush_count entries
1065        for i in 1..=10 {
1066            engine
1067                .send(WalOp::Insert {
1068                    table_id: 1,
1069                    row_id: i as u64,
1070                    values: vec![SqlValue::Integer(i)],
1071                })
1072                .unwrap();
1073        }
1074
1075        // Give the writer thread time to process
1076        std::thread::sleep(Duration::from_millis(100));
1077
1078        let stats = engine.stats();
1079        // Should have triggered at least one count-based flush
1080        assert!(stats.count_flushes >= 1);
1081
1082        engine.shutdown().unwrap();
1083    }
1084
1085    #[test]
1086    fn test_flush_latency_tracking() {
1087        let buf = Vec::new();
1088        let cursor = Cursor::new(buf);
1089
1090        let mut engine =
1091            PersistenceEngine::with_writer(cursor, PersistenceConfig::default()).unwrap();
1092
1093        // Send some entries
1094        for i in 1..=5 {
1095            engine
1096                .send(WalOp::Insert {
1097                    table_id: 1,
1098                    row_id: i as u64,
1099                    values: vec![SqlValue::Integer(i)],
1100                })
1101                .unwrap();
1102        }
1103
1104        // Sync to trigger a flush
1105        engine.sync().unwrap();
1106
1107        let stats = engine.stats();
1108
1109        // Latency should be recorded (at least one sample)
1110        // avg and max should be > 0 since we did at least one flush
1111        assert!(
1112            stats.avg_flush_latency_us > 0 || stats.max_flush_latency_us > 0,
1113            "Flush latency should be recorded after sync"
1114        );
1115
1116        // Max should be >= avg
1117        assert!(
1118            stats.max_flush_latency_us >= stats.avg_flush_latency_us,
1119            "Max latency should be >= avg latency"
1120        );
1121
1122        engine.shutdown().unwrap();
1123    }
1124
1125    #[test]
1126    fn test_flush_latency_max_tracking() {
1127        let buf = Vec::new();
1128        let cursor = Cursor::new(buf);
1129
1130        // Use low flush count to trigger multiple flushes
1131        let config = PersistenceConfig { flush_count: 2, ..Default::default() };
1132
1133        let mut engine = PersistenceEngine::with_writer(cursor, config).unwrap();
1134
1135        // Send enough entries to trigger multiple flushes
1136        for i in 1..=10 {
1137            engine
1138                .send(WalOp::Insert {
1139                    table_id: 1,
1140                    row_id: i as u64,
1141                    values: vec![SqlValue::Integer(i)],
1142                })
1143                .unwrap();
1144        }
1145
1146        // Give writer thread time to process
1147        std::thread::sleep(Duration::from_millis(100));
1148
1149        // Final sync to ensure all entries flushed
1150        engine.sync().unwrap();
1151
1152        let stats = engine.stats();
1153
1154        // Multiple flushes should have occurred
1155        assert!(stats.batches_written >= 1, "At least one batch should have been written");
1156
1157        // Latency metrics should be populated
1158        // Note: On very fast systems, latency could be 0 microseconds
1159        // so we just verify the tracking doesn't break
1160        assert!(
1161            stats.max_flush_latency_us >= stats.avg_flush_latency_us,
1162            "Max latency ({}) should be >= avg latency ({})",
1163            stats.max_flush_latency_us,
1164            stats.avg_flush_latency_us
1165        );
1166
1167        engine.shutdown().unwrap();
1168    }
1169
1170    #[test]
1171    fn test_volatile_mode_discards_entries() {
1172        let buf = Vec::new();
1173        let cursor = Cursor::new(buf);
1174
1175        // Create engine in volatile mode
1176        let config = PersistenceConfig::volatile();
1177        let mut engine = PersistenceEngine::with_writer(cursor, config).unwrap();
1178
1179        // Send some entries
1180        for i in 1..=5 {
1181            let lsn = engine
1182                .send(WalOp::Insert {
1183                    table_id: 1,
1184                    row_id: i as u64,
1185                    values: vec![SqlValue::Integer(i)],
1186                })
1187                .unwrap();
1188            // LSNs should still be assigned
1189            assert_eq!(lsn, i as u64);
1190        }
1191
1192        // Check stats - entries should be discarded, not sent
1193        let stats = engine.stats();
1194        assert_eq!(stats.volatile_discards, 5);
1195        assert_eq!(stats.entries_sent, 0);
1196
1197        // Verify mode
1198        assert_eq!(engine.durability_mode(), DurabilityMode::Volatile);
1199        assert!(!engine.config().writes_wal());
1200
1201        engine.shutdown().unwrap();
1202    }
1203
1204    #[test]
1205    fn test_lazy_mode_sends_entries() {
1206        let buf = Vec::new();
1207        let cursor = Cursor::new(buf);
1208
1209        // Create engine in lazy mode (default)
1210        let config = PersistenceConfig::lazy();
1211        let mut engine = PersistenceEngine::with_writer(cursor, config).unwrap();
1212
1213        // Send some entries
1214        for i in 1..=3 {
1215            engine
1216                .send(WalOp::Insert {
1217                    table_id: 1,
1218                    row_id: i as u64,
1219                    values: vec![SqlValue::Integer(i)],
1220                })
1221                .unwrap();
1222        }
1223
1224        // Check stats - entries should be sent
1225        let stats = engine.stats();
1226        assert_eq!(stats.entries_sent, 3);
1227        assert_eq!(stats.volatile_discards, 0);
1228
1229        // Verify mode
1230        assert_eq!(engine.durability_mode(), DurabilityMode::Lazy);
1231        assert!(engine.config().writes_wal());
1232        assert!(!engine.config().sync_on_commit());
1233
1234        engine.shutdown().unwrap();
1235    }
1236
1237    #[test]
1238    fn test_config_presets() {
1239        // Volatile mode
1240        let volatile = PersistenceConfig::volatile();
1241        assert_eq!(volatile.durability_mode, DurabilityMode::Volatile);
1242        assert!(!volatile.writes_wal());
1243
1244        // Lazy mode
1245        let lazy = PersistenceConfig::lazy();
1246        assert_eq!(lazy.durability_mode, DurabilityMode::Lazy);
1247        assert!(lazy.writes_wal());
1248        assert!(!lazy.sync_on_commit());
1249
1250        // Durable mode
1251        let durable = PersistenceConfig::durable();
1252        assert_eq!(durable.durability_mode, DurabilityMode::Durable);
1253        assert!(durable.writes_wal());
1254        assert!(durable.sync_on_commit());
1255        assert!(!durable.sync_on_every_op());
1256
1257        // Paranoid mode
1258        let paranoid = PersistenceConfig::paranoid();
1259        assert_eq!(paranoid.durability_mode, DurabilityMode::Paranoid);
1260        assert!(paranoid.writes_wal());
1261        assert!(paranoid.sync_on_commit());
1262        assert!(paranoid.sync_on_every_op());
1263    }
1264
1265    #[test]
1266    fn test_config_from_durability_config() {
1267        use crate::wal::durability::DurabilityConfig;
1268
1269        let dur_config = DurabilityConfig::durable();
1270        let config = PersistenceConfig::from_durability_config(&dur_config);
1271
1272        assert_eq!(config.durability_mode, DurabilityMode::Durable);
1273        assert_eq!(config.flush_interval_ms, dur_config.wal_flush_interval_ms);
1274        assert_eq!(config.flush_count, dur_config.wal_flush_batch_size);
1275    }
1276
1277    #[test]
1278    fn test_durability_mode_getter() {
1279        // Test each mode
1280        for (mode, config) in [
1281            (DurabilityMode::Volatile, PersistenceConfig::volatile()),
1282            (DurabilityMode::Lazy, PersistenceConfig::lazy()),
1283            (DurabilityMode::Durable, PersistenceConfig::durable()),
1284            (DurabilityMode::Paranoid, PersistenceConfig::paranoid()),
1285        ] {
1286            let buf: Vec<u8> = Vec::new();
1287            let cursor = Cursor::new(buf);
1288            let mut engine = PersistenceEngine::with_writer(cursor, config).unwrap();
1289            assert_eq!(engine.durability_mode(), mode);
1290            engine.shutdown().unwrap();
1291        }
1292    }
1293}