helia_utils/
blockstore.rs1use async_trait::async_trait;
4use bytes::Bytes;
5use cid::Cid;
6use futures::stream;
7use sled::Db;
8
9use crate::BlockstoreConfig;
10use helia_interface::*;
11
12pub struct SledBlockstore {
14 db: Db,
15}
16
17impl SledBlockstore {
18 pub fn new(config: BlockstoreConfig) -> Result<Self, HeliaError> {
19 let db = if let Some(path) = config.path {
20 sled::open(path)
21 .map_err(|e| HeliaError::other(format!("Failed to open blockstore: {}", e)))?
22 } else {
23 sled::Config::new().temporary(true).open().map_err(|e| {
24 HeliaError::other(format!("Failed to create temporary blockstore: {}", e))
25 })?
26 };
27
28 Ok(Self { db })
29 }
30
31 fn cid_to_key(&self, cid: &Cid) -> Vec<u8> {
32 format!("block:{}", cid).into_bytes()
33 }
34}
35
36#[async_trait]
37impl Blocks for SledBlockstore {
38 async fn get(&self, cid: &Cid, _options: Option<GetBlockOptions>) -> Result<Bytes, HeliaError> {
39 let key = self.cid_to_key(cid);
40 match self.db.get(&key) {
41 Ok(Some(data)) => Ok(Bytes::from(data.to_vec())),
42 Ok(None) => Err(HeliaError::BlockNotFound { cid: *cid }),
43 Err(e) => Err(HeliaError::other(format!("Blockstore get error: {}", e))),
44 }
45 }
46
47 async fn get_many_cids(
48 &self,
49 cids: Vec<Cid>,
50 _options: Option<GetManyOptions>,
51 ) -> Result<AwaitIterable<Result<Pair, HeliaError>>, HeliaError> {
52 let mut results = Vec::new();
53
54 for cid in cids {
55 let result = match self.get(&cid, None).await {
56 Ok(block) => Ok(Pair { cid, block }),
57 Err(e) => Err(e),
58 };
59 results.push(result);
60 }
61
62 Ok(Box::pin(stream::iter(results)))
63 }
64
65 async fn get_all(
66 &self,
67 _options: Option<GetAllOptions>,
68 ) -> Result<AwaitIterable<Pair>, HeliaError> {
69 let mut results = Vec::new();
70
71 for item in self.db.iter() {
73 match item {
74 Ok((key_bytes, value_bytes)) => {
75 if let Ok(key_str) = std::str::from_utf8(&key_bytes) {
77 if let Some(cid_str) = key_str.strip_prefix("block:") {
78 if let Ok(cid) = cid_str.parse::<Cid>() {
79 let block = Bytes::from(value_bytes.to_vec());
80 results.push(Pair { cid, block });
81 }
82 }
83 }
84 }
85 Err(e) => {
86 return Err(HeliaError::other(format!("Error iterating blocks: {}", e)));
87 }
88 }
89 }
90
91 Ok(Box::pin(stream::iter(results)))
92 }
93
94 async fn put(
95 &self,
96 cid: &Cid,
97 block: Bytes,
98 _options: Option<PutBlockOptions>,
99 ) -> Result<Cid, HeliaError> {
100 let key = self.cid_to_key(cid);
101 self.db
102 .insert(&key, block.as_ref())
103 .map_err(|e| HeliaError::other(format!("Blockstore put error: {}", e)))?;
104 Ok(*cid)
105 }
106
107 async fn put_many_blocks(
108 &self,
109 blocks: Vec<InputPair>,
110 _options: Option<PutManyOptions>,
111 ) -> Result<AwaitIterable<Cid>, HeliaError> {
112 let mut results = Vec::new();
113
114 for input_pair in blocks {
115 let cid = input_pair
118 .cid
119 .ok_or_else(|| HeliaError::other("CID is required for putting block"))?;
120
121 match self.put(&cid, input_pair.block, None).await {
122 Ok(returned_cid) => results.push(returned_cid),
123 Err(e) => return Err(e), }
125 }
126
127 Ok(Box::pin(stream::iter(results)))
128 }
129
130 async fn has(&self, cid: &Cid, _options: Option<HasOptions>) -> Result<bool, HeliaError> {
131 let key = self.cid_to_key(cid);
132 match self.db.contains_key(&key) {
133 Ok(exists) => Ok(exists),
134 Err(e) => Err(HeliaError::other(format!("Blockstore has error: {}", e))),
135 }
136 }
137
138 async fn has_many_cids(
139 &self,
140 cids: Vec<Cid>,
141 _options: Option<HasOptions>,
142 ) -> Result<AwaitIterable<bool>, HeliaError> {
143 let mut results = Vec::new();
144
145 for cid in cids {
146 match self.has(&cid, None).await {
147 Ok(exists) => results.push(exists),
148 Err(e) => return Err(e), }
150 }
151
152 Ok(Box::pin(stream::iter(results)))
153 }
154
155 async fn delete_many_cids(
156 &self,
157 cids: Vec<Cid>,
158 _options: Option<DeleteManyOptions>,
159 ) -> Result<AwaitIterable<Cid>, HeliaError> {
160 let mut results = Vec::new();
161
162 for cid in cids {
163 let key = self.cid_to_key(&cid);
164 match self.db.remove(&key) {
165 Ok(_) => results.push(cid), Err(e) => {
167 return Err(HeliaError::other(format!(
168 "Delete error for {}: {}",
169 cid, e
170 )))
171 }
172 }
173 }
174
175 Ok(Box::pin(stream::iter(results)))
176 }
177}