moltendb_core/engine/storage/disk.rs
1// ─── disk.rs ─────────────────────────────────────────────────────────────────
2// This file implements two concrete StorageBackend implementations that write
3// the database log to a real file on disk:
4//
5// • AsyncDiskStorage — high-throughput, non-blocking writes via an MPSC
6// channel + background Tokio task. Writes are buffered
7// in memory and flushed to disk every 50 ms. Ideal for
8// analytics / high-write workloads where losing the last
9// 50 ms of data on a crash is acceptable.
10//
11// • SyncDiskStorage — every write blocks until the OS confirms the data is
12// on disk (flush after every entry). Zero data loss on
13// crash, but much lower throughput. Enabled by setting
14// the WRITE_MODE=sync environment variable.
15//
16// Both implementations share the same binary snapshot + streaming log replay
17// logic for fast startup (see "Snapshot helpers" section below).
18// ─────────────────────────────────────────────────────────────────────────────
19
20// StorageBackend is the trait defined in mod.rs that both structs implement.
21use super::StorageBackend;
22// LogEntry is the unit of data written to the log (cmd, collection, key, value).
23// DbError is our custom error enum.
24use crate::engine::types::{DbError, LogEntry};
25// Standard library file I/O types.
26use std::ops::ControlFlow;
27use std::fs::{File, OpenOptions};
28use std::path::Path;
29use std::time::SystemTime;
30// BufRead lets us iterate a file line-by-line without loading it all into RAM.
31// BufWriter batches small writes into larger OS-level write calls for efficiency.
32use std::io::{BufRead, BufReader, BufWriter, Write};
33// Arc = thread-safe reference-counted pointer (shared ownership across threads).
34// Mutex = mutual exclusion lock (only one thread can hold it at a time).
35use std::sync::{Arc, Mutex};
36// Tokio's async MPSC channel: multiple producers, single consumer.
37// Used to send log lines from the write path to the background flush task.
38use tokio::sync::mpsc;
39use tokio::task::JoinHandle;
40
41// ─── Snapshot helpers ────────────────────────────────────────────────────────
42//
43// A "snapshot" is a compact binary file that captures the entire current state
44// of the database at a point in time. On the next startup we load the snapshot
45// first (fast binary deserialization) and then only replay the log lines that
46// were written AFTER the snapshot was taken — instead of replaying the entire
47// log from the beginning. This dramatically reduces startup time for large DBs.
48//
49// Snapshot file format (binary, little-endian):
50// [8 bytes] magic header: "MOLTSNAP"
51// [8 bytes] seq: number of log lines captured in this snapshot
52// [8 bytes] count: number of LogEntry records that follow
53// for each entry:
54// [8 bytes] len: byte length of the bincode-encoded entry
55// [len bytes] bincode-encoded LogEntry
56// ─────────────────────────────────────────────────────────────────────────────
57
58/// Returns the path of the binary snapshot file for a given log file path.
59/// Convention: `my_database.log` → `my_database.log.snapshot.bin`
60pub(super) fn snapshot_path(log_path: &str) -> String {
61 format!("{}.snapshot.bin", log_path)
62}
63
64/// Serialize all current `entries` into a binary snapshot file and write it
65/// atomically (write to `.tmp`, then rename over the real file so we never
66/// end up with a half-written snapshot).
67///
68/// `seq` is the number of log lines that were present in the log file at the
69/// time of compaction — it tells the next startup how many lines to skip.
70pub(super) fn write_snapshot(log_path: &str, entries: &[LogEntry], seq: u64) -> Result<(), DbError> {
71 let path = snapshot_path(log_path);
72 // Write to a temp file first so the swap is atomic.
73 let tmp = format!("{}.tmp", path);
74
75 let file = OpenOptions::new()
76 .create(true) // create if it doesn't exist
77 .write(true)
78 .truncate(true) // overwrite any existing content
79 .open(&tmp)?;
80 let mut w = BufWriter::new(file);
81
82 // Magic header so we can detect corrupt/wrong files on load.
83 w.write_all(b"MOLTSNAP")?;
84 // Sequence number: how many log lines are already captured here.
85 w.write_all(&seq.to_le_bytes())?;
86
87 // Number of entries, so the reader can pre-allocate a Vec.
88 let count = entries.len() as u64;
89 w.write_all(&count.to_le_bytes())?;
90
91 // Each entry is length-prefixed so the reader knows how many bytes to read.
92 for entry in entries {
93 // bincode is a compact binary format — much faster to deserialize than JSON.
94 let encoded = bincode::serialize(entry).map_err(|_| DbError::WriteError)?;
95 let len = encoded.len() as u64;
96 w.write_all(&len.to_le_bytes())?;
97 w.write_all(&encoded)?;
98 }
99
100 // Flush the BufWriter to ensure all bytes reach the OS buffer.
101 w.flush()?;
102 // Drop the writer to release the file handle before renaming (required on Windows).
103 drop(w);
104
105 // Before renaming the new snapshot, move the old one to the backup folder.
106 if Path::new(&path).exists() {
107 let log_dir = Path::new(log_path).parent().unwrap_or_else(|| Path::new("."));
108 let backup_dir = log_dir.join("backup");
109
110 // Ensure backup directory exists
111 std::fs::create_dir_all(&backup_dir)?;
112
113 let now = SystemTime::now()
114 .duration_since(SystemTime::UNIX_EPOCH)
115 .map(|d| d.as_secs())
116 .unwrap_or(0);
117
118 let filename = Path::new(&path).file_name()
119 .and_then(|n| n.to_str())
120 .unwrap_or("snapshot.bin");
121
122 let backup_path = backup_dir.join(format!("{}.{}.bak", filename, now));
123
124 // Move current snapshot to backup
125 let _ = std::fs::rename(&path, &backup_path);
126 }
127
128 // Atomic rename: replaces the old snapshot file in one OS operation.
129 std::fs::rename(&tmp, &path)?;
130 Ok(())
131}
132
133/// Try to load a previously written binary snapshot.
134/// Returns `Some((entries, seq))` on success, or `None` if:
135/// - the snapshot file doesn't exist (first run)
136/// - the magic header doesn't match (corrupt file)
137/// - any read fails (truncated file, wrong format)
138pub(super) fn load_snapshot(log_path: &str) -> Option<(Vec<LogEntry>, u64)> {
139 let path = snapshot_path(log_path);
140 // If the file doesn't exist, open() returns Err and we return None.
141 let mut file = File::open(&path).ok()?;
142
143 use std::io::Read;
144
145 // Validate the magic header — if it doesn't match, the file is not ours.
146 let mut magic = [0u8; 8];
147 file.read_exact(&mut magic).ok()?;
148 if &magic != b"MOLTSNAP" {
149 return None; // Not a valid snapshot file
150 }
151
152 // Read the sequence number (how many log lines to skip on replay).
153 let mut seq_bytes = [0u8; 8];
154 file.read_exact(&mut seq_bytes).ok()?;
155 let seq = u64::from_le_bytes(seq_bytes);
156
157 // Read the entry count so we can pre-allocate the Vec.
158 let mut count_bytes = [0u8; 8];
159 file.read_exact(&mut count_bytes).ok()?;
160 let count = u64::from_le_bytes(count_bytes) as usize;
161
162 let mut entries = Vec::with_capacity(count);
163 for _ in 0..count {
164 // Read the length prefix for this entry.
165 let mut len_bytes = [0u8; 8];
166 file.read_exact(&mut len_bytes).ok()?;
167 let len = u64::from_le_bytes(len_bytes) as usize;
168
169 // Read exactly `len` bytes and deserialize with bincode.
170 let mut buf = vec![0u8; len];
171 file.read_exact(&mut buf).ok()?;
172
173 // If deserialization fails (e.g. schema changed), return None so we
174 // fall back to full log replay instead of crashing.
175 let entry: LogEntry = bincode::deserialize(&buf).ok()?;
176 entries.push(entry);
177 }
178
179 Some((entries, seq))
180}
181
182// ─── Streaming log reader ─────────────────────────────────────────────────────
183//
184// The log file is a plain text file where each line is a JSON-encoded LogEntry.
185// Instead of reading the whole file into a Vec<String> and then parsing, we
186// stream it line-by-line so only one entry is in memory at a time.
187// ─────────────────────────────────────────────────────────────────────────────
188
189/// Open the log file at `path` and call `f` for each successfully parsed
190/// `LogEntry`, skipping the first `skip_lines` lines (those are already
191/// covered by a loaded snapshot and don't need to be replayed again).
192///
193/// Lines that fail to parse (e.g. partial writes from a crash) are silently
194/// skipped — the database will simply not see those entries, which is safe
195/// because the in-memory state is rebuilt from what we can read.
196pub fn stream_log_entries<F>(path: &str, skip_lines: u64, mut f: F) -> Result<ControlFlow<(), ()>, DbError>
197where
198 F: FnMut(LogEntry, u32) -> ControlFlow<(), ()>, // closure called once per valid entry + raw byte length
199{
200 // If the file doesn't exist yet (first run), just do nothing.
201 if let Ok(file) = File::open(path) {
202 // BufReader wraps the file with an internal buffer so we don't make
203 // one syscall per byte — it reads in chunks and serves lines from RAM.
204 let reader = BufReader::new(file);
205 for (i, line) in reader.lines().enumerate() {
206 // Skip lines already captured in the snapshot.
207 if (i as u64) < skip_lines {
208 continue;
209 }
210 // Ignore lines that fail to read (e.g. I/O error mid-line).
211 if let Ok(json_str) = line {
212 let length = json_str.len() as u32;
213 // Ignore lines that fail to parse (e.g. partial write on crash).
214 if let Ok(entry) = serde_json::from_str::<LogEntry>(&json_str) {
215 if let ControlFlow::Break(_) = f(entry, length) {
216 return Ok(ControlFlow::Break(()));
217 }
218 }
219 }
220 }
221 }
222 Ok(ControlFlow::Continue(()))
223}
224
225/// Count the total number of lines in the log file.
226/// This is used when writing a snapshot to record the current sequence number
227/// (i.e. "the snapshot covers the first N lines of the log").
228pub(super) fn count_log_lines(path: &str) -> u64 {
229 if let Ok(file) = File::open(path) {
230 // .lines() is lazy — it reads one line at a time, so this doesn't
231 // load the whole file into memory.
232 BufReader::new(file).lines().count() as u64
233 } else {
234 0 // File doesn't exist yet
235 }
236}
237
238// ─── read_log (still needed by EncryptedStorage wrapper) ─────────────────────
239//
240// EncryptedStorage wraps another StorageBackend and decrypts entries before
241// they can be applied to state. Because decryption must happen before we can
242// call apply_entry(), EncryptedStorage uses read_log() (which returns a full
243// Vec) rather than stream_log_into() (which applies entries on the fly).
244// ─────────────────────────────────────────────────────────────────────────────
245
246/// Read all log entries from disk into a Vec<LogEntry>.
247/// This is a convenience wrapper around stream_log_entries that collects
248/// everything into a Vec. Used by EncryptedStorage.
249pub fn read_log_from_disk(path: &str) -> Result<Vec<LogEntry>, DbError> {
250 let mut entries = Vec::new();
251 // skip_lines = 0 means read from the very beginning (no snapshot skip here,
252 // because EncryptedStorage handles its own snapshot logic via read_log).
253 let _ = stream_log_entries(path, 0, |e, _| {
254 entries.push(e);
255 ControlFlow::Continue(())
256 })?;
257 Ok(entries)
258}
259
260// ─── Compacted log writer ─────────────────────────────────────────────────────
261//
262// Compaction rewrites the log file to contain only the current state of the
263// database — removing all superseded INSERT entries and all DELETE tombstones.
264// This keeps the log file from growing unboundedly over time.
265// ─────────────────────────────────────────────────────────────────────────────
266
267/// Write a compacted set of entries to `path` (which should be a `.tmp` file).
268/// The caller is responsible for atomically swapping this file over the real log.
269pub(super) fn write_compacted_log(path: &str, entries: &[LogEntry]) -> Result<(), DbError> {
270 let temp_file = OpenOptions::new()
271 .create(true)
272 .write(true)
273 .truncate(true) // start fresh — we're rewriting the whole log
274 .open(path)?;
275 let mut temp_writer = BufWriter::new(temp_file);
276
277 // Write each entry as a JSON line, same format as the live log.
278 for entry in entries {
279 writeln!(temp_writer, "{}", serde_json::to_string(&entry)?)?;
280 }
281
282 temp_writer.flush()?;
283 Ok(())
284}
285
286// ─── AsyncDiskStorage ─────────────────────────────────────────────────────────
287//
288// Design: the write path is completely non-blocking. When write_entry() is
289// called, it serializes the entry to JSON and sends it over an unbounded MPSC
290// channel. A background Tokio task receives from that channel and writes to a
291// BufWriter. The BufWriter is flushed every 50 ms (on timeout) or whenever
292// the channel is drained.
293//
294// Trade-off: if the process is killed (SIGKILL / power loss) within the 50 ms
295// window, the last few writes may be lost. For analytics workloads this is
296// usually acceptable. Use SyncDiskStorage if you need zero data loss.
297// ─────────────────────────────────────────────────────────────────────────────
298
299/// High-performance async disk writer.
300///
301/// Writes are sent over an MPSC channel and flushed to disk every 50 ms by a
302/// background Tokio task. The write path never blocks the caller.
303pub struct AsyncDiskStorage {
304 /// The sending half of the MPSC channel. Cloning this is cheap — all
305 /// clones share the same underlying channel.
306 sender: Option<mpsc::UnboundedSender<String>>,
307 /// Path to the log file on disk. Stored so we can read/compact it later.
308 path: String,
309 /// Handle to the background writer task. Stored so Drop can await it.
310 writer_task: Option<JoinHandle<()>>,
311}
312
313impl AsyncDiskStorage {
314 /// Open (or create) the log file at `path` and spawn the background writer task.
315 pub fn new(path: &str) -> Result<Self, DbError> {
316 // Create an unbounded MPSC channel.
317 // `log_tx` (sender) is kept in the struct; `log_rx` (receiver) goes to the task.
318 let (log_tx, mut log_rx) = mpsc::unbounded_channel::<String>();
319 let path_clone = path.to_string();
320
321 // Spawn a Tokio task that owns the file handle and BufWriter.
322 // This task runs for the lifetime of the server.
323 let writer_task = tokio::spawn(async move {
324 // Open the file in append mode so existing data is preserved.
325 let file = OpenOptions::new()
326 .create(true)
327 .append(true)
328 .open(&path_clone)
329 .unwrap();
330 let mut w = BufWriter::new(file);
331
332 loop {
333 // Wait up to 50 ms for the next message.
334 // If a message arrives within 50 ms → process it immediately.
335 // If the timeout fires → flush the BufWriter to disk.
336 match tokio::time::timeout(
337 std::time::Duration::from_millis(50),
338 log_rx.recv(),
339 )
340 .await
341 {
342 // A message arrived within the timeout window.
343 Ok(Some(log_line)) => {
344 // Special sentinel: the compact() method sends this to
345 // tell us to swap the log file atomically.
346 if log_line.starts_with("__RELOAD_FILE__") {
347 // Extract the temp file path from the sentinel string.
348 let temp_path = log_line.replace("__RELOAD_FILE__", "");
349
350 // Flush and close the current file before renaming.
351 // On Windows, a file cannot be renamed while it's open.
352 w.flush().unwrap();
353 drop(w); // Release the file handle / Windows lock
354
355 // Atomically replace the live log with the compacted version.
356 if let Err(e) = std::fs::rename(&temp_path, &path_clone) {
357 tracing::error!("Failed to swap compacted file: {}", e);
358 }
359
360 // Re-open the (now compacted) log file for future writes.
361 let new_file = OpenOptions::new()
362 .create(true)
363 .append(true)
364 .open(&path_clone)
365 .unwrap();
366 w = BufWriter::new(new_file);
367 } else {
368 // Normal log line — append it to the BufWriter's buffer.
369 if let Err(e) = writeln!(w, "{}", log_line) {
370 tracing::error!("Failed to write to disk: {}", e);
371 }
372 }
373 }
374 // The channel was closed (sender dropped) — the server is shutting down.
375 // The BufWriter will be dropped here, which flushes its buffer to the OS.
376 Ok(None) => break,
377 // Timeout fired — no message in the last 50 ms. Flush buffered data.
378 Err(_) => {
379 let _ = w.flush();
380 }
381 }
382 }
383 // When the loop exits, `w` is dropped here, which flushes the BufWriter.
384 let _ = w.flush();
385 });
386
387 Ok(Self {
388 sender: Some(log_tx),
389 path: path.to_string(),
390 writer_task: Some(writer_task),
391 })
392 }
393}
394
395impl Drop for AsyncDiskStorage {
396 /// On drop, close the sender (signals the writer task to exit) then block
397 /// until the task has drained its queue and flushed everything to disk.
398 fn drop(&mut self) {
399 // Drop the sender — this closes the channel and causes log_rx.recv()
400 // to return None, which breaks the writer task's loop.
401 drop(self.sender.take());
402
403 // Now await the writer task so we don't return until all queued lines
404 // have been written and flushed to the OS.
405 if let Some(handle) = self.writer_task.take() {
406 tokio::task::block_in_place(|| {
407 tokio::runtime::Handle::current().block_on(handle)
408 })
409 .ok();
410 }
411 }
412}
413
414impl StorageBackend for AsyncDiskStorage {
415 /// Serialize `entry` to a JSON string and send it to the background writer.
416 /// This call returns immediately — it never blocks waiting for disk I/O.
417 fn write_entry(&self, entry: &LogEntry) -> Result<(), DbError> {
418 let json_line = serde_json::to_string(entry)?;
419 // send() only fails if the receiver (background task) has been dropped,
420 // which means the server is shutting down.
421 if let Some(ref sender) = self.sender {
422 sender.send(json_line).map_err(|_| DbError::WriteError)?;
423 }
424 Ok(())
425 }
426
427 /// Read all entries from the log file into a Vec.
428 /// Used by EncryptedStorage which needs the full list to decrypt.
429 fn read_log(&self) -> Result<Vec<LogEntry>, DbError> {
430 read_log_from_disk(&self.path)
431 }
432
433 /// Compact the log: write a binary snapshot, rewrite the log to contain
434 /// only current state, then signal the background task to swap the file.
435 fn compact(&self, entries: Vec<LogEntry>) -> Result<(), DbError> {
436 // Step 1: Write a binary snapshot so the next startup can skip log replay.
437 // We record `seq` = current line count so we know where the snapshot ends.
438 let seq = count_log_lines(&self.path);
439 if let Err(e) = write_snapshot(&self.path, &entries, seq) {
440 tracing::warn!("⚠️ Failed to write snapshot during compaction: {}", e);
441 // Non-fatal: we continue without a snapshot. Next startup will do a
442 // full log replay instead, which is slower but still correct.
443 }
444
445 // Step 2: Write the compacted log to a temp file.
446 let temp_path = format!("{}.tmp", self.path);
447 write_compacted_log(&temp_path, &entries)?;
448
449 // Step 3: Send the sentinel to the background task so it flushes,
450 // closes the current file, renames the temp file over it, and reopens.
451 // This is the only safe way to swap the file on Windows (where you
452 // can't rename a file that's currently open by another handle).
453 if let Some(ref sender) = self.sender {
454 sender
455 .send(format!("__RELOAD_FILE__{}", temp_path))
456 .map_err(|_| DbError::WriteError)?;
457 }
458 Ok(())
459 }
460
461 /// Read exactly `length` bytes from the log at `offset`.
462 fn read_at(&self, offset: u64, length: u32) -> Result<Vec<u8>, DbError> {
463 use std::io::{Read, Seek, SeekFrom};
464 let mut file = File::open(&self.path)?;
465 file.seek(SeekFrom::Start(offset))?;
466 let mut buffer = vec![0u8; length as usize];
467 file.read_exact(&mut buffer)?;
468 Ok(buffer)
469 }
470
471 /// Stream log entries into state using snapshot + delta replay.
472 ///
473 /// Fast path (after first compaction):
474 /// 1. Load binary snapshot → apply all entries in it.
475 /// 2. Stream only the log lines written AFTER the snapshot (the "delta").
476 ///
477 /// Slow path (first run, no snapshot):
478 /// Stream the entire log file line-by-line. No full Vec in RAM.
479 fn stream_log_into(
480 &self,
481 f: &mut dyn FnMut(LogEntry, u32) -> ControlFlow<(), ()>,
482 ) -> Result<u64, DbError> {
483 let mut count = 0u64;
484 // Attempt to load the binary snapshot for fast startup.
485 if let Some((snapshot_entries, seq)) = load_snapshot(&self.path) {
486 tracing::info!(
487 "⚡ Snapshot loaded ({} entries, seq {}). Replaying delta only...",
488 snapshot_entries.len(),
489 seq
490 );
491 for entry in snapshot_entries {
492 // For snapshots, we re-serialize because they aren't in the log file
493 let json = serde_json::to_vec(&entry).unwrap_or_default();
494 let length = json.len() as u32;
495 if let ControlFlow::Break(_) = f(entry, length) {
496 return Ok(count);
497 }
498 count += 1;
499 }
500 // Then replay only the log lines that came after the snapshot.
501 // `seq` is the number of lines to skip (already in the snapshot).
502 if let ControlFlow::Break(_) = stream_log_entries(&self.path, seq, |e, l| {
503 let res = f(e, l);
504 if let ControlFlow::Continue(_) = res {
505 count += 1;
506 }
507 res
508 })? {
509 return Ok(count);
510 }
511 return Ok(count);
512 }
513
514 // No snapshot found — stream the full log from the beginning.
515 let _ = stream_log_entries(&self.path, 0, |e, l| {
516 let res = f(e, l);
517 if let ControlFlow::Continue(_) = res {
518 count += 1;
519 }
520 res
521 })?;
522 Ok(count)
523 }
524}
525
526// ─── SyncDiskStorage ──────────────────────────────────────────────────────────
527//
528// Design: every write_entry() call acquires a Mutex, writes the JSON line to
529// a BufWriter, and immediately flushes the BufWriter. The flush() call blocks
530// until the OS confirms the data is in its write buffer (not necessarily on
531// physical disk, but durable enough for most crash scenarios).
532//
533// Trade-off: much lower throughput than AsyncDiskStorage because every write
534// blocks the caller. Use this when data loss is unacceptable.
535// ─────────────────────────────────────────────────────────────────────────────
536
537/// High-durability synchronous disk writer.
538///
539/// Every write is flushed to disk before returning. Zero data loss on crash,
540/// but lower throughput than AsyncDiskStorage. Enable with WRITE_MODE=sync.
541pub struct SyncDiskStorage {
542 /// The BufWriter wrapped in a Mutex so multiple threads can write safely.
543 /// Arc allows the struct to be cloned (shared across Axum handler threads).
544 writer: Arc<Mutex<BufWriter<File>>>,
545 /// Path to the log file. Stored for read/compact operations.
546 path: String,
547}
548
549impl SyncDiskStorage {
550 /// Open (or create) the log file at `path` in append mode.
551 pub fn new(path: &str) -> Result<Self, DbError> {
552 let file = OpenOptions::new().create(true).append(true).open(path)?;
553
554 Ok(Self {
555 writer: Arc::new(Mutex::new(BufWriter::new(file))),
556 path: path.to_string(),
557 })
558 }
559}
560
561impl StorageBackend for SyncDiskStorage {
562 /// Serialize `entry` to JSON, write it to the BufWriter, and flush immediately.
563 /// This call blocks until the OS has accepted the data.
564 fn write_entry(&self, entry: &LogEntry) -> Result<(), DbError> {
565 let json_line = serde_json::to_string(entry)?;
566 // Lock the Mutex — only one thread can write at a time.
567 let mut w = self.writer.lock().map_err(|_| DbError::LockPoisoned)?;
568 writeln!(w, "{}", json_line)?;
569 // Flush immediately so the data is durable before we return.
570 w.flush()?;
571 Ok(())
572 }
573
574 /// Read all entries from the log file into a Vec.
575 fn read_log(&self) -> Result<Vec<LogEntry>, DbError> {
576 read_log_from_disk(&self.path)
577 }
578
579 /// Compact the log: write a binary snapshot, atomically swap the log file,
580 /// then reopen the writer pointing at the new (compacted) file.
581 fn compact(&self, entries: Vec<LogEntry>) -> Result<(), DbError> {
582 // Step 1: Write binary snapshot for fast next startup.
583 let seq = count_log_lines(&self.path);
584 if let Err(e) = write_snapshot(&self.path, &entries, seq) {
585 tracing::warn!("⚠️ Failed to write snapshot during compaction: {}", e);
586 }
587
588 // Step 2: Write compacted log to a temp file.
589 let temp_path = format!("{}.tmp", self.path);
590 write_compacted_log(&temp_path, &entries)?;
591
592 // Step 3: Lock the writer, rename the temp file over the live log,
593 // then reopen the writer so future writes go to the compacted file.
594 let mut w = self.writer.lock().map_err(|_| DbError::LockPoisoned)?;
595 // On Unix this rename is atomic. On Windows the file must be closed first,
596 // but since we hold the Mutex no other thread can write concurrently.
597 std::fs::rename(&temp_path, &self.path)?;
598
599 // Reopen the file so the writer points at the new compacted log.
600 let new_file = OpenOptions::new()
601 .create(true)
602 .append(true)
603 .open(&self.path)?;
604 *w = BufWriter::new(new_file);
605 Ok(())
606 }
607
608 /// Read exactly `length` bytes from the log at `offset`.
609 fn read_at(&self, offset: u64, length: u32) -> Result<Vec<u8>, DbError> {
610 use std::io::{Read, Seek, SeekFrom};
611 let mut file = File::open(&self.path)?;
612 file.seek(SeekFrom::Start(offset))?;
613 let mut buffer = vec![0u8; length as usize];
614 file.read_exact(&mut buffer)?;
615 Ok(buffer)
616 }
617
618 /// Stream log entries into state using snapshot + delta replay.
619 /// Same logic as AsyncDiskStorage::stream_log_into — see that method for details.
620 fn stream_log_into(
621 &self,
622 f: &mut dyn FnMut(LogEntry, u32) -> ControlFlow<(), ()>,
623 ) -> Result<u64, DbError> {
624 let mut count = 0u64;
625 // Fast path: load snapshot and replay only the delta.
626 if let Some((snapshot_entries, seq)) = load_snapshot(&self.path) {
627 tracing::info!(
628 "⚡ Snapshot loaded ({} entries, seq {}). Replaying delta only...",
629 snapshot_entries.len(),
630 seq
631 );
632 for entry in snapshot_entries {
633 // For snapshots, we re-serialize because they aren't in the log file
634 let json = serde_json::to_vec(&entry).unwrap_or_default();
635 let length = json.len() as u32;
636 if let ControlFlow::Break(_) = f(entry, length) {
637 return Ok(count);
638 }
639 count += 1;
640 }
641 if let ControlFlow::Break(_) = stream_log_entries(&self.path, seq, |e, l| {
642 let res = f(e, l);
643 if let ControlFlow::Continue(_) = res {
644 count += 1;
645 }
646 res
647 })? {
648 return Ok(count);
649 }
650 return Ok(count);
651 }
652
653 // Slow path: stream the full log line-by-line.
654 let _ = stream_log_entries(&self.path, 0, |e, l| {
655 let res = f(e, l);
656 if let ControlFlow::Continue(_) = res {
657 count += 1;
658 }
659 res
660 })?;
661 Ok(count)
662 }
663}