helia_utils/
blockstore_with_bitswap.rs

1//! Blockstore with Bitswap integration
2//!
3//! This module provides a blockstore wrapper that integrates local storage
4//! with Bitswap for network-based block retrieval.
5
6use async_trait::async_trait;
7use bytes::Bytes;
8use cid::Cid;
9use helia_bitswap::{Bitswap, NotifyOptions, WantOptions};
10use helia_interface::{
11    blocks::{
12        Blocks, DeleteManyOptions, GetAllOptions, GetBlockOptions, GetManyOptions, HasOptions,
13        InputPair, Pair, PutBlockOptions, PutManyOptions,
14    },
15    AwaitIterable, HeliaError,
16};
17use std::sync::Arc;
18use std::time::Duration;
19use tracing::{debug, info, warn};
20
21use crate::SledBlockstore;
22
23/// Blockstore that integrates local storage with Bitswap for network retrieval
24pub struct BlockstoreWithBitswap {
25    /// Local blockstore (fast path)
26    local: Arc<SledBlockstore>,
27    /// Bitswap coordinator (network path)
28    bitswap: Arc<Bitswap>,
29}
30
31impl BlockstoreWithBitswap {
32    /// Create a new blockstore with Bitswap integration
33    pub fn new(local: Arc<SledBlockstore>, bitswap: Arc<Bitswap>) -> Self {
34        Self { local, bitswap }
35    }
36
37    /// Get the underlying local blockstore
38    pub fn local(&self) -> &Arc<SledBlockstore> {
39        &self.local
40    }
41
42    /// Get the Bitswap coordinator
43    pub fn bitswap(&self) -> &Arc<Bitswap> {
44        &self.bitswap
45    }
46}
47
48#[async_trait]
49impl Blocks for BlockstoreWithBitswap {
50    async fn get(&self, cid: &Cid, options: Option<GetBlockOptions>) -> Result<Bytes, HeliaError> {
51        debug!("BlockstoreWithBitswap: get() called for CID: {}", cid);
52
53        // Try local blockstore first (fast path)
54        debug!("  Step 1: Checking local blockstore...");
55        match self.local.get(cid, options.clone()).await {
56            Ok(data) => {
57                debug!("  ✅ Found in local blockstore ({} bytes)", data.len());
58                return Ok(data);
59            }
60            Err(_) => {
61                debug!("  ⚠️  Not in local blockstore");
62            }
63        }
64
65        // Not in local storage, fetch via Bitswap (slow path)
66        info!(
67            "  Step 2: Block not in local storage, fetching via Bitswap: {}",
68            cid
69        );
70
71        let want_options = WantOptions {
72            timeout: Some(Duration::from_secs(30)),
73            priority: 10,
74            accept_block_presence: true,
75            peer: None,
76        };
77
78        match self.bitswap.want(cid, want_options).await {
79            Ok(data) => {
80                info!("  ✅ Retrieved from network ({} bytes)", data.len());
81
82                // Store in local blockstore for future use
83                debug!("  Step 3: Storing in local blockstore for caching...");
84                if let Err(e) = self.local.put(cid, data.clone(), None).await {
85                    warn!("  ⚠️  Failed to cache block locally: {}", e);
86                    // Don't fail the operation if caching fails
87                }
88
89                Ok(data)
90            }
91            Err(e) => {
92                warn!("  ❌ Failed to retrieve from network: {}", e);
93                Err(e)
94            }
95        }
96    }
97
98    async fn put(
99        &self,
100        cid: &Cid,
101        data: Bytes,
102        options: Option<PutBlockOptions>,
103    ) -> Result<Cid, HeliaError> {
104        debug!("BlockstoreWithBitswap: put() called for CID: {}", cid);
105
106        // Store in local blockstore first
107        debug!("  Step 1: Storing in local blockstore...");
108        let returned_cid = self.local.put(cid, data.clone(), options).await?;
109        debug!("  ✅ Stored locally");
110
111        // Announce to network via Bitswap
112        debug!("  Step 2: Announcing to network via Bitswap...");
113        let notify_options = NotifyOptions { broadcast: true };
114
115        match self
116            .bitswap
117            .notify_new_blocks(vec![(*cid, data)], notify_options)
118            .await
119        {
120            Ok(_) => {
121                debug!("  ✅ Announced to network");
122                Ok(returned_cid)
123            }
124            Err(e) => {
125                warn!("  ⚠️  Failed to announce to network: {}", e);
126                // Don't fail the operation if announcement fails
127                // The block is already stored locally
128                Ok(returned_cid)
129            }
130        }
131    }
132
133    async fn has(&self, cid: &Cid, options: Option<HasOptions>) -> Result<bool, HeliaError> {
134        // Only check local blockstore
135        // We don't query the network for has() to avoid unnecessary traffic
136        self.local.has(cid, options).await
137    }
138
139    async fn get_many_cids(
140        &self,
141        cids: Vec<Cid>,
142        options: Option<GetManyOptions>,
143    ) -> Result<AwaitIterable<Result<Pair, HeliaError>>, HeliaError> {
144        // For each CID, try local first, then network
145        // This is similar to get() but for multiple CIDs
146        self.local.get_many_cids(cids, options).await
147    }
148
149    async fn get_all(
150        &self,
151        options: Option<GetAllOptions>,
152    ) -> Result<AwaitIterable<Pair>, HeliaError> {
153        // Only return blocks from local storage
154        self.local.get_all(options).await
155    }
156
157    async fn put_many_blocks(
158        &self,
159        blocks: Vec<InputPair>,
160        options: Option<PutManyOptions>,
161    ) -> Result<AwaitIterable<Cid>, HeliaError> {
162        // Store locally first
163        let cids = self.local.put_many_blocks(blocks.clone(), options).await?;
164
165        // Announce all blocks to network
166        let blocks_to_announce: Vec<(Cid, Bytes)> = blocks
167            .into_iter()
168            .filter_map(|input_pair| input_pair.cid.map(|cid| (cid, input_pair.block)))
169            .collect();
170
171        if !blocks_to_announce.is_empty() {
172            let notify_options = NotifyOptions { broadcast: true };
173            if let Err(e) = self
174                .bitswap
175                .notify_new_blocks(blocks_to_announce, notify_options)
176                .await
177            {
178                warn!("Failed to announce blocks to network: {}", e);
179                // Don't fail the operation if announcement fails
180            }
181        }
182
183        Ok(cids)
184    }
185
186    async fn has_many_cids(
187        &self,
188        cids: Vec<Cid>,
189        options: Option<HasOptions>,
190    ) -> Result<AwaitIterable<bool>, HeliaError> {
191        // Only check local blockstore
192        self.local.has_many_cids(cids, options).await
193    }
194
195    async fn delete_many_cids(
196        &self,
197        cids: Vec<Cid>,
198        options: Option<DeleteManyOptions>,
199    ) -> Result<AwaitIterable<Cid>, HeliaError> {
200        // Only delete from local blockstore
201        // We can't "un-announce" to the network
202        self.local.delete_many_cids(cids, options).await
203    }
204}
205
206#[cfg(test)]
207mod tests {
208    use super::*;
209    use crate::BlockstoreConfig;
210    use helia_bitswap::BitswapConfig;
211
212    #[tokio::test]
213    async fn test_blockstore_with_bitswap_creation() {
214        let local = Arc::new(SledBlockstore::new(BlockstoreConfig::default()).unwrap());
215        let bitswap = Arc::new(
216            Bitswap::new(local.clone() as Arc<dyn Blocks>, BitswapConfig::default())
217                .await
218                .unwrap(),
219        );
220
221        let blockstore = BlockstoreWithBitswap::new(local.clone(), bitswap.clone());
222
223        assert!(Arc::ptr_eq(blockstore.local(), &local));
224        assert!(Arc::ptr_eq(blockstore.bitswap(), &bitswap));
225    }
226
227    #[tokio::test]
228    async fn test_local_get_works() {
229        let local = Arc::new(SledBlockstore::new(BlockstoreConfig::default()).unwrap());
230        let bitswap = Arc::new(
231            Bitswap::new(local.clone() as Arc<dyn Blocks>, BitswapConfig::default())
232                .await
233                .unwrap(),
234        );
235
236        let blockstore = BlockstoreWithBitswap::new(local.clone(), bitswap);
237
238        // Create a test block
239        let data = Bytes::from("test data");
240        let mut hasher = sha2::Sha256::new();
241        use sha2::Digest;
242        hasher.update(&data);
243        let hash = hasher.finalize();
244
245        let mut mh_bytes = vec![0x12, 0x20];
246        mh_bytes.extend_from_slice(&hash);
247        let mh = multihash::Multihash::from_bytes(&mh_bytes).unwrap();
248        let cid = Cid::new_v1(0x55, mh);
249
250        // Store directly in local blockstore
251        local.put(&cid, data.clone(), None).await.unwrap();
252
253        // Get via wrapper should find it locally (no network call)
254        let retrieved = blockstore.get(&cid, None).await.unwrap();
255        assert_eq!(retrieved, data);
256    }
257}