use super::StorageBackend;
use crate::engine::types::{DbError, LogEntry};
use std::ops::ControlFlow;
use std::fs::{File, OpenOptions};
use std::path::Path;
use std::time::SystemTime;
use std::io::{BufRead, BufReader, BufWriter, Write};
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
pub(super) fn snapshot_path(log_path: &str) -> String {
format!("{}.snapshot.bin", log_path)
}
pub(super) fn write_snapshot(log_path: &str, entries: &[LogEntry], seq: u64) -> Result<(), DbError> {
let path = snapshot_path(log_path);
let tmp = format!("{}.tmp", path);
let file = OpenOptions::new()
.create(true) .write(true)
.truncate(true) .open(&tmp)?;
let mut w = BufWriter::new(file);
w.write_all(b"MOLTSNAP")?;
w.write_all(&seq.to_le_bytes())?;
let count = entries.len() as u64;
w.write_all(&count.to_le_bytes())?;
for entry in entries {
let encoded = serde_json::to_vec(entry).map_err(|_| DbError::WriteError)?;
let len = encoded.len() as u64;
w.write_all(&len.to_le_bytes())?;
w.write_all(&encoded)?;
}
w.flush()?;
drop(w);
if Path::new(&path).exists() {
let log_dir = Path::new(log_path).parent().unwrap_or_else(|| Path::new("."));
let backup_dir = log_dir.join("backup");
std::fs::create_dir_all(&backup_dir)?;
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let filename = Path::new(&path).file_name()
.and_then(|n| n.to_str())
.unwrap_or("snapshot.bin");
let backup_path = backup_dir.join(format!("{}.{}.bak", filename, now));
let _ = std::fs::rename(&path, &backup_path);
}
std::fs::rename(&tmp, &path)?;
Ok(())
}
pub(super) fn write_compacted_log_no_tx(path: &str, entries: &[LogEntry]) -> Result<(), DbError> {
let temp_file = OpenOptions::new()
.create(true)
.write(true)
.truncate(true) .open(path)?;
let mut temp_writer = BufWriter::new(temp_file);
for entry in entries {
writeln!(temp_writer, "{}", serde_json::to_string(&entry)?)?;
}
temp_writer.flush()?;
Ok(())
}
pub(super) fn write_compacted_log(path: &str, entries: &[LogEntry]) -> Result<(), DbError> {
let temp_file = OpenOptions::new()
.create(true)
.write(true)
.truncate(true) .open(path)?;
let mut temp_writer = BufWriter::new(temp_file);
for entry in entries {
let tx_id = format!("compact-{}", entry.key);
let begin = LogEntry {
cmd: "TX_BEGIN".to_string(),
collection: entry.collection.clone(),
key: tx_id.clone(),
value: serde_json::Value::Null,
_t: entry._t,
};
writeln!(temp_writer, "{}", serde_json::to_string(&begin)?)?;
writeln!(temp_writer, "{}", serde_json::to_string(&entry)?)?;
let commit = LogEntry {
cmd: "TX_COMMIT".to_string(),
collection: entry.collection.clone(),
key: tx_id,
value: serde_json::Value::Null,
_t: entry._t,
};
writeln!(temp_writer, "{}", serde_json::to_string(&commit)?)?;
}
temp_writer.flush()?;
Ok(())
}
pub(super) fn load_snapshot(log_path: &str) -> Option<(Vec<LogEntry>, u64)> {
let path = snapshot_path(log_path);
if !Path::new(&path).exists() {
return None;
}
tracing::info!("🔍 Attempting to load snapshot from {}", path);
let mut file = File::open(&path).ok()?;
use std::io::Read;
let mut magic = [0u8; 8];
file.read_exact(&mut magic).ok()?;
if &magic != b"MOLTSNAP" {
tracing::warn!("❌ Invalid snapshot magic header");
return None; }
let mut seq_bytes = [0u8; 8];
file.read_exact(&mut seq_bytes).ok()?;
let seq = u64::from_le_bytes(seq_bytes);
let mut count_bytes = [0u8; 8];
file.read_exact(&mut count_bytes).ok()?;
let count = u64::from_le_bytes(count_bytes) as usize;
tracing::info!("📂 Snapshot header: seq={}, count={}", seq, count);
let mut entries = Vec::with_capacity(count);
for i in 0..count {
let mut len_bytes = [0u8; 8];
if let Err(e) = file.read_exact(&mut len_bytes) {
tracing::error!("❌ Failed to read entry {} length: {}", i, e);
return None;
}
let len = u64::from_le_bytes(len_bytes) as usize;
let mut buf = vec![0u8; len];
if let Err(e) = file.read_exact(&mut buf) {
tracing::error!("❌ Failed to read entry {} data: {}", i, e);
return None;
}
if len > 0 && buf.iter().all(|&b| b == 0) {
tracing::error!("❌ Entry {} data is all zeros. Snapshot might be corrupt.", i);
return None;
}
let entry: LogEntry = match serde_json::from_slice(&buf) {
Ok(e) => e,
Err(err) => {
let sample = if buf.len() > 20 { &buf[..20] } else { &buf };
tracing::error!(
"❌ Failed to deserialize entry {} (len {}): {}. Sample: {:?}. This usually happens if the snapshot was created with an older version of MoltenDB or is corrupt. Falling back to log replay.",
i, len, err, sample
);
return None;
}
};
entries.push(entry);
}
Some((entries, seq))
}
pub fn stream_log_entries<F>(path: &str, skip_lines: u64, mut f: F) -> Result<ControlFlow<(), ()>, DbError>
where
F: FnMut(LogEntry, u32) -> ControlFlow<(), ()>, {
if let Ok(file) = File::open(path) {
let reader = BufReader::new(file);
for (i, line) in reader.lines().enumerate() {
if (i as u64) < skip_lines {
continue;
}
if let Ok(json_str) = line {
let length = json_str.len() as u32;
if let Ok(entry) = serde_json::from_str::<LogEntry>(&json_str) {
if let ControlFlow::Break(_) = f(entry, length) {
return Ok(ControlFlow::Break(()));
}
}
}
}
}
Ok(ControlFlow::Continue(()))
}
pub(super) fn count_log_lines(path: &str) -> u64 {
if let Ok(file) = File::open(path) {
BufReader::new(file).lines().count() as u64
} else {
0 }
}
pub fn read_log_from_disk(path: &str) -> Result<Vec<LogEntry>, DbError> {
let mut entries = Vec::new();
let _ = stream_log_entries(path, 0, |e, _| {
entries.push(e);
ControlFlow::Continue(())
})?;
Ok(entries)
}
pub struct AsyncDiskStorage {
sender: Option<mpsc::UnboundedSender<String>>,
path: String,
writer_task: Option<JoinHandle<()>>,
}
impl AsyncDiskStorage {
pub fn new(path: &str) -> Result<Self, DbError> {
let (log_tx, mut log_rx) = mpsc::unbounded_channel::<String>();
let path_clone = path.to_string();
let writer_task = tokio::spawn(async move {
let file = OpenOptions::new()
.create(true)
.append(true)
.open(&path_clone)
.unwrap();
let mut w = BufWriter::new(file);
loop {
match tokio::time::timeout(
std::time::Duration::from_millis(50),
log_rx.recv(),
)
.await
{
Ok(Some(log_line)) => {
if log_line.starts_with("__RELOAD_FILE__") {
let temp_path = log_line.replace("__RELOAD_FILE__", "");
w.flush().unwrap();
drop(w);
if let Err(e) = std::fs::rename(&temp_path, &path_clone) {
tracing::error!("Failed to swap compacted file: {}", e);
}
let new_file = OpenOptions::new()
.create(true)
.append(true)
.open(&path_clone)
.unwrap();
w = BufWriter::new(new_file);
} else {
if let Err(e) = writeln!(w, "{}", log_line) {
tracing::error!("Failed to write to disk: {}", e);
}
}
}
Ok(None) => break,
Err(_) => {
let _ = w.flush();
}
}
}
let _ = w.flush();
});
Ok(Self {
sender: Some(log_tx),
path: path.to_string(),
writer_task: Some(writer_task),
})
}
}
impl Drop for AsyncDiskStorage {
fn drop(&mut self) {
drop(self.sender.take());
if let Some(handle) = self.writer_task.take() {
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(handle)
})
.ok();
}
}
}
impl StorageBackend for AsyncDiskStorage {
fn write_entry(&self, entry: &LogEntry) -> Result<(), DbError> {
let json_line = serde_json::to_string(entry)?;
if let Some(ref sender) = self.sender {
sender.send(json_line).map_err(|_| DbError::WriteError)?;
}
Ok(())
}
fn read_log(&self) -> Result<Vec<LogEntry>, DbError> {
read_log_from_disk(&self.path)
}
fn compact(&self, entries: Vec<LogEntry>) -> Result<(), DbError> {
let seq = 0u64;
if let Err(e) = write_snapshot(&self.path, &entries, seq) {
tracing::warn!("⚠️ Failed to write snapshot during compaction: {}", e);
}
let temp_path = format!("{}.tmp", self.path);
write_compacted_log_no_tx(&temp_path, &[])?;
if let Some(ref sender) = self.sender {
sender
.send(format!("__RELOAD_FILE__{}", temp_path))
.map_err(|_| DbError::WriteError)?;
}
Ok(())
}
fn read_at(&self, offset: u64, length: u32) -> Result<Vec<u8>, DbError> {
use std::io::{Read, Seek, SeekFrom};
let mut file = File::open(&self.path)?;
file.seek(SeekFrom::Start(offset))?;
let mut buffer = vec![0u8; length as usize];
file.read_exact(&mut buffer)?;
Ok(buffer)
}
fn stream_log_into(
&self,
f: &mut dyn FnMut(LogEntry, u32) -> ControlFlow<(), ()>,
) -> Result<u64, DbError> {
let mut count = 0u64;
if let Some((snapshot_entries, seq)) = load_snapshot(&self.path) {
for entry in snapshot_entries {
if let ControlFlow::Break(_) = f(entry, 0) {
return Ok(count);
}
count += 1;
}
if let ControlFlow::Break(_) = stream_log_entries(&self.path, seq, |e, l| {
let res = f(e, l);
if let ControlFlow::Continue(_) = res {
count += 1;
}
res
})? {
return Ok(count);
}
return Ok(count);
}
let _ = stream_log_entries(&self.path, 0, |e, l| {
let res = f(e, l);
if let ControlFlow::Continue(_) = res {
count += 1;
}
res
})?;
Ok(count)
}
}
pub struct SyncDiskStorage {
writer: Arc<Mutex<BufWriter<File>>>,
path: String,
}
impl SyncDiskStorage {
pub fn new(path: &str) -> Result<Self, DbError> {
let file = OpenOptions::new().create(true).append(true).open(path)?;
Ok(Self {
writer: Arc::new(Mutex::new(BufWriter::new(file))),
path: path.to_string(),
})
}
}
impl StorageBackend for SyncDiskStorage {
fn write_entry(&self, entry: &LogEntry) -> Result<(), DbError> {
let json_line = serde_json::to_string(entry)?;
let mut w = self.writer.lock().map_err(|_| DbError::LockPoisoned)?;
writeln!(w, "{}", json_line)?;
w.flush()?;
Ok(())
}
fn read_log(&self) -> Result<Vec<LogEntry>, DbError> {
read_log_from_disk(&self.path)
}
fn compact(&self, entries: Vec<LogEntry>) -> Result<(), DbError> {
let seq = 0u64;
if let Err(e) = write_snapshot(&self.path, &entries, seq) {
tracing::warn!("⚠️ Failed to write snapshot during compaction: {}", e);
}
let temp_path = format!("{}.tmp", self.path);
write_compacted_log_no_tx(&temp_path, &[])?;
let mut w = self.writer.lock().map_err(|_| DbError::LockPoisoned)?;
if let Err(e) = std::fs::rename(&temp_path, &self.path) {
tracing::error!("Failed to swap compacted file: {}", e);
return Err(DbError::from(e));
}
let new_file = OpenOptions::new()
.create(true)
.append(true)
.open(&self.path)?;
*w = BufWriter::new(new_file);
Ok(())
}
fn read_at(&self, offset: u64, length: u32) -> Result<Vec<u8>, DbError> {
use std::io::{Read, Seek, SeekFrom};
let mut file = File::open(&self.path)?;
file.seek(SeekFrom::Start(offset))?;
let mut buffer = vec![0u8; length as usize];
file.read_exact(&mut buffer)?;
Ok(buffer)
}
fn stream_log_into(
&self,
f: &mut dyn FnMut(LogEntry, u32) -> ControlFlow<(), ()>,
) -> Result<u64, DbError> {
let mut count = 0u64;
if let Some((snapshot_entries, seq)) = load_snapshot(&self.path) {
tracing::info!(
"⚡ Snapshot loaded ({} entries, seq {}). Replaying delta only...",
snapshot_entries.len(),
seq
);
for entry in snapshot_entries {
if let ControlFlow::Break(_) = f(entry, 0) {
return Ok(count);
}
count += 1;
}
if let ControlFlow::Break(_) = stream_log_entries(&self.path, seq, |e, l| {
let res = f(e, l);
if let ControlFlow::Continue(_) = res {
count += 1;
}
res
})? {
return Ok(count);
}
return Ok(count);
}
let _ = stream_log_entries(&self.path, 0, |e, l| {
let res = f(e, l);
if let ControlFlow::Continue(_) = res {
count += 1;
}
res
})?;
Ok(count)
}
}