use crate::bloom::BloomFilter;
use crate::error::Result;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
pub(crate) mod flex_key {
use serde::{Deserialize, Deserializer, Serialize, Serializer};
pub fn serialize<S: Serializer>(key: &[u8], s: S) -> Result<S::Ok, S::Error> {
key.serialize(s)
}
pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Vec<u8>, D::Error> {
#[derive(Deserialize)]
#[serde(untagged)]
enum KeyRepr {
Bytes(Vec<u8>),
Str(String),
}
match KeyRepr::deserialize(d)? {
KeyRepr::Bytes(b) => Ok(b),
KeyRepr::Str(s) => Ok(s.into_bytes()),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct SstInfo {
pub id: u32,
pub records: u64,
pub bytes: u64,
pub min_ts: i64,
pub max_ts: i64,
pub min_expire: i64,
pub max_expire: i64,
#[serde(default)]
pub bloom: Option<BloomFilter>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct BlockInfo {
pub block_idx: u32,
#[serde(with = "flex_key")]
pub min_key: Vec<u8>,
#[serde(with = "flex_key")]
pub max_key: Vec<u8>,
pub min_ts: i64,
pub max_ts: i64,
pub min_expire: i64,
pub max_expire: i64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub(crate) enum ManifestEntry {
#[serde(rename = "flush")]
Flush {
seq: u64,
sst: SstInfo,
blocks: Vec<BlockInfo>,
},
#[serde(rename = "delete_sst")]
DeleteSst { sst_id: u32 },
#[serde(rename = "compaction")]
Compaction {
removed: Vec<u32>,
added: Vec<SstInfo>,
blocks: Vec<(u32, Vec<BlockInfo>)>,
},
#[serde(rename = "checkpoint")]
Checkpoint { last_flushed_seq: u64 },
#[serde(rename = "gc_delete_sst")]
GcDeleteSst { sst_id: u32 },
#[serde(rename = "update_bloom")]
UpdateBloom { sst_id: u32, bloom: BloomFilter },
}
#[derive(Debug, Clone, Default)]
pub(crate) struct ManifestState {
pub last_flushed_seq: u64,
pub sstables: HashMap<u32, SstInfo>,
pub block_infos: HashMap<u32, Vec<BlockInfo>>,
pub active_sst_ids: Vec<u32>,
}
pub(crate) struct Manifest {
path: PathBuf,
state: ManifestState,
dir_file: std::fs::File,
entry_count: usize,
}
const SNAPSHOT_THRESHOLD: usize = 500;
impl Manifest {
pub fn open(dir: &Path) -> Result<Self> {
std::fs::create_dir_all(dir)?;
let dir_file = std::fs::File::open(dir)?;
let path = dir.join("MANIFEST");
let mut state = ManifestState::default();
if path.exists() {
let content = std::fs::read_to_string(&path)?;
for line in content.lines() {
let line = line.trim();
if line.is_empty() {
continue;
}
if let Ok(entry) = serde_json::from_str::<ManifestEntry>(line) {
apply_entry(&mut state, &entry);
}
}
}
Ok(Self {
path,
state,
dir_file,
entry_count: 0,
})
}
pub fn append(&mut self, entry: &ManifestEntry) -> Result<()> {
let line = serde_json::to_string(entry)?;
let mut file = std::fs::OpenOptions::new()
.append(true)
.create(true)
.open(&self.path)?;
use std::io::Write;
writeln!(file, "{}", line)?;
file.flush()?;
file.sync_all()?;
self.dir_file.sync_all()?;
apply_entry(&mut self.state, entry);
self.entry_count += 1;
Ok(())
}
pub fn maybe_snapshot(&mut self) -> Result<()> {
if self.entry_count <= SNAPSHOT_THRESHOLD {
return Ok(());
}
self.snapshot()
}
fn snapshot(&mut self) -> Result<()> {
use std::io::Write;
let tmp_path = self.path.with_extension("MANIFEST.tmp");
{
let mut file = std::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&tmp_path)?;
for (sst_id, info) in &self.state.sstables {
let blocks = self
.state
.block_infos
.get(sst_id)
.cloned()
.unwrap_or_default();
let entry = ManifestEntry::Flush {
seq: self.state.last_flushed_seq,
sst: info.clone(),
blocks,
};
let line = serde_json::to_string(&entry)?;
writeln!(file, "{}", line)?;
}
let checkpoint = ManifestEntry::Checkpoint {
last_flushed_seq: self.state.last_flushed_seq,
};
let line = serde_json::to_string(&checkpoint)?;
writeln!(file, "{}", line)?;
file.flush()?;
file.sync_all()?;
}
std::fs::rename(&tmp_path, &self.path)?;
self.dir_file.sync_all()?;
self.entry_count = self.state.sstables.len() + 1;
tracing::info!(
"Manifest snapshot complete: {} active SSTs, {} entries",
self.state.sstables.len(),
self.entry_count
);
Ok(())
}
pub fn state(&self) -> &ManifestState {
&self.state
}
pub fn next_sst_id(&self) -> u32 {
self.state.sstables.keys().max().copied().unwrap_or(0) + 1
}
#[cfg(test)]
pub fn entry_count(&self) -> usize {
self.entry_count
}
}
fn apply_entry(state: &mut ManifestState, entry: &ManifestEntry) {
match entry {
ManifestEntry::Flush { seq, sst, blocks } => {
state.last_flushed_seq = state.last_flushed_seq.max(*seq);
state.sstables.insert(sst.id, sst.clone());
state.block_infos.insert(sst.id, blocks.clone());
if !state.active_sst_ids.contains(&sst.id) {
state.active_sst_ids.push(sst.id);
}
}
ManifestEntry::DeleteSst { sst_id } | ManifestEntry::GcDeleteSst { sst_id } => {
state.sstables.remove(sst_id);
state.block_infos.remove(sst_id);
state.active_sst_ids.retain(|id| id != sst_id);
}
ManifestEntry::Compaction {
removed,
added,
blocks,
} => {
for id in removed {
state.sstables.remove(id);
state.block_infos.remove(id);
state.active_sst_ids.retain(|sid| sid != id);
}
for info in added {
state.sstables.insert(info.id, info.clone());
if !state.active_sst_ids.contains(&info.id) {
state.active_sst_ids.push(info.id);
}
}
for (sst_id, blks) in blocks {
state.block_infos.insert(*sst_id, blks.clone());
}
}
ManifestEntry::Checkpoint { last_flushed_seq } => {
state.last_flushed_seq = state.last_flushed_seq.max(*last_flushed_seq);
}
ManifestEntry::UpdateBloom { sst_id, bloom } => {
if let Some(info) = state.sstables.get_mut(sst_id) {
info.bloom = Some(bloom.clone());
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn test_manifest_append_and_replay() {
let dir = TempDir::new().unwrap();
let sst_dir = dir.path().join("data");
std::fs::create_dir_all(&sst_dir).unwrap();
{
let mut mf = Manifest::open(&sst_dir).unwrap();
mf.append(&ManifestEntry::Flush {
seq: 100,
sst: SstInfo {
id: 1,
records: 100,
bytes: 4096,
min_ts: 1000,
max_ts: 2000,
min_expire: i64::MAX,
max_expire: i64::MAX,
bloom: None,
},
blocks: vec![BlockInfo {
block_idx: 0,
min_key: "a".into(),
max_key: "b".into(),
min_ts: 1000,
max_ts: 2000,
min_expire: i64::MAX,
max_expire: i64::MAX,
}],
})
.unwrap();
mf.append(&ManifestEntry::Flush {
seq: 200,
sst: SstInfo {
id: 2,
records: 50,
bytes: 2048,
min_ts: 2000,
max_ts: 3000,
min_expire: i64::MAX,
max_expire: i64::MAX,
bloom: None,
},
blocks: vec![],
})
.unwrap();
assert_eq!(mf.state().sstables.len(), 2);
assert_eq!(mf.state().last_flushed_seq, 200);
}
let mf2 = Manifest::open(&sst_dir).unwrap();
assert_eq!(mf2.state().sstables.len(), 2);
assert_eq!(mf2.state().last_flushed_seq, 200);
}
#[test]
fn test_manifest_delete() {
let dir = TempDir::new().unwrap();
let data_dir = dir.path().join("data");
std::fs::create_dir_all(&data_dir).unwrap();
let mut mf = Manifest::open(&data_dir).unwrap();
mf.append(&ManifestEntry::Flush {
seq: 100,
sst: SstInfo {
id: 1,
records: 10,
bytes: 100,
min_ts: 0,
max_ts: 0,
min_expire: 0,
max_expire: 0,
bloom: None,
},
blocks: vec![],
})
.unwrap();
mf.append(&ManifestEntry::DeleteSst { sst_id: 1 }).unwrap();
assert!(!mf.state().sstables.contains_key(&1));
assert!(mf.state().active_sst_ids.is_empty());
}
#[test]
fn test_manifest_legacy_string_keys_load() {
let dir = TempDir::new().unwrap();
let data_dir = dir.path().join("data");
std::fs::create_dir_all(&data_dir).unwrap();
let legacy_line = concat!(
r#"{"type":"flush","seq":10,"sst":{"id":1,"records":3,"bytes":128,"#,
r#""min_ts":0,"max_ts":100,"min_expire":0,"max_expire":0,"#,
r#""bloom":{"bits":[18446744073709551615,1024],"num_hashes":2}},"#,
r#""blocks":[{"block_idx":0,"min_key":"metric.cpu","max_key":"metric.mem","#,
r#""min_ts":0,"max_ts":100,"min_expire":0,"max_expire":0}]}"#
);
std::fs::write(data_dir.join("MANIFEST"), format!("{}\n", legacy_line)).unwrap();
let mf = Manifest::open(&data_dir).unwrap();
assert_eq!(mf.state().sstables.len(), 1);
assert!(mf.state().sstables.contains_key(&1));
let blocks = mf.state().block_infos.get(&1).unwrap();
assert_eq!(blocks.len(), 1);
assert_eq!(blocks[0].min_key, b"metric.cpu");
assert_eq!(blocks[0].max_key, b"metric.mem");
let bloom = mf.state().sstables[&1].bloom.as_ref().unwrap();
assert_eq!(bloom.hash_version(), 0);
}
#[test]
fn test_manifest_new_byte_keys_roundtrip() {
let dir = TempDir::new().unwrap();
let data_dir = dir.path().join("data");
std::fs::create_dir_all(&data_dir).unwrap();
{
let mut mf = Manifest::open(&data_dir).unwrap();
mf.append(&ManifestEntry::Flush {
seq: 1,
sst: SstInfo {
id: 1,
records: 1,
bytes: 32,
min_ts: 0,
max_ts: 0,
min_expire: 0,
max_expire: 0,
bloom: None,
},
blocks: vec![BlockInfo {
block_idx: 0,
min_key: b"\xff\x00key".to_vec(), max_key: b"\xff\x01key".to_vec(),
min_ts: 0,
max_ts: 0,
min_expire: 0,
max_expire: 0,
}],
})
.unwrap();
}
let mf2 = Manifest::open(&data_dir).unwrap();
let blocks = mf2.state().block_infos.get(&1).unwrap();
assert_eq!(blocks[0].min_key, b"\xff\x00key");
assert_eq!(blocks[0].max_key, b"\xff\x01key");
}
#[test]
fn test_manifest_update_bloom_entry() {
use crate::bloom::BloomFilter;
let dir = TempDir::new().unwrap();
let data_dir = dir.path().join("data");
std::fs::create_dir_all(&data_dir).unwrap();
let mut mf = Manifest::open(&data_dir).unwrap();
let legacy_bloom = {
let mut b = BloomFilter::from_keys_with_bits(&[b"k".to_vec()], 10);
b.mark_current(); b
};
mf.append(&ManifestEntry::Flush {
seq: 1,
sst: SstInfo {
id: 7,
records: 1,
bytes: 16,
min_ts: 0,
max_ts: 0,
min_expire: 0,
max_expire: 0,
bloom: Some(legacy_bloom),
},
blocks: vec![],
})
.unwrap();
let new_bloom = BloomFilter::from_keys_with_bits(&[b"new".to_vec()], 10);
mf.append(&ManifestEntry::UpdateBloom {
sst_id: 7,
bloom: new_bloom,
})
.unwrap();
assert_eq!(
mf.state().sstables[&7]
.bloom
.as_ref()
.unwrap()
.hash_version(),
crate::bloom::CURRENT_HASH_VERSION
);
drop(mf);
let mf2 = Manifest::open(&data_dir).unwrap();
assert!(mf2.state().sstables[&7].bloom.is_some());
assert_eq!(
mf2.state().sstables[&7]
.bloom
.as_ref()
.unwrap()
.hash_version(),
crate::bloom::CURRENT_HASH_VERSION
);
assert!(mf2.state().sstables[&7]
.bloom
.as_ref()
.unwrap()
.may_contain(b"new"));
}
}