#![cfg(feature = "persistence")]
use crate::store::{
BlockIO, BlockKey, BlockMeta, DType, MetaLog, ReconstructPolicy, StoreError, Tier,
};
use std::collections::HashMap;
use std::fs;
use std::io::Write;
use std::path::{Path, PathBuf};
const RECORD_SIZE: usize = 91;
pub fn encode_meta(meta: &BlockMeta) -> Vec<u8> {
let mut buf = Vec::with_capacity(RECORD_SIZE);
buf.extend_from_slice(&meta.key.tensor_id.to_le_bytes());
buf.extend_from_slice(&meta.key.block_index.to_le_bytes());
buf.push(meta.dtype as u8);
buf.push(meta.tier as u8);
buf.push(meta.bits);
buf.extend_from_slice(&meta.scale.to_le_bytes());
buf.extend_from_slice(&meta.zero_point.to_le_bytes());
buf.extend_from_slice(&meta.created_at.to_le_bytes());
buf.extend_from_slice(&meta.last_access_at.to_le_bytes());
buf.extend_from_slice(&meta.access_count.to_le_bytes());
buf.extend_from_slice(&meta.ema_rate.to_le_bytes());
buf.extend_from_slice(&meta.window.to_le_bytes());
buf.extend_from_slice(&meta.checksum.to_le_bytes());
buf.push(meta.reconstruct as u8);
buf.extend_from_slice(&meta.tier_age.to_le_bytes());
match meta.lineage_parent {
Some(parent) => {
buf.push(1);
buf.extend_from_slice(&parent.to_le_bytes());
}
None => {
buf.push(0);
buf.extend_from_slice(&0u128.to_le_bytes());
}
}
buf.extend_from_slice(&meta.block_bytes.to_le_bytes());
debug_assert_eq!(buf.len(), RECORD_SIZE);
buf
}
pub fn decode_meta(bytes: &[u8]) -> Result<BlockMeta, StoreError> {
if bytes.len() < RECORD_SIZE {
return Err(StoreError::InvalidData);
}
let tensor_id = u128::from_le_bytes(
bytes[0..16]
.try_into()
.map_err(|_| StoreError::InvalidData)?,
);
let block_index = u32::from_le_bytes(
bytes[16..20]
.try_into()
.map_err(|_| StoreError::InvalidData)?,
);
let dtype = match bytes[20] {
0 => DType::F32,
1 => DType::F16,
2 => DType::BF16,
_ => return Err(StoreError::InvalidData),
};
let tier = match bytes[21] {
0 => Tier::Tier0,
1 => Tier::Tier1,
2 => Tier::Tier2,
3 => Tier::Tier3,
_ => return Err(StoreError::InvalidData),
};
let bits = bytes[22];
let scale = f32::from_le_bytes(
bytes[23..27]
.try_into()
.map_err(|_| StoreError::InvalidData)?,
);
let zero_point = i16::from_le_bytes(
bytes[27..29]
.try_into()
.map_err(|_| StoreError::InvalidData)?,
);
let created_at = u64::from_le_bytes(
bytes[29..37]
.try_into()
.map_err(|_| StoreError::InvalidData)?,
);
let last_access_at = u64::from_le_bytes(
bytes[37..45]
.try_into()
.map_err(|_| StoreError::InvalidData)?,
);
let access_count = u32::from_le_bytes(
bytes[45..49]
.try_into()
.map_err(|_| StoreError::InvalidData)?,
);
let ema_rate = f32::from_le_bytes(
bytes[49..53]
.try_into()
.map_err(|_| StoreError::InvalidData)?,
);
let window = u64::from_le_bytes(
bytes[53..61]
.try_into()
.map_err(|_| StoreError::InvalidData)?,
);
let checksum = u32::from_le_bytes(
bytes[61..65]
.try_into()
.map_err(|_| StoreError::InvalidData)?,
);
let reconstruct = match bytes[65] {
0 => ReconstructPolicy::None,
1 => ReconstructPolicy::Delta,
2 => ReconstructPolicy::Factor,
_ => return Err(StoreError::InvalidData),
};
let tier_age = u32::from_le_bytes(
bytes[66..70]
.try_into()
.map_err(|_| StoreError::InvalidData)?,
);
let has_lineage = bytes[70];
let lineage_value = u128::from_le_bytes(
bytes[71..87]
.try_into()
.map_err(|_| StoreError::InvalidData)?,
);
let lineage_parent = if has_lineage != 0 {
Some(lineage_value)
} else {
None
};
let block_bytes = u32::from_le_bytes(
bytes[87..91]
.try_into()
.map_err(|_| StoreError::InvalidData)?,
);
Ok(BlockMeta {
key: BlockKey {
tensor_id,
block_index,
},
dtype,
tier,
bits,
scale,
zero_point,
created_at,
last_access_at,
access_count,
ema_rate,
window,
checksum,
reconstruct,
tier_age,
lineage_parent,
block_bytes,
})
}
pub struct FileBlockIO {
base_dir: PathBuf,
}
impl FileBlockIO {
pub fn new(base_dir: impl Into<PathBuf>) -> Result<Self, StoreError> {
let base_dir = base_dir.into();
for tier_num in 0..=3u8 {
let tier_dir = base_dir.join(format!("tier{}", tier_num));
fs::create_dir_all(&tier_dir).map_err(|_| StoreError::IOError)?;
}
Ok(Self { base_dir })
}
fn block_path(&self, tier: Tier, key: BlockKey) -> PathBuf {
self.base_dir
.join(format!("tier{}", tier as u8))
.join(format!("{:032x}_{}.bin", key.tensor_id, key.block_index))
}
pub fn base_dir(&self) -> &Path {
&self.base_dir
}
}
impl BlockIO for FileBlockIO {
fn read_block(&self, tier: Tier, key: BlockKey, dst: &mut [u8]) -> Result<usize, StoreError> {
let path = self.block_path(tier, key);
let data = fs::read(&path).map_err(|_| StoreError::BlockNotFound)?;
let n = data.len().min(dst.len());
dst[..n].copy_from_slice(&data[..n]);
Ok(n)
}
fn write_block(&mut self, tier: Tier, key: BlockKey, src: &[u8]) -> Result<(), StoreError> {
if tier == Tier::Tier0 {
return Err(StoreError::InvalidBlock);
}
let path = self.block_path(tier, key);
fs::write(&path, src).map_err(|_| StoreError::IOError)
}
fn delete_block(&mut self, tier: Tier, key: BlockKey) -> Result<(), StoreError> {
let path = self.block_path(tier, key);
fs::remove_file(&path).map_err(|_| StoreError::BlockNotFound)
}
}
pub struct FileMetaLog {
log_path: PathBuf,
index: HashMap<BlockKey, BlockMeta>,
}
impl FileMetaLog {
pub fn new(base_dir: impl Into<PathBuf>) -> Result<Self, StoreError> {
let base_dir = base_dir.into();
fs::create_dir_all(&base_dir).map_err(|_| StoreError::IOError)?;
let log_path = base_dir.join("meta.log");
let mut index = HashMap::new();
if log_path.exists() {
let data = fs::read(&log_path).map_err(|_| StoreError::IOError)?;
let mut offset = 0;
while offset + RECORD_SIZE <= data.len() {
if let Ok(meta) = decode_meta(&data[offset..offset + RECORD_SIZE]) {
index.insert(meta.key, meta);
}
offset += RECORD_SIZE;
}
}
Ok(Self { log_path, index })
}
pub fn log_path(&self) -> &Path {
&self.log_path
}
pub fn len(&self) -> usize {
self.index.len()
}
pub fn is_empty(&self) -> bool {
self.index.is_empty()
}
}
impl MetaLog for FileMetaLog {
fn append(&mut self, rec: &BlockMeta) -> Result<(), StoreError> {
let encoded = encode_meta(rec);
let mut file = fs::OpenOptions::new()
.create(true)
.append(true)
.open(&self.log_path)
.map_err(|_| StoreError::IOError)?;
file.write_all(&encoded).map_err(|_| StoreError::IOError)?;
file.flush().map_err(|_| StoreError::IOError)?;
self.index.insert(rec.key, rec.clone());
Ok(())
}
fn get(&self, key: BlockKey) -> Option<&BlockMeta> {
self.index.get(&key)
}
fn iter(&self) -> Box<dyn Iterator<Item = &BlockMeta> + '_> {
Box::new(self.index.values())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicU32, Ordering};
static TEST_ID: AtomicU32 = AtomicU32::new(0);
fn test_dir(prefix: &str) -> PathBuf {
let id = TEST_ID.fetch_add(1, Ordering::SeqCst);
let pid = std::process::id();
let dir =
std::env::temp_dir().join(format!("ruvector_persistence_{}_{}_{}", prefix, pid, id));
let _ = fs::remove_dir_all(&dir);
fs::create_dir_all(&dir).unwrap();
dir
}
fn cleanup(dir: &Path) {
let _ = fs::remove_dir_all(dir);
}
fn make_key(tid: u128, idx: u32) -> BlockKey {
BlockKey {
tensor_id: tid,
block_index: idx,
}
}
fn sample_meta(key: BlockKey) -> BlockMeta {
BlockMeta {
key,
dtype: DType::F32,
tier: Tier::Tier1,
bits: 8,
scale: 0.03125,
zero_point: 0,
created_at: 1000,
last_access_at: 2000,
access_count: 42,
ema_rate: 0.75,
window: 0xAAAA_BBBB_CCCC_DDDD,
checksum: 0xDEAD_BEEF,
reconstruct: ReconstructPolicy::None,
tier_age: 15,
lineage_parent: None,
block_bytes: 512,
}
}
#[test]
fn encode_decode_roundtrip_basic() {
let key = make_key(0x0123_4567_89AB_CDEF_FEDC_BA98_7654_3210, 7);
let meta = sample_meta(key);
let encoded = encode_meta(&meta);
assert_eq!(encoded.len(), RECORD_SIZE);
let decoded = decode_meta(&encoded).unwrap();
assert_eq!(decoded.key, meta.key);
assert_eq!(decoded.dtype, meta.dtype);
assert_eq!(decoded.tier, meta.tier);
assert_eq!(decoded.bits, meta.bits);
assert!((decoded.scale - meta.scale).abs() < 1e-10);
assert_eq!(decoded.zero_point, meta.zero_point);
assert_eq!(decoded.created_at, meta.created_at);
assert_eq!(decoded.last_access_at, meta.last_access_at);
assert_eq!(decoded.access_count, meta.access_count);
assert!((decoded.ema_rate - meta.ema_rate).abs() < 1e-6);
assert_eq!(decoded.window, meta.window);
assert_eq!(decoded.checksum, meta.checksum);
assert_eq!(decoded.reconstruct, meta.reconstruct);
assert_eq!(decoded.tier_age, meta.tier_age);
assert_eq!(decoded.lineage_parent, meta.lineage_parent);
assert_eq!(decoded.block_bytes, meta.block_bytes);
}
#[test]
fn encode_decode_with_lineage() {
let key = make_key(1, 0);
let mut meta = sample_meta(key);
meta.lineage_parent = Some(0xFFFF_FFFF_FFFF_FFFF_0000_0000_0000_0001);
let encoded = encode_meta(&meta);
let decoded = decode_meta(&encoded).unwrap();
assert_eq!(
decoded.lineage_parent,
Some(0xFFFF_FFFF_FFFF_FFFF_0000_0000_0000_0001)
);
}
#[test]
fn encode_decode_all_dtypes() {
for (dtype_val, expected) in [(0u8, DType::F32), (1, DType::F16), (2, DType::BF16)] {
let key = make_key(dtype_val as u128, 0);
let mut meta = sample_meta(key);
meta.dtype = expected;
let decoded = decode_meta(&encode_meta(&meta)).unwrap();
assert_eq!(decoded.dtype, expected);
}
}
#[test]
fn encode_decode_all_tiers() {
for (tier_val, expected) in [
(0u8, Tier::Tier0),
(1, Tier::Tier1),
(2, Tier::Tier2),
(3, Tier::Tier3),
] {
let key = make_key(tier_val as u128, 0);
let mut meta = sample_meta(key);
meta.tier = expected;
let decoded = decode_meta(&encode_meta(&meta)).unwrap();
assert_eq!(decoded.tier, expected);
}
}
#[test]
fn encode_decode_all_reconstruct_policies() {
for (_, expected) in [
(0u8, ReconstructPolicy::None),
(1, ReconstructPolicy::Delta),
(2, ReconstructPolicy::Factor),
] {
let key = make_key(1, 0);
let mut meta = sample_meta(key);
meta.reconstruct = expected;
let decoded = decode_meta(&encode_meta(&meta)).unwrap();
assert_eq!(decoded.reconstruct, expected);
}
}
#[test]
fn decode_too_short() {
let result = decode_meta(&[0u8; RECORD_SIZE - 1]);
assert!(
matches!(result, Err(StoreError::InvalidData)),
"expected InvalidData, got {:?}",
result.err()
);
}
#[test]
fn decode_invalid_dtype() {
let key = make_key(1, 0);
let mut encoded = encode_meta(&sample_meta(key));
encoded[20] = 255; assert!(
matches!(decode_meta(&encoded), Err(StoreError::InvalidData)),
"expected InvalidData for bad dtype"
);
}
#[test]
fn decode_invalid_tier() {
let key = make_key(1, 0);
let mut encoded = encode_meta(&sample_meta(key));
encoded[21] = 99; assert!(
matches!(decode_meta(&encoded), Err(StoreError::InvalidData)),
"expected InvalidData for bad tier"
);
}
#[test]
fn decode_invalid_reconstruct() {
let key = make_key(1, 0);
let mut encoded = encode_meta(&sample_meta(key));
encoded[65] = 77; assert!(
matches!(decode_meta(&encoded), Err(StoreError::InvalidData)),
"expected InvalidData for bad reconstruct"
);
}
#[test]
fn file_block_io_write_read() {
let dir = test_dir("bio_wr");
let mut io = FileBlockIO::new(&dir).unwrap();
let key = make_key(0xABCD, 3);
let data = vec![1u8, 2, 3, 4, 5, 6, 7, 8];
io.write_block(Tier::Tier1, key, &data).unwrap();
let mut dst = vec![0u8; 16];
let n = io.read_block(Tier::Tier1, key, &mut dst).unwrap();
assert_eq!(n, 8);
assert_eq!(&dst[..8], &data);
cleanup(&dir);
}
#[test]
fn file_block_io_write_tier0_rejected() {
let dir = test_dir("bio_t0");
let mut io = FileBlockIO::new(&dir).unwrap();
let key = make_key(1, 0);
assert_eq!(
io.write_block(Tier::Tier0, key, &[1]),
Err(StoreError::InvalidBlock)
);
cleanup(&dir);
}
#[test]
fn file_block_io_read_not_found() {
let dir = test_dir("bio_nf");
let io = FileBlockIO::new(&dir).unwrap();
let key = make_key(99, 99);
let mut dst = vec![0u8; 4];
assert_eq!(
io.read_block(Tier::Tier2, key, &mut dst),
Err(StoreError::BlockNotFound)
);
cleanup(&dir);
}
#[test]
fn file_block_io_delete() {
let dir = test_dir("bio_del");
let mut io = FileBlockIO::new(&dir).unwrap();
let key = make_key(5, 0);
io.write_block(Tier::Tier2, key, &[10, 20, 30]).unwrap();
io.delete_block(Tier::Tier2, key).unwrap();
let mut dst = vec![0u8; 4];
assert_eq!(
io.read_block(Tier::Tier2, key, &mut dst),
Err(StoreError::BlockNotFound)
);
cleanup(&dir);
}
#[test]
fn file_block_io_delete_not_found() {
let dir = test_dir("bio_del_nf");
let mut io = FileBlockIO::new(&dir).unwrap();
let key = make_key(1, 0);
assert_eq!(
io.delete_block(Tier::Tier1, key),
Err(StoreError::BlockNotFound)
);
cleanup(&dir);
}
#[test]
fn file_block_io_overwrite() {
let dir = test_dir("bio_ow");
let mut io = FileBlockIO::new(&dir).unwrap();
let key = make_key(1, 0);
io.write_block(Tier::Tier1, key, &[1, 2, 3]).unwrap();
io.write_block(Tier::Tier1, key, &[4, 5, 6, 7]).unwrap();
let mut dst = vec![0u8; 8];
let n = io.read_block(Tier::Tier1, key, &mut dst).unwrap();
assert_eq!(n, 4);
assert_eq!(&dst[..4], &[4, 5, 6, 7]);
cleanup(&dir);
}
#[test]
fn file_block_io_multiple_tiers() {
let dir = test_dir("bio_mt");
let mut io = FileBlockIO::new(&dir).unwrap();
let key = make_key(1, 0);
io.write_block(Tier::Tier1, key, &[1]).unwrap();
io.write_block(Tier::Tier2, key, &[2]).unwrap();
io.write_block(Tier::Tier3, key, &[3]).unwrap();
let mut dst = [0u8; 1];
let n = io.read_block(Tier::Tier1, key, &mut dst).unwrap();
assert_eq!(n, 1);
assert_eq!(dst[0], 1);
let n = io.read_block(Tier::Tier2, key, &mut dst).unwrap();
assert_eq!(n, 1);
assert_eq!(dst[0], 2);
let n = io.read_block(Tier::Tier3, key, &mut dst).unwrap();
assert_eq!(n, 1);
assert_eq!(dst[0], 3);
cleanup(&dir);
}
#[test]
fn file_block_io_path_format() {
let dir = test_dir("bio_path");
let io = FileBlockIO::new(&dir).unwrap();
let key = make_key(0xFF, 42);
let path = io.block_path(Tier::Tier1, key);
let expected = dir
.join("tier1")
.join("000000000000000000000000000000ff_42.bin");
assert_eq!(path, expected);
cleanup(&dir);
}
#[test]
fn file_meta_log_append_get() {
let dir = test_dir("ml_ag");
let mut log = FileMetaLog::new(&dir).unwrap();
let key = make_key(1, 0);
let meta = sample_meta(key);
log.append(&meta).unwrap();
let retrieved = log.get(key).unwrap();
assert_eq!(retrieved.key, key);
assert_eq!(retrieved.created_at, 1000);
assert_eq!(log.len(), 1);
cleanup(&dir);
}
#[test]
fn file_meta_log_get_missing() {
let dir = test_dir("ml_miss");
let log = FileMetaLog::new(&dir).unwrap();
assert!(log.get(make_key(99, 0)).is_none());
cleanup(&dir);
}
#[test]
fn file_meta_log_upsert() {
let dir = test_dir("ml_ups");
let mut log = FileMetaLog::new(&dir).unwrap();
let key = make_key(1, 0);
let mut meta = sample_meta(key);
meta.access_count = 10;
log.append(&meta).unwrap();
meta.access_count = 20;
log.append(&meta).unwrap();
let retrieved = log.get(key).unwrap();
assert_eq!(retrieved.access_count, 20);
assert_eq!(log.len(), 1);
cleanup(&dir);
}
#[test]
fn file_meta_log_iter() {
let dir = test_dir("ml_iter");
let mut log = FileMetaLog::new(&dir).unwrap();
for i in 0..5u32 {
let key = make_key(i as u128, 0);
log.append(&sample_meta(key)).unwrap();
}
let entries: Vec<_> = log.iter().collect();
assert_eq!(entries.len(), 5);
cleanup(&dir);
}
#[test]
fn file_meta_log_persistence_across_opens() {
let dir = test_dir("ml_persist");
let key1 = make_key(1, 0);
let key2 = make_key(2, 5);
{
let mut log = FileMetaLog::new(&dir).unwrap();
log.append(&sample_meta(key1)).unwrap();
let mut meta2 = sample_meta(key2);
meta2.tier = Tier::Tier3;
meta2.bits = 3;
meta2.lineage_parent = Some(0x42);
log.append(&meta2).unwrap();
assert_eq!(log.len(), 2);
}
{
let log = FileMetaLog::new(&dir).unwrap();
assert_eq!(log.len(), 2);
let r1 = log.get(key1).unwrap();
assert_eq!(r1.tier, Tier::Tier1);
let r2 = log.get(key2).unwrap();
assert_eq!(r2.tier, Tier::Tier3);
assert_eq!(r2.lineage_parent, Some(0x42));
}
cleanup(&dir);
}
#[test]
fn file_meta_log_replay_last_wins() {
let dir = test_dir("ml_lw");
let key = make_key(1, 0);
{
let mut log = FileMetaLog::new(&dir).unwrap();
let mut meta = sample_meta(key);
meta.access_count = 100;
log.append(&meta).unwrap();
meta.access_count = 200;
log.append(&meta).unwrap();
}
{
let log = FileMetaLog::new(&dir).unwrap();
assert_eq!(log.len(), 1);
let retrieved = log.get(key).unwrap();
assert_eq!(retrieved.access_count, 200);
}
cleanup(&dir);
}
#[test]
fn file_meta_log_empty_on_fresh_dir() {
let dir = test_dir("ml_empty");
let log = FileMetaLog::new(&dir).unwrap();
assert!(log.is_empty());
assert_eq!(log.len(), 0);
assert_eq!(log.iter().count(), 0);
cleanup(&dir);
}
#[test]
fn integration_block_io_and_meta_log() {
let dir = test_dir("integ");
let mut io = FileBlockIO::new(&dir).unwrap();
let mut log = FileMetaLog::new(&dir).unwrap();
let key = make_key(0x1234, 0);
let block_data = vec![0xFFu8; 256];
io.write_block(Tier::Tier1, key, &block_data).unwrap();
let mut meta = sample_meta(key);
meta.block_bytes = 256;
log.append(&meta).unwrap();
let mut dst = vec![0u8; 512];
let n = io.read_block(Tier::Tier1, key, &mut dst).unwrap();
assert_eq!(n, 256);
assert!(dst[..256].iter().all(|&b| b == 0xFF));
let retrieved = log.get(key).unwrap();
assert_eq!(retrieved.block_bytes, 256);
cleanup(&dir);
}
#[test]
fn record_size_constant_matches() {
let meta = sample_meta(make_key(0, 0));
let encoded = encode_meta(&meta);
assert_eq!(encoded.len(), RECORD_SIZE);
}
}