guardian_db/cache/
mod.rs

1use crate::address::Address;
2use crate::data_store::Datastore;
3use crate::error::{GuardianError, Result};
4use sled::{Config, Db};
5use std::collections::HashMap;
6use std::path::{Path, PathBuf};
7use std::sync::{Arc, Mutex};
8use tracing::{Span, debug, error, info, instrument, warn};
9
10#[allow(clippy::module_inception)]
11pub mod level_down;
12pub use level_down::LevelDownCache;
13
14// Type aliases para simplificar tipos complexos
15type DatastoreBox = Box<dyn Datastore + Send + Sync>;
16type CleanupFn = Box<dyn FnOnce() -> Result<()> + Send + Sync>;
17type NewCacheResult = Result<(DatastoreBox, CleanupFn)>;
18
19/// Define as opções para a criação de um cache.
20#[derive(Debug, Clone)]
21pub struct Options {
22    /// Span para logging estruturado com tracing.
23    pub span: Option<Span>,
24    /// Tamanho máximo do cache em bytes (padrão: 100MB)
25    pub max_cache_size: Option<u64>,
26    /// Modo de cache: persistente ou em memória
27    pub cache_mode: CacheMode,
28}
29
30/// Modo de operação do cache
31#[derive(Debug, Clone, PartialEq)]
32pub enum CacheMode {
33    /// Cache persistente no disco
34    Persistent,
35    /// Cache em memória (temporário)
36    InMemory,
37    /// Automático: detecta baseado no diretório
38    Auto,
39}
40
41impl Default for Options {
42    fn default() -> Self {
43        Self {
44            span: None,
45            max_cache_size: Some(100 * 1024 * 1024), // 100MB
46            cache_mode: CacheMode::Auto,
47        }
48    }
49}
50
51/// A trait `Cache` define a interface para um mecanismo de cache
52/// para os bancos de dados GuardianDB.
53pub trait Cache: Send + Sync {
54    /// Cria uma nova instância de cache no caminho especificado
55    /// Retorna um Datastore e uma função de cleanup
56    #[allow(clippy::new_ret_no_self)]
57    fn new(path: &str, opts: Option<Options>) -> NewCacheResult
58    where
59        Self: Sized,
60    {
61        SledCache::create_cache_instance(path, opts.unwrap_or_default())
62    }
63
64    /// Carrega um cache para um determinado endereço de banco de dados e um diretório raiz.
65    fn load(&self, directory: &str, db_address: &dyn Address) -> Result<DatastoreBox>;
66
67    /// Fecha um cache e todos os seus armazenamentos de dados associados.
68    fn close(&mut self) -> Result<()>;
69
70    /// Remove todos os dados em cache de um banco de dados.
71    fn destroy(&self, directory: &str, db_address: &dyn Address) -> Result<()>;
72}
73
74/// Implementação de cache usando Sled como backend
75pub struct SledCache {
76    caches: Arc<Mutex<HashMap<String, Arc<SledDatastore>>>>,
77    options: Options,
78}
79
80impl SledCache {
81    /// Cria uma nova instância do SledCache
82    pub fn new(opts: Options) -> Self {
83        Self {
84            caches: Arc::new(Mutex::new(HashMap::new())),
85            options: opts,
86        }
87    }
88
89    /// Factory method para criar instâncias de cache
90    #[instrument(level = "info")]
91    pub fn create_cache_instance(path: &str, opts: Options) -> NewCacheResult {
92        info!("Creating cache instance: path={}", path);
93
94        let datastore = SledDatastore::new(path, opts.clone())?;
95        let path_clone = path.to_string();
96
97        // Função de cleanup que remove o cache do disco (apenas se não for em memória)
98        let cleanup: Box<dyn FnOnce() -> Result<()> + Send + Sync> = Box::new(move || {
99            if path_clone != ":memory:" && Path::new(&path_clone).exists() {
100                match std::fs::remove_dir_all(&path_clone) {
101                    Ok(_) => {
102                        debug!("Cache directory cleaned up: path={}", &path_clone);
103                        Ok(())
104                    }
105                    Err(e) => {
106                        warn!(
107                            "Failed to cleanup cache directory: path={}, error={}",
108                            &path_clone, e
109                        );
110                        Err(GuardianError::Other(format!(
111                            "Failed to cleanup cache: {}",
112                            e
113                        )))
114                    }
115                }
116            } else {
117                Ok(())
118            }
119        });
120
121        Ok((Box::new(datastore), cleanup))
122    }
123
124    /// Gera uma chave única para o cache baseada no diretório e endereço
125    fn generate_cache_key(directory: &str, db_address: &dyn Address) -> String {
126        let db_path = PathBuf::from(db_address.get_root().to_string()).join(db_address.get_path());
127        PathBuf::from(directory)
128            .join(db_path)
129            .to_string_lossy()
130            .to_string()
131    }
132}
133
134impl Cache for SledCache {
135    #[instrument(level = "info", skip(self, db_address))]
136    fn load(
137        &self,
138        directory: &str,
139        db_address: &dyn Address,
140    ) -> Result<Box<dyn Datastore + Send + Sync>> {
141        let cache_key = Self::generate_cache_key(directory, db_address);
142
143        info!(
144            "Loading cache: directory={}, cache_key={}",
145            directory, &cache_key
146        );
147
148        let mut caches = self.caches.lock().unwrap();
149
150        if let Some(existing_cache) = caches.get(&cache_key) {
151            debug!("Using existing cache: cache_key={}", &cache_key);
152            return Ok(Box::new(existing_cache.as_ref().clone()));
153        }
154
155        // Cria um novo cache se não existir
156        let datastore = SledDatastore::new(&cache_key, self.options.clone())?;
157        let arc_datastore = Arc::new(datastore.clone());
158        caches.insert(cache_key.clone(), arc_datastore);
159
160        info!("Created new cache: cache_key={}", &cache_key);
161        Ok(Box::new(datastore))
162    }
163
164    #[instrument(level = "info", skip(self))]
165    fn close(&mut self) -> Result<()> {
166        info!("Closing all caches");
167
168        let caches = {
169            let mut cache_map = self.caches.lock().unwrap();
170            let caches: Vec<Arc<SledDatastore>> = cache_map.values().cloned().collect();
171            cache_map.clear();
172            caches
173        };
174
175        for cache in caches {
176            if let Err(e) = cache.close() {
177                warn!("Failed to close cache: error={}", e);
178            }
179        }
180
181        info!("All caches closed");
182        Ok(())
183    }
184
185    #[instrument(level = "info", skip(self, db_address))]
186    fn destroy(&self, directory: &str, db_address: &dyn Address) -> Result<()> {
187        let cache_key = Self::generate_cache_key(directory, db_address);
188
189        info!(
190            "Destroying cache: directory={}, cache_key={}",
191            directory, &cache_key
192        );
193
194        // Remove do mapa de caches
195        let cache_to_close = {
196            let mut caches = self.caches.lock().unwrap();
197            caches.remove(&cache_key)
198        };
199
200        // Fecha o cache se existir
201        if let Some(cache) = cache_to_close {
202            cache.close()?;
203        }
204
205        // Remove arquivos do disco (apenas se não for em memória)
206        if directory != ":memory:" && Path::new(&cache_key).exists() {
207            std::fs::remove_dir_all(&cache_key).map_err(|e| {
208                GuardianError::Other(format!("Failed to remove cache directory: {}", e))
209            })?;
210
211            info!("Cache directory removed: path={}", &cache_key);
212        }
213
214        Ok(())
215    }
216}
217
218/// Implementação de Datastore usando Sled
219#[derive(Clone)]
220pub struct SledDatastore {
221    db: Db,
222    path: String,
223    span: Span,
224}
225
226impl SledDatastore {
227    /// Cria uma nova instância do SledDatastore
228    #[instrument(level = "debug")]
229    pub fn new(path: &str, opts: Options) -> Result<Self> {
230        debug!("Creating SledDatastore: path={}", path);
231
232        let db = if path == ":memory:" || matches!(opts.cache_mode, CacheMode::InMemory) {
233            // Cache em memória
234            debug!("Creating in-memory cache");
235            Config::new().temporary(true).open().map_err(|e| {
236                GuardianError::Store(format!("Failed to create in-memory cache: {}", e))
237            })?
238        } else {
239            // Cache persistente
240            debug!("Creating persistent cache: path={}", path);
241
242            // Cria o diretório se não existir
243            if let Some(parent) = Path::new(path).parent() {
244                std::fs::create_dir_all(parent).map_err(|e| {
245                    GuardianError::Store(format!("Failed to create cache directory: {}", e))
246                })?;
247            }
248
249            let mut config = Config::new();
250
251            // Configura tamanho máximo se especificado
252            if let Some(max_size) = opts.max_cache_size {
253                config = config.cache_capacity(max_size);
254            }
255
256            config.path(path).open().map_err(|e| {
257                GuardianError::Store(format!("Failed to open cache at {}: {}", path, e))
258            })?
259        };
260
261        info!(
262            "SledDatastore created successfully: path={}, memory_mode={}",
263            path,
264            path == ":memory:"
265        );
266
267        Ok(Self {
268            db,
269            path: path.to_string(),
270            span: opts.span.unwrap_or_else(tracing::Span::current),
271        })
272    }
273
274    /// Retorna uma referência ao span de tracing para instrumentação
275    pub fn span(&self) -> &Span {
276        &self.span
277    }
278
279    /// Fecha o datastore
280    #[instrument(level = "debug", skip(self))]
281    pub fn close(&self) -> Result<()> {
282        let _entered = self.span.enter();
283        debug!("Closing SledDatastore: path={}", &self.path);
284
285        self.db
286            .flush()
287            .map_err(|e| GuardianError::Store(format!("Failed to flush cache: {}", e)))?;
288
289        info!("SledDatastore closed: path={}", &self.path);
290        Ok(())
291    }
292}
293
294#[async_trait::async_trait]
295impl Datastore for SledDatastore {
296    #[instrument(level = "debug", skip(self, key))]
297    async fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
298        let _entered = self.span.enter();
299        match self.db.get(key) {
300            Ok(Some(value)) => {
301                debug!("Cache hit: key_len={}", key.len());
302                Ok(Some(value.to_vec()))
303            }
304            Ok(None) => {
305                debug!("Cache miss: key_len={}", key.len());
306                Ok(None)
307            }
308            Err(e) => {
309                error!("Cache get error: key_len={}, error={}", key.len(), e);
310                Err(GuardianError::Store(format!("Cache get error: {}", e)))
311            }
312        }
313    }
314
315    #[instrument(level = "debug", skip(self, key, value))]
316    async fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
317        let _entered = self.span.enter();
318        match self.db.insert(key, value) {
319            Ok(_) => {
320                debug!(
321                    "Cache put success: key_len={}, value_len={}",
322                    key.len(),
323                    value.len()
324                );
325                Ok(())
326            }
327            Err(e) => {
328                error!("Cache put error: key_len={}, error={}", key.len(), e);
329                Err(GuardianError::Store(format!("Cache put error: {}", e)))
330            }
331        }
332    }
333
334    #[instrument(level = "debug", skip(self, key))]
335    async fn has(&self, key: &[u8]) -> Result<bool> {
336        let _entered = self.span.enter();
337        match self.db.contains_key(key) {
338            Ok(exists) => {
339                debug!("Cache has check: key_len={}, exists={}", key.len(), exists);
340                Ok(exists)
341            }
342            Err(e) => {
343                error!("Cache has error: key_len={}, error={}", key.len(), e);
344                Err(GuardianError::Store(format!("Cache has error: {}", e)))
345            }
346        }
347    }
348
349    #[instrument(level = "debug", skip(self, key))]
350    async fn delete(&self, key: &[u8]) -> Result<()> {
351        let _entered = self.span.enter();
352        match self.db.remove(key) {
353            Ok(_) => {
354                debug!("Cache delete success: key_len={}", key.len());
355                Ok(())
356            }
357            Err(e) => {
358                error!("Cache delete error: key_len={}, error={}", key.len(), e);
359                Err(GuardianError::Store(format!("Cache delete error: {}", e)))
360            }
361        }
362    }
363
364    #[instrument(level = "debug", skip(self, query))]
365    async fn query(&self, query: &crate::data_store::Query) -> Result<crate::data_store::Results> {
366        let _entered = self.span.enter();
367        use crate::data_store::{Key, ResultItem};
368
369        debug!(
370            "Cache query: has_prefix={}, limit={:?}, order={:?}",
371            query.prefix.is_some(),
372            query.limit,
373            query.order
374        );
375
376        let iter: Box<dyn Iterator<Item = sled::Result<(sled::IVec, sled::IVec)>>> =
377            if let Some(prefix_key) = &query.prefix {
378                // Converte Key para bytes para usar como prefixo
379                let prefix_bytes = prefix_key.as_bytes();
380                Box::new(self.db.scan_prefix(prefix_bytes))
381            } else {
382                Box::new(self.db.iter())
383            };
384
385        let mut results = Vec::new();
386        let mut count = 0;
387
388        // Aplica offset se especificado
389        let skip_count = query.offset.unwrap_or(0);
390        let mut skipped = 0;
391
392        for kv_result in iter {
393            match kv_result {
394                Ok((key_bytes, value_bytes)) => {
395                    // Aplica offset
396                    if skipped < skip_count {
397                        skipped += 1;
398                        continue;
399                    }
400
401                    // Converte bytes de volta para Key
402                    let key_str = String::from_utf8_lossy(&key_bytes);
403                    let key = Key::new(key_str.to_string());
404                    let value = value_bytes.to_vec();
405
406                    results.push(ResultItem::new(key, value));
407                    count += 1;
408
409                    // Aplica limite se especificado
410                    if let Some(limit) = query.limit
411                        && count >= limit
412                    {
413                        break;
414                    }
415                }
416                Err(e) => {
417                    error!("Cache query iteration error: error={}", e);
418                    return Err(GuardianError::Store(format!("Cache query error: {}", e)));
419                }
420            }
421        }
422
423        // Aplica ordenação se necessário (Sled retorna em ordem ascendente por padrão)
424        if matches!(query.order, crate::data_store::Order::Desc) {
425            results.reverse();
426        }
427
428        debug!(
429            "Cache query completed: results_count={}, skipped={}",
430            results.len(),
431            skipped
432        );
433
434        Ok(results)
435    }
436
437    #[instrument(level = "debug", skip(self, prefix))]
438    async fn list_keys(&self, prefix: &[u8]) -> Result<Vec<crate::data_store::Key>> {
439        let _entered = self.span.enter();
440        use crate::data_store::Key;
441
442        debug!("Cache list_keys: prefix_len={}", prefix.len());
443
444        let mut keys = Vec::new();
445
446        for kv_result in self.db.scan_prefix(prefix) {
447            match kv_result {
448                Ok((key_bytes, _)) => {
449                    let key_str = String::from_utf8_lossy(&key_bytes);
450                    let key = Key::new(key_str.to_string());
451                    keys.push(key);
452                }
453                Err(e) => {
454                    error!("Cache list_keys iteration error: error={}", e);
455                    return Err(GuardianError::Store(format!(
456                        "Cache list_keys error: {}",
457                        e
458                    )));
459                }
460            }
461        }
462
463        debug!("Cache list_keys completed: keys_count={}", keys.len());
464        Ok(keys)
465    }
466
467    fn as_any(&self) -> &dyn std::any::Any {
468        self
469    }
470}
471
472/// Factory function para criar instâncias de cache
473pub fn create_cache(opts: Options) -> SledCache {
474    SledCache::new(opts)
475}
476
477/// Cria um cache padrão com configurações otimizadas
478pub fn create_default_cache() -> SledCache {
479    create_cache(Options::default())
480}
481
482/// Cria um cache em memória para testes
483pub fn create_memory_cache() -> SledCache {
484    create_cache(Options {
485        cache_mode: CacheMode::InMemory,
486        ..Default::default()
487    })
488}
489
490#[cfg(test)]
491mod tests {
492    use super::*;
493    use crate::data_store::{Key, Order, Query};
494
495    #[tokio::test]
496    async fn test_sled_datastore_basic_operations() {
497        let datastore = SledDatastore::new(":memory:", Options::default()).unwrap();
498
499        // Test put and get
500        let key = b"test_key";
501        let value = b"test_value";
502
503        datastore.put(key, value).await.unwrap();
504        let retrieved = datastore.get(key).await.unwrap();
505        assert_eq!(retrieved, Some(value.to_vec()));
506
507        // Test has
508        assert!(datastore.has(key).await.unwrap());
509        assert!(!datastore.has(b"non_existent").await.unwrap());
510
511        // Test delete
512        datastore.delete(key).await.unwrap();
513        assert!(!datastore.has(key).await.unwrap());
514        assert_eq!(datastore.get(key).await.unwrap(), None);
515    }
516
517    #[tokio::test]
518    async fn test_sled_datastore_query() {
519        let datastore = SledDatastore::new(":memory:", Options::default()).unwrap();
520
521        // Insert test data
522        datastore.put(b"/users/alice", b"alice_data").await.unwrap();
523        datastore.put(b"/users/bob", b"bob_data").await.unwrap();
524        datastore
525            .put(b"/users/charlie", b"charlie_data")
526            .await
527            .unwrap();
528        datastore
529            .put(b"/config/database", b"db_config")
530            .await
531            .unwrap();
532
533        // Test query with prefix
534        let query = Query {
535            prefix: Some(Key::new("/users")),
536            limit: None,
537            order: Order::Asc,
538            offset: None,
539        };
540
541        let results = datastore.query(&query).await.unwrap();
542        assert_eq!(results.len(), 3);
543
544        // Test query with limit
545        let query_limited = Query {
546            prefix: Some(Key::new("/users")),
547            limit: Some(2),
548            order: Order::Asc,
549            offset: None,
550        };
551
552        let results_limited = datastore.query(&query_limited).await.unwrap();
553        assert_eq!(results_limited.len(), 2);
554
555        // Test query with offset
556        let query_offset = Query {
557            prefix: Some(Key::new("/users")),
558            limit: None,
559            order: Order::Asc,
560            offset: Some(1),
561        };
562
563        let results_offset = datastore.query(&query_offset).await.unwrap();
564        assert_eq!(results_offset.len(), 2);
565    }
566
567    #[tokio::test]
568    async fn test_sled_datastore_list_keys() {
569        let datastore = SledDatastore::new(":memory:", Options::default()).unwrap();
570
571        // Insert test data
572        datastore.put(b"/users/alice", b"alice_data").await.unwrap();
573        datastore.put(b"/users/bob", b"bob_data").await.unwrap();
574        datastore
575            .put(b"/config/database", b"db_config")
576            .await
577            .unwrap();
578
579        // Test list_keys with prefix
580        let keys = datastore.list_keys(b"/users").await.unwrap();
581        assert_eq!(keys.len(), 2);
582
583        let key_strings: Vec<String> = keys.iter().map(|k| k.as_str()).collect();
584        assert!(key_strings.contains(&"/users/alice".to_string()));
585        assert!(key_strings.contains(&"/users/bob".to_string()));
586    }
587
588    #[test]
589    fn test_cache_mode_detection() {
590        let persistent_opts = Options {
591            cache_mode: CacheMode::Persistent,
592            ..Default::default()
593        };
594
595        let memory_opts = Options {
596            cache_mode: CacheMode::InMemory,
597            ..Default::default()
598        };
599
600        assert_eq!(persistent_opts.cache_mode, CacheMode::Persistent);
601        assert_eq!(memory_opts.cache_mode, CacheMode::InMemory);
602    }
603}