1use crate::address::Address;
2use crate::data_store::Datastore;
3use crate::error::{GuardianError, Result};
4use sled::{Config, Db};
5use std::collections::HashMap;
6use std::path::{Path, PathBuf};
7use std::sync::{Arc, Mutex};
8use tracing::{Span, debug, error, info, instrument, warn};
9
10#[allow(clippy::module_inception)]
11pub mod level_down;
12pub use level_down::LevelDownCache;
13
14type DatastoreBox = Box<dyn Datastore + Send + Sync>;
16type CleanupFn = Box<dyn FnOnce() -> Result<()> + Send + Sync>;
17type NewCacheResult = Result<(DatastoreBox, CleanupFn)>;
18
19#[derive(Debug, Clone)]
21pub struct Options {
22 pub span: Option<Span>,
24 pub max_cache_size: Option<u64>,
26 pub cache_mode: CacheMode,
28}
29
30#[derive(Debug, Clone, PartialEq)]
32pub enum CacheMode {
33 Persistent,
35 InMemory,
37 Auto,
39}
40
41impl Default for Options {
42 fn default() -> Self {
43 Self {
44 span: None,
45 max_cache_size: Some(100 * 1024 * 1024), cache_mode: CacheMode::Auto,
47 }
48 }
49}
50
51pub trait Cache: Send + Sync {
54 #[allow(clippy::new_ret_no_self)]
57 fn new(path: &str, opts: Option<Options>) -> NewCacheResult
58 where
59 Self: Sized,
60 {
61 SledCache::create_cache_instance(path, opts.unwrap_or_default())
62 }
63
64 fn load(&self, directory: &str, db_address: &dyn Address) -> Result<DatastoreBox>;
66
67 fn close(&mut self) -> Result<()>;
69
70 fn destroy(&self, directory: &str, db_address: &dyn Address) -> Result<()>;
72}
73
74pub struct SledCache {
76 caches: Arc<Mutex<HashMap<String, Arc<SledDatastore>>>>,
77 options: Options,
78}
79
80impl SledCache {
81 pub fn new(opts: Options) -> Self {
83 Self {
84 caches: Arc::new(Mutex::new(HashMap::new())),
85 options: opts,
86 }
87 }
88
89 #[instrument(level = "info")]
91 pub fn create_cache_instance(path: &str, opts: Options) -> NewCacheResult {
92 info!("Creating cache instance: path={}", path);
93
94 let datastore = SledDatastore::new(path, opts.clone())?;
95 let path_clone = path.to_string();
96
97 let cleanup: Box<dyn FnOnce() -> Result<()> + Send + Sync> = Box::new(move || {
99 if path_clone != ":memory:" && Path::new(&path_clone).exists() {
100 match std::fs::remove_dir_all(&path_clone) {
101 Ok(_) => {
102 debug!("Cache directory cleaned up: path={}", &path_clone);
103 Ok(())
104 }
105 Err(e) => {
106 warn!(
107 "Failed to cleanup cache directory: path={}, error={}",
108 &path_clone, e
109 );
110 Err(GuardianError::Other(format!(
111 "Failed to cleanup cache: {}",
112 e
113 )))
114 }
115 }
116 } else {
117 Ok(())
118 }
119 });
120
121 Ok((Box::new(datastore), cleanup))
122 }
123
124 fn generate_cache_key(directory: &str, db_address: &dyn Address) -> String {
126 let db_path = PathBuf::from(db_address.get_root().to_string()).join(db_address.get_path());
127 PathBuf::from(directory)
128 .join(db_path)
129 .to_string_lossy()
130 .to_string()
131 }
132}
133
134impl Cache for SledCache {
135 #[instrument(level = "info", skip(self, db_address))]
136 fn load(
137 &self,
138 directory: &str,
139 db_address: &dyn Address,
140 ) -> Result<Box<dyn Datastore + Send + Sync>> {
141 let cache_key = Self::generate_cache_key(directory, db_address);
142
143 info!(
144 "Loading cache: directory={}, cache_key={}",
145 directory, &cache_key
146 );
147
148 let mut caches = self.caches.lock().unwrap();
149
150 if let Some(existing_cache) = caches.get(&cache_key) {
151 debug!("Using existing cache: cache_key={}", &cache_key);
152 return Ok(Box::new(existing_cache.as_ref().clone()));
153 }
154
155 let datastore = SledDatastore::new(&cache_key, self.options.clone())?;
157 let arc_datastore = Arc::new(datastore.clone());
158 caches.insert(cache_key.clone(), arc_datastore);
159
160 info!("Created new cache: cache_key={}", &cache_key);
161 Ok(Box::new(datastore))
162 }
163
164 #[instrument(level = "info", skip(self))]
165 fn close(&mut self) -> Result<()> {
166 info!("Closing all caches");
167
168 let caches = {
169 let mut cache_map = self.caches.lock().unwrap();
170 let caches: Vec<Arc<SledDatastore>> = cache_map.values().cloned().collect();
171 cache_map.clear();
172 caches
173 };
174
175 for cache in caches {
176 if let Err(e) = cache.close() {
177 warn!("Failed to close cache: error={}", e);
178 }
179 }
180
181 info!("All caches closed");
182 Ok(())
183 }
184
185 #[instrument(level = "info", skip(self, db_address))]
186 fn destroy(&self, directory: &str, db_address: &dyn Address) -> Result<()> {
187 let cache_key = Self::generate_cache_key(directory, db_address);
188
189 info!(
190 "Destroying cache: directory={}, cache_key={}",
191 directory, &cache_key
192 );
193
194 let cache_to_close = {
196 let mut caches = self.caches.lock().unwrap();
197 caches.remove(&cache_key)
198 };
199
200 if let Some(cache) = cache_to_close {
202 cache.close()?;
203 }
204
205 if directory != ":memory:" && Path::new(&cache_key).exists() {
207 std::fs::remove_dir_all(&cache_key).map_err(|e| {
208 GuardianError::Other(format!("Failed to remove cache directory: {}", e))
209 })?;
210
211 info!("Cache directory removed: path={}", &cache_key);
212 }
213
214 Ok(())
215 }
216}
217
218#[derive(Clone)]
220pub struct SledDatastore {
221 db: Db,
222 path: String,
223 span: Span,
224}
225
226impl SledDatastore {
227 #[instrument(level = "debug")]
229 pub fn new(path: &str, opts: Options) -> Result<Self> {
230 debug!("Creating SledDatastore: path={}", path);
231
232 let db = if path == ":memory:" || matches!(opts.cache_mode, CacheMode::InMemory) {
233 debug!("Creating in-memory cache");
235 Config::new().temporary(true).open().map_err(|e| {
236 GuardianError::Store(format!("Failed to create in-memory cache: {}", e))
237 })?
238 } else {
239 debug!("Creating persistent cache: path={}", path);
241
242 if let Some(parent) = Path::new(path).parent() {
244 std::fs::create_dir_all(parent).map_err(|e| {
245 GuardianError::Store(format!("Failed to create cache directory: {}", e))
246 })?;
247 }
248
249 let mut config = Config::new();
250
251 if let Some(max_size) = opts.max_cache_size {
253 config = config.cache_capacity(max_size);
254 }
255
256 config.path(path).open().map_err(|e| {
257 GuardianError::Store(format!("Failed to open cache at {}: {}", path, e))
258 })?
259 };
260
261 info!(
262 "SledDatastore created successfully: path={}, memory_mode={}",
263 path,
264 path == ":memory:"
265 );
266
267 Ok(Self {
268 db,
269 path: path.to_string(),
270 span: opts.span.unwrap_or_else(tracing::Span::current),
271 })
272 }
273
274 pub fn span(&self) -> &Span {
276 &self.span
277 }
278
279 #[instrument(level = "debug", skip(self))]
281 pub fn close(&self) -> Result<()> {
282 let _entered = self.span.enter();
283 debug!("Closing SledDatastore: path={}", &self.path);
284
285 self.db
286 .flush()
287 .map_err(|e| GuardianError::Store(format!("Failed to flush cache: {}", e)))?;
288
289 info!("SledDatastore closed: path={}", &self.path);
290 Ok(())
291 }
292}
293
294#[async_trait::async_trait]
295impl Datastore for SledDatastore {
296 #[instrument(level = "debug", skip(self, key))]
297 async fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
298 let _entered = self.span.enter();
299 match self.db.get(key) {
300 Ok(Some(value)) => {
301 debug!("Cache hit: key_len={}", key.len());
302 Ok(Some(value.to_vec()))
303 }
304 Ok(None) => {
305 debug!("Cache miss: key_len={}", key.len());
306 Ok(None)
307 }
308 Err(e) => {
309 error!("Cache get error: key_len={}, error={}", key.len(), e);
310 Err(GuardianError::Store(format!("Cache get error: {}", e)))
311 }
312 }
313 }
314
315 #[instrument(level = "debug", skip(self, key, value))]
316 async fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
317 let _entered = self.span.enter();
318 match self.db.insert(key, value) {
319 Ok(_) => {
320 debug!(
321 "Cache put success: key_len={}, value_len={}",
322 key.len(),
323 value.len()
324 );
325 Ok(())
326 }
327 Err(e) => {
328 error!("Cache put error: key_len={}, error={}", key.len(), e);
329 Err(GuardianError::Store(format!("Cache put error: {}", e)))
330 }
331 }
332 }
333
334 #[instrument(level = "debug", skip(self, key))]
335 async fn has(&self, key: &[u8]) -> Result<bool> {
336 let _entered = self.span.enter();
337 match self.db.contains_key(key) {
338 Ok(exists) => {
339 debug!("Cache has check: key_len={}, exists={}", key.len(), exists);
340 Ok(exists)
341 }
342 Err(e) => {
343 error!("Cache has error: key_len={}, error={}", key.len(), e);
344 Err(GuardianError::Store(format!("Cache has error: {}", e)))
345 }
346 }
347 }
348
349 #[instrument(level = "debug", skip(self, key))]
350 async fn delete(&self, key: &[u8]) -> Result<()> {
351 let _entered = self.span.enter();
352 match self.db.remove(key) {
353 Ok(_) => {
354 debug!("Cache delete success: key_len={}", key.len());
355 Ok(())
356 }
357 Err(e) => {
358 error!("Cache delete error: key_len={}, error={}", key.len(), e);
359 Err(GuardianError::Store(format!("Cache delete error: {}", e)))
360 }
361 }
362 }
363
364 #[instrument(level = "debug", skip(self, query))]
365 async fn query(&self, query: &crate::data_store::Query) -> Result<crate::data_store::Results> {
366 let _entered = self.span.enter();
367 use crate::data_store::{Key, ResultItem};
368
369 debug!(
370 "Cache query: has_prefix={}, limit={:?}, order={:?}",
371 query.prefix.is_some(),
372 query.limit,
373 query.order
374 );
375
376 let iter: Box<dyn Iterator<Item = sled::Result<(sled::IVec, sled::IVec)>>> =
377 if let Some(prefix_key) = &query.prefix {
378 let prefix_bytes = prefix_key.as_bytes();
380 Box::new(self.db.scan_prefix(prefix_bytes))
381 } else {
382 Box::new(self.db.iter())
383 };
384
385 let mut results = Vec::new();
386 let mut count = 0;
387
388 let skip_count = query.offset.unwrap_or(0);
390 let mut skipped = 0;
391
392 for kv_result in iter {
393 match kv_result {
394 Ok((key_bytes, value_bytes)) => {
395 if skipped < skip_count {
397 skipped += 1;
398 continue;
399 }
400
401 let key_str = String::from_utf8_lossy(&key_bytes);
403 let key = Key::new(key_str.to_string());
404 let value = value_bytes.to_vec();
405
406 results.push(ResultItem::new(key, value));
407 count += 1;
408
409 if let Some(limit) = query.limit
411 && count >= limit
412 {
413 break;
414 }
415 }
416 Err(e) => {
417 error!("Cache query iteration error: error={}", e);
418 return Err(GuardianError::Store(format!("Cache query error: {}", e)));
419 }
420 }
421 }
422
423 if matches!(query.order, crate::data_store::Order::Desc) {
425 results.reverse();
426 }
427
428 debug!(
429 "Cache query completed: results_count={}, skipped={}",
430 results.len(),
431 skipped
432 );
433
434 Ok(results)
435 }
436
437 #[instrument(level = "debug", skip(self, prefix))]
438 async fn list_keys(&self, prefix: &[u8]) -> Result<Vec<crate::data_store::Key>> {
439 let _entered = self.span.enter();
440 use crate::data_store::Key;
441
442 debug!("Cache list_keys: prefix_len={}", prefix.len());
443
444 let mut keys = Vec::new();
445
446 for kv_result in self.db.scan_prefix(prefix) {
447 match kv_result {
448 Ok((key_bytes, _)) => {
449 let key_str = String::from_utf8_lossy(&key_bytes);
450 let key = Key::new(key_str.to_string());
451 keys.push(key);
452 }
453 Err(e) => {
454 error!("Cache list_keys iteration error: error={}", e);
455 return Err(GuardianError::Store(format!(
456 "Cache list_keys error: {}",
457 e
458 )));
459 }
460 }
461 }
462
463 debug!("Cache list_keys completed: keys_count={}", keys.len());
464 Ok(keys)
465 }
466
467 fn as_any(&self) -> &dyn std::any::Any {
468 self
469 }
470}
471
472pub fn create_cache(opts: Options) -> SledCache {
474 SledCache::new(opts)
475}
476
477pub fn create_default_cache() -> SledCache {
479 create_cache(Options::default())
480}
481
482pub fn create_memory_cache() -> SledCache {
484 create_cache(Options {
485 cache_mode: CacheMode::InMemory,
486 ..Default::default()
487 })
488}
489
490#[cfg(test)]
491mod tests {
492 use super::*;
493 use crate::data_store::{Key, Order, Query};
494
495 #[tokio::test]
496 async fn test_sled_datastore_basic_operations() {
497 let datastore = SledDatastore::new(":memory:", Options::default()).unwrap();
498
499 let key = b"test_key";
501 let value = b"test_value";
502
503 datastore.put(key, value).await.unwrap();
504 let retrieved = datastore.get(key).await.unwrap();
505 assert_eq!(retrieved, Some(value.to_vec()));
506
507 assert!(datastore.has(key).await.unwrap());
509 assert!(!datastore.has(b"non_existent").await.unwrap());
510
511 datastore.delete(key).await.unwrap();
513 assert!(!datastore.has(key).await.unwrap());
514 assert_eq!(datastore.get(key).await.unwrap(), None);
515 }
516
517 #[tokio::test]
518 async fn test_sled_datastore_query() {
519 let datastore = SledDatastore::new(":memory:", Options::default()).unwrap();
520
521 datastore.put(b"/users/alice", b"alice_data").await.unwrap();
523 datastore.put(b"/users/bob", b"bob_data").await.unwrap();
524 datastore
525 .put(b"/users/charlie", b"charlie_data")
526 .await
527 .unwrap();
528 datastore
529 .put(b"/config/database", b"db_config")
530 .await
531 .unwrap();
532
533 let query = Query {
535 prefix: Some(Key::new("/users")),
536 limit: None,
537 order: Order::Asc,
538 offset: None,
539 };
540
541 let results = datastore.query(&query).await.unwrap();
542 assert_eq!(results.len(), 3);
543
544 let query_limited = Query {
546 prefix: Some(Key::new("/users")),
547 limit: Some(2),
548 order: Order::Asc,
549 offset: None,
550 };
551
552 let results_limited = datastore.query(&query_limited).await.unwrap();
553 assert_eq!(results_limited.len(), 2);
554
555 let query_offset = Query {
557 prefix: Some(Key::new("/users")),
558 limit: None,
559 order: Order::Asc,
560 offset: Some(1),
561 };
562
563 let results_offset = datastore.query(&query_offset).await.unwrap();
564 assert_eq!(results_offset.len(), 2);
565 }
566
567 #[tokio::test]
568 async fn test_sled_datastore_list_keys() {
569 let datastore = SledDatastore::new(":memory:", Options::default()).unwrap();
570
571 datastore.put(b"/users/alice", b"alice_data").await.unwrap();
573 datastore.put(b"/users/bob", b"bob_data").await.unwrap();
574 datastore
575 .put(b"/config/database", b"db_config")
576 .await
577 .unwrap();
578
579 let keys = datastore.list_keys(b"/users").await.unwrap();
581 assert_eq!(keys.len(), 2);
582
583 let key_strings: Vec<String> = keys.iter().map(|k| k.as_str()).collect();
584 assert!(key_strings.contains(&"/users/alice".to_string()));
585 assert!(key_strings.contains(&"/users/bob".to_string()));
586 }
587
588 #[test]
589 fn test_cache_mode_detection() {
590 let persistent_opts = Options {
591 cache_mode: CacheMode::Persistent,
592 ..Default::default()
593 };
594
595 let memory_opts = Options {
596 cache_mode: CacheMode::InMemory,
597 ..Default::default()
598 };
599
600 assert_eq!(persistent_opts.cache_mode, CacheMode::Persistent);
601 assert_eq!(memory_opts.cache_mode, CacheMode::InMemory);
602 }
603}