pub mod aggregates;
pub mod bgwriter;
pub mod blob;
pub mod compressor;
pub mod extended_ttl;
pub mod promotion_pool;
pub mod result;
pub mod ring;
pub mod sieve;
pub mod spill;
pub mod strategy;
pub mod sweeper;
pub use aggregates::{AggCacheStats, AggValue, AggregationCache, CardinalityEstimate, NumericAgg};
pub use blob::{
BlobCache, BlobCacheConfig, BlobCacheHit, BlobCachePolicy, BlobCachePut, BlobCacheStats,
CacheError, L1Admission, L2Compression, DEFAULT_BLOB_L1_BYTES_MAX, DEFAULT_BLOB_L2_BYTES_MAX,
DEFAULT_BLOB_MAX_NAMESPACES, METRIC_CACHE_BLOB_L1_BYTES_IN_USE,
METRIC_CACHE_BLOB_L2_BYTES_IN_USE, METRIC_CACHE_BLOB_L2_FULL_REJECTIONS_TOTAL,
METRIC_CACHE_VERSION_MISMATCH_TOTAL,
};
pub use compressor::{CompressError, CompressOpts, Compressed, L2BlobCompressor};
pub use extended_ttl::{EffectiveExpiry, ExpiryDecision, ExtendedTtlPolicy};
pub use promotion_pool::{
AsyncPromotionPool, PoolOpts, PromotionExecutor, PromotionMetrics, PromotionRequest,
ScheduleOutcome,
};
pub use result::{
CacheKey, CachePolicy, MaterializedViewCache, MaterializedViewDef, RefreshPolicy, ResultCache,
ResultCacheStats,
};
pub use ring::BufferRing;
pub use sieve::{CacheConfig, CacheStats, PageCache, PageId};
pub use spill::{SpillConfig, SpillError, SpillManager, SpillStats, SpillableGraph};
pub use strategy::BufferAccessStrategy;
const L2_BACKUP_PAGER_SUFFIX: &str = "l2.pager";
const L2_BACKUP_CONTROL_SUFFIX: &str = "l2.ctl";
const L2_CONTROL_EXTENSION: &str = "blob-cache.ctl";
fn normalize_prefix(prefix: &str) -> String {
if prefix.is_empty() || prefix.ends_with('/') {
prefix.to_string()
} else {
format!("{prefix}/")
}
}
fn control_sidecar_for(l2_path: &std::path::Path) -> std::path::PathBuf {
l2_path.with_extension(L2_CONTROL_EXTENSION)
}
pub fn archive_blob_cache_l2(
backend: &dyn crate::storage::backend::RemoteBackend,
l2_path: &std::path::Path,
prefix: &str,
) -> Result<usize, crate::storage::backend::BackendError> {
let prefix = normalize_prefix(prefix);
let mut count = 0usize;
if l2_path.is_file() {
backend.upload(l2_path, &format!("{prefix}{L2_BACKUP_PAGER_SUFFIX}"))?;
count += 1;
}
let control = control_sidecar_for(l2_path);
if control.is_file() {
backend.upload(&control, &format!("{prefix}{L2_BACKUP_CONTROL_SUFFIX}"))?;
count += 1;
}
Ok(count)
}
pub fn restore_blob_cache_l2(
backend: &dyn crate::storage::backend::RemoteBackend,
prefix: &str,
l2_path: &std::path::Path,
) -> Result<usize, crate::storage::backend::BackendError> {
let prefix = normalize_prefix(prefix);
if let Some(parent) = l2_path.parent() {
if !parent.as_os_str().is_empty() {
std::fs::create_dir_all(parent)
.map_err(|err| crate::storage::backend::BackendError::Transport(err.to_string()))?;
}
}
let mut count = 0usize;
if backend.download(&format!("{prefix}{L2_BACKUP_PAGER_SUFFIX}"), l2_path)? {
count += 1;
}
let control = control_sidecar_for(l2_path);
if backend.download(&format!("{prefix}{L2_BACKUP_CONTROL_SUFFIX}"), &control)? {
count += 1;
}
Ok(count)
}
#[cfg(test)]
mod backup_helpers_tests {
use super::*;
use crate::storage::backend::LocalBackend;
use std::sync::atomic::{AtomicU64, Ordering};
fn write_file(path: &std::path::Path, bytes: &[u8]) {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent).unwrap();
}
std::fs::write(path, bytes).unwrap();
}
static SCRATCH_COUNTER: AtomicU64 = AtomicU64::new(0);
fn scratch(label: &str) -> std::path::PathBuf {
let pid = std::process::id();
let n = SCRATCH_COUNTER.fetch_add(1, Ordering::SeqCst);
let p = std::env::temp_dir().join(format!("reddb-blobcache-bk-{label}-{pid}-{n}"));
let _ = std::fs::remove_dir_all(&p);
std::fs::create_dir_all(&p).unwrap();
p
}
#[test]
fn archive_then_restore_round_trips_l2_pager_and_control_files() {
let scratch_dir = scratch("pair-src");
let l2_src = scratch_dir.join("cache.rdb");
write_file(&l2_src, b"pager-bytes-on-disk");
write_file(&control_sidecar_for(&l2_src), b"control-sidecar-bytes");
let backend_root = scratch("pair-be");
let prefix = format!("{}/blob_cache/", backend_root.display());
let uploaded =
archive_blob_cache_l2(&LocalBackend, &l2_src, &prefix).expect("archive succeeds");
assert_eq!(uploaded, 2, "pager + control sidecar uploaded");
let dst_dir = scratch("pair-dst");
let l2_dst = dst_dir.join("cache.rdb");
let downloaded =
restore_blob_cache_l2(&LocalBackend, &prefix, &l2_dst).expect("restore succeeds");
assert_eq!(downloaded, 2);
assert_eq!(std::fs::read(&l2_dst).unwrap(), b"pager-bytes-on-disk");
assert_eq!(
std::fs::read(control_sidecar_for(&l2_dst)).unwrap(),
b"control-sidecar-bytes"
);
let _ = std::fs::remove_dir_all(&scratch_dir);
let _ = std::fs::remove_dir_all(&backend_root);
let _ = std::fs::remove_dir_all(&dst_dir);
}
#[test]
fn archive_missing_l2_path_is_noop() {
let backend_root = scratch("be-missing");
let prefix = format!("{}/blob_cache/", backend_root.display());
let count = archive_blob_cache_l2(
&LocalBackend,
std::path::Path::new("/nonexistent/path/for/reddb-test.rdb"),
&prefix,
)
.expect("missing path treated as nothing to archive");
assert_eq!(count, 0);
let _ = std::fs::remove_dir_all(&backend_root);
}
#[test]
fn restore_with_no_objects_creates_empty_parent_dir() {
let backend_root = scratch("be-empty");
let prefix = format!("{}/blob_cache/", backend_root.display());
let dst_dir = scratch("dst-empty");
let l2_dst = dst_dir.join("cache.rdb");
let count =
restore_blob_cache_l2(&LocalBackend, &prefix, &l2_dst).expect("empty restore is ok");
assert_eq!(count, 0);
let _ = std::fs::remove_dir_all(&backend_root);
let _ = std::fs::remove_dir_all(&dst_dir);
}
#[test]
fn full_round_trip_via_blob_cache_preserves_entries_after_restore() {
use crate::storage::cache::blob::{BlobCache, BlobCacheConfig, BlobCachePut};
let src_dir = scratch("rt-src");
let dst_dir = scratch("rt-dst");
let backend_root = scratch("rt-be");
let l2_src = src_dir.join("blob-cache.rdb");
let l2_dst = dst_dir.join("blob-cache.rdb");
let prefix = format!("{}/blob_cache/", backend_root.display());
{
let cache = BlobCache::open_with_l2(
BlobCacheConfig::default()
.with_l1_bytes_max(64 * 1024)
.with_shard_count(2)
.with_max_namespaces(8)
.with_l2_path(&l2_src),
)
.expect("open l2 src");
cache
.put("ns-a", "k1", BlobCachePut::new(b"value-1".to_vec()))
.expect("put k1");
cache
.put(
"ns-b",
"k2",
BlobCachePut::new(b"value-2-longer-payload".to_vec()),
)
.expect("put k2");
assert_eq!(cache.l2_path(), Some(l2_src.as_path()));
}
let uploaded = archive_blob_cache_l2(&LocalBackend, &l2_src, &prefix).expect("archive l2");
assert_eq!(uploaded, 2, "pager + control uploaded");
let restored = restore_blob_cache_l2(&LocalBackend, &prefix, &l2_dst).expect("restore l2");
assert_eq!(restored, 2, "pager + control downloaded");
let restored_cache = BlobCache::open_with_l2(
BlobCacheConfig::default()
.with_l1_bytes_max(64 * 1024)
.with_shard_count(2)
.with_max_namespaces(8)
.with_l2_path(&l2_dst),
)
.expect("open l2 dst");
let hit_a = restored_cache
.get("ns-a", "k1")
.expect("k1 survives restore");
assert_eq!(hit_a.value(), b"value-1");
let hit_b = restored_cache
.get("ns-b", "k2")
.expect("k2 survives restore");
assert_eq!(hit_b.value(), b"value-2-longer-payload");
let _ = std::fs::remove_dir_all(&src_dir);
let _ = std::fs::remove_dir_all(&dst_dir);
let _ = std::fs::remove_dir_all(&backend_root);
}
#[test]
fn skipped_archive_leaves_restored_cache_cold() {
use crate::storage::cache::blob::{BlobCache, BlobCacheConfig, BlobCachePut};
let src_dir = scratch("cold-src");
let dst_dir = scratch("cold-dst");
let l2_src = src_dir.join("blob-cache.rdb");
let l2_dst = dst_dir.join("blob-cache.rdb");
{
let cache = BlobCache::open_with_l2(BlobCacheConfig::default().with_l2_path(&l2_src))
.expect("open l2 src");
cache
.put("ns", "k", BlobCachePut::new(b"value".to_vec()))
.expect("put k");
}
let cold_cache = BlobCache::open_with_l2(BlobCacheConfig::default().with_l2_path(&l2_dst))
.expect("open l2 dst");
assert!(
cold_cache.get("ns", "k").is_none(),
"restore without include_blob_cache must yield a cold cache"
);
let _ = std::fs::remove_dir_all(&src_dir);
let _ = std::fs::remove_dir_all(&dst_dir);
}
}