1use crate::address::Address;
2use crate::cache::{Cache, Options};
3use crate::data_store::{Datastore, Key, Order, Query, ResultItem, Results};
4use crate::error::{GuardianError, Result};
5use sled::{Config, Db, IVec};
6use std::{
7 collections::HashMap,
8 path::{Path, PathBuf},
9 sync::{Arc, Mutex, Weak},
10};
11use tracing::{Span, debug, instrument};
12
13pub const IN_MEMORY_DIRECTORY: &str = ":memory:";
14pub struct WrappedCache {
15 id: String,
16 db: Db,
17 manager_map: Weak<Mutex<HashMap<String, Arc<WrappedCache>>>>,
18 #[allow(dead_code)]
19 span: Span,
20 closed: Mutex<bool>,
21}
22
23impl WrappedCache {
24 #[instrument(level = "debug", skip(self, _ctx))]
25 pub fn get(
26 &self,
27 _ctx: &mut dyn core::any::Any,
28 key: &Key,
29 ) -> std::result::Result<Vec<u8>, Box<dyn std::error::Error + Send + Sync>> {
30 match self
31 .db
32 .get(key.as_bytes())
33 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?
34 {
35 Some(v) => Ok(v.to_vec()),
36 None => Err(format!("key not found: {}", key).into()),
37 }
38 }
39
40 pub fn has(
41 &self,
42 _ctx: &mut dyn core::any::Any,
43 key: &Key,
44 ) -> std::result::Result<bool, Box<dyn std::error::Error + Send + Sync>> {
45 self.db
46 .contains_key(key.as_bytes())
47 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
48 }
49
50 pub fn get_size(
51 &self,
52 _ctx: &mut dyn core::any::Any,
53 key: &Key,
54 ) -> std::result::Result<usize, Box<dyn std::error::Error + Send + Sync>> {
55 let v = self
56 .db
57 .get(key.as_bytes())
58 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?
59 .ok_or_else(|| format!("key not found: {}", key))?;
60 Ok(v.len())
61 }
62
63 pub fn query(
64 &self,
65 _ctx: &mut dyn core::any::Any,
66 q: &Query,
67 ) -> std::result::Result<Results, Box<dyn std::error::Error + Send + Sync>> {
68 let iter: Box<dyn Iterator<Item = sled::Result<(IVec, IVec)>>> =
69 if let Some(prefix_key) = &q.prefix {
70 let prefix_bytes = prefix_key.as_bytes();
72 Box::new(self.db.scan_prefix(prefix_bytes))
73 } else {
74 Box::new(self.db.iter())
75 };
76
77 let mut items: Results = Vec::new();
79 let mut count = 0;
80
81 let skip_count = q.offset.unwrap_or(0);
83 let mut skipped = 0;
84
85 for kv in iter {
86 let (k, v) = kv.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
87
88 if skipped < skip_count {
90 skipped += 1;
91 continue;
92 }
93
94 let key_str = String::from_utf8(k.to_vec()).unwrap_or_default();
95 items.push(ResultItem::new(Key::new(key_str), v.to_vec()));
96 count += 1;
97
98 if let Some(n) = q.limit
99 && count >= n
100 {
101 break;
102 }
103 }
104
105 if matches!(q.order, Order::Desc) {
106 items.reverse();
107 }
108
109 Ok(items)
110 }
111
112 #[instrument(level = "debug", skip(self, _ctx, value))]
113 pub fn put(
114 &self,
115 _ctx: &mut dyn core::any::Any,
116 key: &Key,
117 value: &[u8],
118 ) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>> {
119 self.db
120 .insert(key.as_bytes(), value)
121 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
122 Ok(())
123 }
124
125 pub fn delete(
126 &self,
127 _ctx: &mut dyn core::any::Any,
128 key: &Key,
129 ) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>> {
130 self.db
131 .remove(key.as_bytes())
132 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
133 Ok(())
134 }
135
136 pub fn sync(
137 &self,
138 _ctx: &mut dyn core::any::Any,
139 _key: &Key,
140 ) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>> {
141 self.db
142 .flush()
143 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
144 Ok(())
145 }
146
147 #[instrument(level = "debug", skip(self))]
148 pub fn close(&self) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>> {
149 let mut closed = self.closed.lock().unwrap();
150 if *closed {
151 return Ok(());
152 }
153
154 if let Some(map) = self.manager_map.upgrade() {
155 let mut m = map.lock().unwrap();
156 m.remove(&self.id);
157 }
158
159 self.db
160 .flush()
161 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
162 *closed = true;
163 Ok(())
164 }
165}
166
167pub struct DatastoreWrapper {
169 cache: Arc<WrappedCache>,
170}
171
172impl DatastoreWrapper {
173 pub fn new(cache: Arc<WrappedCache>) -> Self {
174 Self { cache }
175 }
176}
177
178#[async_trait::async_trait]
179impl Datastore for DatastoreWrapper {
180 #[instrument(level = "debug", skip(self, key))]
181 async fn has(&self, key: &[u8]) -> Result<bool> {
182 let key_obj = Key::new(String::from_utf8_lossy(key));
183 let mut any_ctx = ();
184 self.cache
185 .has(&mut any_ctx, &key_obj)
186 .map_err(|e| GuardianError::Other(format!("Cache has error: {}", e)))
187 }
188
189 #[instrument(level = "debug", skip(self, key, value))]
190 async fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
191 let key_obj = Key::new(String::from_utf8_lossy(key));
192 let mut any_ctx = ();
193 self.cache
194 .put(&mut any_ctx, &key_obj, value)
195 .map_err(|e| GuardianError::Other(format!("Cache put error: {}", e)))
196 }
197
198 #[instrument(level = "debug", skip(self, key))]
199 async fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
200 let key_obj = Key::new(String::from_utf8_lossy(key));
201 let mut any_ctx = ();
202 match self.cache.get(&mut any_ctx, &key_obj) {
203 Ok(value) => Ok(Some(value)),
204 Err(_) => Ok(None), }
206 }
207
208 #[instrument(level = "debug", skip(self, key))]
209 async fn delete(&self, key: &[u8]) -> Result<()> {
210 let key_obj = Key::new(String::from_utf8_lossy(key));
211 let mut any_ctx = ();
212 self.cache
213 .delete(&mut any_ctx, &key_obj)
214 .map_err(|e| GuardianError::Other(format!("Cache delete error: {}", e)))
215 }
216
217 #[instrument(level = "debug", skip(self, query))]
218 async fn query(&self, query: &Query) -> Result<Results> {
219 let mut any_ctx = ();
220 self.cache
221 .query(&mut any_ctx, query)
222 .map_err(|e| GuardianError::Other(format!("Cache query error: {}", e)))
223 }
224
225 #[instrument(level = "debug", skip(self, prefix))]
226 async fn list_keys(&self, prefix: &[u8]) -> Result<Vec<Key>> {
227 let prefix_str = String::from_utf8_lossy(prefix);
229 let prefix_key = Key::new(prefix_str.to_string());
230
231 let query = Query {
232 prefix: Some(prefix_key),
233 limit: None,
234 order: Order::Asc,
235 offset: None,
236 };
237
238 let mut any_ctx = ();
239 let results = self
240 .cache
241 .query(&mut any_ctx, &query)
242 .map_err(|e| GuardianError::Other(format!("Cache list_keys error: {}", e)))?;
243
244 Ok(results.into_iter().map(|item| item.key).collect())
246 }
247
248 fn as_any(&self) -> &dyn std::any::Any {
249 self
250 }
251}
252
253pub struct LevelDownCache {
254 span: Span,
255 caches: Arc<Mutex<HashMap<String, Arc<WrappedCache>>>>,
256}
257
258impl LevelDownCache {
259 #[instrument(level = "debug")]
260 pub fn new(_opts: Option<&Options>) -> Self {
261 Self {
262 span: tracing::Span::current(),
263 caches: Arc::new(Mutex::new(HashMap::new())),
264 }
265 }
266
267 pub fn span(&self) -> &Span {
269 &self.span
270 }
271
272 #[instrument(level = "debug", skip(self, db_address))]
273 pub fn load_internal(
274 &self,
275 directory: &str,
276 db_address: &dyn Address,
277 ) -> std::result::Result<Arc<WrappedCache>, Box<dyn std::error::Error + Send + Sync>> {
278 let _entered = self.span.enter();
279 let key_path = datastore_key(directory, db_address);
280
281 if let Some(ds) = self.caches.lock().unwrap().get(&key_path).cloned() {
283 return Ok(ds);
284 }
285
286 debug!("opening cache db: path={}", key_path.as_str());
287
288 let db = if directory == IN_MEMORY_DIRECTORY {
289 Config::new()
290 .temporary(true)
291 .open()
292 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?
293 } else {
294 if let Some(parent) = Path::new(&key_path).parent() {
295 std::fs::create_dir_all(parent)?;
296 }
297 Config::new()
298 .path(&key_path)
299 .open()
300 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?
301 };
302
303 let wrapped = Arc::new(WrappedCache {
304 id: key_path.clone(),
305 db,
306 manager_map: Arc::downgrade(&self.caches),
307 span: tracing::Span::current(),
308 closed: Mutex::new(false),
309 });
310
311 self.caches
312 .lock()
313 .unwrap()
314 .insert(key_path, wrapped.clone());
315 Ok(wrapped)
316 }
317
318 #[instrument(level = "debug", skip(self))]
319 pub fn close_internal(
320 &self,
321 ) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>> {
322 let _entered = self.span.enter();
323 let caches = {
324 let m = self.caches.lock().unwrap();
325 m.values().cloned().collect::<Vec<_>>()
326 };
327 for c in caches {
328 let _ = c.close();
329 }
330 Ok(())
331 }
332
333 #[instrument(level = "debug", skip(self, db_address))]
334 pub fn destroy_internal(
335 &self,
336 directory: &str,
337 db_address: &dyn Address,
338 ) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>> {
339 let _entered = self.span.enter();
340 let key_path = datastore_key(directory, db_address);
341
342 if let Some(c) = self.caches.lock().unwrap().remove(&key_path) {
344 let _ = c.close();
345 }
346
347 if directory != IN_MEMORY_DIRECTORY && Path::new(&key_path).exists() {
348 std::fs::remove_dir_all(&key_path)?;
349 }
350
351 Ok(())
352 }
353}
354
355impl Cache for LevelDownCache {
357 #[instrument(level = "info", skip(self, db_address))]
358 fn load(
359 &self,
360 directory: &str,
361 db_address: &dyn Address,
362 ) -> Result<Box<dyn Datastore + Send + Sync>> {
363 let _entered = self.span.enter();
364 let wrapped_cache = self
365 .load_internal(directory, db_address)
366 .map_err(|e| GuardianError::Other(format!("Failed to load cache: {}", e)))?;
367 Ok(Box::new(DatastoreWrapper {
368 cache: wrapped_cache,
369 }))
370 }
371
372 #[instrument(level = "info", skip(self))]
373 fn close(&mut self) -> Result<()> {
374 let _entered = self.span.enter();
375 let caches = {
376 let m = self.caches.lock().unwrap();
377 m.values().cloned().collect::<Vec<_>>()
378 };
379 for c in caches {
380 let _ = c.close();
381 }
382 Ok(())
383 }
384
385 #[instrument(level = "info", skip(self, db_address))]
386 fn destroy(&self, directory: &str, db_address: &dyn Address) -> Result<()> {
387 let _entered = self.span.enter();
388 self.destroy_internal(directory, db_address)
389 .map_err(|e| GuardianError::Other(format!("Failed to destroy cache: {}", e)))?;
390 Ok(())
391 }
392}
393
394fn datastore_key(directory: &str, db_address: &dyn Address) -> String {
395 let db_path = PathBuf::from(db_address.get_root().to_string()).join(db_address.get_path());
396 PathBuf::from(directory)
397 .join(db_path)
398 .to_string_lossy()
399 .into_owned()
400}
401
402#[cfg(test)]
403mod tests {
404 use super::*;
405 use crate::address::Address;
406 use std::fmt;
407
408 #[derive(Debug)]
410 struct MockAddress {
411 root: cid::Cid,
412 path: String,
413 }
414
415 impl MockAddress {
416 fn new(root_str: &str, path: &str) -> Self {
417 let cid = if root_str == "root" {
420 cid::Cid::default()
422 } else {
423 cid::Cid::default()
424 };
425 Self {
426 root: cid,
427 path: path.to_string(),
428 }
429 }
430 }
431
432 impl Address for MockAddress {
433 fn get_root(&self) -> cid::Cid {
434 self.root
435 }
436
437 fn get_path(&self) -> &str {
438 &self.path
439 }
440
441 fn equals(&self, other: &dyn Address) -> bool {
442 self.root == other.get_root() && self.path == other.get_path()
443 }
444 }
445
446 impl fmt::Display for MockAddress {
447 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
448 write!(f, "{}/{}", self.root, self.path)
449 }
450 }
451
452 #[tokio::test]
453 async fn test_datastore_wrapper_basic_operations() {
454 let cache = LevelDownCache::new(None);
455 let mock_address = MockAddress::new("test_root", "test_path");
456
457 let datastore = cache.load(IN_MEMORY_DIRECTORY, &mock_address).unwrap();
458
459 let key = b"test_key";
461 let value = b"test_value";
462
463 datastore.put(key, value).await.unwrap();
464 let retrieved = datastore.get(key).await.unwrap();
465 assert_eq!(retrieved, Some(value.to_vec()));
466
467 assert!(datastore.has(key).await.unwrap());
469 assert!(!datastore.has(b"non_existent").await.unwrap());
470
471 datastore.delete(key).await.unwrap();
473 assert!(!datastore.has(key).await.unwrap());
474 }
475
476 #[tokio::test]
477 async fn test_datastore_wrapper_query() {
478 let cache = LevelDownCache::new(None);
479 let mock_address = MockAddress::new("test_root", "test_path");
480
481 let datastore = cache.load(IN_MEMORY_DIRECTORY, &mock_address).unwrap();
482
483 datastore.put(b"/users/alice", b"alice_data").await.unwrap();
485 datastore.put(b"/users/bob", b"bob_data").await.unwrap();
486 datastore
487 .put(b"/config/database", b"db_config")
488 .await
489 .unwrap();
490
491 let query = Query {
493 prefix: Some(Key::new("/users")),
494 limit: Some(10),
495 order: Order::Asc,
496 offset: None,
497 };
498
499 let results = datastore.query(&query).await.unwrap();
500 assert_eq!(results.len(), 2);
501
502 let keys = datastore.list_keys(b"/users").await.unwrap();
504 assert_eq!(keys.len(), 2);
505 }
506
507 #[test]
508 fn test_datastore_key_generation() {
509 let mock_address = MockAddress::new("root", "path/to/db");
510 let key = datastore_key("/cache", &mock_address);
511
512 println!("Generated key: {}", key);
514 println!("Root CID: {}", mock_address.get_root());
515 println!("Path: {}", mock_address.get_path());
516
517 assert!(key.contains("cache"));
519 assert!(key.contains("path"));
520 assert!(key.contains(&mock_address.get_root().to_string()));
522 }
523
524 #[tokio::test]
525 #[ignore] async fn test_cache_lifecycle() {
527 let mut cache = LevelDownCache::new(None);
528 let mock_address = MockAddress::new("test_root", "lifecycle_test");
529
530 let datastore = cache.load(IN_MEMORY_DIRECTORY, &mock_address).unwrap();
532 datastore.put(b"test", b"data").await.unwrap();
533
534 cache.destroy(IN_MEMORY_DIRECTORY, &mock_address).unwrap();
536
537 cache.close().unwrap();
539 }
540}