Skip to main content

moltendb_core/engine/storage/
disk.rs

1// ─── disk.rs ─────────────────────────────────────────────────────────────────
2// This file implements two concrete StorageBackend implementations that write
3// the database log to a real file on disk:
4//
5//   • AsyncDiskStorage  — high-throughput, non-blocking writes via an MPSC
6//                         channel + background Tokio task. Writes are buffered
7//                         in memory and flushed to disk every 50 ms. Ideal for
8//                         analytics / high-write workloads where losing the last
9//                         50 ms of data on a crash is acceptable.
10//
11//   • SyncDiskStorage   — every write blocks until the OS confirms the data is
12//                         on disk (flush after every entry). Zero data loss on
13//                         crash, but much lower throughput. Enabled by setting
14//                         the WRITE_MODE=sync environment variable.
15//
16// Both implementations share the same binary snapshot + streaming log replay
17// logic for fast startup (see "Snapshot helpers" section below).
18// ─────────────────────────────────────────────────────────────────────────────
19
20// StorageBackend is the trait defined in mod.rs that both structs implement.
21use super::StorageBackend;
22// LogEntry is the unit of data written to the log (cmd, collection, key, value).
23// DbError is our custom error enum.
24use crate::engine::types::{DbError, LogEntry};
25// Standard library file I/O types.
26use std::ops::ControlFlow;
27use std::fs::{File, OpenOptions};
28use std::path::Path;
29use std::time::SystemTime;
30// BufRead lets us iterate a file line-by-line without loading it all into RAM.
31// BufWriter batches small writes into larger OS-level write calls for efficiency.
32use std::io::{BufRead, BufReader, BufWriter, Write};
33// Arc = thread-safe reference-counted pointer (shared ownership across threads).
34// Mutex = mutual exclusion lock (only one thread can hold it at a time).
35use std::sync::{Arc, Mutex};
36// Tokio's async MPSC channel: multiple producers, single consumer.
37// Used to send log lines from the write path to the background flush task.
38use tokio::sync::mpsc;
39use tokio::task::JoinHandle;
40
41// ─── Snapshot helpers ────────────────────────────────────────────────────────
42//
43// A "snapshot" is a compact binary file that captures the entire current state
44// of the database at a point in time. On the next startup we load the snapshot
45// first (fast binary deserialization) and then only replay the log lines that
46// were written AFTER the snapshot was taken — instead of replaying the entire
47// log from the beginning. This dramatically reduces startup time for large DBs.
48//
49// Snapshot file format (binary, little-endian):
50//   [8 bytes]  magic header: "MOLTSNAP"
51//   [8 bytes]  seq: number of log lines captured in this snapshot
52//   [8 bytes]  count: number of LogEntry records that follow
53//   for each entry:
54//     [8 bytes]  len: byte length of the bincode-encoded entry
55//     [len bytes] bincode-encoded LogEntry
56// ─────────────────────────────────────────────────────────────────────────────
57
58/// Returns the path of the binary snapshot file for a given log file path.
59/// Convention: `my_database.log` → `my_database.log.snapshot.bin`
60pub(super) fn snapshot_path(log_path: &str) -> String {
61    format!("{}.snapshot.bin", log_path)
62}
63
64pub(super) fn write_snapshot(log_path: &str, entries: &[LogEntry], seq: u64) -> Result<(), DbError> {
65    let path = snapshot_path(log_path);
66    // Write to a temp file first so the swap is atomic.
67    let tmp = format!("{}.tmp", path);
68
69    let file = OpenOptions::new()
70        .create(true)   // create if it doesn't exist
71        .write(true)
72        .truncate(true) // overwrite any existing content
73        .open(&tmp)?;
74    let mut w = BufWriter::new(file);
75
76    // Magic header so we can detect corrupt/wrong files on load.
77    w.write_all(b"MOLTSNAP")?;
78    // Sequence number: how many log lines are already captured here.
79    w.write_all(&seq.to_le_bytes())?;
80
81    // Number of entries, so the reader can pre-allocate a Vec.
82    let count = entries.len() as u64;
83    w.write_all(&count.to_le_bytes())?;
84
85    // Each entry is length-prefixed so the reader knows how many bytes to read.
86    for entry in entries {
87        // We use JSON for snapshots as well for now to avoid bincode issues with dynamic Value
88        let encoded = serde_json::to_vec(entry).map_err(|_| DbError::WriteError)?;
89        let len = encoded.len() as u64;
90        w.write_all(&len.to_le_bytes())?;
91        w.write_all(&encoded)?;
92    }
93
94    // Flush the BufWriter to ensure all bytes reach the OS buffer.
95    w.flush()?;
96    // Drop the writer to release the file handle before renaming (required on Windows).
97    drop(w);
98
99    // Before renaming the new snapshot, move the old one to the backup folder.
100    if Path::new(&path).exists() {
101        let log_dir = Path::new(log_path).parent().unwrap_or_else(|| Path::new("."));
102        let backup_dir = log_dir.join("backup");
103        
104        // Ensure backup directory exists
105        std::fs::create_dir_all(&backup_dir)?;
106
107        let now = SystemTime::now()
108            .duration_since(SystemTime::UNIX_EPOCH)
109            .map(|d| d.as_secs())
110            .unwrap_or(0);
111        
112        let filename = Path::new(&path).file_name()
113            .and_then(|n| n.to_str())
114            .unwrap_or("snapshot.bin");
115        
116        let backup_path = backup_dir.join(format!("{}.{}.bak", filename, now));
117        
118        // Move current snapshot to backup
119        let _ = std::fs::rename(&path, &backup_path);
120    }
121
122    // Atomic rename: replaces the old snapshot file in one OS operation.
123    std::fs::rename(&tmp, &path)?;
124    Ok(())
125}
126
127pub(super) fn write_compacted_log_no_tx(path: &str, entries: &[LogEntry]) -> Result<(), DbError> {
128    let temp_file = OpenOptions::new()
129        .create(true)
130        .write(true)
131        .truncate(true) // start fresh — we're rewriting the whole log
132        .open(path)?;
133    let mut temp_writer = BufWriter::new(temp_file);
134
135    // Write each entry as a JSON line, same format as the live log.
136    for entry in entries {
137        writeln!(temp_writer, "{}", serde_json::to_string(&entry)?)?;
138    }
139
140    temp_writer.flush()?;
141    Ok(())
142}
143
144pub(super) fn write_compacted_log(path: &str, entries: &[LogEntry]) -> Result<(), DbError> {
145    let temp_file = OpenOptions::new()
146        .create(true)
147        .write(true)
148        .truncate(true) // start fresh — we're rewriting the whole log
149        .open(path)?;
150    let mut temp_writer = BufWriter::new(temp_file);
151
152    // Write each entry as a JSON line, same format as the live log.
153    for entry in entries {
154        // We write each entry in its own transaction in the compacted log.
155        // This ensures they are replayed correctly even if followed by other log entries.
156        let tx_id = format!("compact-{}", entry.key);
157        
158        let begin = LogEntry {
159            cmd: "TX_BEGIN".to_string(),
160            collection: entry.collection.clone(),
161            key: tx_id.clone(),
162            value: serde_json::Value::Null,
163            _t: entry._t,
164        };
165        writeln!(temp_writer, "{}", serde_json::to_string(&begin)?)?;
166        
167        writeln!(temp_writer, "{}", serde_json::to_string(&entry)?)?;
168        
169        let commit = LogEntry {
170            cmd: "TX_COMMIT".to_string(),
171            collection: entry.collection.clone(),
172            key: tx_id,
173            value: serde_json::Value::Null,
174            _t: entry._t,
175        };
176        writeln!(temp_writer, "{}", serde_json::to_string(&commit)?)?;
177    }
178
179    temp_writer.flush()?;
180    Ok(())
181}
182
183/// Try to load a previously written binary snapshot.
184/// Returns `Some((entries, seq))` on success, or `None` if:
185///   - the snapshot file doesn't exist (first run)
186///   - the magic header doesn't match (corrupt file)
187///   - any read fails (truncated file, wrong format)
188pub(super) fn load_snapshot(log_path: &str) -> Option<(Vec<LogEntry>, u64)> {
189    let path = snapshot_path(log_path);
190    if !Path::new(&path).exists() {
191        return None;
192    }
193    tracing::info!("🔍 Attempting to load snapshot from {}", path);
194    // If the file doesn't exist, open() returns Err and we return None.
195    let mut file = File::open(&path).ok()?;
196
197    use std::io::Read;
198
199    // Validate the magic header — if it doesn't match, the file is not ours.
200    let mut magic = [0u8; 8];
201    file.read_exact(&mut magic).ok()?;
202    if &magic != b"MOLTSNAP" {
203        tracing::warn!("❌ Invalid snapshot magic header");
204        return None; // Not a valid snapshot file
205    }
206
207    // Read the sequence number (how many log lines to skip on replay).
208    let mut seq_bytes = [0u8; 8];
209    file.read_exact(&mut seq_bytes).ok()?;
210    let seq = u64::from_le_bytes(seq_bytes);
211
212    // Read the entry count so we can pre-allocate the Vec.
213    let mut count_bytes = [0u8; 8];
214    file.read_exact(&mut count_bytes).ok()?;
215    let count = u64::from_le_bytes(count_bytes) as usize;
216
217    tracing::info!("📂 Snapshot header: seq={}, count={}", seq, count);
218
219    let mut entries = Vec::with_capacity(count);
220    for i in 0..count {
221        // Read the length prefix for this entry.
222        let mut len_bytes = [0u8; 8];
223        if let Err(e) = file.read_exact(&mut len_bytes) {
224             tracing::error!("❌ Failed to read entry {} length: {}", i, e);
225             return None;
226        }
227        let len = u64::from_le_bytes(len_bytes) as usize;
228
229        // Read exactly `len` bytes and deserialize with JSON.
230        let mut buf = vec![0u8; len];
231        if let Err(e) = file.read_exact(&mut buf) {
232             tracing::error!("❌ Failed to read entry {} data: {}", i, e);
233             return None;
234        }
235
236        // If the entry is all zeros or empty, it might be a partial write
237        if len > 0 && buf.iter().all(|&b| b == 0) {
238            tracing::error!("❌ Entry {} data is all zeros. Snapshot might be corrupt.", i);
239            return None;
240        }
241
242        // If deserialization fails (e.g. schema changed), return None so we
243        // fall back to full log replay instead of crashing.
244        let entry: LogEntry = match serde_json::from_slice(&buf) {
245            Ok(e) => e,
246            Err(err) => {
247                let sample = if buf.len() > 20 { &buf[..20] } else { &buf };
248                tracing::error!(
249                    "❌ Failed to deserialize entry {} (len {}): {}. Sample: {:?}. This usually happens if the snapshot was created with an older version of MoltenDB or is corrupt. Falling back to log replay.",
250                    i, len, err, sample
251                );
252                return None;
253            }
254        };
255        entries.push(entry);
256    }
257
258    Some((entries, seq))
259}
260
261// ─── Streaming log reader ─────────────────────────────────────────────────────
262//
263// The log file is a plain text file where each line is a JSON-encoded LogEntry.
264// Instead of reading the whole file into a Vec<String> and then parsing, we
265// stream it line-by-line so only one entry is in memory at a time.
266// ─────────────────────────────────────────────────────────────────────────────
267
268/// Open the log file at `path` and call `f` for each successfully parsed
269/// `LogEntry`, skipping the first `skip_lines` lines (those are already
270/// covered by a loaded snapshot and don't need to be replayed again).
271///
272/// Lines that fail to parse (e.g. partial writes from a crash) are silently
273/// skipped — the database will simply not see those entries, which is safe
274/// because the in-memory state is rebuilt from what we can read.
275pub fn stream_log_entries<F>(path: &str, skip_lines: u64, mut f: F) -> Result<ControlFlow<(), ()>, DbError>
276where
277    F: FnMut(LogEntry, u32) -> ControlFlow<(), ()>, // closure called once per valid entry + raw byte length
278{
279    // If the file doesn't exist yet (first run), just do nothing.
280    if let Ok(file) = File::open(path) {
281        // BufReader wraps the file with an internal buffer so we don't make
282        // one syscall per byte — it reads in chunks and serves lines from RAM.
283        let reader = BufReader::new(file);
284        for (i, line) in reader.lines().enumerate() {
285            // Skip lines already captured in the snapshot.
286            if (i as u64) < skip_lines {
287                continue;
288            }
289            // Ignore lines that fail to read (e.g. I/O error mid-line).
290            if let Ok(json_str) = line {
291                let length = json_str.len() as u32;
292                // Ignore lines that fail to parse (e.g. partial write on crash).
293                if let Ok(entry) = serde_json::from_str::<LogEntry>(&json_str) {
294                    if let ControlFlow::Break(_) = f(entry, length) {
295                        return Ok(ControlFlow::Break(()));
296                    }
297                }
298            }
299        }
300    }
301    Ok(ControlFlow::Continue(()))
302}
303
304/// Count the total number of lines in the log file.
305/// This is used when writing a snapshot to record the current sequence number
306/// (i.e. "the snapshot covers the first N lines of the log").
307pub(super) fn count_log_lines(path: &str) -> u64 {
308    if let Ok(file) = File::open(path) {
309        // .lines() is lazy — it reads one line at a time, so this doesn't
310        // load the whole file into memory.
311        BufReader::new(file).lines().count() as u64
312    } else {
313        0 // File doesn't exist yet
314    }
315}
316
317// ─── read_log (still needed by EncryptedStorage wrapper) ─────────────────────
318//
319// EncryptedStorage wraps another StorageBackend and decrypts entries before
320// they can be applied to state. Because decryption must happen before we can
321// call apply_entry(), EncryptedStorage uses read_log() (which returns a full
322// Vec) rather than stream_log_into() (which applies entries on the fly).
323// ─────────────────────────────────────────────────────────────────────────────
324
325/// Read all log entries from disk into a Vec<LogEntry>.
326/// This is a convenience wrapper around stream_log_entries that collects
327/// everything into a Vec. Used by EncryptedStorage.
328pub fn read_log_from_disk(path: &str) -> Result<Vec<LogEntry>, DbError> {
329    let mut entries = Vec::new();
330    // skip_lines = 0 means read from the very beginning (no snapshot skip here,
331    // because EncryptedStorage handles its own snapshot logic via read_log).
332    let _ = stream_log_entries(path, 0, |e, _| {
333        entries.push(e);
334        ControlFlow::Continue(())
335    })?;
336    Ok(entries)
337}
338
339// ─── Compacted log writer ─────────────────────────────────────────────────────
340//
341// Compaction rewrites the log file to contain only the current state of the
342// database — removing all superseded INSERT entries and all DELETE tombstones.
343// This keeps the log file from growing unboundedly over time.
344// ─────────────────────────────────────────────────────────────────────────────
345
346// ─── AsyncDiskStorage ─────────────────────────────────────────────────────────
347//
348// Design: the write path is completely non-blocking. When write_entry() is
349// called, it serializes the entry to JSON and sends it over an unbounded MPSC
350// channel. A background Tokio task receives from that channel and writes to a
351// BufWriter. The BufWriter is flushed every 50 ms (on timeout) or whenever
352// the channel is drained.
353//
354// Trade-off: if the process is killed (SIGKILL / power loss) within the 50 ms
355// window, the last few writes may be lost. For analytics workloads this is
356// usually acceptable. Use SyncDiskStorage if you need zero data loss.
357// ─────────────────────────────────────────────────────────────────────────────
358
359/// High-performance async disk writer.
360///
361/// Writes are sent over an MPSC channel and flushed to disk every 50 ms by a
362/// background Tokio task. The write path never blocks the caller.
363pub struct AsyncDiskStorage {
364    /// The sending half of the MPSC channel. Cloning this is cheap — all
365    /// clones share the same underlying channel.
366    sender: Option<mpsc::UnboundedSender<String>>,
367    /// Path to the log file on disk. Stored so we can read/compact it later.
368    path: String,
369    /// Handle to the background writer task. Stored so Drop can await it.
370    writer_task: Option<JoinHandle<()>>,
371}
372
373impl AsyncDiskStorage {
374    /// Open (or create) the log file at `path` and spawn the background writer task.
375    pub fn new(path: &str) -> Result<Self, DbError> {
376        // Create an unbounded MPSC channel.
377        // `log_tx` (sender) is kept in the struct; `log_rx` (receiver) goes to the task.
378        let (log_tx, mut log_rx) = mpsc::unbounded_channel::<String>();
379        let path_clone = path.to_string();
380
381        // Spawn a Tokio task that owns the file handle and BufWriter.
382        // This task runs for the lifetime of the server.
383        let writer_task = tokio::spawn(async move {
384            // Open the file in append mode so existing data is preserved.
385            let file = OpenOptions::new()
386                .create(true)
387                .append(true)
388                .open(&path_clone)
389                .unwrap();
390            let mut w = BufWriter::new(file);
391
392            loop {
393                // Wait up to 50 ms for the next message.
394                // If a message arrives within 50 ms → process it immediately.
395                // If the timeout fires → flush the BufWriter to disk.
396                match tokio::time::timeout(
397                    std::time::Duration::from_millis(50),
398                    log_rx.recv(),
399                )
400                .await
401                {
402                    // A message arrived within the timeout window.
403                    Ok(Some(log_line)) => {
404                        // Special sentinel: the compact() method sends this to
405                        // tell us to swap the log file atomically.
406                        if log_line.starts_with("__RELOAD_FILE__") {
407                            // Extract the temp file path from the sentinel string.
408                            let temp_path = log_line.replace("__RELOAD_FILE__", "");
409                            // println!("🔥 Worker: Reloading file from {}", temp_path);
410
411                            // Flush and close the current file before renaming.
412                            // On Windows, a file cannot be renamed while it's open.
413                            w.flush().unwrap();
414                            drop(w); // Release the file handle / Windows lock
415
416                            // Atomically replace the live log with the compacted version.
417                            if let Err(e) = std::fs::rename(&temp_path, &path_clone) {
418                                tracing::error!("Failed to swap compacted file: {}", e);
419                            }
420
421                            // Re-open the (now compacted) log file for future writes.
422                            let new_file = OpenOptions::new()
423                                .create(true)
424                                .append(true)
425                                .open(&path_clone)
426                                .unwrap();
427                            w = BufWriter::new(new_file);
428                        } else {
429                            // Normal log line — append it to the BufWriter's buffer.
430                            if let Err(e) = writeln!(w, "{}", log_line) {
431                                tracing::error!("Failed to write to disk: {}", e);
432                            }
433                        }
434                    }
435                    // The channel was closed (sender dropped) — the server is shutting down.
436                    // The BufWriter will be dropped here, which flushes its buffer to the OS.
437                    Ok(None) => break,
438                    // Timeout fired — no message in the last 50 ms. Flush buffered data.
439                    Err(_) => {
440                        let _ = w.flush();
441                    }
442                }
443            }
444            // When the loop exits, `w` is dropped here, which flushes the BufWriter.
445            let _ = w.flush();
446        });
447
448        Ok(Self {
449            sender: Some(log_tx),
450            path: path.to_string(),
451            writer_task: Some(writer_task),
452        })
453    }
454}
455
456impl Drop for AsyncDiskStorage {
457    /// On drop, close the sender (signals the writer task to exit) then block
458    /// until the task has drained its queue and flushed everything to disk.
459    fn drop(&mut self) {
460        // Drop the sender — this closes the channel and causes log_rx.recv()
461        // to return None, which breaks the writer task's loop.
462        drop(self.sender.take());
463
464        // Now await the writer task so we don't return until all queued lines
465        // have been written and flushed to the OS.
466        if let Some(handle) = self.writer_task.take() {
467            tokio::task::block_in_place(|| {
468                tokio::runtime::Handle::current().block_on(handle)
469            })
470            .ok();
471        }
472    }
473}
474
475impl StorageBackend for AsyncDiskStorage {
476    /// Serialize `entry` to a JSON string and send it to the background writer.
477    /// This call returns immediately — it never blocks waiting for disk I/O.
478    fn write_entry(&self, entry: &LogEntry) -> Result<(), DbError> {
479        let json_line = serde_json::to_string(entry)?;
480        // send() only fails if the receiver (background task) has been dropped,
481        // which means the server is shutting down.
482        if let Some(ref sender) = self.sender {
483            sender.send(json_line).map_err(|_| DbError::WriteError)?;
484        }
485        Ok(())
486    }
487
488    /// Read all entries from the log file into a Vec.
489    /// Used by EncryptedStorage which needs the full list to decrypt.
490    fn read_log(&self) -> Result<Vec<LogEntry>, DbError> {
491        read_log_from_disk(&self.path)
492    }
493
494    /// Compact the log: write a binary snapshot, rewrite the log to be empty,
495    /// then signal the background task to swap the file.
496    fn compact(&self, entries: Vec<LogEntry>) -> Result<(), DbError> {
497        self.compact_with_hook(entries, None)
498    }
499
500    /// Internal compact implementation that can take a post-backup script.
501    fn compact_with_hook(&self, entries: Vec<LogEntry>, hook: Option<String>) -> Result<(), DbError> {
502        // Step 1: Write a binary snapshot.
503        // After compaction the log is reset to empty, so seq=0: all future log
504        // lines written after this snapshot must be replayed from the start.
505        let seq = 0u64;
506        if let Err(e) = write_snapshot(&self.path, &entries, seq) {
507            tracing::warn!("⚠️  Failed to write snapshot during compaction: {}", e);
508        } else if let Some(script_path) = hook {
509            // If snapshot was successful and we have a hook, execute it.
510            let snapshot_path = snapshot_path(&self.path);
511            let abs_snapshot_path = match std::fs::canonicalize(&snapshot_path) {
512                Ok(p) => p.to_string_lossy().to_string(),
513                Err(_) => snapshot_path,
514            };
515
516            // Execute in background
517            tokio::spawn(async move {
518                let res = if cfg!(target_os = "windows") {
519                    tokio::process::Command::new("powershell")
520                        .arg("-ExecutionPolicy")
521                        .arg("Bypass")
522                        .arg("-Command")
523                        .arg(format!("& '{}' '{}'", script_path, abs_snapshot_path))
524                        .output()
525                        .await
526                } else {
527                    tokio::process::Command::new("sh")
528                        .arg(script_path)
529                        .arg(abs_snapshot_path)
530                        .output()
531                        .await
532                };
533
534                match res {
535                    Ok(output) => {
536                        if !output.status.success() {
537                            let stderr = String::from_utf8_lossy(&output.stderr);
538                            tracing::error!("❌ Post-backup hook failed: {}", stderr);
539                        } else {
540                            tracing::info!("✅ Post-backup hook executed successfully");
541                        }
542                    }
543                    Err(e) => {
544                        tracing::error!("❌ Failed to spawn post-backup hook: {}", e);
545                    }
546                }
547            });
548        }
549
550        // Step 2: Write an empty compacted log to a temp file.
551        // Since the snapshot now contains the full state, we can start the log fresh.
552        let temp_path = format!("{}.tmp", self.path);
553        write_compacted_log_no_tx(&temp_path, &[])?;
554
555        // Step 3: Send the sentinel to the background task so it flushes,
556        // closes the current file, renames the temp file over it, and reopens.
557        if let Some(ref sender) = self.sender {
558            sender
559                .send(format!("__RELOAD_FILE__{}", temp_path))
560                .map_err(|_| DbError::WriteError)?;
561        }
562        Ok(())
563    }
564
565    /// Read exactly `length` bytes from the log at `offset`.
566    fn read_at(&self, offset: u64, length: u32) -> Result<Vec<u8>, DbError> {
567        use std::io::{Read, Seek, SeekFrom};
568        let mut file = File::open(&self.path)?;
569        file.seek(SeekFrom::Start(offset))?;
570        let mut buffer = vec![0u8; length as usize];
571        file.read_exact(&mut buffer)?;
572        Ok(buffer)
573    }
574
575    /// Stream log entries into state using snapshot + delta replay.
576    ///
577    /// Fast path (after first compaction):
578    ///   1. Load binary snapshot → apply all entries in it.
579    ///   2. Stream only the log lines written AFTER the snapshot (the "delta").
580    ///
581    /// Slow path (first run, no snapshot):
582    ///   Stream the entire log file line-by-line. No full Vec in RAM.
583    fn stream_log_into(
584        &self,
585        f: &mut dyn FnMut(LogEntry, u32) -> ControlFlow<(), ()>,
586    ) -> Result<u64, DbError> {
587        let mut count = 0u64;
588        // Attempt to load the binary snapshot for fast startup.
589        if let Some((snapshot_entries, seq)) = load_snapshot(&self.path) {
590            for entry in snapshot_entries {
591                // Entries from snapshot MUST be Hot because they are not in the log file
592                // and thus don't have a valid RecordPointer for this log instance.
593                if let ControlFlow::Break(_) = f(entry, 0) {
594                    return Ok(count);
595                }
596                count += 1;
597            }
598            // Then replay only the log lines that came after the snapshot.
599            // `seq` is the number of lines to skip (already in the snapshot).
600            if let ControlFlow::Break(_) = stream_log_entries(&self.path, seq, |e, l| {
601                let res = f(e, l);
602                if let ControlFlow::Continue(_) = res {
603                    count += 1;
604                }
605                res
606            })? {
607                return Ok(count);
608            }
609            return Ok(count);
610        }
611
612        // No snapshot found — stream the full log from the beginning.
613        let _ = stream_log_entries(&self.path, 0, |e, l| {
614            let res = f(e, l);
615            if let ControlFlow::Continue(_) = res {
616                count += 1;
617            }
618            res
619        })?;
620        Ok(count)
621    }
622}
623
624// ─── SyncDiskStorage ──────────────────────────────────────────────────────────
625//
626// Design: every write_entry() call acquires a Mutex, writes the JSON line to
627// a BufWriter, and immediately flushes the BufWriter. The flush() call blocks
628// until the OS confirms the data is in its write buffer (not necessarily on
629// physical disk, but durable enough for most crash scenarios).
630//
631// Trade-off: much lower throughput than AsyncDiskStorage because every write
632// blocks the caller. Use this when data loss is unacceptable.
633// ─────────────────────────────────────────────────────────────────────────────
634
635/// High-durability synchronous disk writer.
636///
637/// Every write is flushed to disk before returning. Zero data loss on crash,
638/// but lower throughput than AsyncDiskStorage. Enable with WRITE_MODE=sync.
639pub struct SyncDiskStorage {
640    /// The BufWriter wrapped in a Mutex so multiple threads can write safely.
641    /// Arc allows the struct to be cloned (shared across Axum handler threads).
642    writer: Arc<Mutex<BufWriter<File>>>,
643    /// Path to the log file. Stored for read/compact operations.
644    path: String,
645}
646
647impl SyncDiskStorage {
648    /// Open (or create) the log file at `path` in append mode.
649    pub fn new(path: &str) -> Result<Self, DbError> {
650        let file = OpenOptions::new().create(true).append(true).open(path)?;
651
652        Ok(Self {
653            writer: Arc::new(Mutex::new(BufWriter::new(file))),
654            path: path.to_string(),
655        })
656    }
657}
658
659impl StorageBackend for SyncDiskStorage {
660    /// Serialize `entry` to JSON, write it to the BufWriter, and flush immediately.
661    /// This call blocks until the OS has accepted the data.
662    fn write_entry(&self, entry: &LogEntry) -> Result<(), DbError> {
663        let json_line = serde_json::to_string(entry)?;
664        // Lock the Mutex — only one thread can write at a time.
665        let mut w = self.writer.lock().map_err(|_| DbError::LockPoisoned)?;
666        writeln!(w, "{}", json_line)?;
667        // Flush immediately so the data is durable before we return.
668        w.flush()?;
669        Ok(())
670    }
671
672    /// Read all entries from the log file into a Vec.
673    fn read_log(&self) -> Result<Vec<LogEntry>, DbError> {
674        read_log_from_disk(&self.path)
675    }
676
677    /// Compact the log: write a binary snapshot, swap the log file with an
678    /// empty one, then reopen the writer.
679    fn compact(&self, entries: Vec<LogEntry>) -> Result<(), DbError> {
680        self.compact_with_hook(entries, None)
681    }
682
683    fn compact_with_hook(&self, entries: Vec<LogEntry>, hook: Option<String>) -> Result<(), DbError> {
684        // Step 1: Write binary snapshot for fast next startup.
685        // After compaction the log is reset to empty, so seq=0: all future log
686        // lines written after this snapshot must be replayed from the start.
687        let seq = 0u64;
688        if let Err(e) = write_snapshot(&self.path, &entries, seq) {
689            tracing::warn!("⚠️  Failed to write snapshot during compaction: {}", e);
690        } else if let Some(script_path) = hook {
691            // If snapshot was successful and we have a hook, execute it.
692            let snapshot_path = snapshot_path(&self.path);
693            let abs_snapshot_path = match std::fs::canonicalize(&snapshot_path) {
694                Ok(p) => p.to_string_lossy().to_string(),
695                Err(_) => snapshot_path,
696            };
697
698            // Execute in background
699            tokio::spawn(async move {
700                let res = if cfg!(target_os = "windows") {
701                    tokio::process::Command::new("powershell")
702                        .arg("-ExecutionPolicy")
703                        .arg("Bypass")
704                        .arg("-Command")
705                        .arg(format!("& '{}' '{}'", script_path, abs_snapshot_path))
706                        .output()
707                        .await
708                } else {
709                    tokio::process::Command::new("sh")
710                        .arg(script_path)
711                        .arg(abs_snapshot_path)
712                        .output()
713                        .await
714                };
715
716                match res {
717                    Ok(output) => {
718                        if !output.status.success() {
719                            let stderr = String::from_utf8_lossy(&output.stderr);
720                            tracing::error!("❌ Post-backup hook failed: {}", stderr);
721                        } else {
722                            tracing::info!("✅ Post-backup hook executed successfully");
723                        }
724                    }
725                    Err(e) => {
726                        tracing::error!("❌ Failed to spawn post-backup hook: {}", e);
727                    }
728                }
729            });
730        }
731
732        // Step 2: Write an empty compacted log to a temp file.
733        let temp_path = format!("{}.tmp", self.path);
734        write_compacted_log_no_tx(&temp_path, &[])?;
735
736        // Step 3: Lock the writer, rename the temp file over the live log,
737        // then reopen the writer so future writes go to the compacted file.
738        let mut w = self.writer.lock().map_err(|_| DbError::LockPoisoned)?;
739        // On Unix this rename is atomic. On Windows the file must be closed first,
740        // but since we hold the Mutex no other thread can write concurrently.
741        if let Err(e) = std::fs::rename(&temp_path, &self.path) {
742             tracing::error!("Failed to swap compacted file: {}", e);
743             return Err(DbError::from(e));
744        }
745
746        // Reopen the file so the writer points at the new compacted log.
747        let new_file = OpenOptions::new()
748            .create(true)
749            .append(true)
750            .open(&self.path)?;
751        *w = BufWriter::new(new_file);
752        Ok(())
753    }
754
755    /// Read exactly `length` bytes from the log at `offset`.
756    fn read_at(&self, offset: u64, length: u32) -> Result<Vec<u8>, DbError> {
757        use std::io::{Read, Seek, SeekFrom};
758        let mut file = File::open(&self.path)?;
759        file.seek(SeekFrom::Start(offset))?;
760        let mut buffer = vec![0u8; length as usize];
761        file.read_exact(&mut buffer)?;
762        Ok(buffer)
763    }
764
765    /// Stream log entries into state using snapshot + delta replay.
766    /// Same logic as AsyncDiskStorage::stream_log_into — see that method for details.
767    fn stream_log_into(
768        &self,
769        f: &mut dyn FnMut(LogEntry, u32) -> ControlFlow<(), ()>,
770    ) -> Result<u64, DbError> {
771        let mut count = 0u64;
772        // Fast path: load snapshot and replay only the delta.
773        if let Some((snapshot_entries, seq)) = load_snapshot(&self.path) {
774            tracing::info!(
775                "⚡ Snapshot loaded ({} entries, seq {}). Replaying delta only...",
776                snapshot_entries.len(),
777                seq
778            );
779            for entry in snapshot_entries {
780                // Entries from snapshot MUST be Hot because they are not in the log file
781                // and thus don't have a valid RecordPointer for this log instance.
782                if let ControlFlow::Break(_) = f(entry, 0) {
783                    return Ok(count);
784                }
785                count += 1;
786            }
787            if let ControlFlow::Break(_) = stream_log_entries(&self.path, seq, |e, l| {
788                let res = f(e, l);
789                if let ControlFlow::Continue(_) = res {
790                    count += 1;
791                }
792                res
793            })? {
794                return Ok(count);
795            }
796            return Ok(count);
797        }
798
799        // Slow path: stream the full log line-by-line.
800        let _ = stream_log_entries(&self.path, 0, |e, l| {
801            let res = f(e, l);
802            if let ControlFlow::Continue(_) = res {
803                count += 1;
804            }
805            res
806        })?;
807        Ok(count)
808    }
809}