ipfrs_storage/
blockstore.rs

1//! Block storage implementation using Sled
2
3use crate::traits::BlockStore;
4use async_trait::async_trait;
5use ipfrs_core::{Block, Cid, Error, Result};
6use sled::Db;
7use std::path::PathBuf;
8
9/// Block store configuration
10#[derive(Debug, Clone)]
11pub struct BlockStoreConfig {
12    /// Path to the database directory
13    pub path: PathBuf,
14    /// Cache size in bytes
15    pub cache_size: usize,
16}
17
18impl Default for BlockStoreConfig {
19    fn default() -> Self {
20        Self {
21            path: PathBuf::from(".ipfrs/blocks"),
22            cache_size: 100 * 1024 * 1024, // 100MB
23        }
24    }
25}
26
27impl BlockStoreConfig {
28    /// Create a configuration optimized for development
29    /// - Small cache (50MB)
30    /// - Stored in /tmp for easy cleanup
31    pub fn development() -> Self {
32        Self {
33            path: PathBuf::from("/tmp/ipfrs-dev"),
34            cache_size: 50 * 1024 * 1024,
35        }
36    }
37
38    /// Create a configuration optimized for production
39    /// - Large cache (500MB)
40    /// - Stored in standard location
41    pub fn production(path: PathBuf) -> Self {
42        Self {
43            path,
44            cache_size: 500 * 1024 * 1024,
45        }
46    }
47
48    /// Create a configuration optimized for embedded devices
49    /// - Minimal cache (10MB)
50    /// - Configurable path
51    pub fn embedded(path: PathBuf) -> Self {
52        Self {
53            path,
54            cache_size: 10 * 1024 * 1024,
55        }
56    }
57
58    /// Create a configuration optimized for testing
59    /// - Minimal cache (5MB)
60    /// - Temporary directory with unique name
61    pub fn testing() -> Self {
62        let temp_dir = std::env::temp_dir().join(format!("ipfrs-test-{}", std::process::id()));
63        Self {
64            path: temp_dir,
65            cache_size: 5 * 1024 * 1024,
66        }
67    }
68
69    /// Builder: Set the storage path
70    pub fn with_path(mut self, path: PathBuf) -> Self {
71        self.path = path;
72        self
73    }
74
75    /// Builder: Set the cache size in MB
76    pub fn with_cache_mb(mut self, cache_mb: usize) -> Self {
77        self.cache_size = cache_mb * 1024 * 1024;
78        self
79    }
80
81    /// Builder: Set the cache size in bytes
82    pub fn with_cache_bytes(mut self, cache_bytes: usize) -> Self {
83        self.cache_size = cache_bytes;
84        self
85    }
86}
87
88/// Block storage using Sled embedded database
89pub struct SledBlockStore {
90    db: Db,
91}
92
93impl SledBlockStore {
94    /// Create a new block store
95    pub fn new(config: BlockStoreConfig) -> Result<Self> {
96        // Create parent directory if it doesn't exist
97        if let Some(parent) = config.path.parent() {
98            std::fs::create_dir_all(parent)
99                .map_err(|e| Error::Storage(format!("Failed to create directory: {e}")))?;
100        }
101
102        let db = sled::Config::new()
103            .path(&config.path)
104            .cache_capacity(config.cache_size as u64)
105            .open()
106            .map_err(|e| Error::Storage(format!("Failed to open database: {e}")))?;
107
108        Ok(Self { db })
109    }
110}
111
112#[async_trait]
113impl BlockStore for SledBlockStore {
114    /// Store a block
115    async fn put(&self, block: &Block) -> Result<()> {
116        let key = block.cid().to_bytes();
117        let value = block.data().to_vec();
118
119        self.db
120            .insert(key, value)
121            .map_err(|e| Error::Storage(format!("Failed to insert block: {e}")))?;
122
123        self.db
124            .flush_async()
125            .await
126            .map_err(|e| Error::Storage(format!("Failed to flush: {e}")))?;
127
128        Ok(())
129    }
130
131    /// Retrieve a block by CID
132    async fn get(&self, cid: &Cid) -> Result<Option<Block>> {
133        let key = cid.to_bytes();
134
135        match self.db.get(&key) {
136            Ok(Some(value)) => {
137                let data = bytes::Bytes::from(value.to_vec());
138                Ok(Some(Block::from_parts(*cid, data)))
139            }
140            Ok(None) => Ok(None),
141            Err(e) => Err(Error::Storage(format!("Failed to get block: {e}"))),
142        }
143    }
144
145    /// Check if a block exists
146    async fn has(&self, cid: &Cid) -> Result<bool> {
147        let key = cid.to_bytes();
148        self.db
149            .contains_key(&key)
150            .map_err(|e| Error::Storage(format!("Failed to check block: {e}")))
151    }
152
153    /// Delete a block
154    async fn delete(&self, cid: &Cid) -> Result<()> {
155        let key = cid.to_bytes();
156        self.db
157            .remove(&key)
158            .map_err(|e| Error::Storage(format!("Failed to delete block: {e}")))?;
159
160        self.db
161            .flush_async()
162            .await
163            .map_err(|e| Error::Storage(format!("Failed to flush: {e}")))?;
164
165        Ok(())
166    }
167
168    /// Get the number of blocks stored
169    fn len(&self) -> usize {
170        self.db.len()
171    }
172
173    /// Check if the store is empty
174    fn is_empty(&self) -> bool {
175        self.db.is_empty()
176    }
177
178    /// Get all CIDs in the store
179    fn list_cids(&self) -> Result<Vec<Cid>> {
180        let mut cids = Vec::new();
181
182        for item in self.db.iter() {
183            let (key, _) = item.map_err(|e| Error::Storage(format!("Iteration error: {e}")))?;
184
185            // Parse CID from key bytes
186            let cid = Cid::try_from(key.to_vec())
187                .map_err(|e| Error::Cid(format!("Failed to parse CID: {e}")))?;
188
189            cids.push(cid);
190        }
191
192        Ok(cids)
193    }
194
195    /// Store multiple blocks atomically using Sled's batch API
196    async fn put_many(&self, blocks: &[Block]) -> Result<()> {
197        let mut batch = sled::Batch::default();
198
199        for block in blocks {
200            let key = block.cid().to_bytes();
201            let value = block.data().to_vec();
202            batch.insert(key, value);
203        }
204
205        self.db
206            .apply_batch(batch)
207            .map_err(|e| Error::Storage(format!("Failed to apply batch: {e}")))?;
208
209        self.db
210            .flush_async()
211            .await
212            .map_err(|e| Error::Storage(format!("Failed to flush: {e}")))?;
213
214        Ok(())
215    }
216
217    /// Retrieve multiple blocks efficiently
218    async fn get_many(&self, cids: &[Cid]) -> Result<Vec<Option<Block>>> {
219        let mut results = Vec::with_capacity(cids.len());
220
221        for cid in cids {
222            let key = cid.to_bytes();
223            match self.db.get(&key) {
224                Ok(Some(value)) => {
225                    let data = bytes::Bytes::from(value.to_vec());
226                    results.push(Some(Block::from_parts(*cid, data)));
227                }
228                Ok(None) => results.push(None),
229                Err(e) => return Err(Error::Storage(format!("Failed to get block: {e}"))),
230            }
231        }
232
233        Ok(results)
234    }
235
236    /// Check if multiple blocks exist efficiently
237    async fn has_many(&self, cids: &[Cid]) -> Result<Vec<bool>> {
238        let mut results = Vec::with_capacity(cids.len());
239
240        for cid in cids {
241            let key = cid.to_bytes();
242            let exists = self
243                .db
244                .contains_key(&key)
245                .map_err(|e| Error::Storage(format!("Failed to check block: {e}")))?;
246            results.push(exists);
247        }
248
249        Ok(results)
250    }
251
252    /// Delete multiple blocks atomically
253    async fn delete_many(&self, cids: &[Cid]) -> Result<()> {
254        let mut batch = sled::Batch::default();
255
256        for cid in cids {
257            let key = cid.to_bytes();
258            batch.remove(key);
259        }
260
261        self.db
262            .apply_batch(batch)
263            .map_err(|e| Error::Storage(format!("Failed to apply batch: {e}")))?;
264
265        self.db
266            .flush_async()
267            .await
268            .map_err(|e| Error::Storage(format!("Failed to flush: {e}")))?;
269
270        Ok(())
271    }
272
273    /// Flush pending writes to disk
274    async fn flush(&self) -> Result<()> {
275        self.db
276            .flush_async()
277            .await
278            .map_err(|e| Error::Storage(format!("Failed to flush: {e}")))?;
279        Ok(())
280    }
281}
282
283#[cfg(test)]
284mod tests {
285    use super::*;
286    use bytes::Bytes;
287
288    #[tokio::test]
289    async fn test_put_get_block() {
290        let config = BlockStoreConfig {
291            path: PathBuf::from("/tmp/ipfrs-test-blockstore"),
292            cache_size: 1024 * 1024,
293        };
294
295        // Clean up from previous test
296        let _ = std::fs::remove_dir_all(&config.path);
297
298        let store = SledBlockStore::new(config).unwrap();
299        let data = Bytes::from("hello world");
300        let block = Block::new(data.clone()).unwrap();
301
302        // Put block
303        store.put(&block).await.unwrap();
304
305        // Get block
306        let retrieved = store.get(block.cid()).await.unwrap();
307        assert!(retrieved.is_some());
308        assert_eq!(retrieved.unwrap().data(), &data);
309
310        // Check has
311        assert!(store.has(block.cid()).await.unwrap());
312
313        // Delete block
314        store.delete(block.cid()).await.unwrap();
315        assert!(!store.has(block.cid()).await.unwrap());
316    }
317}