use crate::core::error::Result;
use crate::core::id::VersionId;
use crate::core::version::{EdgeVersion, EntityVersion, NodeVersion};
use crate::storage::redb_cold_storage::{ColdStorageStats, RedbColdStorage};
use parking_lot::Mutex;
use quick_cache::sync::Cache;
use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
#[cfg(feature = "config-toml")]
use serde::{Deserialize, Serialize};
const LATENCY_SAMPLE_SIZE: usize = 1000;
#[derive(Debug, Clone)]
#[cfg_attr(feature = "config-toml", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "config-toml", serde(default))]
pub struct TieredStorageConfig {
pub warm_cache_size: usize,
pub enable_prefetch: bool,
pub prefetch_depth: usize,
}
impl Default for TieredStorageConfig {
fn default() -> Self {
Self {
warm_cache_size: 10_000,
enable_prefetch: true,
prefetch_depth: 5,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct TieredStorageMetrics {
pub hot_hits: u64,
pub warm_hits: u64,
pub cold_hits: u64,
pub misses: u64,
pub prefetches: u64,
pub cold_latency: LatencyPercentiles,
}
#[derive(Debug, Clone, Default)]
pub struct LatencyPercentiles {
pub p50_us: u64,
pub p95_us: u64,
pub p99_us: u64,
pub min_us: u64,
pub max_us: u64,
pub avg_us: u64,
pub sample_count: usize,
}
impl LatencyPercentiles {
pub fn meets_target(&self) -> bool {
self.p50_us < 1000 }
}
impl TieredStorageMetrics {
pub fn hot_ratio(&self) -> f64 {
let total = self.hot_hits + self.warm_hits + self.cold_hits;
if total == 0 {
0.0
} else {
self.hot_hits as f64 / total as f64
}
}
pub fn warm_ratio(&self) -> f64 {
let cache_requests = self.warm_hits + self.cold_hits;
if cache_requests == 0 {
0.0
} else {
self.warm_hits as f64 / cache_requests as f64
}
}
pub fn cache_hit_ratio(&self) -> f64 {
let total = self.hot_hits + self.warm_hits + self.cold_hits + self.misses;
if total == 0 {
0.0
} else {
(self.hot_hits + self.warm_hits) as f64 / total as f64
}
}
}
#[derive(Debug)]
struct LatencyTracker {
samples: Mutex<VecDeque<u64>>,
max_samples: usize,
}
impl Default for LatencyTracker {
fn default() -> Self {
Self::new(LATENCY_SAMPLE_SIZE)
}
}
impl LatencyTracker {
fn new(max_samples: usize) -> Self {
Self {
samples: Mutex::new(VecDeque::with_capacity(max_samples)),
max_samples,
}
}
fn record(&self, duration: Duration) {
if self.max_samples == 0 {
return;
}
let us = duration.as_micros() as u64;
let mut samples = self.samples.lock();
if samples.len() >= self.max_samples {
samples.pop_front();
}
samples.push_back(us);
}
fn percentiles(&self) -> LatencyPercentiles {
let samples = self.samples.lock();
if samples.is_empty() {
return LatencyPercentiles::default();
}
let mut sorted: Vec<u64> = samples.iter().copied().collect();
sorted.sort_unstable();
let len = sorted.len();
let p50_idx = (len as f64 * 0.50) as usize;
let p95_idx = (len as f64 * 0.95) as usize;
let p99_idx = (len as f64 * 0.99) as usize;
let sum: u64 = sorted.iter().sum();
let avg = sum / len as u64;
LatencyPercentiles {
p50_us: sorted.get(p50_idx).copied().unwrap_or(0),
p95_us: sorted.get(p95_idx.min(len - 1)).copied().unwrap_or(0),
p99_us: sorted.get(p99_idx.min(len - 1)).copied().unwrap_or(0),
min_us: sorted.first().copied().unwrap_or(0),
max_us: sorted.last().copied().unwrap_or(0),
avg_us: avg,
sample_count: len,
}
}
}
#[derive(Debug)]
struct AtomicTieredMetrics {
hot_hits: AtomicU64,
warm_hits: AtomicU64,
cold_hits: AtomicU64,
misses: AtomicU64,
prefetches: AtomicU64,
cold_latency: LatencyTracker,
}
impl Default for AtomicTieredMetrics {
fn default() -> Self {
Self {
hot_hits: AtomicU64::new(0),
warm_hits: AtomicU64::new(0),
cold_hits: AtomicU64::new(0),
misses: AtomicU64::new(0),
prefetches: AtomicU64::new(0),
cold_latency: LatencyTracker::default(),
}
}
}
impl AtomicTieredMetrics {
fn new() -> Self {
Self::default()
}
fn snapshot(&self) -> TieredStorageMetrics {
TieredStorageMetrics {
hot_hits: self.hot_hits.load(Ordering::Relaxed),
warm_hits: self.warm_hits.load(Ordering::Relaxed),
cold_hits: self.cold_hits.load(Ordering::Relaxed),
misses: self.misses.load(Ordering::Relaxed),
prefetches: self.prefetches.load(Ordering::Relaxed),
cold_latency: self.cold_latency.percentiles(),
}
}
}
pub struct TieredStorage {
config: TieredStorageConfig,
cold: Arc<RedbColdStorage>,
node_warm_cache: Cache<VersionId, Arc<NodeVersion>>,
edge_warm_cache: Cache<VersionId, Arc<EdgeVersion>>,
metrics: AtomicTieredMetrics,
}
impl TieredStorage {
pub fn new(config: TieredStorageConfig, cold: Arc<RedbColdStorage>) -> Self {
let warm_cache_size = config.warm_cache_size;
Self {
config,
cold,
node_warm_cache: Cache::new(warm_cache_size),
edge_warm_cache: Cache::new(warm_cache_size),
metrics: AtomicTieredMetrics::new(),
}
}
pub fn with_default_config(cold: Arc<RedbColdStorage>) -> Self {
Self::new(TieredStorageConfig::default(), cold)
}
pub fn cold_storage(&self) -> &RedbColdStorage {
&self.cold
}
pub fn record_hot_hit(&self) {
self.metrics.hot_hits.fetch_add(1, Ordering::Relaxed);
}
fn get_version_through_cache<V, F>(
&self,
id: VersionId,
cache: &Cache<VersionId, Arc<V>>,
fetch_fn: F,
) -> Result<Option<Arc<V>>>
where
V: EntityVersion + 'static,
F: Fn(VersionId) -> Result<Option<V>>,
{
if let Some(cached) = cache.get(&id) {
self.metrics.warm_hits.fetch_add(1, Ordering::Relaxed);
return Ok(Some(cached));
}
let start = Instant::now();
let result = fetch_fn(id)?;
let elapsed = start.elapsed();
match result {
Some(version) => {
self.metrics.cold_hits.fetch_add(1, Ordering::Relaxed);
self.metrics.cold_latency.record(elapsed);
let version_arc = Arc::new(version);
cache.insert(id, version_arc.clone());
if self.config.enable_prefetch {
self.prefetch_chain(&*version_arc, cache, &fetch_fn);
}
Ok(Some(version_arc))
}
None => {
self.metrics.misses.fetch_add(1, Ordering::Relaxed);
Ok(None)
}
}
}
pub fn get_node_version_cold(&self, id: VersionId) -> Result<Option<Arc<NodeVersion>>> {
self.get_version_through_cache(id, &self.node_warm_cache, |id| {
self.cold.get_node_version(id)
})
}
pub fn get_edge_version_cold(&self, id: VersionId) -> Result<Option<Arc<EdgeVersion>>> {
self.get_version_through_cache(id, &self.edge_warm_cache, |id| {
self.cold.get_edge_version(id)
})
}
fn prefetch_chain<V, F>(&self, start: &V, cache: &Cache<VersionId, Arc<V>>, fetch_fn: &F)
where
V: EntityVersion + 'static,
F: Fn(VersionId) -> Result<Option<V>>,
{
let mut current_prev = start.prev_version();
let mut depth = 0;
while let Some(prev_id) = current_prev {
if depth >= self.config.prefetch_depth {
break;
}
if cache.get(&prev_id).is_some() {
break;
}
match fetch_fn(prev_id) {
Ok(Some(version)) => {
self.metrics.prefetches.fetch_add(1, Ordering::Relaxed);
current_prev = version.prev_version();
cache.insert(prev_id, Arc::new(version));
depth += 1;
}
_ => break,
}
}
}
pub fn store_node_version(&self, version: &NodeVersion) -> Result<()> {
self.cold.store_node_version(version)
}
pub fn store_edge_version(&self, version: &EdgeVersion) -> Result<()> {
self.cold.store_edge_version(version)
}
pub fn store_node_versions_batch(&self, versions: &[NodeVersion]) -> Result<()> {
self.cold.store_node_versions_batch(versions)
}
pub fn store_edge_versions_batch(&self, versions: &[EdgeVersion]) -> Result<()> {
self.cold.store_edge_versions_batch(versions)
}
pub fn contains_node_version(&self, id: VersionId) -> Result<bool> {
if self.node_warm_cache.get(&id).is_some() {
return Ok(true);
}
self.cold.contains_node_version(id)
}
pub fn contains_edge_version(&self, id: VersionId) -> Result<bool> {
if self.edge_warm_cache.get(&id).is_some() {
return Ok(true);
}
self.cold.contains_edge_version(id)
}
pub fn metrics(&self) -> TieredStorageMetrics {
self.metrics.snapshot()
}
pub fn cold_stats(&self) -> ColdStorageStats {
self.cold.stats()
}
pub fn flush(&self) -> Result<()> {
self.cold.flush()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::id::{EdgeId, NodeId};
use crate::core::interning::GLOBAL_INTERNER;
use crate::core::property::PropertyMapBuilder;
use crate::core::temporal::BiTemporalInterval;
use crate::storage::redb_cold_storage::RedbColdStorage;
fn create_test_node_version(id: u64) -> NodeVersion {
let properties = PropertyMapBuilder::new()
.insert("name", "Alice")
.insert("age", 30i64)
.build();
NodeVersion::new_anchor(
VersionId::new(id).unwrap(),
NodeId::new(100).unwrap(),
BiTemporalInterval::current(1000.into()),
GLOBAL_INTERNER.intern("Person").unwrap(),
properties,
)
}
fn create_test_edge_version(id: u64) -> EdgeVersion {
let properties = PropertyMapBuilder::new().insert("weight", 1.0f64).build();
EdgeVersion::new_anchor(
VersionId::new(id).unwrap(),
EdgeId::new(200).unwrap(),
BiTemporalInterval::current(1000.into()),
GLOBAL_INTERNER.intern("KNOWS").unwrap(),
NodeId::new(100).unwrap(),
NodeId::new(101).unwrap(),
properties,
)
}
fn create_test_version_chain() -> Vec<NodeVersion> {
let mut versions = Vec::new();
let v1 = {
let props = PropertyMapBuilder::new().insert("step", 1i64).build();
NodeVersion::new_anchor(
VersionId::new(1).unwrap(),
NodeId::new(100).unwrap(),
BiTemporalInterval::current(1000.into()),
GLOBAL_INTERNER.intern("Test").unwrap(),
props,
)
};
versions.push(v1);
for i in 2..=5 {
let old_props = PropertyMapBuilder::new()
.insert("step", (i - 1) as i64)
.build();
let new_props = PropertyMapBuilder::new().insert("step", i as i64).build();
let v = NodeVersion::new_delta(
VersionId::new(i).unwrap(),
NodeId::new(100).unwrap(),
BiTemporalInterval::current(((1000 + i * 100) as i64).into()),
GLOBAL_INTERNER.intern("Test").unwrap(),
&old_props,
&new_props,
VersionId::new(i - 1).unwrap(),
);
versions.push(v);
}
versions
}
#[test]
fn test_tiered_storage_cold_lookup() {
let temp_dir = tempfile::tempdir().unwrap();
let db_path = temp_dir.path().join("test.redb");
let cold = RedbColdStorage::with_default_config(&db_path).unwrap();
let tiered = TieredStorage::with_default_config(Arc::new(cold));
let version = create_test_node_version(1);
tiered.store_node_version(&version).unwrap();
let retrieved = tiered.get_node_version_cold(version.id).unwrap().unwrap();
assert_eq!(retrieved.id, version.id);
let metrics = tiered.metrics();
assert_eq!(metrics.cold_hits, 1);
assert_eq!(metrics.warm_hits, 0);
}
#[test]
fn test_tiered_storage_edge_cold_lookup() {
let temp_dir = tempfile::tempdir().unwrap();
let db_path = temp_dir.path().join("test.redb");
let cold = RedbColdStorage::with_default_config(&db_path).unwrap();
let tiered = TieredStorage::with_default_config(Arc::new(cold));
let version = create_test_edge_version(1);
tiered.store_edge_version(&version).unwrap();
let retrieved = tiered.get_edge_version_cold(version.id).unwrap().unwrap();
assert_eq!(retrieved.id, version.id);
let metrics = tiered.metrics();
assert_eq!(metrics.cold_hits, 1);
assert_eq!(metrics.warm_hits, 0);
}
#[test]
fn test_tiered_storage_edge_warm_cache() {
let temp_dir = tempfile::tempdir().unwrap();
let db_path = temp_dir.path().join("test.redb");
let cold = RedbColdStorage::with_default_config(&db_path).unwrap();
let tiered = TieredStorage::with_default_config(Arc::new(cold));
let version = create_test_edge_version(1);
tiered.store_edge_version(&version).unwrap();
let _ = tiered.get_edge_version_cold(version.id).unwrap();
let retrieved = tiered.get_edge_version_cold(version.id).unwrap().unwrap();
assert_eq!(retrieved.id, version.id);
let metrics = tiered.metrics();
assert_eq!(metrics.cold_hits, 1);
assert_eq!(metrics.warm_hits, 1);
}
#[test]
fn test_tiered_storage_warm_cache() {
let temp_dir = tempfile::tempdir().unwrap();
let db_path = temp_dir.path().join("test.redb");
let cold = RedbColdStorage::with_default_config(&db_path).unwrap();
let tiered = TieredStorage::with_default_config(Arc::new(cold));
let version = create_test_node_version(1);
tiered.store_node_version(&version).unwrap();
let _ = tiered.get_node_version_cold(version.id).unwrap();
let retrieved = tiered.get_node_version_cold(version.id).unwrap().unwrap();
assert_eq!(retrieved.id, version.id);
let metrics = tiered.metrics();
assert_eq!(metrics.cold_hits, 1);
assert_eq!(metrics.warm_hits, 1);
}
#[test]
fn test_tiered_storage_miss() {
let temp_dir = tempfile::tempdir().unwrap();
let db_path = temp_dir.path().join("test.redb");
let cold = RedbColdStorage::with_default_config(&db_path).unwrap();
let tiered = TieredStorage::with_default_config(Arc::new(cold));
let result = tiered
.get_node_version_cold(VersionId::new(999).unwrap())
.unwrap();
assert!(result.is_none());
let metrics = tiered.metrics();
assert_eq!(metrics.misses, 1);
}
#[test]
fn test_tiered_storage_prefetch() {
let temp_dir = tempfile::tempdir().unwrap();
let db_path = temp_dir.path().join("test.redb");
let cold = RedbColdStorage::with_default_config(&db_path).unwrap();
let config = TieredStorageConfig {
warm_cache_size: 100,
enable_prefetch: true,
prefetch_depth: 3,
};
let tiered = TieredStorage::new(config, Arc::new(cold));
let versions = create_test_version_chain();
for v in &versions {
tiered.store_node_version(v).unwrap();
}
let _ = tiered
.get_node_version_cold(VersionId::new(5).unwrap())
.unwrap();
let metrics = tiered.metrics();
assert!(metrics.prefetches > 0);
assert!(metrics.prefetches <= 3); }
#[test]
fn test_tiered_storage_no_prefetch() {
let temp_dir = tempfile::tempdir().unwrap();
let db_path = temp_dir.path().join("test.redb");
let cold = RedbColdStorage::with_default_config(&db_path).unwrap();
let config = TieredStorageConfig {
warm_cache_size: 100,
enable_prefetch: false,
prefetch_depth: 3,
};
let tiered = TieredStorage::new(config, Arc::new(cold));
let versions = create_test_version_chain();
for v in &versions {
tiered.store_node_version(v).unwrap();
}
let _ = tiered
.get_node_version_cold(VersionId::new(5).unwrap())
.unwrap();
let metrics = tiered.metrics();
assert_eq!(metrics.prefetches, 0);
}
#[test]
fn test_tiered_storage_hot_hit_recording() {
let temp_dir = tempfile::tempdir().unwrap();
let db_path = temp_dir.path().join("test.redb");
let cold = RedbColdStorage::with_default_config(&db_path).unwrap();
let tiered = TieredStorage::with_default_config(Arc::new(cold));
tiered.record_hot_hit();
tiered.record_hot_hit();
tiered.record_hot_hit();
let metrics = tiered.metrics();
assert_eq!(metrics.hot_hits, 3);
}
#[test]
fn test_metrics_ratios() {
let metrics = TieredStorageMetrics {
hot_hits: 70,
warm_hits: 20,
cold_hits: 10,
misses: 0,
prefetches: 5,
cold_latency: LatencyPercentiles::default(),
};
assert!((metrics.hot_ratio() - 0.7).abs() < 0.001);
assert!((metrics.warm_ratio() - 0.666).abs() < 0.01); assert!((metrics.cache_hit_ratio() - 0.9).abs() < 0.001); }
#[test]
fn test_batch_store() {
let temp_dir = tempfile::tempdir().unwrap();
let db_path = temp_dir.path().join("test.redb");
let cold = RedbColdStorage::with_default_config(&db_path).unwrap();
let tiered = TieredStorage::with_default_config(Arc::new(cold));
let versions: Vec<NodeVersion> = (1..=10).map(create_test_node_version).collect();
tiered.store_node_versions_batch(&versions).unwrap();
for v in &versions {
assert!(tiered.contains_node_version(v.id).unwrap());
}
}
#[test]
fn test_default_config() {
let config = TieredStorageConfig::default();
assert_eq!(config.warm_cache_size, 10_000);
assert!(config.enable_prefetch);
assert_eq!(config.prefetch_depth, 5);
}
#[test]
fn test_latency_tracker_empty() {
let tracker = LatencyTracker::new(100);
let percentiles = tracker.percentiles();
assert_eq!(percentiles.sample_count, 0);
assert_eq!(percentiles.p50_us, 0);
assert_eq!(percentiles.p95_us, 0);
assert_eq!(percentiles.p99_us, 0);
}
#[test]
fn test_latency_tracker_zero_max_samples() {
let tracker = LatencyTracker::new(0);
tracker.record(Duration::from_micros(500));
let percentiles = tracker.percentiles();
assert_eq!(percentiles.sample_count, 0);
assert_eq!(percentiles.p50_us, 0);
}
#[test]
fn test_latency_percentiles_even_samples() {
let tracker = LatencyTracker::new(10);
tracker.record(Duration::from_micros(10));
tracker.record(Duration::from_micros(20));
tracker.record(Duration::from_micros(30));
tracker.record(Duration::from_micros(40));
let percentiles = tracker.percentiles();
assert_eq!(percentiles.sample_count, 4);
assert_eq!(percentiles.p50_us, 30);
}
#[test]
fn test_latency_tracker_single_sample() {
let tracker = LatencyTracker::new(100);
tracker.record(Duration::from_micros(500));
let percentiles = tracker.percentiles();
assert_eq!(percentiles.sample_count, 1);
assert_eq!(percentiles.p50_us, 500);
assert_eq!(percentiles.min_us, 500);
assert_eq!(percentiles.max_us, 500);
}
#[test]
fn test_latency_tracker_percentiles() {
let tracker = LatencyTracker::new(100);
for i in 1..=100 {
tracker.record(Duration::from_micros(i));
}
let percentiles = tracker.percentiles();
assert_eq!(percentiles.sample_count, 100);
assert_eq!(percentiles.min_us, 1);
assert_eq!(percentiles.max_us, 100);
assert_eq!(percentiles.p50_us, 51);
assert_eq!(percentiles.p95_us, 96);
assert_eq!(percentiles.p99_us, 100);
assert_eq!(percentiles.avg_us, 50);
}
#[test]
fn test_latency_tracker_circular_buffer() {
let tracker = LatencyTracker::new(10);
for i in 1..=20 {
tracker.record(Duration::from_micros(i));
}
let percentiles = tracker.percentiles();
assert_eq!(percentiles.sample_count, 10);
assert_eq!(percentiles.min_us, 11); assert_eq!(percentiles.max_us, 20);
}
#[test]
fn test_latency_percentiles_meets_target() {
let good = LatencyPercentiles {
p50_us: 500,
p95_us: 900,
p99_us: 950,
min_us: 100,
max_us: 1000,
avg_us: 500,
sample_count: 100,
};
assert!(good.meets_target());
let bad = LatencyPercentiles {
p50_us: 1500, p95_us: 2000,
p99_us: 3000,
min_us: 500,
max_us: 5000,
avg_us: 1500,
sample_count: 100,
};
assert!(!bad.meets_target());
}
#[test]
fn test_cold_latency_tracking() {
let temp_dir = tempfile::tempdir().unwrap();
let db_path = temp_dir.path().join("test.redb");
let cold = RedbColdStorage::with_default_config(&db_path).unwrap();
let tiered = TieredStorage::with_default_config(Arc::new(cold));
for i in 1..=10 {
let version = create_test_node_version(i);
tiered.store_node_version(&version).unwrap();
}
for i in 1..=10 {
let _ = tiered
.get_node_version_cold(VersionId::new(i).unwrap())
.unwrap();
}
let metrics = tiered.metrics();
assert_eq!(metrics.cold_hits, 10);
assert!(metrics.cold_latency.sample_count > 0);
}
#[test]
fn test_concurrent_access() {
use std::sync::Arc;
use std::thread;
let temp_dir = tempfile::tempdir().unwrap();
let db_path = temp_dir.path().join("test.redb");
let cold = RedbColdStorage::with_default_config(&db_path).unwrap();
let tiered = Arc::new(TieredStorage::with_default_config(Arc::new(cold)));
let version = create_test_node_version(1);
tiered.store_node_version(&version).unwrap();
let mut handles = vec![];
for _ in 0..4 {
let tiered_clone = Arc::clone(&tiered);
let handle = thread::spawn(move || {
for _ in 0..50 {
let _ = tiered_clone
.get_node_version_cold(VersionId::new(1).unwrap())
.unwrap();
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
let metrics = tiered.metrics();
assert_eq!(metrics.cold_hits + metrics.warm_hits, 200);
assert!(metrics.cold_hits >= 1);
assert!(metrics.cold_latency.sample_count > 0);
}
#[test]
fn test_prefetch_limits() {
let temp_dir = tempfile::tempdir().unwrap();
let db_path = temp_dir.path().join("test.redb");
let cold = RedbColdStorage::with_default_config(&db_path).unwrap();
let mut versions = Vec::new();
let v1 = {
let props = PropertyMapBuilder::new().insert("step", 1i64).build();
NodeVersion::new_anchor(
VersionId::new(1).unwrap(),
NodeId::new(100).unwrap(),
BiTemporalInterval::current(1000.into()),
GLOBAL_INTERNER.intern("Test").unwrap(),
props,
)
};
versions.push(v1);
for i in 2..=10 {
let old_props = PropertyMapBuilder::new()
.insert("step", (i - 1) as i64)
.build();
let new_props = PropertyMapBuilder::new().insert("step", i as i64).build();
let v = NodeVersion::new_delta(
VersionId::new(i).unwrap(),
NodeId::new(100).unwrap(),
BiTemporalInterval::current(((1000 + i * 100) as i64).into()),
GLOBAL_INTERNER.intern("Test").unwrap(),
&old_props,
&new_props,
VersionId::new(i - 1).unwrap(),
);
versions.push(v);
}
let config = TieredStorageConfig {
warm_cache_size: 100,
enable_prefetch: true,
prefetch_depth: 3,
};
let tiered = TieredStorage::new(config, Arc::new(cold));
for v in &versions {
tiered.store_node_version(v).unwrap();
}
let _ = tiered
.get_node_version_cold(VersionId::new(10).unwrap())
.unwrap();
let metrics = tiered.metrics();
assert_eq!(metrics.prefetches, 3);
let _ = tiered
.get_node_version_cold(VersionId::new(9).unwrap())
.unwrap();
assert_eq!(tiered.metrics().warm_hits, 1);
let _ = tiered
.get_node_version_cold(VersionId::new(7).unwrap())
.unwrap();
assert_eq!(tiered.metrics().warm_hits, 2);
let _ = tiered
.get_node_version_cold(VersionId::new(6).unwrap())
.unwrap();
let final_metrics = tiered.metrics();
assert_eq!(final_metrics.warm_hits, 2); assert_eq!(final_metrics.cold_hits, 2);
}
#[test]
fn test_cache_eviction() {
let temp_dir = tempfile::tempdir().unwrap();
let db_path = temp_dir.path().join("test.redb");
let cold = RedbColdStorage::with_default_config(&db_path).unwrap();
let config = TieredStorageConfig {
warm_cache_size: 5,
enable_prefetch: false,
prefetch_depth: 0,
};
let tiered = TieredStorage::new(config, Arc::new(cold));
for i in 1..=10 {
let version = create_test_node_version(i);
tiered.store_node_version(&version).unwrap();
}
for i in 1..=10 {
tiered
.get_node_version_cold(VersionId::new(i).unwrap())
.unwrap();
}
let metrics_before = tiered.metrics();
for i in 1..=10 {
tiered
.get_node_version_cold(VersionId::new(i).unwrap())
.unwrap();
}
let metrics_after = tiered.metrics();
let warm_hits = metrics_after.warm_hits - metrics_before.warm_hits;
let cold_hits = metrics_after.cold_hits - metrics_before.cold_hits;
assert!(
warm_hits <= 5,
"Expected at most 5 warm hits, got {}",
warm_hits
);
assert!(
cold_hits >= 5,
"Expected at least 5 cold hits, got {}",
cold_hits
);
}
#[test]
fn test_error_propagation_writes() {
let temp_dir = tempfile::tempdir().unwrap();
let db_path = temp_dir.path().join("test.redb");
let cold = RedbColdStorage::with_default_config(&db_path).unwrap();
let tiered = TieredStorage::with_default_config(Arc::new(cold));
tiered.cold_storage().set_fail_writes(true);
let version = create_test_node_version(1);
let result = tiered.store_node_version(&version);
assert!(result.is_err());
let err_msg = format!("{:?}", result.unwrap_err());
assert!(err_msg.contains("Simulated write failure"));
let result = tiered.store_node_versions_batch(&[version]);
assert!(result.is_err());
}
#[test]
fn test_prefetch_latency_metric_behavior() {
let temp_dir = tempfile::tempdir().unwrap();
let db_path = temp_dir.path().join("test.redb");
let cold = RedbColdStorage::with_default_config(&db_path).unwrap();
let config = TieredStorageConfig {
warm_cache_size: 100,
enable_prefetch: true,
prefetch_depth: 3,
};
let tiered = TieredStorage::new(config, Arc::new(cold));
let mut versions = Vec::new();
let v1 = create_test_node_version(1);
versions.push(v1);
for i in 2..=4 {
let v = NodeVersion::new_delta(
VersionId::new(i).unwrap(),
NodeId::new(100).unwrap(),
BiTemporalInterval::current(((1000 + i * 100) as i64).into()),
GLOBAL_INTERNER.intern("Test").unwrap(),
&PropertyMapBuilder::new().build(),
&PropertyMapBuilder::new().build(),
VersionId::new(i - 1).unwrap(),
);
versions.push(v);
}
for v in &versions {
tiered.store_node_version(v).unwrap();
}
let _ = tiered
.get_node_version_cold(VersionId::new(4).unwrap())
.unwrap();
let metrics = tiered.metrics();
assert_eq!(
metrics.cold_latency.sample_count, 1,
"Should record latency only for the requested item, not prefetches"
);
assert_eq!(metrics.prefetches, 3, "Should have prefetched 3 items");
for i in 1..=4 {
assert!(
tiered
.get_node_version_cold(VersionId::new(i).unwrap())
.unwrap()
.is_some()
);
}
let final_metrics = tiered.metrics();
assert_eq!(final_metrics.cold_hits, 1);
assert_eq!(final_metrics.warm_hits, 4);
}
}