1use async_trait::async_trait;
7use bytes::Bytes;
8use cid::Cid;
9use helia_bitswap::{Bitswap, NotifyOptions, WantOptions};
10use helia_interface::{
11 blocks::{
12 Blocks, DeleteManyOptions, GetAllOptions, GetBlockOptions, GetManyOptions, HasOptions,
13 InputPair, Pair, PutBlockOptions, PutManyOptions,
14 },
15 AwaitIterable, HeliaError,
16};
17use std::sync::Arc;
18use std::time::Duration;
19use tracing::{debug, info, warn};
20
21use crate::SledBlockstore;
22
23pub struct BlockstoreWithBitswap {
25 local: Arc<SledBlockstore>,
27 bitswap: Arc<Bitswap>,
29}
30
31impl BlockstoreWithBitswap {
32 pub fn new(local: Arc<SledBlockstore>, bitswap: Arc<Bitswap>) -> Self {
34 Self { local, bitswap }
35 }
36
37 pub fn local(&self) -> &Arc<SledBlockstore> {
39 &self.local
40 }
41
42 pub fn bitswap(&self) -> &Arc<Bitswap> {
44 &self.bitswap
45 }
46}
47
48#[async_trait]
49impl Blocks for BlockstoreWithBitswap {
50 async fn get(&self, cid: &Cid, options: Option<GetBlockOptions>) -> Result<Bytes, HeliaError> {
51 debug!("BlockstoreWithBitswap: get() called for CID: {}", cid);
52
53 debug!(" Step 1: Checking local blockstore...");
55 match self.local.get(cid, options.clone()).await {
56 Ok(data) => {
57 debug!(" ✅ Found in local blockstore ({} bytes)", data.len());
58 return Ok(data);
59 }
60 Err(_) => {
61 debug!(" ⚠️ Not in local blockstore");
62 }
63 }
64
65 info!(
67 " Step 2: Block not in local storage, fetching via Bitswap: {}",
68 cid
69 );
70
71 let want_options = WantOptions {
72 timeout: Some(Duration::from_secs(30)),
73 priority: 10,
74 accept_block_presence: true,
75 peer: None,
76 };
77
78 match self.bitswap.want(cid, want_options).await {
79 Ok(data) => {
80 info!(" ✅ Retrieved from network ({} bytes)", data.len());
81
82 debug!(" Step 3: Storing in local blockstore for caching...");
84 if let Err(e) = self.local.put(cid, data.clone(), None).await {
85 warn!(" ⚠️ Failed to cache block locally: {}", e);
86 }
88
89 Ok(data)
90 }
91 Err(e) => {
92 warn!(" ❌ Failed to retrieve from network: {}", e);
93 Err(e)
94 }
95 }
96 }
97
98 async fn put(
99 &self,
100 cid: &Cid,
101 data: Bytes,
102 options: Option<PutBlockOptions>,
103 ) -> Result<Cid, HeliaError> {
104 debug!("BlockstoreWithBitswap: put() called for CID: {}", cid);
105
106 debug!(" Step 1: Storing in local blockstore...");
108 let returned_cid = self.local.put(cid, data.clone(), options).await?;
109 debug!(" ✅ Stored locally");
110
111 debug!(" Step 2: Announcing to network via Bitswap...");
113 let notify_options = NotifyOptions { broadcast: true };
114
115 match self
116 .bitswap
117 .notify_new_blocks(vec![(*cid, data)], notify_options)
118 .await
119 {
120 Ok(_) => {
121 debug!(" ✅ Announced to network");
122 Ok(returned_cid)
123 }
124 Err(e) => {
125 warn!(" ⚠️ Failed to announce to network: {}", e);
126 Ok(returned_cid)
129 }
130 }
131 }
132
133 async fn has(&self, cid: &Cid, options: Option<HasOptions>) -> Result<bool, HeliaError> {
134 self.local.has(cid, options).await
137 }
138
139 async fn get_many_cids(
140 &self,
141 cids: Vec<Cid>,
142 options: Option<GetManyOptions>,
143 ) -> Result<AwaitIterable<Result<Pair, HeliaError>>, HeliaError> {
144 self.local.get_many_cids(cids, options).await
147 }
148
149 async fn get_all(
150 &self,
151 options: Option<GetAllOptions>,
152 ) -> Result<AwaitIterable<Pair>, HeliaError> {
153 self.local.get_all(options).await
155 }
156
157 async fn put_many_blocks(
158 &self,
159 blocks: Vec<InputPair>,
160 options: Option<PutManyOptions>,
161 ) -> Result<AwaitIterable<Cid>, HeliaError> {
162 let cids = self.local.put_many_blocks(blocks.clone(), options).await?;
164
165 let blocks_to_announce: Vec<(Cid, Bytes)> = blocks
167 .into_iter()
168 .filter_map(|input_pair| input_pair.cid.map(|cid| (cid, input_pair.block)))
169 .collect();
170
171 if !blocks_to_announce.is_empty() {
172 let notify_options = NotifyOptions { broadcast: true };
173 if let Err(e) = self
174 .bitswap
175 .notify_new_blocks(blocks_to_announce, notify_options)
176 .await
177 {
178 warn!("Failed to announce blocks to network: {}", e);
179 }
181 }
182
183 Ok(cids)
184 }
185
186 async fn has_many_cids(
187 &self,
188 cids: Vec<Cid>,
189 options: Option<HasOptions>,
190 ) -> Result<AwaitIterable<bool>, HeliaError> {
191 self.local.has_many_cids(cids, options).await
193 }
194
195 async fn delete_many_cids(
196 &self,
197 cids: Vec<Cid>,
198 options: Option<DeleteManyOptions>,
199 ) -> Result<AwaitIterable<Cid>, HeliaError> {
200 self.local.delete_many_cids(cids, options).await
203 }
204}
205
206#[cfg(test)]
207mod tests {
208 use super::*;
209 use crate::BlockstoreConfig;
210 use helia_bitswap::BitswapConfig;
211
212 #[tokio::test]
213 async fn test_blockstore_with_bitswap_creation() {
214 let local = Arc::new(SledBlockstore::new(BlockstoreConfig::default()).unwrap());
215 let bitswap = Arc::new(
216 Bitswap::new(local.clone() as Arc<dyn Blocks>, BitswapConfig::default())
217 .await
218 .unwrap(),
219 );
220
221 let blockstore = BlockstoreWithBitswap::new(local.clone(), bitswap.clone());
222
223 assert!(Arc::ptr_eq(blockstore.local(), &local));
224 assert!(Arc::ptr_eq(blockstore.bitswap(), &bitswap));
225 }
226
227 #[tokio::test]
228 async fn test_local_get_works() {
229 let local = Arc::new(SledBlockstore::new(BlockstoreConfig::default()).unwrap());
230 let bitswap = Arc::new(
231 Bitswap::new(local.clone() as Arc<dyn Blocks>, BitswapConfig::default())
232 .await
233 .unwrap(),
234 );
235
236 let blockstore = BlockstoreWithBitswap::new(local.clone(), bitswap);
237
238 let data = Bytes::from("test data");
240 let mut hasher = sha2::Sha256::new();
241 use sha2::Digest;
242 hasher.update(&data);
243 let hash = hasher.finalize();
244
245 let mut mh_bytes = vec![0x12, 0x20];
246 mh_bytes.extend_from_slice(&hash);
247 let mh = multihash::Multihash::from_bytes(&mh_bytes).unwrap();
248 let cid = Cid::new_v1(0x55, mh);
249
250 local.put(&cid, data.clone(), None).await.unwrap();
252
253 let retrieved = blockstore.get(&cid, None).await.unwrap();
255 assert_eq!(retrieved, data);
256 }
257}