moltendb_core/engine/storage/disk.rs
1// ─── disk.rs ─────────────────────────────────────────────────────────────────
2// This file implements two concrete StorageBackend implementations that write
3// the database log to a real file on disk:
4//
5// • AsyncDiskStorage — high-throughput, non-blocking writes via an MPSC
6// channel + background Tokio task. Writes are buffered
7// in memory and flushed to disk every 50 ms. Ideal for
8// analytics / high-write workloads where losing the last
9// 50 ms of data on a crash is acceptable.
10//
11// • SyncDiskStorage — every write blocks until the OS confirms the data is
12// on disk (flush after every entry). Zero data loss on
13// crash, but much lower throughput. Enabled by setting
14// the WRITE_MODE=sync environment variable.
15//
16// Both implementations share the same binary snapshot + streaming log replay
17// logic for fast startup (see "Snapshot helpers" section below).
18// ─────────────────────────────────────────────────────────────────────────────
19
20// StorageBackend is the trait defined in mod.rs that both structs implement.
21use super::StorageBackend;
22// LogEntry is the unit of data written to the log (cmd, collection, key, value).
23// DbError is our custom error enum.
24use crate::engine::types::{DbError, LogEntry};
25// Standard library file I/O types.
26use std::ops::ControlFlow;
27use std::fs::{File, OpenOptions};
28use std::path::Path;
29use std::time::SystemTime;
30// BufRead lets us iterate a file line-by-line without loading it all into RAM.
31// BufWriter batches small writes into larger OS-level write calls for efficiency.
32use std::io::{BufRead, BufReader, BufWriter, Write};
33// Arc = thread-safe reference-counted pointer (shared ownership across threads).
34// Mutex = mutual exclusion lock (only one thread can hold it at a time).
35use std::sync::{Arc, Mutex};
36// Tokio's async MPSC channel: multiple producers, single consumer.
37// Used to send log lines from the write path to the background flush task.
38use tokio::sync::mpsc;
39use tokio::task::JoinHandle;
40
41// ─── Snapshot helpers ────────────────────────────────────────────────────────
42//
43// A "snapshot" is a compact binary file that captures the entire current state
44// of the database at a point in time. On the next startup we load the snapshot
45// first (fast binary deserialization) and then only replay the log lines that
46// were written AFTER the snapshot was taken — instead of replaying the entire
47// log from the beginning. This dramatically reduces startup time for large DBs.
48//
49// Snapshot file format (binary, little-endian):
50// [8 bytes] magic header: "MOLTSNAP"
51// [8 bytes] seq: number of log lines captured in this snapshot
52// [8 bytes] count: number of LogEntry records that follow
53// for each entry:
54// [8 bytes] len: byte length of the bincode-encoded entry
55// [len bytes] bincode-encoded LogEntry
56// ─────────────────────────────────────────────────────────────────────────────
57
58/// Returns the path of the binary snapshot file for a given log file path.
59/// Convention: `my_database.log` → `my_database.log.snapshot.bin`
60pub(super) fn snapshot_path(log_path: &str) -> String {
61 format!("{}.snapshot.bin", log_path)
62}
63
64pub(super) fn write_snapshot(log_path: &str, entries: &[LogEntry], seq: u64) -> Result<(), DbError> {
65 let path = snapshot_path(log_path);
66 // Write to a temp file first so the swap is atomic.
67 let tmp = format!("{}.tmp", path);
68
69 let file = OpenOptions::new()
70 .create(true) // create if it doesn't exist
71 .write(true)
72 .truncate(true) // overwrite any existing content
73 .open(&tmp)?;
74 let mut w = BufWriter::new(file);
75
76 // Magic header so we can detect corrupt/wrong files on load.
77 w.write_all(b"MOLTSNAP")?;
78 // Sequence number: how many log lines are already captured here.
79 w.write_all(&seq.to_le_bytes())?;
80
81 // Number of entries, so the reader can pre-allocate a Vec.
82 let count = entries.len() as u64;
83 w.write_all(&count.to_le_bytes())?;
84
85 // Each entry is length-prefixed so the reader knows how many bytes to read.
86 for entry in entries {
87 // We use JSON for snapshots as well for now to avoid bincode issues with dynamic Value
88 let encoded = serde_json::to_vec(entry).map_err(|_| DbError::WriteError)?;
89 let len = encoded.len() as u64;
90 w.write_all(&len.to_le_bytes())?;
91 w.write_all(&encoded)?;
92 }
93
94 // Flush the BufWriter to ensure all bytes reach the OS buffer.
95 w.flush()?;
96 // Drop the writer to release the file handle before renaming (required on Windows).
97 drop(w);
98
99 // Before renaming the new snapshot, move the old one to the backup folder.
100 if Path::new(&path).exists() {
101 let log_dir = Path::new(log_path).parent().unwrap_or_else(|| Path::new("."));
102 let backup_dir = log_dir.join("backup");
103
104 // Ensure backup directory exists
105 std::fs::create_dir_all(&backup_dir)?;
106
107 let now = SystemTime::now()
108 .duration_since(SystemTime::UNIX_EPOCH)
109 .map(|d| d.as_secs())
110 .unwrap_or(0);
111
112 let filename = Path::new(&path).file_name()
113 .and_then(|n| n.to_str())
114 .unwrap_or("snapshot.bin");
115
116 let backup_path = backup_dir.join(format!("{}.{}.bak", filename, now));
117
118 // Move current snapshot to backup
119 let _ = std::fs::rename(&path, &backup_path);
120 }
121
122 // Atomic rename: replaces the old snapshot file in one OS operation.
123 std::fs::rename(&tmp, &path)?;
124 Ok(())
125}
126
127pub(super) fn write_compacted_log_no_tx(path: &str, entries: &[LogEntry]) -> Result<(), DbError> {
128 let temp_file = OpenOptions::new()
129 .create(true)
130 .write(true)
131 .truncate(true) // start fresh — we're rewriting the whole log
132 .open(path)?;
133 let mut temp_writer = BufWriter::new(temp_file);
134
135 // Write each entry as a JSON line, same format as the live log.
136 for entry in entries {
137 writeln!(temp_writer, "{}", serde_json::to_string(&entry)?)?;
138 }
139
140 temp_writer.flush()?;
141 Ok(())
142}
143
144pub(super) fn write_compacted_log(path: &str, entries: &[LogEntry]) -> Result<(), DbError> {
145 let temp_file = OpenOptions::new()
146 .create(true)
147 .write(true)
148 .truncate(true) // start fresh — we're rewriting the whole log
149 .open(path)?;
150 let mut temp_writer = BufWriter::new(temp_file);
151
152 // Write each entry as a JSON line, same format as the live log.
153 for entry in entries {
154 // We write each entry in its own transaction in the compacted log.
155 // This ensures they are replayed correctly even if followed by other log entries.
156 let tx_id = format!("compact-{}", entry.key);
157
158 let begin = LogEntry {
159 cmd: "TX_BEGIN".to_string(),
160 collection: entry.collection.clone(),
161 key: tx_id.clone(),
162 value: serde_json::Value::Null,
163 _t: entry._t,
164 };
165 writeln!(temp_writer, "{}", serde_json::to_string(&begin)?)?;
166
167 writeln!(temp_writer, "{}", serde_json::to_string(&entry)?)?;
168
169 let commit = LogEntry {
170 cmd: "TX_COMMIT".to_string(),
171 collection: entry.collection.clone(),
172 key: tx_id,
173 value: serde_json::Value::Null,
174 _t: entry._t,
175 };
176 writeln!(temp_writer, "{}", serde_json::to_string(&commit)?)?;
177 }
178
179 temp_writer.flush()?;
180 Ok(())
181}
182
183/// Try to load a previously written binary snapshot.
184/// Returns `Some((entries, seq))` on success, or `None` if:
185/// - the snapshot file doesn't exist (first run)
186/// - the magic header doesn't match (corrupt file)
187/// - any read fails (truncated file, wrong format)
188pub(super) fn load_snapshot(log_path: &str) -> Option<(Vec<LogEntry>, u64)> {
189 let path = snapshot_path(log_path);
190 if !Path::new(&path).exists() {
191 return None;
192 }
193 tracing::info!("🔍 Attempting to load snapshot from {}", path);
194 // If the file doesn't exist, open() returns Err and we return None.
195 let mut file = File::open(&path).ok()?;
196
197 use std::io::Read;
198
199 // Validate the magic header — if it doesn't match, the file is not ours.
200 let mut magic = [0u8; 8];
201 file.read_exact(&mut magic).ok()?;
202 if &magic != b"MOLTSNAP" {
203 tracing::warn!("❌ Invalid snapshot magic header");
204 return None; // Not a valid snapshot file
205 }
206
207 // Read the sequence number (how many log lines to skip on replay).
208 let mut seq_bytes = [0u8; 8];
209 file.read_exact(&mut seq_bytes).ok()?;
210 let seq = u64::from_le_bytes(seq_bytes);
211
212 // Read the entry count so we can pre-allocate the Vec.
213 let mut count_bytes = [0u8; 8];
214 file.read_exact(&mut count_bytes).ok()?;
215 let count = u64::from_le_bytes(count_bytes) as usize;
216
217 tracing::info!("📂 Snapshot header: seq={}, count={}", seq, count);
218
219 let mut entries = Vec::with_capacity(count);
220 for i in 0..count {
221 // Read the length prefix for this entry.
222 let mut len_bytes = [0u8; 8];
223 if let Err(e) = file.read_exact(&mut len_bytes) {
224 tracing::error!("❌ Failed to read entry {} length: {}", i, e);
225 return None;
226 }
227 let len = u64::from_le_bytes(len_bytes) as usize;
228
229 // Read exactly `len` bytes and deserialize with JSON.
230 let mut buf = vec![0u8; len];
231 if let Err(e) = file.read_exact(&mut buf) {
232 tracing::error!("❌ Failed to read entry {} data: {}", i, e);
233 return None;
234 }
235
236 // If the entry is all zeros or empty, it might be a partial write
237 if len > 0 && buf.iter().all(|&b| b == 0) {
238 tracing::error!("❌ Entry {} data is all zeros. Snapshot might be corrupt.", i);
239 return None;
240 }
241
242 // If deserialization fails (e.g. schema changed), return None so we
243 // fall back to full log replay instead of crashing.
244 let entry: LogEntry = match serde_json::from_slice(&buf) {
245 Ok(e) => e,
246 Err(err) => {
247 let sample = if buf.len() > 20 { &buf[..20] } else { &buf };
248 tracing::error!(
249 "❌ Failed to deserialize entry {} (len {}): {}. Sample: {:?}. This usually happens if the snapshot was created with an older version of MoltenDB or is corrupt. Falling back to log replay.",
250 i, len, err, sample
251 );
252 return None;
253 }
254 };
255 entries.push(entry);
256 }
257
258 Some((entries, seq))
259}
260
261// ─── Streaming log reader ─────────────────────────────────────────────────────
262//
263// The log file is a plain text file where each line is a JSON-encoded LogEntry.
264// Instead of reading the whole file into a Vec<String> and then parsing, we
265// stream it line-by-line so only one entry is in memory at a time.
266// ─────────────────────────────────────────────────────────────────────────────
267
268/// Open the log file at `path` and call `f` for each successfully parsed
269/// `LogEntry`, skipping the first `skip_lines` lines (those are already
270/// covered by a loaded snapshot and don't need to be replayed again).
271///
272/// Lines that fail to parse (e.g. partial writes from a crash) are silently
273/// skipped — the database will simply not see those entries, which is safe
274/// because the in-memory state is rebuilt from what we can read.
275pub fn stream_log_entries<F>(path: &str, skip_lines: u64, mut f: F) -> Result<ControlFlow<(), ()>, DbError>
276where
277 F: FnMut(LogEntry, u32) -> ControlFlow<(), ()>, // closure called once per valid entry + raw byte length
278{
279 // If the file doesn't exist yet (first run), just do nothing.
280 if let Ok(file) = File::open(path) {
281 // BufReader wraps the file with an internal buffer so we don't make
282 // one syscall per byte — it reads in chunks and serves lines from RAM.
283 let reader = BufReader::new(file);
284 for (i, line) in reader.lines().enumerate() {
285 // Skip lines already captured in the snapshot.
286 if (i as u64) < skip_lines {
287 continue;
288 }
289 // Ignore lines that fail to read (e.g. I/O error mid-line).
290 if let Ok(json_str) = line {
291 let length = json_str.len() as u32;
292 // Ignore lines that fail to parse (e.g. partial write on crash).
293 if let Ok(entry) = serde_json::from_str::<LogEntry>(&json_str) {
294 if let ControlFlow::Break(_) = f(entry, length) {
295 return Ok(ControlFlow::Break(()));
296 }
297 }
298 }
299 }
300 }
301 Ok(ControlFlow::Continue(()))
302}
303
304/// Count the total number of lines in the log file.
305/// This is used when writing a snapshot to record the current sequence number
306/// (i.e. "the snapshot covers the first N lines of the log").
307pub(super) fn count_log_lines(path: &str) -> u64 {
308 if let Ok(file) = File::open(path) {
309 // .lines() is lazy — it reads one line at a time, so this doesn't
310 // load the whole file into memory.
311 BufReader::new(file).lines().count() as u64
312 } else {
313 0 // File doesn't exist yet
314 }
315}
316
317// ─── read_log (still needed by EncryptedStorage wrapper) ─────────────────────
318//
319// EncryptedStorage wraps another StorageBackend and decrypts entries before
320// they can be applied to state. Because decryption must happen before we can
321// call apply_entry(), EncryptedStorage uses read_log() (which returns a full
322// Vec) rather than stream_log_into() (which applies entries on the fly).
323// ─────────────────────────────────────────────────────────────────────────────
324
325/// Read all log entries from disk into a Vec<LogEntry>.
326/// This is a convenience wrapper around stream_log_entries that collects
327/// everything into a Vec. Used by EncryptedStorage.
328pub fn read_log_from_disk(path: &str) -> Result<Vec<LogEntry>, DbError> {
329 let mut entries = Vec::new();
330 // skip_lines = 0 means read from the very beginning (no snapshot skip here,
331 // because EncryptedStorage handles its own snapshot logic via read_log).
332 let _ = stream_log_entries(path, 0, |e, _| {
333 entries.push(e);
334 ControlFlow::Continue(())
335 })?;
336 Ok(entries)
337}
338
339// ─── Compacted log writer ─────────────────────────────────────────────────────
340//
341// Compaction rewrites the log file to contain only the current state of the
342// database — removing all superseded INSERT entries and all DELETE tombstones.
343// This keeps the log file from growing unboundedly over time.
344// ─────────────────────────────────────────────────────────────────────────────
345
346// ─── AsyncDiskStorage ─────────────────────────────────────────────────────────
347//
348// Design: the write path is completely non-blocking. When write_entry() is
349// called, it serializes the entry to JSON and sends it over an unbounded MPSC
350// channel. A background Tokio task receives from that channel and writes to a
351// BufWriter. The BufWriter is flushed every 50 ms (on timeout) or whenever
352// the channel is drained.
353//
354// Trade-off: if the process is killed (SIGKILL / power loss) within the 50 ms
355// window, the last few writes may be lost. For analytics workloads this is
356// usually acceptable. Use SyncDiskStorage if you need zero data loss.
357// ─────────────────────────────────────────────────────────────────────────────
358
359/// High-performance async disk writer.
360///
361/// Writes are sent over an MPSC channel and flushed to disk every 50 ms by a
362/// background Tokio task. The write path never blocks the caller.
363pub struct AsyncDiskStorage {
364 /// The sending half of the MPSC channel. Cloning this is cheap — all
365 /// clones share the same underlying channel.
366 sender: Option<mpsc::UnboundedSender<String>>,
367 /// Path to the log file on disk. Stored so we can read/compact it later.
368 path: String,
369 /// Handle to the background writer task. Stored so Drop can await it.
370 writer_task: Option<JoinHandle<()>>,
371}
372
373impl AsyncDiskStorage {
374 /// Open (or create) the log file at `path` and spawn the background writer task.
375 pub fn new(path: &str) -> Result<Self, DbError> {
376 // Create an unbounded MPSC channel.
377 // `log_tx` (sender) is kept in the struct; `log_rx` (receiver) goes to the task.
378 let (log_tx, mut log_rx) = mpsc::unbounded_channel::<String>();
379 let path_clone = path.to_string();
380
381 // Spawn a Tokio task that owns the file handle and BufWriter.
382 // This task runs for the lifetime of the server.
383 let writer_task = tokio::spawn(async move {
384 // Open the file in append mode so existing data is preserved.
385 let file = OpenOptions::new()
386 .create(true)
387 .append(true)
388 .open(&path_clone)
389 .unwrap();
390 let mut w = BufWriter::new(file);
391
392 loop {
393 // Wait up to 50 ms for the next message.
394 // If a message arrives within 50 ms → process it immediately.
395 // If the timeout fires → flush the BufWriter to disk.
396 match tokio::time::timeout(
397 std::time::Duration::from_millis(50),
398 log_rx.recv(),
399 )
400 .await
401 {
402 // A message arrived within the timeout window.
403 Ok(Some(log_line)) => {
404 // Special sentinel: the compact() method sends this to
405 // tell us to swap the log file atomically.
406 if log_line.starts_with("__RELOAD_FILE__") {
407 // Extract the temp file path from the sentinel string.
408 let temp_path = log_line.replace("__RELOAD_FILE__", "");
409 // println!("🔥 Worker: Reloading file from {}", temp_path);
410
411 // Flush and close the current file before renaming.
412 // On Windows, a file cannot be renamed while it's open.
413 w.flush().unwrap();
414 drop(w); // Release the file handle / Windows lock
415
416 // Atomically replace the live log with the compacted version.
417 if let Err(e) = std::fs::rename(&temp_path, &path_clone) {
418 tracing::error!("Failed to swap compacted file: {}", e);
419 }
420
421 // Re-open the (now compacted) log file for future writes.
422 let new_file = OpenOptions::new()
423 .create(true)
424 .append(true)
425 .open(&path_clone)
426 .unwrap();
427 w = BufWriter::new(new_file);
428 } else {
429 // Normal log line — append it to the BufWriter's buffer.
430 if let Err(e) = writeln!(w, "{}", log_line) {
431 tracing::error!("Failed to write to disk: {}", e);
432 }
433 }
434 }
435 // The channel was closed (sender dropped) — the server is shutting down.
436 // The BufWriter will be dropped here, which flushes its buffer to the OS.
437 Ok(None) => break,
438 // Timeout fired — no message in the last 50 ms. Flush buffered data.
439 Err(_) => {
440 let _ = w.flush();
441 }
442 }
443 }
444 // When the loop exits, `w` is dropped here, which flushes the BufWriter.
445 let _ = w.flush();
446 });
447
448 Ok(Self {
449 sender: Some(log_tx),
450 path: path.to_string(),
451 writer_task: Some(writer_task),
452 })
453 }
454}
455
456impl Drop for AsyncDiskStorage {
457 /// On drop, close the sender (signals the writer task to exit) then block
458 /// until the task has drained its queue and flushed everything to disk.
459 fn drop(&mut self) {
460 // Drop the sender — this closes the channel and causes log_rx.recv()
461 // to return None, which breaks the writer task's loop.
462 drop(self.sender.take());
463
464 // Now await the writer task so we don't return until all queued lines
465 // have been written and flushed to the OS.
466 if let Some(handle) = self.writer_task.take() {
467 tokio::task::block_in_place(|| {
468 tokio::runtime::Handle::current().block_on(handle)
469 })
470 .ok();
471 }
472 }
473}
474
475impl StorageBackend for AsyncDiskStorage {
476 /// Serialize `entry` to a JSON string and send it to the background writer.
477 /// This call returns immediately — it never blocks waiting for disk I/O.
478 fn write_entry(&self, entry: &LogEntry) -> Result<(), DbError> {
479 let json_line = serde_json::to_string(entry)?;
480 // send() only fails if the receiver (background task) has been dropped,
481 // which means the server is shutting down.
482 if let Some(ref sender) = self.sender {
483 sender.send(json_line).map_err(|_| DbError::WriteError)?;
484 }
485 Ok(())
486 }
487
488 /// Read all entries from the log file into a Vec.
489 /// Used by EncryptedStorage which needs the full list to decrypt.
490 fn read_log(&self) -> Result<Vec<LogEntry>, DbError> {
491 read_log_from_disk(&self.path)
492 }
493
494 /// Compact the log: write a binary snapshot, rewrite the log to be empty,
495 /// then signal the background task to swap the file.
496 fn compact(&self, entries: Vec<LogEntry>) -> Result<(), DbError> {
497 // Step 1: Write a binary snapshot.
498 // After compaction the log is reset to empty, so seq=0: all future log
499 // lines written after this snapshot must be replayed from the start.
500 let seq = 0u64;
501 if let Err(e) = write_snapshot(&self.path, &entries, seq) {
502 tracing::warn!("⚠️ Failed to write snapshot during compaction: {}", e);
503 }
504
505 // Step 2: Write an empty compacted log to a temp file.
506 // Since the snapshot now contains the full state, we can start the log fresh.
507 let temp_path = format!("{}.tmp", self.path);
508 write_compacted_log_no_tx(&temp_path, &[])?;
509
510 // Step 3: Send the sentinel to the background task so it flushes,
511 // closes the current file, renames the temp file over it, and reopens.
512 if let Some(ref sender) = self.sender {
513 sender
514 .send(format!("__RELOAD_FILE__{}", temp_path))
515 .map_err(|_| DbError::WriteError)?;
516 }
517 Ok(())
518 }
519
520 /// Read exactly `length` bytes from the log at `offset`.
521 fn read_at(&self, offset: u64, length: u32) -> Result<Vec<u8>, DbError> {
522 use std::io::{Read, Seek, SeekFrom};
523 let mut file = File::open(&self.path)?;
524 file.seek(SeekFrom::Start(offset))?;
525 let mut buffer = vec![0u8; length as usize];
526 file.read_exact(&mut buffer)?;
527 Ok(buffer)
528 }
529
530 /// Stream log entries into state using snapshot + delta replay.
531 ///
532 /// Fast path (after first compaction):
533 /// 1. Load binary snapshot → apply all entries in it.
534 /// 2. Stream only the log lines written AFTER the snapshot (the "delta").
535 ///
536 /// Slow path (first run, no snapshot):
537 /// Stream the entire log file line-by-line. No full Vec in RAM.
538 fn stream_log_into(
539 &self,
540 f: &mut dyn FnMut(LogEntry, u32) -> ControlFlow<(), ()>,
541 ) -> Result<u64, DbError> {
542 let mut count = 0u64;
543 // Attempt to load the binary snapshot for fast startup.
544 if let Some((snapshot_entries, seq)) = load_snapshot(&self.path) {
545 for entry in snapshot_entries {
546 // Entries from snapshot MUST be Hot because they are not in the log file
547 // and thus don't have a valid RecordPointer for this log instance.
548 if let ControlFlow::Break(_) = f(entry, 0) {
549 return Ok(count);
550 }
551 count += 1;
552 }
553 // Then replay only the log lines that came after the snapshot.
554 // `seq` is the number of lines to skip (already in the snapshot).
555 if let ControlFlow::Break(_) = stream_log_entries(&self.path, seq, |e, l| {
556 let res = f(e, l);
557 if let ControlFlow::Continue(_) = res {
558 count += 1;
559 }
560 res
561 })? {
562 return Ok(count);
563 }
564 return Ok(count);
565 }
566
567 // No snapshot found — stream the full log from the beginning.
568 let _ = stream_log_entries(&self.path, 0, |e, l| {
569 let res = f(e, l);
570 if let ControlFlow::Continue(_) = res {
571 count += 1;
572 }
573 res
574 })?;
575 Ok(count)
576 }
577}
578
579// ─── SyncDiskStorage ──────────────────────────────────────────────────────────
580//
581// Design: every write_entry() call acquires a Mutex, writes the JSON line to
582// a BufWriter, and immediately flushes the BufWriter. The flush() call blocks
583// until the OS confirms the data is in its write buffer (not necessarily on
584// physical disk, but durable enough for most crash scenarios).
585//
586// Trade-off: much lower throughput than AsyncDiskStorage because every write
587// blocks the caller. Use this when data loss is unacceptable.
588// ─────────────────────────────────────────────────────────────────────────────
589
590/// High-durability synchronous disk writer.
591///
592/// Every write is flushed to disk before returning. Zero data loss on crash,
593/// but lower throughput than AsyncDiskStorage. Enable with WRITE_MODE=sync.
594pub struct SyncDiskStorage {
595 /// The BufWriter wrapped in a Mutex so multiple threads can write safely.
596 /// Arc allows the struct to be cloned (shared across Axum handler threads).
597 writer: Arc<Mutex<BufWriter<File>>>,
598 /// Path to the log file. Stored for read/compact operations.
599 path: String,
600}
601
602impl SyncDiskStorage {
603 /// Open (or create) the log file at `path` in append mode.
604 pub fn new(path: &str) -> Result<Self, DbError> {
605 let file = OpenOptions::new().create(true).append(true).open(path)?;
606
607 Ok(Self {
608 writer: Arc::new(Mutex::new(BufWriter::new(file))),
609 path: path.to_string(),
610 })
611 }
612}
613
614impl StorageBackend for SyncDiskStorage {
615 /// Serialize `entry` to JSON, write it to the BufWriter, and flush immediately.
616 /// This call blocks until the OS has accepted the data.
617 fn write_entry(&self, entry: &LogEntry) -> Result<(), DbError> {
618 let json_line = serde_json::to_string(entry)?;
619 // Lock the Mutex — only one thread can write at a time.
620 let mut w = self.writer.lock().map_err(|_| DbError::LockPoisoned)?;
621 writeln!(w, "{}", json_line)?;
622 // Flush immediately so the data is durable before we return.
623 w.flush()?;
624 Ok(())
625 }
626
627 /// Read all entries from the log file into a Vec.
628 fn read_log(&self) -> Result<Vec<LogEntry>, DbError> {
629 read_log_from_disk(&self.path)
630 }
631
632 /// Compact the log: write a binary snapshot, swap the log file with an
633 /// empty one, then reopen the writer.
634 fn compact(&self, entries: Vec<LogEntry>) -> Result<(), DbError> {
635 // Step 1: Write binary snapshot for fast next startup.
636 // After compaction the log is reset to empty, so seq=0: all future log
637 // lines written after this snapshot must be replayed from the start.
638 let seq = 0u64;
639 if let Err(e) = write_snapshot(&self.path, &entries, seq) {
640 tracing::warn!("⚠️ Failed to write snapshot during compaction: {}", e);
641 }
642
643 // Step 2: Write an empty compacted log to a temp file.
644 let temp_path = format!("{}.tmp", self.path);
645 write_compacted_log_no_tx(&temp_path, &[])?;
646
647 // Step 3: Lock the writer, rename the temp file over the live log,
648 // then reopen the writer so future writes go to the compacted file.
649 let mut w = self.writer.lock().map_err(|_| DbError::LockPoisoned)?;
650 // On Unix this rename is atomic. On Windows the file must be closed first,
651 // but since we hold the Mutex no other thread can write concurrently.
652 if let Err(e) = std::fs::rename(&temp_path, &self.path) {
653 tracing::error!("Failed to swap compacted file: {}", e);
654 return Err(DbError::from(e));
655 }
656
657 // Reopen the file so the writer points at the new compacted log.
658 let new_file = OpenOptions::new()
659 .create(true)
660 .append(true)
661 .open(&self.path)?;
662 *w = BufWriter::new(new_file);
663 Ok(())
664 }
665
666 /// Read exactly `length` bytes from the log at `offset`.
667 fn read_at(&self, offset: u64, length: u32) -> Result<Vec<u8>, DbError> {
668 use std::io::{Read, Seek, SeekFrom};
669 let mut file = File::open(&self.path)?;
670 file.seek(SeekFrom::Start(offset))?;
671 let mut buffer = vec![0u8; length as usize];
672 file.read_exact(&mut buffer)?;
673 Ok(buffer)
674 }
675
676 /// Stream log entries into state using snapshot + delta replay.
677 /// Same logic as AsyncDiskStorage::stream_log_into — see that method for details.
678 fn stream_log_into(
679 &self,
680 f: &mut dyn FnMut(LogEntry, u32) -> ControlFlow<(), ()>,
681 ) -> Result<u64, DbError> {
682 let mut count = 0u64;
683 // Fast path: load snapshot and replay only the delta.
684 if let Some((snapshot_entries, seq)) = load_snapshot(&self.path) {
685 tracing::info!(
686 "⚡ Snapshot loaded ({} entries, seq {}). Replaying delta only...",
687 snapshot_entries.len(),
688 seq
689 );
690 for entry in snapshot_entries {
691 // Entries from snapshot MUST be Hot because they are not in the log file
692 // and thus don't have a valid RecordPointer for this log instance.
693 if let ControlFlow::Break(_) = f(entry, 0) {
694 return Ok(count);
695 }
696 count += 1;
697 }
698 if let ControlFlow::Break(_) = stream_log_entries(&self.path, seq, |e, l| {
699 let res = f(e, l);
700 if let ControlFlow::Continue(_) = res {
701 count += 1;
702 }
703 res
704 })? {
705 return Ok(count);
706 }
707 return Ok(count);
708 }
709
710 // Slow path: stream the full log line-by-line.
711 let _ = stream_log_entries(&self.path, 0, |e, l| {
712 let res = f(e, l);
713 if let ControlFlow::Continue(_) = res {
714 count += 1;
715 }
716 res
717 })?;
718 Ok(count)
719 }
720}