use std::collections::HashMap;
use std::path::{Path, PathBuf};
use anyhow::{Context, Result};
use base64::engine::general_purpose::STANDARD as B64;
use base64::Engine as _;
use serde::Serialize;
use crate::format::RecordType;
use crate::record_log::{Record, RecordLog};
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct CompactionStats {
pub live_keys: u64,
pub records_written: u64,
pub bytes_written: u64,
}
#[derive(Debug)]
pub struct DataWal {
log: RecordLog,
map: HashMap<Vec<u8>, Vec<u8>>,
}
impl DataWal {
pub fn open(dir: &Path) -> Result<Self> {
let mut log = RecordLog::open(dir)?;
let records = log.scan()?;
let map = rebuild_keydir(&records);
Ok(Self { log, map })
}
pub fn dir(&self) -> &Path {
self.log.dir()
}
pub fn len(&self) -> usize {
self.map.len()
}
pub fn is_empty(&self) -> bool {
self.map.is_empty()
}
pub fn contains_key(&self, key: &[u8]) -> bool {
self.map.contains_key(key)
}
pub fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
Ok(self.map.get(key).cloned())
}
pub fn keys(&self) -> Vec<Vec<u8>> {
self.map.keys().cloned().collect()
}
pub fn items(&self) -> Vec<(Vec<u8>, Vec<u8>)> {
self.map
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect()
}
pub fn put(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
self.log.append_record(RecordType::Put, key, value)?;
self.map.insert(key.to_vec(), value.to_vec());
Ok(())
}
pub fn delete(&mut self, key: &[u8]) -> Result<()> {
self.log.append_record(RecordType::Delete, key, b"")?;
self.map.remove(key);
Ok(())
}
pub fn fsync(&mut self) -> Result<()> {
self.log.fsync()
}
pub fn log(&self) -> &RecordLog {
&self.log
}
pub fn compact_to(&self, out_dir: &Path) -> Result<CompactionStats> {
if out_dir.exists() {
let is_empty = std::fs::read_dir(out_dir)
.with_context(|| {
format!(
"datawal: read_dir on compact_to target {}",
out_dir.display()
)
})?
.next()
.is_none();
if !is_empty {
anyhow::bail!(
"datawal: compact_to target {} is not empty; refusing to overwrite",
out_dir.display()
);
}
}
let mut out_log = RecordLog::open(out_dir)?;
let mut stats = CompactionStats::default();
let mut sorted: Vec<(&Vec<u8>, &Vec<u8>)> = self.map.iter().collect();
sorted.sort_by(|a, b| a.0.cmp(b.0));
for (k, v) in sorted {
let r = out_log.append_record(RecordType::Put, k, v)?;
stats.records_written += 1;
stats.bytes_written += r.len as u64;
}
stats.live_keys = stats.records_written;
out_log.fsync()?;
out_log.close()?;
Ok(stats)
}
pub fn export_jsonl(&self, out_path: &Path) -> Result<()> {
#[derive(Serialize)]
struct Row<'a> {
key_b64: &'a str,
value_b64: &'a str,
}
let mut sorted: Vec<(&Vec<u8>, &Vec<u8>)> = self.map.iter().collect();
sorted.sort_by(|a, b| a.0.cmp(b.0));
let mut buf = String::new();
for (k, v) in sorted {
let k_b64 = B64.encode(k);
let v_b64 = B64.encode(v);
let line = serde_json::to_string(&Row {
key_b64: &k_b64,
value_b64: &v_b64,
})
.context("datawal: serialize JSONL row")?;
buf.push_str(&line);
buf.push('\n');
}
if let Some(parent) = out_path.parent() {
if !parent.as_os_str().is_empty() {
std::fs::create_dir_all(parent)
.with_context(|| format!("datawal: create_dir_all {}", parent.display()))?;
}
}
safeatomic_rs::write_atomic(out_path, buf.as_bytes())
.with_context(|| format!("datawal: write_atomic {}", out_path.display()))?;
Ok(())
}
pub fn dir_path_buf(&self) -> PathBuf {
self.log.dir().to_path_buf()
}
}
fn rebuild_keydir(records: &[Record]) -> HashMap<Vec<u8>, Vec<u8>> {
let mut map: HashMap<Vec<u8>, Vec<u8>> = HashMap::new();
for r in records {
match r.record_type {
RecordType::Put => {
map.insert(r.key.clone(), r.payload.clone());
}
RecordType::Delete => {
map.remove(&r.key);
}
RecordType::Raw => {
}
}
}
map
}