use std::path::Path;
use std::sync::Arc;
use redb::{Database, ReadableTable, TableDefinition};
use serde::{Deserialize, Serialize};
use zccache_core::NormalizedPath;
mod long_path {
use std::path::Path;
use zccache_core::NormalizedPath;
pub(super) fn ensure_long_path(dir: &Path) -> std::io::Result<NormalizedPath> {
#[cfg(windows)]
{
if starts_with_verbatim(dir) {
return Ok(NormalizedPath::new(dir));
}
std::fs::canonicalize(dir).map(NormalizedPath::new)
}
#[cfg(not(windows))]
{
Ok(NormalizedPath::new(dir))
}
}
#[cfg(windows)]
fn starts_with_verbatim(p: &Path) -> bool {
let s = p.as_os_str().to_string_lossy();
s.starts_with(r"\\?\")
}
}
const KV_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("kv");
pub const INLINE_THRESHOLD: usize = 4 * 1024;
pub const MAX_VALUE_BYTES: usize = 64 * 1024 * 1024;
const SCHEMA_VERSION: u32 = 1;
const NAMESPACE_MAX: usize = 64;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct Key(pub [u8; 32]);
impl Key {
#[must_use]
pub fn from_hash(h: blake3::Hash) -> Self {
Self(*h.as_bytes())
}
#[must_use]
pub fn as_bytes(&self) -> &[u8; 32] {
&self.0
}
#[must_use]
pub fn to_hex(&self) -> String {
let mut out = String::with_capacity(64);
for byte in &self.0 {
out.push(hex_nibble(byte >> 4));
out.push(hex_nibble(byte & 0x0f));
}
out
}
pub fn from_hex(hex: &str) -> KvResult<Self> {
if hex.len() != 64 {
return Err(KvError::BadKey);
}
let bytes = hex.as_bytes();
let mut out = [0u8; 32];
for i in 0..32 {
let hi = parse_nibble(bytes[2 * i]).ok_or(KvError::BadKey)?;
let lo = parse_nibble(bytes[2 * i + 1]).ok_or(KvError::BadKey)?;
out[i] = (hi << 4) | lo;
}
Ok(Self(out))
}
}
fn hex_nibble(n: u8) -> char {
match n {
0..=9 => (b'0' + n) as char,
10..=15 => (b'a' + (n - 10)) as char,
_ => unreachable!(),
}
}
fn parse_nibble(b: u8) -> Option<u8> {
match b {
b'0'..=b'9' => Some(b - b'0'),
b'a'..=b'f' => Some(b - b'a' + 10),
b'A'..=b'F' => Some(b - b'A' + 10),
_ => None,
}
}
#[derive(Debug, thiserror::Error)]
pub enum KvError {
#[error("io: {0}")]
Io(#[from] std::io::Error),
#[error("redb: {0}")]
Redb(String),
#[error("namespace must be 1..=64 chars of [a-z0-9-] without `::`")]
BadNamespace,
#[error("key must be 32 bytes (64 hex chars)")]
BadKey,
#[error("corrupt entry for key {0}: {1}")]
Corrupt(String, String),
#[error("value too large: {0} bytes (max {1})")]
TooLarge(usize, usize),
}
impl From<redb::Error> for KvError {
fn from(e: redb::Error) -> Self {
Self::Redb(e.to_string())
}
}
impl From<redb::DatabaseError> for KvError {
fn from(e: redb::DatabaseError) -> Self {
Self::Redb(e.to_string())
}
}
impl From<redb::TransactionError> for KvError {
fn from(e: redb::TransactionError) -> Self {
Self::Redb(e.to_string())
}
}
impl From<redb::TableError> for KvError {
fn from(e: redb::TableError) -> Self {
Self::Redb(e.to_string())
}
}
impl From<redb::StorageError> for KvError {
fn from(e: redb::StorageError) -> Self {
Self::Redb(e.to_string())
}
}
impl From<redb::CommitError> for KvError {
fn from(e: redb::CommitError) -> Self {
Self::Redb(e.to_string())
}
}
pub type KvResult<T> = std::result::Result<T, KvError>;
#[derive(Debug, Serialize, Deserialize)]
struct KvRow {
schema_version: u32,
body: KvBody,
}
#[derive(Debug, Serialize, Deserialize)]
enum KvBody {
Inline(Vec<u8>),
Spilled { len: u64, blake3: [u8; 32] },
}
#[must_use]
pub fn is_valid_namespace(ns: &str) -> bool {
if ns.is_empty() || ns.len() > NAMESPACE_MAX {
return false;
}
if ns.contains("::") {
return false;
}
ns.bytes()
.all(|b| b.is_ascii_lowercase() || b.is_ascii_digit() || b == b'-')
}
fn check_namespace(ns: &str) -> KvResult<()> {
if is_valid_namespace(ns) {
Ok(())
} else {
Err(KvError::BadNamespace)
}
}
fn composite(ns: &str, key: &Key) -> String {
let mut s = String::with_capacity(ns.len() + 2 + 64);
s.push_str(ns);
s.push_str("::");
s.push_str(&key.to_hex());
s
}
#[derive(Clone)]
pub struct KvStore {
db: Arc<Database>,
cache_dir: Arc<NormalizedPath>,
}
impl KvStore {
pub fn open_default() -> KvResult<Self> {
let dir = zccache_core::config::default_cache_dir();
Self::open(dir)
}
pub fn open<P: AsRef<Path>>(dir: P) -> KvResult<Self> {
let mut dir = NormalizedPath::new(dir.as_ref());
std::fs::create_dir_all(&dir)?;
dir = long_path::ensure_long_path(dir.as_path())?;
let db_path = dir.join("index.redb");
let db = Database::create(&db_path).map_err(|e| KvError::Redb(e.to_string()))?;
let store = Self {
db: Arc::new(db),
cache_dir: Arc::new(dir),
};
store.ensure_table()?;
Ok(store)
}
pub fn from_database<P: AsRef<Path>>(db: Arc<Database>, cache_dir: P) -> KvResult<Self> {
let mut cache_dir = NormalizedPath::new(cache_dir.as_ref());
std::fs::create_dir_all(&cache_dir)?;
cache_dir = long_path::ensure_long_path(cache_dir.as_path())?;
let store = Self {
db,
cache_dir: Arc::new(cache_dir),
};
store.ensure_table()?;
Ok(store)
}
fn ensure_table(&self) -> KvResult<()> {
let txn = self.db.begin_write()?;
{
let _t = txn.open_table(KV_TABLE)?;
}
txn.commit()?;
Ok(())
}
fn spill_path(&self, namespace: &str, key: &Key) -> NormalizedPath {
self.cache_dir
.join("kv")
.join(namespace)
.join(format!("{}.bin", key.to_hex()))
}
pub fn get(&self, namespace: &str, key: &Key) -> KvResult<Option<Vec<u8>>> {
check_namespace(namespace)?;
let composite = composite(namespace, key);
let txn = self.db.begin_read()?;
let table = txn.open_table(KV_TABLE)?;
let raw = match table.get(composite.as_str())? {
Some(v) => v.value().to_vec(),
None => return Ok(None),
};
let row: KvRow = bincode::deserialize(&raw)
.map_err(|e| KvError::Corrupt(composite.clone(), format!("bincode: {e}")))?;
if row.schema_version != SCHEMA_VERSION {
return Err(KvError::Corrupt(
composite,
format!("schema_version={}", row.schema_version),
));
}
match row.body {
KvBody::Inline(bytes) => Ok(Some(bytes)),
KvBody::Spilled { len, blake3 } => {
let path = self.spill_path(namespace, key);
let bytes = std::fs::read(&path)?;
if bytes.len() as u64 != len {
return Err(KvError::Corrupt(
composite,
format!(
"spill length mismatch: got {}, expected {}",
bytes.len(),
len
),
));
}
let actual = *::blake3::hash(&bytes).as_bytes();
if actual != blake3 {
return Err(KvError::Corrupt(
composite,
"spill blake3 mismatch".to_string(),
));
}
Ok(Some(bytes))
}
}
}
pub fn put(&self, namespace: &str, key: &Key, value: &[u8]) -> KvResult<usize> {
check_namespace(namespace)?;
if value.len() > MAX_VALUE_BYTES {
return Err(KvError::TooLarge(value.len(), MAX_VALUE_BYTES));
}
let composite = composite(namespace, key);
let body = if value.len() <= INLINE_THRESHOLD {
let prev_path = self.spill_path(namespace, key);
if prev_path.exists() {
let _ = std::fs::remove_file(&prev_path);
}
KvBody::Inline(value.to_vec())
} else {
let path = self.spill_path(namespace, key);
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
}
let dir = path
.parent()
.expect("spill path always has a parent because we joined kv/<ns>/");
let mut tmp = tempfile::NamedTempFile::new_in(dir)?;
use std::io::Write;
tmp.write_all(value)?;
tmp.as_file().sync_all()?;
tmp.persist(&path).map_err(|e| KvError::Io(e.error))?;
let blake3 = *::blake3::hash(value).as_bytes();
KvBody::Spilled {
len: value.len() as u64,
blake3,
}
};
let row = KvRow {
schema_version: SCHEMA_VERSION,
body,
};
let bytes =
bincode::serialize(&row).map_err(|e| KvError::Redb(format!("serialize row: {e}")))?;
let txn = self.db.begin_write()?;
{
let mut table = txn.open_table(KV_TABLE)?;
table.insert(composite.as_str(), bytes.as_slice())?;
}
txn.commit()?;
Ok(value.len())
}
pub fn remove(&self, namespace: &str, key: &Key) -> KvResult<()> {
check_namespace(namespace)?;
let composite = composite(namespace, key);
let txn = self.db.begin_write()?;
let mut had_spill = None;
{
let mut table = txn.open_table(KV_TABLE)?;
let removed = table.remove(composite.as_str())?;
if let Some(existing) = removed {
if let Ok(row) = bincode::deserialize::<KvRow>(existing.value()) {
if matches!(row.body, KvBody::Spilled { .. }) {
had_spill = Some(self.spill_path(namespace, key));
}
}
}
}
txn.commit()?;
if let Some(path) = had_spill {
let _ = std::fs::remove_file(&path);
}
Ok(())
}
pub fn clear_namespace(&self, namespace: &str) -> KvResult<()> {
check_namespace(namespace)?;
let prefix = format!("{namespace}::");
let mut to_remove: Vec<String> = Vec::new();
{
let txn = self.db.begin_read()?;
let table = txn.open_table(KV_TABLE)?;
for entry in table.iter()? {
let (k, _v) = entry?;
let k_str = k.value().to_string();
if k_str.starts_with(&prefix) {
to_remove.push(k_str);
}
}
}
let txn = self.db.begin_write()?;
{
let mut table = txn.open_table(KV_TABLE)?;
for k in &to_remove {
table.remove(k.as_str())?;
}
}
txn.commit()?;
let ns_dir = self.cache_dir.join("kv").join(namespace);
if ns_dir.exists() {
let _ = std::fs::remove_dir_all(&ns_dir);
}
Ok(())
}
pub fn list_namespace(&self, namespace: &str) -> KvResult<Vec<(Key, u64)>> {
check_namespace(namespace)?;
let prefix = format!("{namespace}::");
let txn = self.db.begin_read()?;
let table = txn.open_table(KV_TABLE)?;
let mut out: Vec<(Key, u64)> = Vec::new();
for entry in table.iter()? {
let (k, v) = entry?;
let k_str = k.value().to_string();
if let Some(hex) = k_str.strip_prefix(&prefix) {
let key = Key::from_hex(hex)?;
let row: KvRow = bincode::deserialize(v.value())
.map_err(|e| KvError::Corrupt(k_str.clone(), format!("bincode: {e}")))?;
let len = match row.body {
KvBody::Inline(ref b) => b.len() as u64,
KvBody::Spilled { len, .. } => len,
};
out.push((key, len));
}
}
out.sort_by(|a, b| a.0.to_hex().cmp(&b.0.to_hex()));
Ok(out)
}
pub fn namespace_bytes(&self, namespace: &str) -> KvResult<u64> {
let entries = self.list_namespace(namespace)?;
Ok(entries.iter().map(|(_, l)| *l).sum())
}
pub fn total_bytes(&self) -> KvResult<u64> {
let txn = self.db.begin_read()?;
let table = txn.open_table(KV_TABLE)?;
let mut total: u64 = 0;
for entry in table.iter()? {
let (_, v) = entry?;
if let Ok(row) = bincode::deserialize::<KvRow>(v.value()) {
total += match row.body {
KvBody::Inline(b) => b.len() as u64,
KvBody::Spilled { len, .. } => len,
};
}
}
Ok(total)
}
pub fn stats(&self) -> KvResult<Vec<(String, u64)>> {
let txn = self.db.begin_read()?;
let table = txn.open_table(KV_TABLE)?;
let mut by_ns: std::collections::BTreeMap<String, u64> = Default::default();
for entry in table.iter()? {
let (k, v) = entry?;
let k_str = k.value().to_string();
let ns = match k_str.split_once("::") {
Some((ns, _)) => ns.to_string(),
None => continue,
};
if let Ok(row) = bincode::deserialize::<KvRow>(v.value()) {
let len = match row.body {
KvBody::Inline(b) => b.len() as u64,
KvBody::Spilled { len, .. } => len,
};
*by_ns.entry(ns).or_insert(0) += len;
}
}
Ok(by_ns.into_iter().collect())
}
}
#[cfg(test)]
mod tests {
use super::*;
fn store() -> (tempfile::TempDir, KvStore) {
let dir = tempfile::tempdir().unwrap();
let s = KvStore::open(dir.path()).unwrap();
(dir, s)
}
fn key_from(seed: &[u8]) -> Key {
Key::from_hash(::blake3::hash(seed))
}
#[test]
fn f1_round_trip_sizes() {
let (_d, s) = store();
let sizes = [
0,
1,
100,
INLINE_THRESHOLD - 1,
INLINE_THRESHOLD,
INLINE_THRESHOLD + 1,
64 * 1024,
];
for (i, n) in sizes.iter().enumerate() {
let k = key_from(&i.to_le_bytes());
let val: Vec<u8> = (0..*n).map(|j| (j % 251) as u8).collect();
assert_eq!(s.put("ns", &k, &val).unwrap(), val.len());
let got = s.get("ns", &k).unwrap().unwrap();
assert_eq!(got, val, "size {n} round-trip mismatch");
}
}
#[test]
fn f2_miss_returns_none() {
let (_d, s) = store();
let k = key_from(b"nope");
assert!(s.get("ns", &k).unwrap().is_none());
}
#[test]
fn f3_overwrite() {
let (_d, s) = store();
let k = key_from(b"ow");
s.put("ns", &k, b"v1").unwrap();
s.put("ns", &k, b"v2").unwrap();
assert_eq!(s.get("ns", &k).unwrap().unwrap(), b"v2");
}
#[test]
fn f4_remove() {
let (_d, s) = store();
let k = key_from(b"r");
s.put("ns", &k, b"x").unwrap();
s.remove("ns", &k).unwrap();
assert!(s.get("ns", &k).unwrap().is_none());
s.remove("ns", &k).unwrap();
}
#[test]
fn f5_clear_namespace_isolation() {
let (_d, s) = store();
let k = key_from(b"k");
s.put("a", &k, b"in-a").unwrap();
s.put("b", &k, b"in-b").unwrap();
s.clear_namespace("a").unwrap();
assert!(s.get("a", &k).unwrap().is_none());
assert_eq!(s.get("b", &k).unwrap().unwrap(), b"in-b");
}
#[test]
fn f6_list_sorted_and_lengths() {
let (_d, s) = store();
let mut keys: Vec<Key> = (0u32..5).map(|i| key_from(&i.to_le_bytes())).collect();
for (i, k) in keys.iter().enumerate() {
let n = if i % 2 == 0 {
10
} else {
INLINE_THRESHOLD + 100
};
let val = vec![i as u8; n];
s.put("ns", k, &val).unwrap();
}
let listed = s.list_namespace("ns").unwrap();
assert_eq!(listed.len(), 5);
keys.sort_by_key(|k| k.to_hex());
for (i, (k, _)) in listed.iter().enumerate() {
assert_eq!(k.to_hex(), keys[i].to_hex(), "list not sorted at {i}");
}
}
#[test]
fn f7_total_eq_sum() {
let (_d, s) = store();
for ns in &["a", "b", "c"] {
for i in 0..3 {
let k = key_from(format!("{ns}-{i}").as_bytes());
s.put(ns, &k, &vec![0u8; 50 + i]).unwrap();
}
}
let total = s.total_bytes().unwrap();
let sum: u64 = ["a", "b", "c"]
.iter()
.map(|ns| s.namespace_bytes(ns).unwrap())
.sum();
assert_eq!(total, sum);
}
#[test]
fn f8_byte_exact_threshold() {
let (d, s) = store();
let inline_key = key_from(b"inline");
let spill_key = key_from(b"spill");
s.put("ns", &inline_key, &vec![1u8; INLINE_THRESHOLD])
.unwrap();
s.put("ns", &spill_key, &vec![2u8; INLINE_THRESHOLD + 1])
.unwrap();
let inline_path = d
.path()
.join("kv")
.join("ns")
.join(format!("{}.bin", inline_key.to_hex()));
let spill_path = d
.path()
.join("kv")
.join("ns")
.join(format!("{}.bin", spill_key.to_hex()));
assert!(!inline_path.exists(), "inline value must NOT spill");
assert!(spill_path.exists(), "spill threshold + 1 must spill");
assert_eq!(
std::fs::metadata(&spill_path).unwrap().len(),
(INLINE_THRESHOLD + 1) as u64
);
}
#[test]
fn f9_tampered_spill_detected() {
let (d, s) = store();
let k = key_from(b"corrupt");
s.put("ns", &k, &vec![7u8; INLINE_THRESHOLD + 100]).unwrap();
let path = d
.path()
.join("kv")
.join("ns")
.join(format!("{}.bin", k.to_hex()));
let mut bytes = std::fs::read(&path).unwrap();
bytes[0] ^= 0xff;
std::fs::write(&path, &bytes).unwrap();
let err = s.get("ns", &k).unwrap_err();
assert!(matches!(err, KvError::Corrupt(_, _)), "got {err:?}");
}
#[test]
fn f10_key_hex_round_trip() {
let h = ::blake3::hash(b"hello");
let k = Key::from_hash(h);
let hex = k.to_hex();
assert_eq!(hex.len(), 64);
assert!(hex
.chars()
.all(|c| c.is_ascii_lowercase() || c.is_ascii_digit()));
let k2 = Key::from_hex(&hex).unwrap();
assert_eq!(k, k2);
let upper = hex.to_ascii_uppercase();
let k3 = Key::from_hex(&upper).unwrap();
assert_eq!(k, k3);
assert!(matches!(Key::from_hex(""), Err(KvError::BadKey)));
assert!(matches!(Key::from_hex("zz"), Err(KvError::BadKey)));
assert!(matches!(
Key::from_hex(&"a".repeat(63)),
Err(KvError::BadKey)
));
assert!(matches!(
Key::from_hex(&"a".repeat(65)),
Err(KvError::BadKey)
));
let mut bad = "a".repeat(64);
bad.replace_range(0..1, "g");
assert!(matches!(Key::from_hex(&bad), Err(KvError::BadKey)));
}
#[test]
fn f11_namespace_validator() {
assert!(is_valid_namespace("a"));
assert!(is_valid_namespace("0"));
assert!(is_valid_namespace("library-selection"));
assert!(is_valid_namespace(&"x".repeat(64)));
assert!(!is_valid_namespace(""));
assert!(!is_valid_namespace("A"));
assert!(!is_valid_namespace("name with space"));
assert!(!is_valid_namespace("a/b"));
assert!(!is_valid_namespace("日本語"));
assert!(!is_valid_namespace(&"x".repeat(65)));
assert!(!is_valid_namespace("a::b"));
}
#[test]
fn f12_schema_version_mismatch() {
let (_d, s) = store();
let k = key_from(b"sv");
let row = KvRow {
schema_version: SCHEMA_VERSION + 1,
body: KvBody::Inline(b"hi".to_vec()),
};
let bytes = bincode::serialize(&row).unwrap();
let composite_key = composite("ns", &k);
let txn = s.db.begin_write().unwrap();
{
let mut t = txn.open_table(KV_TABLE).unwrap();
t.insert(composite_key.as_str(), bytes.as_slice()).unwrap();
}
txn.commit().unwrap();
let err = s.get("ns", &k).unwrap_err();
match err {
KvError::Corrupt(_, msg) => assert!(msg.contains("schema_version="), "msg={msg}"),
other => panic!("expected Corrupt, got {other:?}"),
}
}
#[test]
fn i1_empty_namespace_rejected() {
let (_d, s) = store();
let k = key_from(b"x");
assert!(matches!(s.put("", &k, b"v"), Err(KvError::BadNamespace)));
}
#[test]
fn i2_namespace_at_limit_ok() {
let (_d, s) = store();
let k = key_from(b"x");
let ns = "a".repeat(64);
s.put(&ns, &k, b"v").unwrap();
}
#[test]
fn i3_namespace_too_long_rejected() {
let (_d, s) = store();
let k = key_from(b"x");
let ns = "a".repeat(65);
assert!(matches!(s.put(&ns, &k, b"v"), Err(KvError::BadNamespace)));
}
#[test]
fn i4_namespace_with_double_colon_rejected() {
let (_d, s) = store();
let k = key_from(b"x");
assert!(matches!(
s.put("a::b", &k, b"v"),
Err(KvError::BadNamespace)
));
}
#[test]
#[ignore = "allocates 64 MiB; see tests/kv_stress.rs for max-cap coverage"]
fn i6_too_large_rejected() {
let (_d, s) = store();
let k = key_from(b"big");
let oversized = MAX_VALUE_BYTES + 1;
let v = vec![0u8; oversized];
let err = s.put("ns", &k, &v).unwrap_err();
assert!(matches!(err, KvError::TooLarge(n, m) if n == oversized && m == MAX_VALUE_BYTES));
}
#[test]
fn i7_namespaces_are_independent() {
let (_d, s) = store();
let k = key_from(b"shared");
s.put("a", &k, b"a-val").unwrap();
s.put("b", &k, b"b-val").unwrap();
assert_eq!(s.get("a", &k).unwrap().unwrap(), b"a-val");
assert_eq!(s.get("b", &k).unwrap().unwrap(), b"b-val");
}
#[test]
fn p8_reopen_round_trip() {
let dir = tempfile::tempdir().unwrap();
let k = key_from(b"persist");
{
let s = KvStore::open(dir.path()).unwrap();
s.put("ns", &k, &vec![3u8; INLINE_THRESHOLD + 10]).unwrap();
}
let s = KvStore::open(dir.path()).unwrap();
let got = s.get("ns", &k).unwrap().unwrap();
assert_eq!(got, vec![3u8; INLINE_THRESHOLD + 10]);
}
#[test]
fn p3_case_insensitive_key_parses_to_same_key() {
let h = ::blake3::hash(b"x");
let k = Key::from_hash(h);
let lower = k.to_hex();
let upper = lower.to_ascii_uppercase();
let k_lower = Key::from_hex(&lower).unwrap();
let k_upper = Key::from_hex(&upper).unwrap();
assert_eq!(k_lower, k_upper);
}
#[test]
fn i8_durability_after_commit() {
let dir = tempfile::tempdir().unwrap();
let k = key_from(b"d");
{
let s = KvStore::open(dir.path()).unwrap();
s.put("ns", &k, b"durable").unwrap();
}
let s = KvStore::open(dir.path()).unwrap();
assert_eq!(s.get("ns", &k).unwrap().unwrap(), b"durable");
}
}