use std::path::Path;
use fjall::{Config, Database, Keyspace, KeyspaceCreateOptions, PersistMode};
use crate::kv::{KvEntry, KvUpdate, VersionToken, WatchCursor};
use crate::snapshot::{SnapshotError, SnapshotStore};
const DATA_PARTITION: &str = "data";
const META_PARTITION: &str = "meta";
const CURSOR_KEY: &[u8] = b"cursor";
#[derive(Debug, Clone, Copy, Default)]
pub struct FjallConfig {
pub sync: bool,
}
pub struct FjallSnapshot {
db: Database,
data: Keyspace,
meta: Keyspace,
config: FjallConfig,
cursor: WatchCursor,
}
impl FjallSnapshot {
pub fn open(path: &Path, config: FjallConfig) -> Result<(WatchCursor, Self), SnapshotError> {
std::fs::create_dir_all(path)?;
let db = Database::open(Config::new(path)).map_err(map_fjall)?;
let data = db
.keyspace(DATA_PARTITION, KeyspaceCreateOptions::default)
.map_err(map_fjall)?;
let meta = db
.keyspace(META_PARTITION, KeyspaceCreateOptions::default)
.map_err(map_fjall)?;
let cursor = match meta.get(CURSOR_KEY).map_err(map_fjall)? {
Some(raw) => VersionToken::from_raw(&raw)
.map(WatchCursor::from_version)
.ok_or_else(|| {
SnapshotError::InvalidFormat(format!(
"stored cursor is {} bytes, exceeds version token capacity",
raw.len()
))
})?,
None => WatchCursor::none(),
};
Ok((
cursor.clone(),
Self {
db,
data,
meta,
config,
cursor,
},
))
}
pub fn cursor(&self) -> &WatchCursor {
&self.cursor
}
}
impl SnapshotStore for FjallSnapshot {
fn load(path: &Path) -> Result<(WatchCursor, Self), SnapshotError> {
Self::open(path, FjallConfig::default())
}
fn apply(&mut self, batch: &[KvUpdate], cursor: &WatchCursor) -> Result<(), SnapshotError> {
let mut wb = self.db.batch().durability(self.durability());
let mut scratch = Vec::new();
for update in batch {
match update {
KvUpdate::Put(entry) => {
encode_value_into(&mut scratch, &entry.value, &entry.version)?;
wb.insert(&self.data, entry.key.as_bytes(), scratch.as_slice());
}
KvUpdate::Delete { key, .. } | KvUpdate::Purge { key, .. } => {
wb.remove(&self.data, key.as_bytes());
}
}
}
wb.insert(&self.meta, CURSOR_KEY, cursor.version().as_bytes());
wb.commit().map_err(map_fjall)?;
self.cursor = cursor.clone();
Ok(())
}
fn get(&self, key: &str) -> Result<Option<KvEntry>, SnapshotError> {
match self.data.get(key.as_bytes()).map_err(map_fjall)? {
Some(raw) => Ok(Some(decode_entry(key, &raw)?)),
None => Ok(None),
}
}
fn range(&self, prefix: &str) -> Result<Vec<KvEntry>, SnapshotError> {
let mut out = Vec::new();
self.for_each_in_range(prefix, |entry| {
out.push(entry);
Ok(())
})?;
Ok(out)
}
fn for_each_in_range(
&self,
prefix: &str,
mut f: impl FnMut(KvEntry) -> Result<(), SnapshotError>,
) -> Result<(), SnapshotError> {
for guard in self.data.prefix(prefix.as_bytes()) {
let (raw_key, raw_val) = guard.into_inner().map_err(map_fjall)?;
let key = std::str::from_utf8(&raw_key).map_err(|e| {
SnapshotError::InvalidFormat(format!("non-UTF-8 key in fjall store: {e}"))
})?;
f(decode_entry(key, &raw_val)?)?;
}
Ok(())
}
}
impl FjallSnapshot {
fn durability(&self) -> Option<PersistMode> {
if self.config.sync {
Some(PersistMode::SyncAll)
} else {
Some(PersistMode::Buffer)
}
}
}
fn encode_value_into(
buf: &mut Vec<u8>,
value: &[u8],
version: &VersionToken,
) -> Result<(), SnapshotError> {
let vb = version.as_bytes();
let ver_len = u8::try_from(vb.len()).map_err(|_| {
SnapshotError::InvalidFormat(format!(
"version too long: {} bytes (max {})",
vb.len(),
u8::MAX
))
})?;
buf.clear();
buf.reserve(1 + vb.len() + value.len());
buf.push(ver_len);
buf.extend_from_slice(vb);
buf.extend_from_slice(value);
Ok(())
}
fn decode_entry(key: &str, raw: &[u8]) -> Result<KvEntry, SnapshotError> {
let ver_len = *raw.first().ok_or_else(|| {
SnapshotError::InvalidFormat("fjall value record is empty (no version length)".into())
})? as usize;
let value_off = 1 + ver_len;
if raw.len() < value_off {
return Err(SnapshotError::InvalidFormat(format!(
"fjall value record truncated: need {value_off} bytes for version, have {}",
raw.len()
)));
}
let version = VersionToken::from_raw(&raw[1..value_off]).ok_or_else(|| {
SnapshotError::InvalidFormat(format!(
"version length {ver_len} exceeds version token capacity"
))
})?;
Ok(KvEntry {
key: key.to_string(),
value: raw[value_off..].to_vec(),
version,
})
}
fn map_fjall(e: fjall::Error) -> SnapshotError {
match e {
fjall::Error::Io(io) => SnapshotError::Io(io),
other => SnapshotError::Backend(other.to_string()),
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn encode_decode_round_trips_fdb_versionstamp() {
let vs = VersionToken::from_fdb_versionstamp(&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
let mut enc = Vec::new();
encode_value_into(&mut enc, b"payload", &vs).expect("encode");
let entry = decode_entry("k", &enc).expect("decode");
assert_eq!(entry.version.as_bytes(), &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
assert!(
entry.version.as_u64().is_none(),
"a 10-byte token has no u64 form — it must not be flattened"
);
assert_eq!(entry.value, b"payload");
}
#[test]
fn encode_decode_round_trips_empty_value() {
let mut enc = Vec::new();
encode_value_into(&mut enc, b"", &VersionToken::from_u64(7)).expect("encode");
let entry = decode_entry("k", &enc).expect("decode");
assert!(entry.value.is_empty());
assert_eq!(entry.version.as_u64(), Some(7));
}
#[test]
fn decode_entry_rejects_empty_record() {
let err = decode_entry("k", &[]).unwrap_err();
assert!(
matches!(err, SnapshotError::InvalidFormat(_)),
"empty record must be a format error, got {err:?}"
);
}
#[test]
fn decode_entry_rejects_truncated_version() {
let raw = [5u8, 0xAA, 0xBB];
let err = decode_entry("k", &raw).unwrap_err();
assert!(
matches!(err, SnapshotError::InvalidFormat(_)),
"truncated version must be a format error, got {err:?}"
);
}
#[test]
fn decode_entry_rejects_oversized_version() {
let mut raw = vec![11u8];
raw.extend_from_slice(&[0u8; 11]);
let err = decode_entry("k", &raw).unwrap_err();
assert!(
matches!(err, SnapshotError::InvalidFormat(_)),
"oversized version must be a format error, got {err:?}"
);
}
#[test]
fn open_rejects_corrupted_cursor() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("store");
{
let (_c, store) =
FjallSnapshot::open(&path, FjallConfig::default()).expect("initial open");
store
.meta
.insert(CURSOR_KEY, [0u8; 11])
.expect("insert oversized cursor");
store.db.persist(PersistMode::SyncAll).expect("persist");
}
match FjallSnapshot::open(&path, FjallConfig::default()) {
Err(SnapshotError::InvalidFormat(_)) => {}
Err(other) => panic!("expected InvalidFormat, got {other:?}"),
Ok(_) => panic!("expected open to reject the oversized cursor"),
}
}
}