helia_utils/
blockstore.rs

1//! Blockstore implementations
2
3use async_trait::async_trait;
4use bytes::Bytes;
5use cid::Cid;
6use futures::stream;
7use sled::Db;
8
9use crate::BlockstoreConfig;
10use helia_interface::*;
11
12/// Sled-based blockstore implementation
13pub struct SledBlockstore {
14    db: Db,
15}
16
17impl SledBlockstore {
18    pub fn new(config: BlockstoreConfig) -> Result<Self, HeliaError> {
19        let db = if let Some(path) = config.path {
20            sled::open(path)
21                .map_err(|e| HeliaError::other(format!("Failed to open blockstore: {}", e)))?
22        } else {
23            sled::Config::new().temporary(true).open().map_err(|e| {
24                HeliaError::other(format!("Failed to create temporary blockstore: {}", e))
25            })?
26        };
27
28        Ok(Self { db })
29    }
30
31    fn cid_to_key(&self, cid: &Cid) -> Vec<u8> {
32        format!("block:{}", cid).into_bytes()
33    }
34}
35
36#[async_trait]
37impl Blocks for SledBlockstore {
38    async fn get(&self, cid: &Cid, _options: Option<GetBlockOptions>) -> Result<Bytes, HeliaError> {
39        let key = self.cid_to_key(cid);
40        match self.db.get(&key) {
41            Ok(Some(data)) => Ok(Bytes::from(data.to_vec())),
42            Ok(None) => Err(HeliaError::BlockNotFound { cid: *cid }),
43            Err(e) => Err(HeliaError::other(format!("Blockstore get error: {}", e))),
44        }
45    }
46
47    async fn get_many_cids(
48        &self,
49        cids: Vec<Cid>,
50        _options: Option<GetManyOptions>,
51    ) -> Result<AwaitIterable<Result<Pair, HeliaError>>, HeliaError> {
52        let mut results = Vec::new();
53
54        for cid in cids {
55            let result = match self.get(&cid, None).await {
56                Ok(block) => Ok(Pair { cid, block }),
57                Err(e) => Err(e),
58            };
59            results.push(result);
60        }
61
62        Ok(Box::pin(stream::iter(results)))
63    }
64
65    async fn get_all(
66        &self,
67        _options: Option<GetAllOptions>,
68    ) -> Result<AwaitIterable<Pair>, HeliaError> {
69        let mut results = Vec::new();
70
71        // Iterate through all blocks in the database
72        for item in self.db.iter() {
73            match item {
74                Ok((key_bytes, value_bytes)) => {
75                    // Parse the key to extract CID
76                    if let Ok(key_str) = std::str::from_utf8(&key_bytes) {
77                        if let Some(cid_str) = key_str.strip_prefix("block:") {
78                            if let Ok(cid) = cid_str.parse::<Cid>() {
79                                let block = Bytes::from(value_bytes.to_vec());
80                                results.push(Pair { cid, block });
81                            }
82                        }
83                    }
84                }
85                Err(e) => {
86                    return Err(HeliaError::other(format!("Error iterating blocks: {}", e)));
87                }
88            }
89        }
90
91        Ok(Box::pin(stream::iter(results)))
92    }
93
94    async fn put(
95        &self,
96        cid: &Cid,
97        block: Bytes,
98        _options: Option<PutBlockOptions>,
99    ) -> Result<Cid, HeliaError> {
100        let key = self.cid_to_key(cid);
101        self.db
102            .insert(&key, block.as_ref())
103            .map_err(|e| HeliaError::other(format!("Blockstore put error: {}", e)))?;
104        Ok(*cid)
105    }
106
107    async fn put_many_blocks(
108        &self,
109        blocks: Vec<InputPair>,
110        _options: Option<PutManyOptions>,
111    ) -> Result<AwaitIterable<Cid>, HeliaError> {
112        let mut results = Vec::new();
113
114        for input_pair in blocks {
115            // If CID is not provided, we'd need to compute it from the block
116            // For now, we'll require CID to be provided
117            let cid = input_pair
118                .cid
119                .ok_or_else(|| HeliaError::other("CID is required for putting block"))?;
120
121            match self.put(&cid, input_pair.block, None).await {
122                Ok(returned_cid) => results.push(returned_cid),
123                Err(e) => return Err(e), // Fail fast on any error
124            }
125        }
126
127        Ok(Box::pin(stream::iter(results)))
128    }
129
130    async fn has(&self, cid: &Cid, _options: Option<HasOptions>) -> Result<bool, HeliaError> {
131        let key = self.cid_to_key(cid);
132        match self.db.contains_key(&key) {
133            Ok(exists) => Ok(exists),
134            Err(e) => Err(HeliaError::other(format!("Blockstore has error: {}", e))),
135        }
136    }
137
138    async fn has_many_cids(
139        &self,
140        cids: Vec<Cid>,
141        _options: Option<HasOptions>,
142    ) -> Result<AwaitIterable<bool>, HeliaError> {
143        let mut results = Vec::new();
144
145        for cid in cids {
146            match self.has(&cid, None).await {
147                Ok(exists) => results.push(exists),
148                Err(e) => return Err(e), // Fail fast on any error
149            }
150        }
151
152        Ok(Box::pin(stream::iter(results)))
153    }
154
155    async fn delete_many_cids(
156        &self,
157        cids: Vec<Cid>,
158        _options: Option<DeleteManyOptions>,
159    ) -> Result<AwaitIterable<Cid>, HeliaError> {
160        let mut results = Vec::new();
161
162        for cid in cids {
163            let key = self.cid_to_key(&cid);
164            match self.db.remove(&key) {
165                Ok(_) => results.push(cid), // Successfully deleted
166                Err(e) => {
167                    return Err(HeliaError::other(format!(
168                        "Delete error for {}: {}",
169                        cid, e
170                    )))
171                }
172            }
173        }
174
175        Ok(Box::pin(stream::iter(results)))
176    }
177}