moltendb_core/engine/storage/disk.rs
1// ─── disk.rs ─────────────────────────────────────────────────────────────────
2// This file implements two concrete StorageBackend implementations that write
3// the database log to a real file on disk:
4//
5// • AsyncDiskStorage — high-throughput, non-blocking writes via an MPSC
6// channel + background Tokio task. Writes are buffered
7// in memory and flushed to disk every 50 ms. Ideal for
8// analytics / high-write workloads where losing the last
9// 50 ms of data on a crash is acceptable.
10//
11// • SyncDiskStorage — every write blocks until the OS confirms the data is
12// on disk (flush after every entry). Zero data loss on
13// crash, but much lower throughput. Enabled by setting
14// the WRITE_MODE=sync environment variable.
15//
16// Both implementations share the same binary snapshot + streaming log replay
17// logic for fast startup (see "Snapshot helpers" section below).
18// ─────────────────────────────────────────────────────────────────────────────
19
20// StorageBackend is the trait defined in mod.rs that both structs implement.
21use super::StorageBackend;
22// LogEntry is the unit of data written to the log (cmd, collection, key, value).
23// DbError is our custom error enum.
24use crate::engine::types::{DbError, LogEntry};
25// Standard library file I/O types.
26use std::ops::ControlFlow;
27use std::fs::{File, OpenOptions};
28use std::path::Path;
29use std::time::SystemTime;
30// BufRead lets us iterate a file line-by-line without loading it all into RAM.
31// BufWriter batches small writes into larger OS-level write calls for efficiency.
32use std::io::{BufRead, BufReader, BufWriter, Write};
33// Arc = thread-safe reference-counted pointer (shared ownership across threads).
34// Mutex = mutual exclusion lock (only one thread can hold it at a time).
35use std::sync::{Arc, Mutex};
36// Tokio's async MPSC channel: multiple producers, single consumer.
37// Used to send log lines from the write path to the background flush task.
38use tokio::sync::mpsc;
39use tokio::task::JoinHandle;
40
41// ─── Snapshot helpers ────────────────────────────────────────────────────────
42//
43// A "snapshot" is a compact binary file that captures the entire current state
44// of the database at a point in time. On the next startup we load the snapshot
45// first (fast binary deserialization) and then only replay the log lines that
46// were written AFTER the snapshot was taken — instead of replaying the entire
47// log from the beginning. This dramatically reduces startup time for large DBs.
48//
49// Snapshot file format (binary, little-endian):
50// [8 bytes] magic header: "MOLTSNAP"
51// [8 bytes] seq: number of log lines captured in this snapshot
52// [8 bytes] count: number of LogEntry records that follow
53// for each entry:
54// [8 bytes] len: byte length of the bincode-encoded entry
55// [len bytes] bincode-encoded LogEntry
56// ─────────────────────────────────────────────────────────────────────────────
57
58/// Returns the path of the binary snapshot file for a given log file path.
59/// Convention: `my_database.log` → `my_database.log.snapshot.bin`
60pub(super) fn snapshot_path(log_path: &str) -> String {
61 format!("{}.snapshot.bin", log_path)
62}
63
64pub(super) fn write_snapshot(log_path: &str, entries: &[LogEntry], seq: u64) -> Result<(), DbError> {
65 let path = snapshot_path(log_path);
66 // Write to a temp file first so the swap is atomic.
67 let tmp = format!("{}.tmp", path);
68
69 let file = OpenOptions::new()
70 .create(true) // create if it doesn't exist
71 .write(true)
72 .truncate(true) // overwrite any existing content
73 .open(&tmp)?;
74 let mut w = BufWriter::new(file);
75
76 // Magic header so we can detect corrupt/wrong files on load.
77 w.write_all(b"MOLTSNAP")?;
78 // Sequence number: how many log lines are already captured here.
79 w.write_all(&seq.to_le_bytes())?;
80
81 // Number of entries, so the reader can pre-allocate a Vec.
82 let count = entries.len() as u64;
83 w.write_all(&count.to_le_bytes())?;
84
85 // Each entry is length-prefixed so the reader knows how many bytes to read.
86 for entry in entries {
87 // We use JSON for snapshots as well for now to avoid bincode issues with dynamic Value
88 let encoded = serde_json::to_vec(entry).map_err(|_| DbError::WriteError)?;
89 let len = encoded.len() as u64;
90 w.write_all(&len.to_le_bytes())?;
91 w.write_all(&encoded)?;
92 }
93
94 // Flush the BufWriter to ensure all bytes reach the OS buffer.
95 w.flush()?;
96 // Drop the writer to release the file handle before renaming (required on Windows).
97 drop(w);
98
99 // Before renaming the new snapshot, move the old one to the backup folder.
100 if Path::new(&path).exists() {
101 let log_dir = Path::new(log_path).parent().unwrap_or_else(|| Path::new("."));
102 let backup_dir = log_dir.join("backup");
103
104 // Ensure backup directory exists
105 std::fs::create_dir_all(&backup_dir)?;
106
107 let now = SystemTime::now()
108 .duration_since(SystemTime::UNIX_EPOCH)
109 .map(|d| d.as_secs())
110 .unwrap_or(0);
111
112 let filename = Path::new(&path).file_name()
113 .and_then(|n| n.to_str())
114 .unwrap_or("snapshot.bin");
115
116 let backup_path = backup_dir.join(format!("{}.{}.bak", filename, now));
117
118 // Move current snapshot to backup
119 let _ = std::fs::rename(&path, &backup_path);
120 }
121
122 // Atomic rename: replaces the old snapshot file in one OS operation.
123 std::fs::rename(&tmp, &path)?;
124 Ok(())
125}
126
127pub(super) fn write_compacted_log_no_tx(path: &str, entries: &[LogEntry]) -> Result<(), DbError> {
128 let temp_file = OpenOptions::new()
129 .create(true)
130 .write(true)
131 .truncate(true) // start fresh — we're rewriting the whole log
132 .open(path)?;
133 let mut temp_writer = BufWriter::new(temp_file);
134
135 // Write each entry as a JSON line, same format as the live log.
136 for entry in entries {
137 writeln!(temp_writer, "{}", serde_json::to_string(&entry)?)?;
138 }
139
140 temp_writer.flush()?;
141 Ok(())
142}
143
144pub(super) fn write_compacted_log(path: &str, entries: &[LogEntry]) -> Result<(), DbError> {
145 let temp_file = OpenOptions::new()
146 .create(true)
147 .write(true)
148 .truncate(true) // start fresh — we're rewriting the whole log
149 .open(path)?;
150 let mut temp_writer = BufWriter::new(temp_file);
151
152 // Write each entry as a JSON line, same format as the live log.
153 for entry in entries {
154 // We write each entry in its own transaction in the compacted log.
155 // This ensures they are replayed correctly even if followed by other log entries.
156 let tx_id = format!("compact-{}", entry.key);
157
158 let begin = LogEntry {
159 cmd: "TX_BEGIN".to_string(),
160 collection: entry.collection.clone(),
161 key: tx_id.clone(),
162 value: serde_json::Value::Null,
163 _t: entry._t,
164 };
165 writeln!(temp_writer, "{}", serde_json::to_string(&begin)?)?;
166
167 writeln!(temp_writer, "{}", serde_json::to_string(&entry)?)?;
168
169 let commit = LogEntry {
170 cmd: "TX_COMMIT".to_string(),
171 collection: entry.collection.clone(),
172 key: tx_id,
173 value: serde_json::Value::Null,
174 _t: entry._t,
175 };
176 writeln!(temp_writer, "{}", serde_json::to_string(&commit)?)?;
177 }
178
179 temp_writer.flush()?;
180 Ok(())
181}
182
183/// Try to load a previously written binary snapshot.
184/// Returns `Some((entries, seq))` on success, or `None` if:
185/// - the snapshot file doesn't exist (first run)
186/// - the magic header doesn't match (corrupt file)
187/// - any read fails (truncated file, wrong format)
188pub(super) fn load_snapshot(log_path: &str) -> Option<(Vec<LogEntry>, u64)> {
189 let path = snapshot_path(log_path);
190 if !Path::new(&path).exists() {
191 return None;
192 }
193 tracing::info!("🔍 Attempting to load snapshot from {}", path);
194 // If the file doesn't exist, open() returns Err and we return None.
195 let mut file = File::open(&path).ok()?;
196
197 use std::io::Read;
198
199 // Validate the magic header — if it doesn't match, the file is not ours.
200 let mut magic = [0u8; 8];
201 file.read_exact(&mut magic).ok()?;
202 if &magic != b"MOLTSNAP" {
203 tracing::warn!("❌ Invalid snapshot magic header");
204 return None; // Not a valid snapshot file
205 }
206
207 // Read the sequence number (how many log lines to skip on replay).
208 let mut seq_bytes = [0u8; 8];
209 file.read_exact(&mut seq_bytes).ok()?;
210 let seq = u64::from_le_bytes(seq_bytes);
211
212 // Read the entry count so we can pre-allocate the Vec.
213 let mut count_bytes = [0u8; 8];
214 file.read_exact(&mut count_bytes).ok()?;
215 let count = u64::from_le_bytes(count_bytes) as usize;
216
217 tracing::info!("📂 Snapshot header: seq={}, count={}", seq, count);
218
219 let mut entries = Vec::with_capacity(count);
220 for i in 0..count {
221 // Read the length prefix for this entry.
222 let mut len_bytes = [0u8; 8];
223 if let Err(e) = file.read_exact(&mut len_bytes) {
224 tracing::error!("❌ Failed to read entry {} length: {}", i, e);
225 return None;
226 }
227 let len = u64::from_le_bytes(len_bytes) as usize;
228
229 // Read exactly `len` bytes and deserialize with JSON.
230 let mut buf = vec![0u8; len];
231 if let Err(e) = file.read_exact(&mut buf) {
232 tracing::error!("❌ Failed to read entry {} data: {}", i, e);
233 return None;
234 }
235
236 // If the entry is all zeros or empty, it might be a partial write
237 if len > 0 && buf.iter().all(|&b| b == 0) {
238 tracing::error!("❌ Entry {} data is all zeros. Snapshot might be corrupt.", i);
239 return None;
240 }
241
242 // If deserialization fails (e.g. schema changed), return None so we
243 // fall back to full log replay instead of crashing.
244 let entry: LogEntry = match serde_json::from_slice(&buf) {
245 Ok(e) => e,
246 Err(err) => {
247 let sample = if buf.len() > 20 { &buf[..20] } else { &buf };
248 tracing::error!(
249 "❌ Failed to deserialize entry {} (len {}): {}. Sample: {:?}. This usually happens if the snapshot was created with an older version of MoltenDB or is corrupt. Falling back to log replay.",
250 i, len, err, sample
251 );
252 return None;
253 }
254 };
255 entries.push(entry);
256 }
257
258 Some((entries, seq))
259}
260
261// ─── Streaming log reader ─────────────────────────────────────────────────────
262//
263// The log file is a plain text file where each line is a JSON-encoded LogEntry.
264// Instead of reading the whole file into a Vec<String> and then parsing, we
265// stream it line-by-line so only one entry is in memory at a time.
266// ─────────────────────────────────────────────────────────────────────────────
267
268/// Open the log file at `path` and call `f` for each successfully parsed
269/// `LogEntry`, skipping the first `skip_lines` lines (those are already
270/// covered by a loaded snapshot and don't need to be replayed again).
271///
272/// Lines that fail to parse (e.g. partial writes from a crash) are silently
273/// skipped — the database will simply not see those entries, which is safe
274/// because the in-memory state is rebuilt from what we can read.
275pub fn stream_log_entries<F>(path: &str, skip_lines: u64, mut f: F) -> Result<ControlFlow<(), ()>, DbError>
276where
277 F: FnMut(LogEntry, u32) -> ControlFlow<(), ()>, // closure called once per valid entry + raw byte length
278{
279 // If the file doesn't exist yet (first run), just do nothing.
280 if let Ok(file) = File::open(path) {
281 // BufReader wraps the file with an internal buffer so we don't make
282 // one syscall per byte — it reads in chunks and serves lines from RAM.
283 let reader = BufReader::new(file);
284 for (i, line) in reader.lines().enumerate() {
285 // Skip lines already captured in the snapshot.
286 if (i as u64) < skip_lines {
287 continue;
288 }
289 // Ignore lines that fail to read (e.g. I/O error mid-line).
290 if let Ok(json_str) = line {
291 let length = json_str.len() as u32;
292 // Ignore lines that fail to parse (e.g. partial write on crash).
293 if let Ok(entry) = serde_json::from_str::<LogEntry>(&json_str) {
294 if let ControlFlow::Break(_) = f(entry, length) {
295 return Ok(ControlFlow::Break(()));
296 }
297 }
298 }
299 }
300 }
301 Ok(ControlFlow::Continue(()))
302}
303
304/// Count the total number of lines in the log file.
305/// This is used when writing a snapshot to record the current sequence number
306/// (i.e. "the snapshot covers the first N lines of the log").
307pub(super) fn count_log_lines(path: &str) -> u64 {
308 if let Ok(file) = File::open(path) {
309 // .lines() is lazy — it reads one line at a time, so this doesn't
310 // load the whole file into memory.
311 BufReader::new(file).lines().count() as u64
312 } else {
313 0 // File doesn't exist yet
314 }
315}
316
317// ─── read_log (still needed by EncryptedStorage wrapper) ─────────────────────
318//
319// EncryptedStorage wraps another StorageBackend and decrypts entries before
320// they can be applied to state. Because decryption must happen before we can
321// call apply_entry(), EncryptedStorage uses read_log() (which returns a full
322// Vec) rather than stream_log_into() (which applies entries on the fly).
323// ─────────────────────────────────────────────────────────────────────────────
324
325/// Read all log entries from disk into a Vec<LogEntry>.
326/// This is a convenience wrapper around stream_log_entries that collects
327/// everything into a Vec. Used by EncryptedStorage.
328pub fn read_log_from_disk(path: &str) -> Result<Vec<LogEntry>, DbError> {
329 let mut entries = Vec::new();
330 // skip_lines = 0 means read from the very beginning (no snapshot skip here,
331 // because EncryptedStorage handles its own snapshot logic via read_log).
332 let _ = stream_log_entries(path, 0, |e, _| {
333 entries.push(e);
334 ControlFlow::Continue(())
335 })?;
336 Ok(entries)
337}
338
339// ─── Compacted log writer ─────────────────────────────────────────────────────
340//
341// Compaction rewrites the log file to contain only the current state of the
342// database — removing all superseded INSERT entries and all DELETE tombstones.
343// This keeps the log file from growing unboundedly over time.
344// ─────────────────────────────────────────────────────────────────────────────
345
346// ─── AsyncDiskStorage ─────────────────────────────────────────────────────────
347//
348// Design: the write path is completely non-blocking. When write_entry() is
349// called, it serializes the entry to JSON and sends it over an unbounded MPSC
350// channel. A background Tokio task receives from that channel and writes to a
351// BufWriter. The BufWriter is flushed every 50 ms (on timeout) or whenever
352// the channel is drained.
353//
354// Trade-off: if the process is killed (SIGKILL / power loss) within the 50 ms
355// window, the last few writes may be lost. For analytics workloads this is
356// usually acceptable. Use SyncDiskStorage if you need zero data loss.
357// ─────────────────────────────────────────────────────────────────────────────
358
359/// High-performance async disk writer.
360///
361/// Writes are sent over an MPSC channel and flushed to disk every 50 ms by a
362/// background Tokio task. The write path never blocks the caller.
363pub struct AsyncDiskStorage {
364 /// The sending half of the MPSC channel. Cloning this is cheap — all
365 /// clones share the same underlying channel.
366 sender: Option<mpsc::UnboundedSender<String>>,
367 /// Path to the log file on disk. Stored so we can read/compact it later.
368 path: String,
369 /// Handle to the background writer task. Stored so Drop can await it.
370 writer_task: Option<JoinHandle<()>>,
371}
372
373impl AsyncDiskStorage {
374 /// Open (or create) the log file at `path` and spawn the background writer task.
375 pub fn new(path: &str) -> Result<Self, DbError> {
376 // Create an unbounded MPSC channel.
377 // `log_tx` (sender) is kept in the struct; `log_rx` (receiver) goes to the task.
378 let (log_tx, mut log_rx) = mpsc::unbounded_channel::<String>();
379 let path_clone = path.to_string();
380
381 // Spawn a Tokio task that owns the file handle and BufWriter.
382 // This task runs for the lifetime of the server.
383 let writer_task = tokio::spawn(async move {
384 // Open the file in append mode so existing data is preserved.
385 let file = OpenOptions::new()
386 .create(true)
387 .append(true)
388 .open(&path_clone)
389 .unwrap();
390 let mut w = BufWriter::new(file);
391
392 loop {
393 // Wait up to 50 ms for the next message.
394 // If a message arrives within 50 ms → process it immediately.
395 // If the timeout fires → flush the BufWriter to disk.
396 match tokio::time::timeout(
397 std::time::Duration::from_millis(50),
398 log_rx.recv(),
399 )
400 .await
401 {
402 // A message arrived within the timeout window.
403 Ok(Some(log_line)) => {
404 // Special sentinel: the compact() method sends this to
405 // tell us to swap the log file atomically.
406 if log_line.starts_with("__RELOAD_FILE__") {
407 // Extract the temp file path from the sentinel string.
408 let temp_path = log_line.replace("__RELOAD_FILE__", "");
409 // println!("🔥 Worker: Reloading file from {}", temp_path);
410
411 // Flush and close the current file before renaming.
412 // On Windows, a file cannot be renamed while it's open.
413 w.flush().unwrap();
414 drop(w); // Release the file handle / Windows lock
415
416 // Atomically replace the live log with the compacted version.
417 if let Err(e) = std::fs::rename(&temp_path, &path_clone) {
418 tracing::error!("Failed to swap compacted file: {}", e);
419 }
420
421 // Re-open the (now compacted) log file for future writes.
422 let new_file = OpenOptions::new()
423 .create(true)
424 .append(true)
425 .open(&path_clone)
426 .unwrap();
427 w = BufWriter::new(new_file);
428 } else {
429 // Normal log line — append it to the BufWriter's buffer.
430 if let Err(e) = writeln!(w, "{}", log_line) {
431 tracing::error!("Failed to write to disk: {}", e);
432 }
433 }
434 }
435 // The channel was closed (sender dropped) — the server is shutting down.
436 // The BufWriter will be dropped here, which flushes its buffer to the OS.
437 Ok(None) => break,
438 // Timeout fired — no message in the last 50 ms. Flush buffered data.
439 Err(_) => {
440 let _ = w.flush();
441 }
442 }
443 }
444 // When the loop exits, `w` is dropped here, which flushes the BufWriter.
445 let _ = w.flush();
446 });
447
448 Ok(Self {
449 sender: Some(log_tx),
450 path: path.to_string(),
451 writer_task: Some(writer_task),
452 })
453 }
454}
455
456impl Drop for AsyncDiskStorage {
457 /// On drop, close the sender (signals the writer task to exit) then block
458 /// until the task has drained its queue and flushed everything to disk.
459 fn drop(&mut self) {
460 // Drop the sender — this closes the channel and causes log_rx.recv()
461 // to return None, which breaks the writer task's loop.
462 drop(self.sender.take());
463
464 // Now await the writer task so we don't return until all queued lines
465 // have been written and flushed to the OS.
466 if let Some(handle) = self.writer_task.take() {
467 tokio::task::block_in_place(|| {
468 tokio::runtime::Handle::current().block_on(handle)
469 })
470 .ok();
471 }
472 }
473}
474
475impl StorageBackend for AsyncDiskStorage {
476 /// Serialize `entry` to a JSON string and send it to the background writer.
477 /// This call returns immediately — it never blocks waiting for disk I/O.
478 fn write_entry(&self, entry: &LogEntry) -> Result<(), DbError> {
479 let json_line = serde_json::to_string(entry)?;
480 // send() only fails if the receiver (background task) has been dropped,
481 // which means the server is shutting down.
482 if let Some(ref sender) = self.sender {
483 sender.send(json_line).map_err(|_| DbError::WriteError)?;
484 }
485 Ok(())
486 }
487
488 /// Read all entries from the log file into a Vec.
489 /// Used by EncryptedStorage which needs the full list to decrypt.
490 fn read_log(&self) -> Result<Vec<LogEntry>, DbError> {
491 read_log_from_disk(&self.path)
492 }
493
494 /// Compact the log: write a binary snapshot, rewrite the log to be empty,
495 /// then signal the background task to swap the file.
496 fn compact(&self, entries: Vec<LogEntry>) -> Result<(), DbError> {
497 self.compact_with_hook(entries, None)
498 }
499
500 /// Internal compact implementation that can take a post-backup script.
501 fn compact_with_hook(&self, entries: Vec<LogEntry>, hook: Option<String>) -> Result<(), DbError> {
502 // Step 1: Write a binary snapshot.
503 // After compaction the log is reset to empty, so seq=0: all future log
504 // lines written after this snapshot must be replayed from the start.
505 let seq = 0u64;
506 if let Err(e) = write_snapshot(&self.path, &entries, seq) {
507 tracing::warn!("⚠️ Failed to write snapshot during compaction: {}", e);
508 } else if let Some(script_path) = hook {
509 // If snapshot was successful and we have a hook, execute it.
510 let snapshot_path = snapshot_path(&self.path);
511 let abs_snapshot_path = match std::fs::canonicalize(&snapshot_path) {
512 Ok(p) => p.to_string_lossy().to_string(),
513 Err(_) => snapshot_path,
514 };
515
516 // Execute in background
517 tokio::spawn(async move {
518 let res = if cfg!(target_os = "windows") {
519 tokio::process::Command::new("powershell")
520 .arg("-ExecutionPolicy")
521 .arg("Bypass")
522 .arg("-Command")
523 .arg(format!("& '{}' '{}'", script_path, abs_snapshot_path))
524 .output()
525 .await
526 } else {
527 tokio::process::Command::new("sh")
528 .arg(script_path)
529 .arg(abs_snapshot_path)
530 .output()
531 .await
532 };
533
534 match res {
535 Ok(output) => {
536 if !output.status.success() {
537 let stderr = String::from_utf8_lossy(&output.stderr);
538 tracing::error!("❌ Post-backup hook failed: {}", stderr);
539 } else {
540 tracing::info!("✅ Post-backup hook executed successfully");
541 }
542 }
543 Err(e) => {
544 tracing::error!("❌ Failed to spawn post-backup hook: {}", e);
545 }
546 }
547 });
548 }
549
550 // Step 2: Write an empty compacted log to a temp file.
551 // Since the snapshot now contains the full state, we can start the log fresh.
552 let temp_path = format!("{}.tmp", self.path);
553 write_compacted_log_no_tx(&temp_path, &[])?;
554
555 // Step 3: Send the sentinel to the background task so it flushes,
556 // closes the current file, renames the temp file over it, and reopens.
557 if let Some(ref sender) = self.sender {
558 sender
559 .send(format!("__RELOAD_FILE__{}", temp_path))
560 .map_err(|_| DbError::WriteError)?;
561 }
562 Ok(())
563 }
564
565 /// Read exactly `length` bytes from the log at `offset`.
566 fn read_at(&self, offset: u64, length: u32) -> Result<Vec<u8>, DbError> {
567 use std::io::{Read, Seek, SeekFrom};
568 let mut file = File::open(&self.path)?;
569 file.seek(SeekFrom::Start(offset))?;
570 let mut buffer = vec![0u8; length as usize];
571 file.read_exact(&mut buffer)?;
572 Ok(buffer)
573 }
574
575 /// Stream log entries into state using snapshot + delta replay.
576 ///
577 /// Fast path (after first compaction):
578 /// 1. Load binary snapshot → apply all entries in it.
579 /// 2. Stream only the log lines written AFTER the snapshot (the "delta").
580 ///
581 /// Slow path (first run, no snapshot):
582 /// Stream the entire log file line-by-line. No full Vec in RAM.
583 fn stream_log_into(
584 &self,
585 f: &mut dyn FnMut(LogEntry, u32) -> ControlFlow<(), ()>,
586 ) -> Result<u64, DbError> {
587 let mut count = 0u64;
588 // Attempt to load the binary snapshot for fast startup.
589 if let Some((snapshot_entries, seq)) = load_snapshot(&self.path) {
590 for entry in snapshot_entries {
591 // Entries from snapshot MUST be Hot because they are not in the log file
592 // and thus don't have a valid RecordPointer for this log instance.
593 if let ControlFlow::Break(_) = f(entry, 0) {
594 return Ok(count);
595 }
596 count += 1;
597 }
598 // Then replay only the log lines that came after the snapshot.
599 // `seq` is the number of lines to skip (already in the snapshot).
600 if let ControlFlow::Break(_) = stream_log_entries(&self.path, seq, |e, l| {
601 let res = f(e, l);
602 if let ControlFlow::Continue(_) = res {
603 count += 1;
604 }
605 res
606 })? {
607 return Ok(count);
608 }
609 return Ok(count);
610 }
611
612 // No snapshot found — stream the full log from the beginning.
613 let _ = stream_log_entries(&self.path, 0, |e, l| {
614 let res = f(e, l);
615 if let ControlFlow::Continue(_) = res {
616 count += 1;
617 }
618 res
619 })?;
620 Ok(count)
621 }
622}
623
624// ─── SyncDiskStorage ──────────────────────────────────────────────────────────
625//
626// Design: every write_entry() call acquires a Mutex, writes the JSON line to
627// a BufWriter, and immediately flushes the BufWriter. The flush() call blocks
628// until the OS confirms the data is in its write buffer (not necessarily on
629// physical disk, but durable enough for most crash scenarios).
630//
631// Trade-off: much lower throughput than AsyncDiskStorage because every write
632// blocks the caller. Use this when data loss is unacceptable.
633// ─────────────────────────────────────────────────────────────────────────────
634
635/// High-durability synchronous disk writer.
636///
637/// Every write is flushed to disk before returning. Zero data loss on crash,
638/// but lower throughput than AsyncDiskStorage. Enable with WRITE_MODE=sync.
639pub struct SyncDiskStorage {
640 /// The BufWriter wrapped in a Mutex so multiple threads can write safely.
641 /// Arc allows the struct to be cloned (shared across Axum handler threads).
642 writer: Arc<Mutex<BufWriter<File>>>,
643 /// Path to the log file. Stored for read/compact operations.
644 path: String,
645}
646
647impl SyncDiskStorage {
648 /// Open (or create) the log file at `path` in append mode.
649 pub fn new(path: &str) -> Result<Self, DbError> {
650 let file = OpenOptions::new().create(true).append(true).open(path)?;
651
652 Ok(Self {
653 writer: Arc::new(Mutex::new(BufWriter::new(file))),
654 path: path.to_string(),
655 })
656 }
657}
658
659impl StorageBackend for SyncDiskStorage {
660 /// Serialize `entry` to JSON, write it to the BufWriter, and flush immediately.
661 /// This call blocks until the OS has accepted the data.
662 fn write_entry(&self, entry: &LogEntry) -> Result<(), DbError> {
663 let json_line = serde_json::to_string(entry)?;
664 // Lock the Mutex — only one thread can write at a time.
665 let mut w = self.writer.lock().map_err(|_| DbError::LockPoisoned)?;
666 writeln!(w, "{}", json_line)?;
667 // Flush immediately so the data is durable before we return.
668 w.flush()?;
669 Ok(())
670 }
671
672 /// Read all entries from the log file into a Vec.
673 fn read_log(&self) -> Result<Vec<LogEntry>, DbError> {
674 read_log_from_disk(&self.path)
675 }
676
677 /// Compact the log: write a binary snapshot, swap the log file with an
678 /// empty one, then reopen the writer.
679 fn compact(&self, entries: Vec<LogEntry>) -> Result<(), DbError> {
680 self.compact_with_hook(entries, None)
681 }
682
683 fn compact_with_hook(&self, entries: Vec<LogEntry>, hook: Option<String>) -> Result<(), DbError> {
684 // Step 1: Write binary snapshot for fast next startup.
685 // After compaction the log is reset to empty, so seq=0: all future log
686 // lines written after this snapshot must be replayed from the start.
687 let seq = 0u64;
688 if let Err(e) = write_snapshot(&self.path, &entries, seq) {
689 tracing::warn!("⚠️ Failed to write snapshot during compaction: {}", e);
690 } else if let Some(script_path) = hook {
691 // If snapshot was successful and we have a hook, execute it.
692 let snapshot_path = snapshot_path(&self.path);
693 let abs_snapshot_path = match std::fs::canonicalize(&snapshot_path) {
694 Ok(p) => p.to_string_lossy().to_string(),
695 Err(_) => snapshot_path,
696 };
697
698 // Execute in background
699 tokio::spawn(async move {
700 let res = if cfg!(target_os = "windows") {
701 tokio::process::Command::new("powershell")
702 .arg("-ExecutionPolicy")
703 .arg("Bypass")
704 .arg("-Command")
705 .arg(format!("& '{}' '{}'", script_path, abs_snapshot_path))
706 .output()
707 .await
708 } else {
709 tokio::process::Command::new("sh")
710 .arg(script_path)
711 .arg(abs_snapshot_path)
712 .output()
713 .await
714 };
715
716 match res {
717 Ok(output) => {
718 if !output.status.success() {
719 let stderr = String::from_utf8_lossy(&output.stderr);
720 tracing::error!("❌ Post-backup hook failed: {}", stderr);
721 } else {
722 tracing::info!("✅ Post-backup hook executed successfully");
723 }
724 }
725 Err(e) => {
726 tracing::error!("❌ Failed to spawn post-backup hook: {}", e);
727 }
728 }
729 });
730 }
731
732 // Step 2: Write an empty compacted log to a temp file.
733 let temp_path = format!("{}.tmp", self.path);
734 write_compacted_log_no_tx(&temp_path, &[])?;
735
736 // Step 3: Lock the writer, rename the temp file over the live log,
737 // then reopen the writer so future writes go to the compacted file.
738 let mut w = self.writer.lock().map_err(|_| DbError::LockPoisoned)?;
739 // On Unix this rename is atomic. On Windows the file must be closed first,
740 // but since we hold the Mutex no other thread can write concurrently.
741 if let Err(e) = std::fs::rename(&temp_path, &self.path) {
742 tracing::error!("Failed to swap compacted file: {}", e);
743 return Err(DbError::from(e));
744 }
745
746 // Reopen the file so the writer points at the new compacted log.
747 let new_file = OpenOptions::new()
748 .create(true)
749 .append(true)
750 .open(&self.path)?;
751 *w = BufWriter::new(new_file);
752 Ok(())
753 }
754
755 /// Read exactly `length` bytes from the log at `offset`.
756 fn read_at(&self, offset: u64, length: u32) -> Result<Vec<u8>, DbError> {
757 use std::io::{Read, Seek, SeekFrom};
758 let mut file = File::open(&self.path)?;
759 file.seek(SeekFrom::Start(offset))?;
760 let mut buffer = vec![0u8; length as usize];
761 file.read_exact(&mut buffer)?;
762 Ok(buffer)
763 }
764
765 /// Stream log entries into state using snapshot + delta replay.
766 /// Same logic as AsyncDiskStorage::stream_log_into — see that method for details.
767 fn stream_log_into(
768 &self,
769 f: &mut dyn FnMut(LogEntry, u32) -> ControlFlow<(), ()>,
770 ) -> Result<u64, DbError> {
771 let mut count = 0u64;
772 // Fast path: load snapshot and replay only the delta.
773 if let Some((snapshot_entries, seq)) = load_snapshot(&self.path) {
774 tracing::info!(
775 "⚡ Snapshot loaded ({} entries, seq {}). Replaying delta only...",
776 snapshot_entries.len(),
777 seq
778 );
779 for entry in snapshot_entries {
780 // Entries from snapshot MUST be Hot because they are not in the log file
781 // and thus don't have a valid RecordPointer for this log instance.
782 if let ControlFlow::Break(_) = f(entry, 0) {
783 return Ok(count);
784 }
785 count += 1;
786 }
787 if let ControlFlow::Break(_) = stream_log_entries(&self.path, seq, |e, l| {
788 let res = f(e, l);
789 if let ControlFlow::Continue(_) = res {
790 count += 1;
791 }
792 res
793 })? {
794 return Ok(count);
795 }
796 return Ok(count);
797 }
798
799 // Slow path: stream the full log line-by-line.
800 let _ = stream_log_entries(&self.path, 0, |e, l| {
801 let res = f(e, l);
802 if let ControlFlow::Continue(_) = res {
803 count += 1;
804 }
805 res
806 })?;
807 Ok(count)
808 }
809}