use super::StorageBackend;
use crate::engine::types::{DbError, LogEntry};
use std::fs::{File, OpenOptions};
use std::io::{BufRead, BufReader, BufWriter, Write};
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
pub(super) fn snapshot_path(log_path: &str) -> String {
format!("{}.snapshot.bin", log_path)
}
pub(super) fn write_snapshot(log_path: &str, entries: &[LogEntry], seq: u64) -> Result<(), DbError> {
let path = snapshot_path(log_path);
let tmp = format!("{}.tmp", path);
let file = OpenOptions::new()
.create(true) .write(true)
.truncate(true) .open(&tmp)?;
let mut w = BufWriter::new(file);
w.write_all(b"MOLTSNAP")?;
w.write_all(&seq.to_le_bytes())?;
let count = entries.len() as u64;
w.write_all(&count.to_le_bytes())?;
for entry in entries {
let encoded = bincode::serialize(entry).map_err(|_| DbError::WriteError)?;
let len = encoded.len() as u64;
w.write_all(&len.to_le_bytes())?;
w.write_all(&encoded)?;
}
w.flush()?;
drop(w);
std::fs::rename(&tmp, &path)?;
Ok(())
}
pub(super) fn load_snapshot(log_path: &str) -> Option<(Vec<LogEntry>, u64)> {
let path = snapshot_path(log_path);
let mut file = File::open(&path).ok()?;
use std::io::Read;
let mut magic = [0u8; 8];
file.read_exact(&mut magic).ok()?;
if &magic != b"MOLTSNAP" {
return None; }
let mut seq_bytes = [0u8; 8];
file.read_exact(&mut seq_bytes).ok()?;
let seq = u64::from_le_bytes(seq_bytes);
let mut count_bytes = [0u8; 8];
file.read_exact(&mut count_bytes).ok()?;
let count = u64::from_le_bytes(count_bytes) as usize;
let mut entries = Vec::with_capacity(count);
for _ in 0..count {
let mut len_bytes = [0u8; 8];
file.read_exact(&mut len_bytes).ok()?;
let len = u64::from_le_bytes(len_bytes) as usize;
let mut buf = vec![0u8; len];
file.read_exact(&mut buf).ok()?;
let entry: LogEntry = bincode::deserialize(&buf).ok()?;
entries.push(entry);
}
Some((entries, seq))
}
pub fn stream_log_entries<F>(path: &str, skip_lines: u64, mut f: F) -> Result<(), DbError>
where
F: FnMut(LogEntry), {
if let Ok(file) = File::open(path) {
let reader = BufReader::new(file);
for (i, line) in reader.lines().enumerate() {
if (i as u64) < skip_lines {
continue;
}
if let Ok(json_str) = line {
if let Ok(entry) = serde_json::from_str::<LogEntry>(&json_str) {
f(entry); }
}
}
}
Ok(())
}
pub(super) fn count_log_lines(path: &str) -> u64 {
if let Ok(file) = File::open(path) {
BufReader::new(file).lines().count() as u64
} else {
0 }
}
pub fn read_log_from_disk(path: &str) -> Result<Vec<LogEntry>, DbError> {
let mut entries = Vec::new();
stream_log_entries(path, 0, |e| entries.push(e))?;
Ok(entries)
}
pub(super) fn write_compacted_log(path: &str, entries: &[LogEntry]) -> Result<(), DbError> {
let temp_file = OpenOptions::new()
.create(true)
.write(true)
.truncate(true) .open(path)?;
let mut temp_writer = BufWriter::new(temp_file);
for entry in entries {
writeln!(temp_writer, "{}", serde_json::to_string(&entry)?)?;
}
temp_writer.flush()?;
Ok(())
}
pub struct AsyncDiskStorage {
sender: Option<mpsc::UnboundedSender<String>>,
path: String,
writer_task: Option<JoinHandle<()>>,
}
impl AsyncDiskStorage {
pub fn new(path: &str) -> Result<Self, DbError> {
let (log_tx, mut log_rx) = mpsc::unbounded_channel::<String>();
let path_clone = path.to_string();
let writer_task = tokio::spawn(async move {
let file = OpenOptions::new()
.create(true)
.append(true)
.open(&path_clone)
.unwrap();
let mut w = BufWriter::new(file);
loop {
match tokio::time::timeout(
std::time::Duration::from_millis(50),
log_rx.recv(),
)
.await
{
Ok(Some(log_line)) => {
if log_line.starts_with("__RELOAD_FILE__") {
let temp_path = log_line.replace("__RELOAD_FILE__", "");
w.flush().unwrap();
drop(w);
if let Err(e) = std::fs::rename(&temp_path, &path_clone) {
tracing::error!("Failed to swap compacted file: {}", e);
}
let new_file = OpenOptions::new()
.create(true)
.append(true)
.open(&path_clone)
.unwrap();
w = BufWriter::new(new_file);
} else {
if let Err(e) = writeln!(w, "{}", log_line) {
tracing::error!("Failed to write to disk: {}", e);
}
}
}
Ok(None) => break,
Err(_) => {
let _ = w.flush();
}
}
}
let _ = w.flush();
});
Ok(Self {
sender: Some(log_tx),
path: path.to_string(),
writer_task: Some(writer_task),
})
}
}
impl Drop for AsyncDiskStorage {
fn drop(&mut self) {
drop(self.sender.take());
if let Some(handle) = self.writer_task.take() {
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(handle)
})
.ok();
}
}
}
impl StorageBackend for AsyncDiskStorage {
fn write_entry(&self, entry: &LogEntry) -> Result<(), DbError> {
let json_line = serde_json::to_string(entry)?;
if let Some(ref sender) = self.sender {
sender.send(json_line).map_err(|_| DbError::WriteError)?;
}
Ok(())
}
fn read_log(&self) -> Result<Vec<LogEntry>, DbError> {
read_log_from_disk(&self.path)
}
fn compact(&self, entries: Vec<LogEntry>) -> Result<(), DbError> {
let seq = count_log_lines(&self.path);
if let Err(e) = write_snapshot(&self.path, &entries, seq) {
tracing::warn!("⚠️ Failed to write snapshot during compaction: {}", e);
}
let temp_path = format!("{}.tmp", self.path);
write_compacted_log(&temp_path, &entries)?;
if let Some(ref sender) = self.sender {
sender
.send(format!("__RELOAD_FILE__{}", temp_path))
.map_err(|_| DbError::WriteError)?;
}
Ok(())
}
fn stream_log_into(
&self,
f: &mut dyn FnMut(LogEntry),
) -> Result<u64, DbError> {
if let Some((snapshot_entries, seq)) = load_snapshot(&self.path) {
tracing::info!(
"⚡ Snapshot loaded ({} entries, seq {}). Replaying delta only...",
snapshot_entries.len(),
seq
);
for entry in snapshot_entries {
f(entry);
}
stream_log_entries(&self.path, seq, |e| f(e))?;
let total = count_log_lines(&self.path);
return Ok(total);
}
let mut count = 0u64;
stream_log_entries(&self.path, 0, |e| {
f(e);
count += 1;
})?;
Ok(count)
}
}
pub struct SyncDiskStorage {
writer: Arc<Mutex<BufWriter<File>>>,
path: String,
}
impl SyncDiskStorage {
pub fn new(path: &str) -> Result<Self, DbError> {
let file = OpenOptions::new().create(true).append(true).open(path)?;
Ok(Self {
writer: Arc::new(Mutex::new(BufWriter::new(file))),
path: path.to_string(),
})
}
}
impl StorageBackend for SyncDiskStorage {
fn write_entry(&self, entry: &LogEntry) -> Result<(), DbError> {
let json_line = serde_json::to_string(entry)?;
let mut w = self.writer.lock().map_err(|_| DbError::LockPoisoned)?;
writeln!(w, "{}", json_line)?;
w.flush()?;
Ok(())
}
fn read_log(&self) -> Result<Vec<LogEntry>, DbError> {
read_log_from_disk(&self.path)
}
fn compact(&self, entries: Vec<LogEntry>) -> Result<(), DbError> {
let seq = count_log_lines(&self.path);
if let Err(e) = write_snapshot(&self.path, &entries, seq) {
tracing::warn!("⚠️ Failed to write snapshot during compaction: {}", e);
}
let temp_path = format!("{}.tmp", self.path);
write_compacted_log(&temp_path, &entries)?;
let mut w = self.writer.lock().map_err(|_| DbError::LockPoisoned)?;
std::fs::rename(&temp_path, &self.path)?;
let new_file = OpenOptions::new()
.create(true)
.append(true)
.open(&self.path)?;
*w = BufWriter::new(new_file);
Ok(())
}
fn stream_log_into(
&self,
f: &mut dyn FnMut(LogEntry),
) -> Result<u64, DbError> {
if let Some((snapshot_entries, seq)) = load_snapshot(&self.path) {
tracing::info!(
"⚡ Snapshot loaded ({} entries, seq {}). Replaying delta only...",
snapshot_entries.len(),
seq
);
for entry in snapshot_entries {
f(entry);
}
stream_log_entries(&self.path, seq, |e| f(e))?;
let total = count_log_lines(&self.path);
return Ok(total);
}
let mut count = 0u64;
stream_log_entries(&self.path, 0, |e| {
f(e);
count += 1;
})?;
Ok(count)
}
}