use std::collections::HashMap;
use std::path::{Path, PathBuf};
use anyhow::{Context, Result};
use base64::engine::general_purpose::STANDARD as B64;
use base64::Engine as _;
use serde::Serialize;
use crate::fd_pool::{FdPool, DEFAULT_CAPACITY};
use crate::format::{decode_next, DecodeOutcome, RecordType, HEADER_LEN};
use crate::record_log::{RecordLog, RecordRef};
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct CompactionStats {
pub live_keys: u64,
pub records_written: u64,
pub bytes_written: u64,
}
#[derive(Debug)]
pub struct DataWal {
log: RecordLog,
map: HashMap<Vec<u8>, RecordRef>,
fd_pool: FdPool,
}
impl DataWal {
pub fn open(dir: &Path) -> Result<Self> {
let log = RecordLog::open(dir)?;
let map = rebuild_keydir_lazy(&log)?;
Ok(Self {
log,
map,
fd_pool: FdPool::with_capacity(DEFAULT_CAPACITY),
})
}
pub fn dir(&self) -> &Path {
self.log.dir()
}
pub fn len(&self) -> usize {
self.map.len()
}
pub fn is_empty(&self) -> bool {
self.map.is_empty()
}
pub fn contains_key(&self, key: &[u8]) -> bool {
self.map.contains_key(key)
}
pub fn ref_of(&self, key: &[u8]) -> Option<RecordRef> {
self.map.get(key).copied()
}
pub fn get(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
let rref = match self.map.get(key).copied() {
Some(r) => r,
None => return Ok(None),
};
let bytes =
self.fd_pool
.read_at(self.log.dir(), rref.segment, rref.offset, rref.len as usize)?;
decode_payload_for_get(&bytes, rref).map(Some)
}
pub fn keys(&self) -> Vec<Vec<u8>> {
self.map.keys().cloned().collect()
}
pub fn items(&mut self) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
let keys: Vec<Vec<u8>> = self.map.keys().cloned().collect();
let mut out = Vec::with_capacity(keys.len());
for k in keys {
let v = self.get(&k)?.ok_or_else(|| {
anyhow::anyhow!("datawal: keydir desync for key (len={})", k.len())
})?;
out.push((k, v));
}
Ok(out)
}
pub fn put(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
let rref = self.log.append_record(RecordType::Put, key, value)?;
self.map.insert(key.to_vec(), rref);
Ok(())
}
pub fn delete(&mut self, key: &[u8]) -> Result<()> {
self.log.append_record(RecordType::Delete, key, b"")?;
self.map.remove(key);
Ok(())
}
pub fn fsync(&mut self) -> Result<()> {
self.log.fsync()
}
pub fn log(&self) -> &RecordLog {
&self.log
}
pub fn compact_to(&mut self, out_dir: &Path) -> Result<CompactionStats> {
if out_dir.exists() {
let is_empty = std::fs::read_dir(out_dir)
.with_context(|| {
format!(
"datawal: read_dir on compact_to target {}",
out_dir.display()
)
})?
.next()
.is_none();
if !is_empty {
anyhow::bail!(
"datawal: compact_to target {} is not empty; refusing to overwrite",
out_dir.display()
);
}
}
let mut sorted_keys: Vec<Vec<u8>> = self.map.keys().cloned().collect();
sorted_keys.sort();
let mut out_log = RecordLog::open(out_dir)?;
let mut stats = CompactionStats::default();
for k in sorted_keys {
let v = self
.get(&k)?
.ok_or_else(|| anyhow::anyhow!("datawal: keydir desync during compact_to"))?;
let r = out_log.append_record(RecordType::Put, &k, &v)?;
stats.records_written += 1;
stats.bytes_written += r.len as u64;
}
stats.live_keys = stats.records_written;
out_log.fsync()?;
out_log.close()?;
Ok(stats)
}
pub fn export_jsonl(&mut self, out_path: &Path) -> Result<()> {
#[derive(Serialize)]
struct Row<'a> {
key_b64: &'a str,
value_b64: &'a str,
}
let mut sorted_keys: Vec<Vec<u8>> = self.map.keys().cloned().collect();
sorted_keys.sort();
let mut buf = String::new();
for k in sorted_keys {
let v = self
.get(&k)?
.ok_or_else(|| anyhow::anyhow!("datawal: keydir desync during export_jsonl"))?;
let k_b64 = B64.encode(&k);
let v_b64 = B64.encode(&v);
let line = serde_json::to_string(&Row {
key_b64: &k_b64,
value_b64: &v_b64,
})
.context("datawal: serialize JSONL row")?;
buf.push_str(&line);
buf.push('\n');
}
if let Some(parent) = out_path.parent() {
if !parent.as_os_str().is_empty() {
std::fs::create_dir_all(parent)
.with_context(|| format!("datawal: create_dir_all {}", parent.display()))?;
}
}
safeatomic_rs::write_atomic(out_path, buf.as_bytes())
.with_context(|| format!("datawal: write_atomic {}", out_path.display()))?;
Ok(())
}
pub fn dir_path_buf(&self) -> PathBuf {
self.log.dir().to_path_buf()
}
}
fn rebuild_keydir_lazy(log: &RecordLog) -> Result<HashMap<Vec<u8>, RecordRef>> {
let mut map: HashMap<Vec<u8>, RecordRef> = HashMap::new();
let iter = log.scan_iter()?;
for rec in iter {
let rec = rec?;
match rec.record_type {
RecordType::Put => {
map.insert(
rec.key,
RecordRef {
segment: rec.segment,
offset: rec.offset,
len: rec.len,
},
);
}
RecordType::Delete => {
map.remove(&rec.key);
}
RecordType::Raw => {
}
}
}
Ok(map)
}
fn decode_payload_for_get(bytes: &[u8], rref: RecordRef) -> Result<Vec<u8>> {
if bytes.len() < HEADER_LEN {
anyhow::bail!(
"datawal: short read for segment {:08} offset {} (got {}, need at least {})",
rref.segment,
rref.offset,
bytes.len(),
HEADER_LEN
);
}
match decode_next(bytes, 0) {
Ok(DecodeOutcome::Ok {
record_type,
key: _key,
payload,
bytes_consumed,
..
}) => {
if bytes_consumed != rref.len {
anyhow::bail!(
"datawal: frame length mismatch at segment {:08} offset {} (decoded {} vs ref {})",
rref.segment,
rref.offset,
bytes_consumed,
rref.len
);
}
if !matches!(record_type, RecordType::Put) {
anyhow::bail!(
"datawal: record at segment {:08} offset {} is not a Put (found {:?})",
rref.segment,
rref.offset,
record_type
);
}
Ok(payload)
}
Ok(DecodeOutcome::Truncated { available, needed }) => {
anyhow::bail!(
"datawal: truncated frame at segment {:08} offset {} (available {}, needed {})",
rref.segment,
rref.offset,
available,
needed
);
}
Ok(DecodeOutcome::CrcMismatch { bytes_consumed }) => {
anyhow::bail!(
"datawal: CRC mismatch at segment {:08} offset {} (size {})",
rref.segment,
rref.offset,
bytes_consumed
);
}
Err(e) => Err(anyhow::Error::new(e).context(format!(
"datawal: structural decode error at segment {:08} offset {}",
rref.segment, rref.offset
))),
}
}