ipfrs_storage/
blockstore.rs1use crate::traits::BlockStore;
4use async_trait::async_trait;
5use ipfrs_core::{Block, Cid, Error, Result};
6use sled::Db;
7use std::path::PathBuf;
8
9#[derive(Debug, Clone)]
11pub struct BlockStoreConfig {
12 pub path: PathBuf,
14 pub cache_size: usize,
16}
17
18impl Default for BlockStoreConfig {
19 fn default() -> Self {
20 Self {
21 path: PathBuf::from(".ipfrs/blocks"),
22 cache_size: 100 * 1024 * 1024, }
24 }
25}
26
27impl BlockStoreConfig {
28 pub fn development() -> Self {
32 Self {
33 path: PathBuf::from("/tmp/ipfrs-dev"),
34 cache_size: 50 * 1024 * 1024,
35 }
36 }
37
38 pub fn production(path: PathBuf) -> Self {
42 Self {
43 path,
44 cache_size: 500 * 1024 * 1024,
45 }
46 }
47
48 pub fn embedded(path: PathBuf) -> Self {
52 Self {
53 path,
54 cache_size: 10 * 1024 * 1024,
55 }
56 }
57
58 pub fn testing() -> Self {
62 let temp_dir = std::env::temp_dir().join(format!("ipfrs-test-{}", std::process::id()));
63 Self {
64 path: temp_dir,
65 cache_size: 5 * 1024 * 1024,
66 }
67 }
68
69 pub fn with_path(mut self, path: PathBuf) -> Self {
71 self.path = path;
72 self
73 }
74
75 pub fn with_cache_mb(mut self, cache_mb: usize) -> Self {
77 self.cache_size = cache_mb * 1024 * 1024;
78 self
79 }
80
81 pub fn with_cache_bytes(mut self, cache_bytes: usize) -> Self {
83 self.cache_size = cache_bytes;
84 self
85 }
86}
87
88pub struct SledBlockStore {
90 db: Db,
91}
92
93impl SledBlockStore {
94 pub fn new(config: BlockStoreConfig) -> Result<Self> {
96 if let Some(parent) = config.path.parent() {
98 std::fs::create_dir_all(parent)
99 .map_err(|e| Error::Storage(format!("Failed to create directory: {e}")))?;
100 }
101
102 let db = sled::Config::new()
103 .path(&config.path)
104 .cache_capacity(config.cache_size as u64)
105 .open()
106 .map_err(|e| Error::Storage(format!("Failed to open database: {e}")))?;
107
108 Ok(Self { db })
109 }
110}
111
112#[async_trait]
113impl BlockStore for SledBlockStore {
114 async fn put(&self, block: &Block) -> Result<()> {
116 let key = block.cid().to_bytes();
117 let value = block.data().to_vec();
118
119 self.db
120 .insert(key, value)
121 .map_err(|e| Error::Storage(format!("Failed to insert block: {e}")))?;
122
123 self.db
124 .flush_async()
125 .await
126 .map_err(|e| Error::Storage(format!("Failed to flush: {e}")))?;
127
128 Ok(())
129 }
130
131 async fn get(&self, cid: &Cid) -> Result<Option<Block>> {
133 let key = cid.to_bytes();
134
135 match self.db.get(&key) {
136 Ok(Some(value)) => {
137 let data = bytes::Bytes::from(value.to_vec());
138 Ok(Some(Block::from_parts(*cid, data)))
139 }
140 Ok(None) => Ok(None),
141 Err(e) => Err(Error::Storage(format!("Failed to get block: {e}"))),
142 }
143 }
144
145 async fn has(&self, cid: &Cid) -> Result<bool> {
147 let key = cid.to_bytes();
148 self.db
149 .contains_key(&key)
150 .map_err(|e| Error::Storage(format!("Failed to check block: {e}")))
151 }
152
153 async fn delete(&self, cid: &Cid) -> Result<()> {
155 let key = cid.to_bytes();
156 self.db
157 .remove(&key)
158 .map_err(|e| Error::Storage(format!("Failed to delete block: {e}")))?;
159
160 self.db
161 .flush_async()
162 .await
163 .map_err(|e| Error::Storage(format!("Failed to flush: {e}")))?;
164
165 Ok(())
166 }
167
168 fn len(&self) -> usize {
170 self.db.len()
171 }
172
173 fn is_empty(&self) -> bool {
175 self.db.is_empty()
176 }
177
178 fn list_cids(&self) -> Result<Vec<Cid>> {
180 let mut cids = Vec::new();
181
182 for item in self.db.iter() {
183 let (key, _) = item.map_err(|e| Error::Storage(format!("Iteration error: {e}")))?;
184
185 let cid = Cid::try_from(key.to_vec())
187 .map_err(|e| Error::Cid(format!("Failed to parse CID: {e}")))?;
188
189 cids.push(cid);
190 }
191
192 Ok(cids)
193 }
194
195 async fn put_many(&self, blocks: &[Block]) -> Result<()> {
197 let mut batch = sled::Batch::default();
198
199 for block in blocks {
200 let key = block.cid().to_bytes();
201 let value = block.data().to_vec();
202 batch.insert(key, value);
203 }
204
205 self.db
206 .apply_batch(batch)
207 .map_err(|e| Error::Storage(format!("Failed to apply batch: {e}")))?;
208
209 self.db
210 .flush_async()
211 .await
212 .map_err(|e| Error::Storage(format!("Failed to flush: {e}")))?;
213
214 Ok(())
215 }
216
217 async fn get_many(&self, cids: &[Cid]) -> Result<Vec<Option<Block>>> {
219 let mut results = Vec::with_capacity(cids.len());
220
221 for cid in cids {
222 let key = cid.to_bytes();
223 match self.db.get(&key) {
224 Ok(Some(value)) => {
225 let data = bytes::Bytes::from(value.to_vec());
226 results.push(Some(Block::from_parts(*cid, data)));
227 }
228 Ok(None) => results.push(None),
229 Err(e) => return Err(Error::Storage(format!("Failed to get block: {e}"))),
230 }
231 }
232
233 Ok(results)
234 }
235
236 async fn has_many(&self, cids: &[Cid]) -> Result<Vec<bool>> {
238 let mut results = Vec::with_capacity(cids.len());
239
240 for cid in cids {
241 let key = cid.to_bytes();
242 let exists = self
243 .db
244 .contains_key(&key)
245 .map_err(|e| Error::Storage(format!("Failed to check block: {e}")))?;
246 results.push(exists);
247 }
248
249 Ok(results)
250 }
251
252 async fn delete_many(&self, cids: &[Cid]) -> Result<()> {
254 let mut batch = sled::Batch::default();
255
256 for cid in cids {
257 let key = cid.to_bytes();
258 batch.remove(key);
259 }
260
261 self.db
262 .apply_batch(batch)
263 .map_err(|e| Error::Storage(format!("Failed to apply batch: {e}")))?;
264
265 self.db
266 .flush_async()
267 .await
268 .map_err(|e| Error::Storage(format!("Failed to flush: {e}")))?;
269
270 Ok(())
271 }
272
273 async fn flush(&self) -> Result<()> {
275 self.db
276 .flush_async()
277 .await
278 .map_err(|e| Error::Storage(format!("Failed to flush: {e}")))?;
279 Ok(())
280 }
281}
282
283#[cfg(test)]
284mod tests {
285 use super::*;
286 use bytes::Bytes;
287
288 #[tokio::test]
289 async fn test_put_get_block() {
290 let config = BlockStoreConfig {
291 path: PathBuf::from("/tmp/ipfrs-test-blockstore"),
292 cache_size: 1024 * 1024,
293 };
294
295 let _ = std::fs::remove_dir_all(&config.path);
297
298 let store = SledBlockStore::new(config).unwrap();
299 let data = Bytes::from("hello world");
300 let block = Block::new(data.clone()).unwrap();
301
302 store.put(&block).await.unwrap();
304
305 let retrieved = store.get(block.cid()).await.unwrap();
307 assert!(retrieved.is_some());
308 assert_eq!(retrieved.unwrap().data(), &data);
309
310 assert!(store.has(block.cid()).await.unwrap());
312
313 store.delete(block.cid()).await.unwrap();
315 assert!(!store.has(block.cid()).await.unwrap());
316 }
317}