reddb_server/storage/cache/
mod.rs1pub mod aggregates;
30pub mod bgwriter;
31pub mod blob;
32pub mod compressor;
33pub mod extended_ttl;
34pub mod promotion_pool;
35pub mod result;
36pub mod ring;
37pub mod sieve;
38pub mod spill;
39pub mod strategy;
40pub mod sweeper;
41
42pub use aggregates::{AggCacheStats, AggValue, AggregationCache, CardinalityEstimate, NumericAgg};
43pub use blob::{
44 BlobCache, BlobCacheConfig, BlobCacheHit, BlobCachePolicy, BlobCachePut, BlobCacheStats,
45 CacheError, L1Admission, L2Compression, DEFAULT_BLOB_L1_BYTES_MAX, DEFAULT_BLOB_L2_BYTES_MAX,
46 DEFAULT_BLOB_MAX_NAMESPACES, METRIC_CACHE_BLOB_L1_BYTES_IN_USE,
47 METRIC_CACHE_BLOB_L2_BYTES_IN_USE, METRIC_CACHE_BLOB_L2_FULL_REJECTIONS_TOTAL,
48 METRIC_CACHE_VERSION_MISMATCH_TOTAL,
49};
50pub use compressor::{CompressError, CompressOpts, Compressed, L2BlobCompressor};
51pub use extended_ttl::{EffectiveExpiry, ExpiryDecision, ExtendedTtlPolicy};
52pub use promotion_pool::{
53 AsyncPromotionPool, PoolOpts, PromotionExecutor, PromotionMetrics, PromotionRequest,
54 ScheduleOutcome,
55};
56pub use result::{
57 CacheKey, CachePolicy, MaterializedViewCache, MaterializedViewDef, RefreshPolicy, ResultCache,
58 ResultCacheStats,
59};
60pub use ring::BufferRing;
61pub use sieve::{CacheConfig, CacheStats, PageCache, PageId};
62pub use spill::{SpillConfig, SpillError, SpillManager, SpillStats, SpillableGraph};
63pub use strategy::BufferAccessStrategy;
64
65const L2_BACKUP_PAGER_SUFFIX: &str = "l2.pager";
95const L2_BACKUP_CONTROL_SUFFIX: &str = "l2.ctl";
96const L2_CONTROL_EXTENSION: &str = "blob-cache.ctl";
97
98fn normalize_prefix(prefix: &str) -> String {
99 if prefix.is_empty() || prefix.ends_with('/') {
100 prefix.to_string()
101 } else {
102 format!("{prefix}/")
103 }
104}
105
106fn control_sidecar_for(l2_path: &std::path::Path) -> std::path::PathBuf {
107 l2_path.with_extension(L2_CONTROL_EXTENSION)
108}
109
110pub fn archive_blob_cache_l2(
117 backend: &dyn crate::storage::backend::RemoteBackend,
118 l2_path: &std::path::Path,
119 prefix: &str,
120) -> Result<usize, crate::storage::backend::BackendError> {
121 let prefix = normalize_prefix(prefix);
122 let mut count = 0usize;
123 if l2_path.is_file() {
124 backend.upload(l2_path, &format!("{prefix}{L2_BACKUP_PAGER_SUFFIX}"))?;
125 count += 1;
126 }
127 let control = control_sidecar_for(l2_path);
128 if control.is_file() {
129 backend.upload(&control, &format!("{prefix}{L2_BACKUP_CONTROL_SUFFIX}"))?;
130 count += 1;
131 }
132 Ok(count)
133}
134
135pub fn restore_blob_cache_l2(
145 backend: &dyn crate::storage::backend::RemoteBackend,
146 prefix: &str,
147 l2_path: &std::path::Path,
148) -> Result<usize, crate::storage::backend::BackendError> {
149 let prefix = normalize_prefix(prefix);
150 if let Some(parent) = l2_path.parent() {
151 if !parent.as_os_str().is_empty() {
152 std::fs::create_dir_all(parent)
153 .map_err(|err| crate::storage::backend::BackendError::Transport(err.to_string()))?;
154 }
155 }
156 let mut count = 0usize;
157 if backend.download(&format!("{prefix}{L2_BACKUP_PAGER_SUFFIX}"), l2_path)? {
158 count += 1;
159 }
160 let control = control_sidecar_for(l2_path);
161 if backend.download(&format!("{prefix}{L2_BACKUP_CONTROL_SUFFIX}"), &control)? {
162 count += 1;
163 }
164 Ok(count)
165}
166
167#[cfg(test)]
168mod backup_helpers_tests {
169 use super::*;
170 use crate::storage::backend::LocalBackend;
171 use std::sync::atomic::{AtomicU64, Ordering};
172
173 fn write_file(path: &std::path::Path, bytes: &[u8]) {
174 if let Some(parent) = path.parent() {
175 std::fs::create_dir_all(parent).unwrap();
176 }
177 std::fs::write(path, bytes).unwrap();
178 }
179
180 static SCRATCH_COUNTER: AtomicU64 = AtomicU64::new(0);
186 fn scratch(label: &str) -> std::path::PathBuf {
187 let pid = std::process::id();
188 let n = SCRATCH_COUNTER.fetch_add(1, Ordering::SeqCst);
189 let p = std::env::temp_dir().join(format!("reddb-blobcache-bk-{label}-{pid}-{n}"));
190 let _ = std::fs::remove_dir_all(&p);
191 std::fs::create_dir_all(&p).unwrap();
192 p
193 }
194
195 #[test]
201 fn archive_then_restore_round_trips_l2_pager_and_control_files() {
202 let scratch_dir = scratch("pair-src");
203 let l2_src = scratch_dir.join("cache.rdb");
204 write_file(&l2_src, b"pager-bytes-on-disk");
205 write_file(&control_sidecar_for(&l2_src), b"control-sidecar-bytes");
206
207 let backend_root = scratch("pair-be");
208 let prefix = format!("{}/blob_cache/", backend_root.display());
209
210 let uploaded =
211 archive_blob_cache_l2(&LocalBackend, &l2_src, &prefix).expect("archive succeeds");
212 assert_eq!(uploaded, 2, "pager + control sidecar uploaded");
213
214 let dst_dir = scratch("pair-dst");
215 let l2_dst = dst_dir.join("cache.rdb");
216 let downloaded =
217 restore_blob_cache_l2(&LocalBackend, &prefix, &l2_dst).expect("restore succeeds");
218 assert_eq!(downloaded, 2);
219
220 assert_eq!(std::fs::read(&l2_dst).unwrap(), b"pager-bytes-on-disk");
221 assert_eq!(
222 std::fs::read(control_sidecar_for(&l2_dst)).unwrap(),
223 b"control-sidecar-bytes"
224 );
225
226 let _ = std::fs::remove_dir_all(&scratch_dir);
227 let _ = std::fs::remove_dir_all(&backend_root);
228 let _ = std::fs::remove_dir_all(&dst_dir);
229 }
230
231 #[test]
232 fn archive_missing_l2_path_is_noop() {
233 let backend_root = scratch("be-missing");
234 let prefix = format!("{}/blob_cache/", backend_root.display());
235 let count = archive_blob_cache_l2(
236 &LocalBackend,
237 std::path::Path::new("/nonexistent/path/for/reddb-test.rdb"),
238 &prefix,
239 )
240 .expect("missing path treated as nothing to archive");
241 assert_eq!(count, 0);
242 let _ = std::fs::remove_dir_all(&backend_root);
243 }
244
245 #[test]
246 fn restore_with_no_objects_creates_empty_parent_dir() {
247 let backend_root = scratch("be-empty");
248 let prefix = format!("{}/blob_cache/", backend_root.display());
249 let dst_dir = scratch("dst-empty");
250 let l2_dst = dst_dir.join("cache.rdb");
251 let count =
252 restore_blob_cache_l2(&LocalBackend, &prefix, &l2_dst).expect("empty restore is ok");
253 assert_eq!(count, 0);
254 let _ = std::fs::remove_dir_all(&backend_root);
255 let _ = std::fs::remove_dir_all(&dst_dir);
256 }
257
258 #[test]
273 fn full_round_trip_via_blob_cache_preserves_entries_after_restore() {
274 use crate::storage::cache::blob::{BlobCache, BlobCacheConfig, BlobCachePut};
275
276 let src_dir = scratch("rt-src");
277 let dst_dir = scratch("rt-dst");
278 let backend_root = scratch("rt-be");
279 let l2_src = src_dir.join("blob-cache.rdb");
280 let l2_dst = dst_dir.join("blob-cache.rdb");
281 let prefix = format!("{}/blob_cache/", backend_root.display());
282
283 {
285 let cache = BlobCache::open_with_l2(
286 BlobCacheConfig::default()
287 .with_l1_bytes_max(64 * 1024)
288 .with_shard_count(2)
289 .with_max_namespaces(8)
290 .with_l2_path(&l2_src),
291 )
292 .expect("open l2 src");
293 cache
294 .put("ns-a", "k1", BlobCachePut::new(b"value-1".to_vec()))
295 .expect("put k1");
296 cache
297 .put(
298 "ns-b",
299 "k2",
300 BlobCachePut::new(b"value-2-longer-payload".to_vec()),
301 )
302 .expect("put k2");
303 assert_eq!(cache.l2_path(), Some(l2_src.as_path()));
305 } let uploaded = archive_blob_cache_l2(&LocalBackend, &l2_src, &prefix).expect("archive l2");
309 assert_eq!(uploaded, 2, "pager + control uploaded");
310
311 let restored = restore_blob_cache_l2(&LocalBackend, &prefix, &l2_dst).expect("restore l2");
313 assert_eq!(restored, 2, "pager + control downloaded");
314
315 let restored_cache = BlobCache::open_with_l2(
319 BlobCacheConfig::default()
320 .with_l1_bytes_max(64 * 1024)
321 .with_shard_count(2)
322 .with_max_namespaces(8)
323 .with_l2_path(&l2_dst),
324 )
325 .expect("open l2 dst");
326 let hit_a = restored_cache
327 .get("ns-a", "k1")
328 .expect("k1 survives restore");
329 assert_eq!(hit_a.value(), b"value-1");
330 let hit_b = restored_cache
331 .get("ns-b", "k2")
332 .expect("k2 survives restore");
333 assert_eq!(hit_b.value(), b"value-2-longer-payload");
334
335 let _ = std::fs::remove_dir_all(&src_dir);
336 let _ = std::fs::remove_dir_all(&dst_dir);
337 let _ = std::fs::remove_dir_all(&backend_root);
338 }
339
340 #[test]
346 fn skipped_archive_leaves_restored_cache_cold() {
347 use crate::storage::cache::blob::{BlobCache, BlobCacheConfig, BlobCachePut};
348
349 let src_dir = scratch("cold-src");
350 let dst_dir = scratch("cold-dst");
351 let l2_src = src_dir.join("blob-cache.rdb");
352 let l2_dst = dst_dir.join("blob-cache.rdb");
353
354 {
355 let cache = BlobCache::open_with_l2(BlobCacheConfig::default().with_l2_path(&l2_src))
356 .expect("open l2 src");
357 cache
358 .put("ns", "k", BlobCachePut::new(b"value".to_vec()))
359 .expect("put k");
360 }
361
362 let cold_cache = BlobCache::open_with_l2(BlobCacheConfig::default().with_l2_path(&l2_dst))
365 .expect("open l2 dst");
366 assert!(
367 cold_cache.get("ns", "k").is_none(),
368 "restore without include_blob_cache must yield a cold cache"
369 );
370
371 let _ = std::fs::remove_dir_all(&src_dir);
372 let _ = std::fs::remove_dir_all(&dst_dir);
373 }
374}