use crate::engine::types::{DbError, LogEntry};
use std::fs::{File, OpenOptions};
use std::ops::ControlFlow;
use std::io::{BufRead, BufReader, BufWriter, Write};
pub fn write_compacted_log_no_tx(path: &str, entries: &[LogEntry]) -> Result<(), DbError> {
let temp_file = OpenOptions::new()
.create(true)
.write(true)
.truncate(true) .open(path)?;
let mut temp_writer = BufWriter::new(temp_file);
for entry in entries {
writeln!(temp_writer, "{}", serde_json::to_string(&entry)?)?;
}
temp_writer.flush()?;
Ok(())
}
pub fn write_compacted_log(path: &str, entries: &[LogEntry]) -> Result<(), DbError> {
let temp_file = OpenOptions::new()
.create(true)
.write(true)
.truncate(true) .open(path)?;
let mut temp_writer = BufWriter::new(temp_file);
for entry in entries {
let tx_id = format!("compact-{}", entry.key);
let begin = LogEntry {
cmd: "TX_BEGIN".to_string(),
collection: entry.collection.clone(),
key: tx_id.clone(),
value: serde_json::Value::Null,
_t: entry._t,
};
writeln!(temp_writer, "{}", serde_json::to_string(&begin)?)?;
writeln!(temp_writer, "{}", serde_json::to_string(&entry)?)?;
let commit = LogEntry {
cmd: "TX_COMMIT".to_string(),
collection: entry.collection.clone(),
key: tx_id,
value: serde_json::Value::Null,
_t: entry._t,
};
writeln!(temp_writer, "{}", serde_json::to_string(&commit)?)?;
}
temp_writer.flush()?;
Ok(())
}
pub fn stream_log_entries<F>(path: &str, skip_lines: u64, mut f: F) -> Result<ControlFlow<(), ()>, DbError>
where
F: FnMut(LogEntry, u32) -> ControlFlow<(), ()>, {
if let Ok(file) = File::open(path) {
let reader = BufReader::new(file);
for (i, line) in reader.lines().enumerate() {
if (i as u64) < skip_lines {
continue;
}
if let Ok(json_str) = line {
let length = json_str.len() as u32;
if let Ok(entry) = serde_json::from_str::<LogEntry>(&json_str) {
if let ControlFlow::Break(_) = f(entry, length) {
return Ok(ControlFlow::Break(()));
}
}
}
}
}
Ok(ControlFlow::Continue(()))
}
pub fn count_log_lines(path: &str) -> u64 {
if let Ok(file) = File::open(path) {
BufReader::new(file).lines().count() as u64
} else {
0 }
}
pub fn read_log_from_disk(path: &str) -> Result<Vec<LogEntry>, DbError> {
let mut entries = Vec::new();
let _ = stream_log_entries(path, 0, |e, _| {
entries.push(e);
ControlFlow::Continue(())
})?;
Ok(entries)
}