noosphere_storage/implementation/
sled.rs

1use std::path::{Path, PathBuf};
2use std::sync::Arc;
3
4use crate::store::Store;
5use crate::StorageConfig;
6use crate::{storage::Storage, ConfigurableStorage};
7
8use anyhow::Result;
9use async_trait::async_trait;
10use noosphere_common::ConditionalSend;
11use sled::{Db, Tree};
12
13#[derive(Clone)]
14pub struct SledStorage {
15    db: Db,
16    debug_data: Arc<(PathBuf, StorageConfig)>,
17}
18
19impl SledStorage {
20    /// Open or create a database at directory `path`.
21    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
22        Self::with_config(path, StorageConfig::default())
23    }
24
25    pub fn with_config<P: AsRef<Path>>(path: P, config: StorageConfig) -> Result<Self> {
26        std::fs::create_dir_all(path.as_ref())?;
27        let db_path = path.as_ref().canonicalize()?;
28
29        let mut sled_config = sled::Config::default();
30        sled_config = sled_config.path(&db_path);
31        if let Some(memory_cache_limit) = config.memory_cache_limit {
32            // Maximum size in bytes for the system page cache. (default: 1GB)
33            sled_config = sled_config.cache_capacity(memory_cache_limit.try_into()?);
34        }
35
36        let db = sled_config.open()?;
37        let debug_data = Arc::new((db_path, config));
38        Ok(SledStorage { db, debug_data })
39    }
40
41    async fn get_store(&self, name: &str) -> Result<SledStore> {
42        Ok(SledStore::new(&self.db.open_tree(name)?))
43    }
44}
45
46#[async_trait]
47impl Storage for SledStorage {
48    type BlockStore = SledStore;
49
50    type KeyValueStore = SledStore;
51
52    async fn get_block_store(&self, name: &str) -> Result<Self::BlockStore> {
53        self.get_store(name).await
54    }
55
56    async fn get_key_value_store(&self, name: &str) -> Result<Self::KeyValueStore> {
57        self.get_store(name).await
58    }
59}
60
61#[async_trait]
62impl ConfigurableStorage for SledStorage {
63    async fn open_with_config<P: AsRef<Path> + ConditionalSend>(
64        path: P,
65        config: StorageConfig,
66    ) -> Result<Self> {
67        SledStorage::with_config(path, config)
68    }
69}
70
71impl std::fmt::Debug for SledStorage {
72    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73        f.debug_struct("SledStorage")
74            .field("path", &self.debug_data.0)
75            .field("config", &self.debug_data.1)
76            .finish()
77    }
78}
79
80#[derive(Clone)]
81pub struct SledStore {
82    db: Tree,
83}
84
85impl SledStore {
86    pub fn new(db: &Tree) -> Self {
87        SledStore { db: db.clone() }
88    }
89}
90
91#[async_trait]
92impl Store for SledStore {
93    async fn read(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
94        Ok(self.db.get(key)?.map(|entry| entry.to_vec()))
95    }
96
97    async fn write(&mut self, key: &[u8], bytes: &[u8]) -> Result<Option<Vec<u8>>> {
98        let old_bytes = self
99            .db
100            .insert(key, bytes)?
101            .map(|old_entry| old_entry.to_vec());
102        Ok(old_bytes)
103    }
104
105    /// Remove a value given a CID
106    async fn remove(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
107        Ok(self
108            .db
109            .remove(key)
110            .map(|maybe_entry| maybe_entry.map(|entry| entry.to_vec()))?)
111    }
112
113    /// Flushes pending writes if there are any
114    async fn flush(&self) -> Result<()> {
115        // `flush_async()` can deadlock when simultaneous calls are performed.
116        // This occurs often in tests and fixed in `sled`'s main branch,
117        // but no cargo release since 2021.
118        // https://github.com/spacejam/sled/issues/1308
119        self.db.flush()?;
120        Ok(())
121    }
122}
123
124impl Drop for SledStorage {
125    fn drop(&mut self) {
126        let _ = self.db.flush();
127    }
128}
129
130#[async_trait]
131impl crate::Space for SledStorage {
132    async fn get_space_usage(&self) -> Result<u64> {
133        self.db.size_on_disk().map_err(|e| e.into())
134    }
135}