Skip to main content

moltendb_core/engine/storage/
disk.rs

1// ─── disk.rs ─────────────────────────────────────────────────────────────────
2// This file implements two concrete StorageBackend implementations that write
3// the database log to a real file on disk:
4//
5//   • AsyncDiskStorage  — high-throughput, non-blocking writes via an MPSC
6//                         channel + background Tokio task. Writes are buffered
7//                         in memory and flushed to disk every 50 ms. Ideal for
8//                         analytics / high-write workloads where losing the last
9//                         50 ms of data on a crash is acceptable.
10//
11//   • SyncDiskStorage   — every write blocks until the OS confirms the data is
12//                         on disk (flush after every entry). Zero data loss on
13//                         crash, but much lower throughput. Enabled by setting
14//                         the WRITE_MODE=sync environment variable.
15//
16// Both implementations share the same binary snapshot + streaming log replay
17// logic for fast startup (see "Snapshot helpers" section below).
18// ─────────────────────────────────────────────────────────────────────────────
19
20// StorageBackend is the trait defined in mod.rs that both structs implement.
21use super::StorageBackend;
22// LogEntry is the unit of data written to the log (cmd, collection, key, value).
23// DbError is our custom error enum.
24use crate::engine::types::{DbError, LogEntry};
25// Standard library file I/O types.
26use std::ops::ControlFlow;
27use std::fs::{File, OpenOptions};
28use std::path::Path;
29use std::time::SystemTime;
30// BufRead lets us iterate a file line-by-line without loading it all into RAM.
31// BufWriter batches small writes into larger OS-level write calls for efficiency.
32use std::io::{BufRead, BufReader, BufWriter, Write};
33// Arc = thread-safe reference-counted pointer (shared ownership across threads).
34// Mutex = mutual exclusion lock (only one thread can hold it at a time).
35use std::sync::{Arc, Mutex};
36// Tokio's async MPSC channel: multiple producers, single consumer.
37// Used to send log lines from the write path to the background flush task.
38use tokio::sync::mpsc;
39use tokio::task::JoinHandle;
40
41// ─── Snapshot helpers ────────────────────────────────────────────────────────
42//
43// A "snapshot" is a compact binary file that captures the entire current state
44// of the database at a point in time. On the next startup we load the snapshot
45// first (fast binary deserialization) and then only replay the log lines that
46// were written AFTER the snapshot was taken — instead of replaying the entire
47// log from the beginning. This dramatically reduces startup time for large DBs.
48//
49// Snapshot file format (binary, little-endian):
50//   [8 bytes]  magic header: "MOLTSNAP"
51//   [8 bytes]  seq: number of log lines captured in this snapshot
52//   [8 bytes]  count: number of LogEntry records that follow
53//   for each entry:
54//     [8 bytes]  len: byte length of the bincode-encoded entry
55//     [len bytes] bincode-encoded LogEntry
56// ─────────────────────────────────────────────────────────────────────────────
57
58/// Returns the path of the binary snapshot file for a given log file path.
59/// Convention: `my_database.log` → `my_database.log.snapshot.bin`
60pub(super) fn snapshot_path(log_path: &str) -> String {
61    format!("{}.snapshot.bin", log_path)
62}
63
64/// Serialize all current `entries` into a binary snapshot file and write it
65/// atomically (write to `.tmp`, then rename over the real file so we never
66/// end up with a half-written snapshot).
67///
68/// `seq` is the number of log lines that were present in the log file at the
69/// time of compaction — it tells the next startup how many lines to skip.
70pub(super) fn write_snapshot(log_path: &str, entries: &[LogEntry], seq: u64) -> Result<(), DbError> {
71    let path = snapshot_path(log_path);
72    // Write to a temp file first so the swap is atomic.
73    let tmp = format!("{}.tmp", path);
74
75    let file = OpenOptions::new()
76        .create(true)   // create if it doesn't exist
77        .write(true)
78        .truncate(true) // overwrite any existing content
79        .open(&tmp)?;
80    let mut w = BufWriter::new(file);
81
82    // Magic header so we can detect corrupt/wrong files on load.
83    w.write_all(b"MOLTSNAP")?;
84    // Sequence number: how many log lines are already captured here.
85    w.write_all(&seq.to_le_bytes())?;
86
87    // Number of entries, so the reader can pre-allocate a Vec.
88    let count = entries.len() as u64;
89    w.write_all(&count.to_le_bytes())?;
90
91    // Each entry is length-prefixed so the reader knows how many bytes to read.
92    for entry in entries {
93        // bincode is a compact binary format — much faster to deserialize than JSON.
94        let encoded = bincode::serialize(entry).map_err(|_| DbError::WriteError)?;
95        let len = encoded.len() as u64;
96        w.write_all(&len.to_le_bytes())?;
97        w.write_all(&encoded)?;
98    }
99
100    // Flush the BufWriter to ensure all bytes reach the OS buffer.
101    w.flush()?;
102    // Drop the writer to release the file handle before renaming (required on Windows).
103    drop(w);
104
105    // Before renaming the new snapshot, move the old one to the backup folder.
106    if Path::new(&path).exists() {
107        let log_dir = Path::new(log_path).parent().unwrap_or_else(|| Path::new("."));
108        let backup_dir = log_dir.join("backup");
109        
110        // Ensure backup directory exists
111        std::fs::create_dir_all(&backup_dir)?;
112
113        let now = SystemTime::now()
114            .duration_since(SystemTime::UNIX_EPOCH)
115            .map(|d| d.as_secs())
116            .unwrap_or(0);
117        
118        let filename = Path::new(&path).file_name()
119            .and_then(|n| n.to_str())
120            .unwrap_or("snapshot.bin");
121        
122        let backup_path = backup_dir.join(format!("{}.{}.bak", filename, now));
123        
124        // Move current snapshot to backup
125        let _ = std::fs::rename(&path, &backup_path);
126    }
127
128    // Atomic rename: replaces the old snapshot file in one OS operation.
129    std::fs::rename(&tmp, &path)?;
130    Ok(())
131}
132
133/// Try to load a previously written binary snapshot.
134/// Returns `Some((entries, seq))` on success, or `None` if:
135///   - the snapshot file doesn't exist (first run)
136///   - the magic header doesn't match (corrupt file)
137///   - any read fails (truncated file, wrong format)
138pub(super) fn load_snapshot(log_path: &str) -> Option<(Vec<LogEntry>, u64)> {
139    let path = snapshot_path(log_path);
140    // If the file doesn't exist, open() returns Err and we return None.
141    let mut file = File::open(&path).ok()?;
142
143    use std::io::Read;
144
145    // Validate the magic header — if it doesn't match, the file is not ours.
146    let mut magic = [0u8; 8];
147    file.read_exact(&mut magic).ok()?;
148    if &magic != b"MOLTSNAP" {
149        return None; // Not a valid snapshot file
150    }
151
152    // Read the sequence number (how many log lines to skip on replay).
153    let mut seq_bytes = [0u8; 8];
154    file.read_exact(&mut seq_bytes).ok()?;
155    let seq = u64::from_le_bytes(seq_bytes);
156
157    // Read the entry count so we can pre-allocate the Vec.
158    let mut count_bytes = [0u8; 8];
159    file.read_exact(&mut count_bytes).ok()?;
160    let count = u64::from_le_bytes(count_bytes) as usize;
161
162    let mut entries = Vec::with_capacity(count);
163    for _ in 0..count {
164        // Read the length prefix for this entry.
165        let mut len_bytes = [0u8; 8];
166        file.read_exact(&mut len_bytes).ok()?;
167        let len = u64::from_le_bytes(len_bytes) as usize;
168
169        // Read exactly `len` bytes and deserialize with bincode.
170        let mut buf = vec![0u8; len];
171        file.read_exact(&mut buf).ok()?;
172
173        // If deserialization fails (e.g. schema changed), return None so we
174        // fall back to full log replay instead of crashing.
175        let entry: LogEntry = bincode::deserialize(&buf).ok()?;
176        entries.push(entry);
177    }
178
179    Some((entries, seq))
180}
181
182// ─── Streaming log reader ─────────────────────────────────────────────────────
183//
184// The log file is a plain text file where each line is a JSON-encoded LogEntry.
185// Instead of reading the whole file into a Vec<String> and then parsing, we
186// stream it line-by-line so only one entry is in memory at a time.
187// ─────────────────────────────────────────────────────────────────────────────
188
189/// Open the log file at `path` and call `f` for each successfully parsed
190/// `LogEntry`, skipping the first `skip_lines` lines (those are already
191/// covered by a loaded snapshot and don't need to be replayed again).
192///
193/// Lines that fail to parse (e.g. partial writes from a crash) are silently
194/// skipped — the database will simply not see those entries, which is safe
195/// because the in-memory state is rebuilt from what we can read.
196pub fn stream_log_entries<F>(path: &str, skip_lines: u64, mut f: F) -> Result<ControlFlow<(), ()>, DbError>
197where
198    F: FnMut(LogEntry, u32) -> ControlFlow<(), ()>, // closure called once per valid entry + raw byte length
199{
200    // If the file doesn't exist yet (first run), just do nothing.
201    if let Ok(file) = File::open(path) {
202        // BufReader wraps the file with an internal buffer so we don't make
203        // one syscall per byte — it reads in chunks and serves lines from RAM.
204        let reader = BufReader::new(file);
205        for (i, line) in reader.lines().enumerate() {
206            // Skip lines already captured in the snapshot.
207            if (i as u64) < skip_lines {
208                continue;
209            }
210            // Ignore lines that fail to read (e.g. I/O error mid-line).
211            if let Ok(json_str) = line {
212                let length = json_str.len() as u32;
213                // Ignore lines that fail to parse (e.g. partial write on crash).
214                if let Ok(entry) = serde_json::from_str::<LogEntry>(&json_str) {
215                    if let ControlFlow::Break(_) = f(entry, length) {
216                        return Ok(ControlFlow::Break(()));
217                    }
218                }
219            }
220        }
221    }
222    Ok(ControlFlow::Continue(()))
223}
224
225/// Count the total number of lines in the log file.
226/// This is used when writing a snapshot to record the current sequence number
227/// (i.e. "the snapshot covers the first N lines of the log").
228pub(super) fn count_log_lines(path: &str) -> u64 {
229    if let Ok(file) = File::open(path) {
230        // .lines() is lazy — it reads one line at a time, so this doesn't
231        // load the whole file into memory.
232        BufReader::new(file).lines().count() as u64
233    } else {
234        0 // File doesn't exist yet
235    }
236}
237
238// ─── read_log (still needed by EncryptedStorage wrapper) ─────────────────────
239//
240// EncryptedStorage wraps another StorageBackend and decrypts entries before
241// they can be applied to state. Because decryption must happen before we can
242// call apply_entry(), EncryptedStorage uses read_log() (which returns a full
243// Vec) rather than stream_log_into() (which applies entries on the fly).
244// ─────────────────────────────────────────────────────────────────────────────
245
246/// Read all log entries from disk into a Vec<LogEntry>.
247/// This is a convenience wrapper around stream_log_entries that collects
248/// everything into a Vec. Used by EncryptedStorage.
249pub fn read_log_from_disk(path: &str) -> Result<Vec<LogEntry>, DbError> {
250    let mut entries = Vec::new();
251    // skip_lines = 0 means read from the very beginning (no snapshot skip here,
252    // because EncryptedStorage handles its own snapshot logic via read_log).
253    let _ = stream_log_entries(path, 0, |e, _| {
254        entries.push(e);
255        ControlFlow::Continue(())
256    })?;
257    Ok(entries)
258}
259
260// ─── Compacted log writer ─────────────────────────────────────────────────────
261//
262// Compaction rewrites the log file to contain only the current state of the
263// database — removing all superseded INSERT entries and all DELETE tombstones.
264// This keeps the log file from growing unboundedly over time.
265// ─────────────────────────────────────────────────────────────────────────────
266
267/// Write a compacted set of entries to `path` (which should be a `.tmp` file).
268/// The caller is responsible for atomically swapping this file over the real log.
269pub(super) fn write_compacted_log(path: &str, entries: &[LogEntry]) -> Result<(), DbError> {
270    let temp_file = OpenOptions::new()
271        .create(true)
272        .write(true)
273        .truncate(true) // start fresh — we're rewriting the whole log
274        .open(path)?;
275    let mut temp_writer = BufWriter::new(temp_file);
276
277    // Write each entry as a JSON line, same format as the live log.
278    for entry in entries {
279        writeln!(temp_writer, "{}", serde_json::to_string(&entry)?)?;
280    }
281
282    temp_writer.flush()?;
283    Ok(())
284}
285
286// ─── AsyncDiskStorage ─────────────────────────────────────────────────────────
287//
288// Design: the write path is completely non-blocking. When write_entry() is
289// called, it serializes the entry to JSON and sends it over an unbounded MPSC
290// channel. A background Tokio task receives from that channel and writes to a
291// BufWriter. The BufWriter is flushed every 50 ms (on timeout) or whenever
292// the channel is drained.
293//
294// Trade-off: if the process is killed (SIGKILL / power loss) within the 50 ms
295// window, the last few writes may be lost. For analytics workloads this is
296// usually acceptable. Use SyncDiskStorage if you need zero data loss.
297// ─────────────────────────────────────────────────────────────────────────────
298
299/// High-performance async disk writer.
300///
301/// Writes are sent over an MPSC channel and flushed to disk every 50 ms by a
302/// background Tokio task. The write path never blocks the caller.
303pub struct AsyncDiskStorage {
304    /// The sending half of the MPSC channel. Cloning this is cheap — all
305    /// clones share the same underlying channel.
306    sender: Option<mpsc::UnboundedSender<String>>,
307    /// Path to the log file on disk. Stored so we can read/compact it later.
308    path: String,
309    /// Handle to the background writer task. Stored so Drop can await it.
310    writer_task: Option<JoinHandle<()>>,
311}
312
313impl AsyncDiskStorage {
314    /// Open (or create) the log file at `path` and spawn the background writer task.
315    pub fn new(path: &str) -> Result<Self, DbError> {
316        // Create an unbounded MPSC channel.
317        // `log_tx` (sender) is kept in the struct; `log_rx` (receiver) goes to the task.
318        let (log_tx, mut log_rx) = mpsc::unbounded_channel::<String>();
319        let path_clone = path.to_string();
320
321        // Spawn a Tokio task that owns the file handle and BufWriter.
322        // This task runs for the lifetime of the server.
323        let writer_task = tokio::spawn(async move {
324            // Open the file in append mode so existing data is preserved.
325            let file = OpenOptions::new()
326                .create(true)
327                .append(true)
328                .open(&path_clone)
329                .unwrap();
330            let mut w = BufWriter::new(file);
331
332            loop {
333                // Wait up to 50 ms for the next message.
334                // If a message arrives within 50 ms → process it immediately.
335                // If the timeout fires → flush the BufWriter to disk.
336                match tokio::time::timeout(
337                    std::time::Duration::from_millis(50),
338                    log_rx.recv(),
339                )
340                .await
341                {
342                    // A message arrived within the timeout window.
343                    Ok(Some(log_line)) => {
344                        // Special sentinel: the compact() method sends this to
345                        // tell us to swap the log file atomically.
346                        if log_line.starts_with("__RELOAD_FILE__") {
347                            // Extract the temp file path from the sentinel string.
348                            let temp_path = log_line.replace("__RELOAD_FILE__", "");
349
350                            // Flush and close the current file before renaming.
351                            // On Windows, a file cannot be renamed while it's open.
352                            w.flush().unwrap();
353                            drop(w); // Release the file handle / Windows lock
354
355                            // Atomically replace the live log with the compacted version.
356                            if let Err(e) = std::fs::rename(&temp_path, &path_clone) {
357                                tracing::error!("Failed to swap compacted file: {}", e);
358                            }
359
360                            // Re-open the (now compacted) log file for future writes.
361                            let new_file = OpenOptions::new()
362                                .create(true)
363                                .append(true)
364                                .open(&path_clone)
365                                .unwrap();
366                            w = BufWriter::new(new_file);
367                        } else {
368                            // Normal log line — append it to the BufWriter's buffer.
369                            if let Err(e) = writeln!(w, "{}", log_line) {
370                                tracing::error!("Failed to write to disk: {}", e);
371                            }
372                        }
373                    }
374                    // The channel was closed (sender dropped) — the server is shutting down.
375                    // The BufWriter will be dropped here, which flushes its buffer to the OS.
376                    Ok(None) => break,
377                    // Timeout fired — no message in the last 50 ms. Flush buffered data.
378                    Err(_) => {
379                        let _ = w.flush();
380                    }
381                }
382            }
383            // When the loop exits, `w` is dropped here, which flushes the BufWriter.
384            let _ = w.flush();
385        });
386
387        Ok(Self {
388            sender: Some(log_tx),
389            path: path.to_string(),
390            writer_task: Some(writer_task),
391        })
392    }
393}
394
395impl Drop for AsyncDiskStorage {
396    /// On drop, close the sender (signals the writer task to exit) then block
397    /// until the task has drained its queue and flushed everything to disk.
398    fn drop(&mut self) {
399        // Drop the sender — this closes the channel and causes log_rx.recv()
400        // to return None, which breaks the writer task's loop.
401        drop(self.sender.take());
402
403        // Now await the writer task so we don't return until all queued lines
404        // have been written and flushed to the OS.
405        if let Some(handle) = self.writer_task.take() {
406            tokio::task::block_in_place(|| {
407                tokio::runtime::Handle::current().block_on(handle)
408            })
409            .ok();
410        }
411    }
412}
413
414impl StorageBackend for AsyncDiskStorage {
415    /// Serialize `entry` to a JSON string and send it to the background writer.
416    /// This call returns immediately — it never blocks waiting for disk I/O.
417    fn write_entry(&self, entry: &LogEntry) -> Result<(), DbError> {
418        let json_line = serde_json::to_string(entry)?;
419        // send() only fails if the receiver (background task) has been dropped,
420        // which means the server is shutting down.
421        if let Some(ref sender) = self.sender {
422            sender.send(json_line).map_err(|_| DbError::WriteError)?;
423        }
424        Ok(())
425    }
426
427    /// Read all entries from the log file into a Vec.
428    /// Used by EncryptedStorage which needs the full list to decrypt.
429    fn read_log(&self) -> Result<Vec<LogEntry>, DbError> {
430        read_log_from_disk(&self.path)
431    }
432
433    /// Compact the log: write a binary snapshot, rewrite the log to contain
434    /// only current state, then signal the background task to swap the file.
435    fn compact(&self, entries: Vec<LogEntry>) -> Result<(), DbError> {
436        // Step 1: Write a binary snapshot so the next startup can skip log replay.
437        // We record `seq` = current line count so we know where the snapshot ends.
438        let seq = count_log_lines(&self.path);
439        if let Err(e) = write_snapshot(&self.path, &entries, seq) {
440            tracing::warn!("⚠️  Failed to write snapshot during compaction: {}", e);
441            // Non-fatal: we continue without a snapshot. Next startup will do a
442            // full log replay instead, which is slower but still correct.
443        }
444
445        // Step 2: Write the compacted log to a temp file.
446        let temp_path = format!("{}.tmp", self.path);
447        write_compacted_log(&temp_path, &entries)?;
448
449        // Step 3: Send the sentinel to the background task so it flushes,
450        // closes the current file, renames the temp file over it, and reopens.
451        // This is the only safe way to swap the file on Windows (where you
452        // can't rename a file that's currently open by another handle).
453        if let Some(ref sender) = self.sender {
454            sender
455                .send(format!("__RELOAD_FILE__{}", temp_path))
456                .map_err(|_| DbError::WriteError)?;
457        }
458        Ok(())
459    }
460
461    /// Read exactly `length` bytes from the log at `offset`.
462    fn read_at(&self, offset: u64, length: u32) -> Result<Vec<u8>, DbError> {
463        use std::io::{Read, Seek, SeekFrom};
464        let mut file = File::open(&self.path)?;
465        file.seek(SeekFrom::Start(offset))?;
466        let mut buffer = vec![0u8; length as usize];
467        file.read_exact(&mut buffer)?;
468        Ok(buffer)
469    }
470
471    /// Stream log entries into state using snapshot + delta replay.
472    ///
473    /// Fast path (after first compaction):
474    ///   1. Load binary snapshot → apply all entries in it.
475    ///   2. Stream only the log lines written AFTER the snapshot (the "delta").
476    ///
477    /// Slow path (first run, no snapshot):
478    ///   Stream the entire log file line-by-line. No full Vec in RAM.
479    fn stream_log_into(
480        &self,
481        f: &mut dyn FnMut(LogEntry, u32) -> ControlFlow<(), ()>,
482    ) -> Result<u64, DbError> {
483        let mut count = 0u64;
484        // Attempt to load the binary snapshot for fast startup.
485        if let Some((snapshot_entries, seq)) = load_snapshot(&self.path) {
486            tracing::info!(
487                "⚡ Snapshot loaded ({} entries, seq {}). Replaying delta only...",
488                snapshot_entries.len(),
489                seq
490            );
491            for entry in snapshot_entries {
492                // For snapshots, we re-serialize because they aren't in the log file
493                let json = serde_json::to_vec(&entry).unwrap_or_default();
494                let length = json.len() as u32;
495                if let ControlFlow::Break(_) = f(entry, length) {
496                    return Ok(count);
497                }
498                count += 1;
499            }
500            // Then replay only the log lines that came after the snapshot.
501            // `seq` is the number of lines to skip (already in the snapshot).
502            if let ControlFlow::Break(_) = stream_log_entries(&self.path, seq, |e, l| {
503                let res = f(e, l);
504                if let ControlFlow::Continue(_) = res {
505                    count += 1;
506                }
507                res
508            })? {
509                return Ok(count);
510            }
511            return Ok(count);
512        }
513
514        // No snapshot found — stream the full log from the beginning.
515        let _ = stream_log_entries(&self.path, 0, |e, l| {
516            let res = f(e, l);
517            if let ControlFlow::Continue(_) = res {
518                count += 1;
519            }
520            res
521        })?;
522        Ok(count)
523    }
524}
525
526// ─── SyncDiskStorage ──────────────────────────────────────────────────────────
527//
528// Design: every write_entry() call acquires a Mutex, writes the JSON line to
529// a BufWriter, and immediately flushes the BufWriter. The flush() call blocks
530// until the OS confirms the data is in its write buffer (not necessarily on
531// physical disk, but durable enough for most crash scenarios).
532//
533// Trade-off: much lower throughput than AsyncDiskStorage because every write
534// blocks the caller. Use this when data loss is unacceptable.
535// ─────────────────────────────────────────────────────────────────────────────
536
537/// High-durability synchronous disk writer.
538///
539/// Every write is flushed to disk before returning. Zero data loss on crash,
540/// but lower throughput than AsyncDiskStorage. Enable with WRITE_MODE=sync.
541pub struct SyncDiskStorage {
542    /// The BufWriter wrapped in a Mutex so multiple threads can write safely.
543    /// Arc allows the struct to be cloned (shared across Axum handler threads).
544    writer: Arc<Mutex<BufWriter<File>>>,
545    /// Path to the log file. Stored for read/compact operations.
546    path: String,
547}
548
549impl SyncDiskStorage {
550    /// Open (or create) the log file at `path` in append mode.
551    pub fn new(path: &str) -> Result<Self, DbError> {
552        let file = OpenOptions::new().create(true).append(true).open(path)?;
553
554        Ok(Self {
555            writer: Arc::new(Mutex::new(BufWriter::new(file))),
556            path: path.to_string(),
557        })
558    }
559}
560
561impl StorageBackend for SyncDiskStorage {
562    /// Serialize `entry` to JSON, write it to the BufWriter, and flush immediately.
563    /// This call blocks until the OS has accepted the data.
564    fn write_entry(&self, entry: &LogEntry) -> Result<(), DbError> {
565        let json_line = serde_json::to_string(entry)?;
566        // Lock the Mutex — only one thread can write at a time.
567        let mut w = self.writer.lock().map_err(|_| DbError::LockPoisoned)?;
568        writeln!(w, "{}", json_line)?;
569        // Flush immediately so the data is durable before we return.
570        w.flush()?;
571        Ok(())
572    }
573
574    /// Read all entries from the log file into a Vec.
575    fn read_log(&self) -> Result<Vec<LogEntry>, DbError> {
576        read_log_from_disk(&self.path)
577    }
578
579    /// Compact the log: write a binary snapshot, atomically swap the log file,
580    /// then reopen the writer pointing at the new (compacted) file.
581    fn compact(&self, entries: Vec<LogEntry>) -> Result<(), DbError> {
582        // Step 1: Write binary snapshot for fast next startup.
583        let seq = count_log_lines(&self.path);
584        if let Err(e) = write_snapshot(&self.path, &entries, seq) {
585            tracing::warn!("⚠️  Failed to write snapshot during compaction: {}", e);
586        }
587
588        // Step 2: Write compacted log to a temp file.
589        let temp_path = format!("{}.tmp", self.path);
590        write_compacted_log(&temp_path, &entries)?;
591
592        // Step 3: Lock the writer, rename the temp file over the live log,
593        // then reopen the writer so future writes go to the compacted file.
594        let mut w = self.writer.lock().map_err(|_| DbError::LockPoisoned)?;
595        // On Unix this rename is atomic. On Windows the file must be closed first,
596        // but since we hold the Mutex no other thread can write concurrently.
597        std::fs::rename(&temp_path, &self.path)?;
598
599        // Reopen the file so the writer points at the new compacted log.
600        let new_file = OpenOptions::new()
601            .create(true)
602            .append(true)
603            .open(&self.path)?;
604        *w = BufWriter::new(new_file);
605        Ok(())
606    }
607
608    /// Read exactly `length` bytes from the log at `offset`.
609    fn read_at(&self, offset: u64, length: u32) -> Result<Vec<u8>, DbError> {
610        use std::io::{Read, Seek, SeekFrom};
611        let mut file = File::open(&self.path)?;
612        file.seek(SeekFrom::Start(offset))?;
613        let mut buffer = vec![0u8; length as usize];
614        file.read_exact(&mut buffer)?;
615        Ok(buffer)
616    }
617
618    /// Stream log entries into state using snapshot + delta replay.
619    /// Same logic as AsyncDiskStorage::stream_log_into — see that method for details.
620    fn stream_log_into(
621        &self,
622        f: &mut dyn FnMut(LogEntry, u32) -> ControlFlow<(), ()>,
623    ) -> Result<u64, DbError> {
624        let mut count = 0u64;
625        // Fast path: load snapshot and replay only the delta.
626        if let Some((snapshot_entries, seq)) = load_snapshot(&self.path) {
627            tracing::info!(
628                "⚡ Snapshot loaded ({} entries, seq {}). Replaying delta only...",
629                snapshot_entries.len(),
630                seq
631            );
632            for entry in snapshot_entries {
633                // For snapshots, we re-serialize because they aren't in the log file
634                let json = serde_json::to_vec(&entry).unwrap_or_default();
635                let length = json.len() as u32;
636                if let ControlFlow::Break(_) = f(entry, length) {
637                    return Ok(count);
638                }
639                count += 1;
640            }
641            if let ControlFlow::Break(_) = stream_log_entries(&self.path, seq, |e, l| {
642                let res = f(e, l);
643                if let ControlFlow::Continue(_) = res {
644                    count += 1;
645                }
646                res
647            })? {
648                return Ok(count);
649            }
650            return Ok(count);
651        }
652
653        // Slow path: stream the full log line-by-line.
654        let _ = stream_log_entries(&self.path, 0, |e, l| {
655            let res = f(e, l);
656            if let ControlFlow::Continue(_) = res {
657                count += 1;
658            }
659            res
660        })?;
661        Ok(count)
662    }
663}