#![cfg(not(target_arch = "wasm32"))]
use super::StorageBackend;
use super::disk::{AsyncDiskStorage, count_log_lines, write_snapshot, write_compacted_log};
use crate::engine::types::{DbError, LogEntry};
use memmap2::{Mmap, MmapOptions};
use std::fs::{File, OpenOptions};
use std::io::{BufRead, BufReader, Cursor};
use std::sync::Arc;
const HOT_TIER_MAX_BYTES: u64 = 50 * 1024 * 1024;
pub struct MmapLogReader {
mmap: Mmap,
}
impl MmapLogReader {
pub fn open(path: &str) -> Option<Self> {
let file = File::open(path).ok()?;
let metadata = file.metadata().ok()?;
if metadata.len() == 0 {
return None;
}
let mmap = unsafe { MmapOptions::new().map(&file).ok()? };
Some(Self { mmap })
}
pub fn stream_entries<F>(&self, skip_lines: u64, mut f: F)
where
F: FnMut(LogEntry),
{
let cursor = Cursor::new(&self.mmap[..]);
let reader = BufReader::new(cursor);
for (i, line) in reader.lines().enumerate() {
if (i as u64) < skip_lines {
continue;
}
if let Ok(json_str) = line {
if let Ok(entry) = serde_json::from_str::<LogEntry>(&json_str) {
f(entry);
}
}
}
}
pub fn line_count(&self) -> u64 {
let cursor = Cursor::new(&self.mmap[..]);
BufReader::new(cursor).lines().count() as u64
}
}
pub struct TieredStorage {
hot: Arc<AsyncDiskStorage>,
hot_path: String,
cold_path: String,
}
impl TieredStorage {
pub fn new(hot_path: &str) -> Result<Self, DbError> {
let cold_path = if hot_path.ends_with(".log") {
format!("{}.cold.log", &hot_path[..hot_path.len() - 4])
} else {
format!("{}.cold.log", hot_path)
};
let hot = Arc::new(AsyncDiskStorage::new(hot_path)?);
Ok(Self {
hot,
hot_path: hot_path.to_string(),
cold_path,
})
}
fn hot_log_size(&self) -> u64 {
std::fs::metadata(&self.hot_path)
.map(|m| m.len())
.unwrap_or(0)
}
fn promote_hot_to_cold(&self, hot_entries: &[LogEntry]) -> Result<(), DbError> {
let cold_file = OpenOptions::new()
.create(true)
.append(true)
.open(&self.cold_path)?;
let mut w = std::io::BufWriter::new(cold_file);
for entry in hot_entries {
writeln!(w, "{}", serde_json::to_string(entry)?)?;
}
use std::io::Write;
w.flush()?;
tracing::info!(
"🧊 Promoted {} entries from hot tier to cold tier ({})",
hot_entries.len(),
self.cold_path
);
Ok(())
}
}
impl StorageBackend for TieredStorage {
fn write_entry(&self, entry: &LogEntry) -> Result<(), DbError> {
self.hot.write_entry(entry)
}
fn read_log(&self) -> Result<Vec<LogEntry>, DbError> {
let mut entries = Vec::new();
if let Some(reader) = MmapLogReader::open(&self.cold_path) {
reader.stream_entries(0, |e| entries.push(e));
}
let hot_entries = self.hot.read_log()?;
entries.extend(hot_entries);
Ok(entries)
}
fn compact(&self, entries: Vec<LogEntry>) -> Result<(), DbError> {
let hot_size = self.hot_log_size();
if hot_size > HOT_TIER_MAX_BYTES {
tracing::info!(
"🔥→🧊 Hot tier is {:.1} MB — promoting to cold tier",
hot_size as f64 / 1_048_576.0
);
self.promote_hot_to_cold(&entries)?;
let seq = count_log_lines(&self.hot_path);
if let Err(e) = write_snapshot(&self.hot_path, &[], seq) {
tracing::warn!("⚠️ Failed to write hot snapshot after promotion: {}", e);
}
let temp_path = format!("{}.tmp", self.hot_path);
write_compacted_log(&temp_path, &[])?;
self.hot.compact(vec![])?;
tracing::info!("✅ Promotion complete. Hot tier reset to empty.");
} else {
tracing::info!(
"🗜️ Compacting hot tier in place ({:.1} MB)",
hot_size as f64 / 1_048_576.0
);
let seq = count_log_lines(&self.hot_path);
if let Err(e) = write_snapshot(&self.hot_path, &entries, seq) {
tracing::warn!("⚠️ Failed to write hot snapshot: {}", e);
}
self.hot.compact(entries)?;
}
Ok(())
}
fn stream_log_into(&self, f: &mut dyn FnMut(LogEntry)) -> Result<u64, DbError> {
let mut total = 0u64;
if let Some(cold_reader) = MmapLogReader::open(&self.cold_path) {
let cold_line_count = cold_reader.line_count();
tracing::info!(
"🧊 Replaying cold tier via mmap ({} lines, file paged by OS)",
cold_line_count
);
cold_reader.stream_entries(0, |e| {
f(e);
total += 1;
});
}
let hot_count = self.hot.stream_log_into(f)?;
total += hot_count;
tracing::info!(
"✅ Tiered startup replay complete ({} total entries: cold + {} hot)",
total,
hot_count
);
Ok(total)
}
fn read_at(&self, offset: u64, length: u32) -> Result<Vec<u8>, DbError> {
use std::io::{Read, Seek, SeekFrom};
let cold_size = std::fs::metadata(&self.cold_path).map(|m| m.len()).unwrap_or(0);
if offset < cold_size {
let mut file = File::open(&self.cold_path)?;
file.seek(SeekFrom::Start(offset))?;
let mut buffer = vec![0u8; length as usize];
file.read_exact(&mut buffer)?;
Ok(buffer)
} else {
self.hot.read_at(offset - cold_size, length)
}
}
}