ipfrs_network/
bitswap.rs

1//! Bitswap protocol implementation for block exchange
2//!
3//! Bitswap is a data exchange protocol for sharing blocks of content-addressed data.
4//! It allows peers to request blocks they need and provide blocks they have.
5
6use cid::Cid;
7use ipfrs_core::error::Result;
8use libp2p::PeerId;
9use std::collections::{HashMap, HashSet};
10use std::sync::Arc;
11use tokio::sync::{mpsc, RwLock};
12use tracing::{debug, info};
13
14/// Bitswap protocol handler
15pub struct Bitswap {
16    /// Blocks we want (want list)
17    want_list: Arc<RwLock<HashSet<Cid>>>,
18    /// Blocks we have (have list)
19    have_list: Arc<RwLock<HashSet<Cid>>>,
20    /// Pending block requests per peer
21    pending_requests: Arc<RwLock<HashMap<PeerId, HashSet<Cid>>>>,
22    /// Block exchange events
23    event_tx: mpsc::Sender<BitswapEvent>,
24    event_rx: Option<mpsc::Receiver<BitswapEvent>>,
25}
26
27/// Bitswap events
28#[derive(Debug, Clone)]
29pub enum BitswapEvent {
30    /// Block received from peer
31    BlockReceived {
32        cid: Cid,
33        data: Vec<u8>,
34        from: PeerId,
35    },
36    /// Block sent to peer
37    BlockSent { cid: Cid, to: PeerId },
38    /// Block request received
39    BlockRequested { cid: Cid, from: PeerId },
40    /// Peer doesn't have requested block
41    BlockNotFound { cid: Cid, peer: PeerId },
42}
43
44/// Bitswap message types
45#[derive(Debug, Clone)]
46pub enum BitswapMessage {
47    /// Want a block
48    Want(Cid),
49    /// Have a block
50    Have(Cid),
51    /// Provide block data
52    Block { cid: Cid, data: Vec<u8> },
53    /// Don't have requested block
54    DontHave(Cid),
55}
56
57impl Bitswap {
58    /// Create a new Bitswap instance
59    pub fn new() -> Self {
60        let (event_tx, event_rx) = mpsc::channel(1024);
61
62        Self {
63            want_list: Arc::new(RwLock::new(HashSet::new())),
64            have_list: Arc::new(RwLock::new(HashSet::new())),
65            pending_requests: Arc::new(RwLock::new(HashMap::new())),
66            event_tx,
67            event_rx: Some(event_rx),
68        }
69    }
70
71    /// Add a CID to our want list
72    pub async fn want(&self, cid: Cid) -> Result<()> {
73        let mut want_list = self.want_list.write().await;
74        want_list.insert(cid);
75        debug!("Added to want list: {}", cid);
76        Ok(())
77    }
78
79    /// Remove a CID from our want list
80    pub async fn cancel_want(&self, cid: &Cid) -> Result<()> {
81        let mut want_list = self.want_list.write().await;
82        want_list.remove(cid);
83        debug!("Removed from want list: {}", cid);
84        Ok(())
85    }
86
87    /// Add a CID to our have list
88    pub async fn have(&self, cid: Cid) -> Result<()> {
89        let mut have_list = self.have_list.write().await;
90        have_list.insert(cid);
91        debug!("Added to have list: {}", cid);
92        Ok(())
93    }
94
95    /// Check if we want a CID
96    pub async fn wants(&self, cid: &Cid) -> bool {
97        let want_list = self.want_list.read().await;
98        want_list.contains(cid)
99    }
100
101    /// Check if we have a CID
102    pub async fn has(&self, cid: &Cid) -> bool {
103        let have_list = self.have_list.read().await;
104        have_list.contains(cid)
105    }
106
107    /// Get all CIDs we want
108    pub async fn get_want_list(&self) -> HashSet<Cid> {
109        let want_list = self.want_list.read().await;
110        want_list.clone()
111    }
112
113    /// Get all CIDs we have
114    pub async fn get_have_list(&self) -> HashSet<Cid> {
115        let have_list = self.have_list.read().await;
116        have_list.clone()
117    }
118
119    /// Request a block from a specific peer
120    pub async fn request_block(&self, cid: Cid, peer: PeerId) -> Result<()> {
121        let mut pending = self.pending_requests.write().await;
122        pending.entry(peer).or_insert_with(HashSet::new).insert(cid);
123        debug!("Requesting block {} from peer {}", cid, peer);
124        Ok(())
125    }
126
127    /// Handle incoming Bitswap message
128    pub async fn handle_message(
129        &self,
130        message: BitswapMessage,
131        from: PeerId,
132    ) -> Result<Option<BitswapMessage>> {
133        match message {
134            BitswapMessage::Want(cid) => {
135                // Peer wants a block
136                debug!("Peer {} wants block {}", from, cid);
137                let _ = self
138                    .event_tx
139                    .send(BitswapEvent::BlockRequested { cid, from })
140                    .await;
141
142                // Check if we have it
143                if self.has(&cid).await {
144                    // We'll send the block data through the event system
145                    // The actual block retrieval happens outside Bitswap
146                    Ok(Some(BitswapMessage::Have(cid)))
147                } else {
148                    Ok(Some(BitswapMessage::DontHave(cid)))
149                }
150            }
151            BitswapMessage::Have(cid) => {
152                // Peer has a block we might want
153                debug!("Peer {} has block {}", from, cid);
154
155                // If we want it, request it
156                if self.wants(&cid).await {
157                    self.request_block(cid, from).await?;
158                }
159                Ok(None)
160            }
161            BitswapMessage::Block { cid, data } => {
162                // Received block data
163                info!(
164                    "Received block {} ({} bytes) from peer {}",
165                    cid,
166                    data.len(),
167                    from
168                );
169
170                // Remove from want list
171                self.cancel_want(&cid).await?;
172
173                // Remove from pending requests
174                let mut pending = self.pending_requests.write().await;
175                if let Some(peer_requests) = pending.get_mut(&from) {
176                    peer_requests.remove(&cid);
177                }
178
179                // Emit event
180                let _ = self
181                    .event_tx
182                    .send(BitswapEvent::BlockReceived { cid, data, from })
183                    .await;
184
185                Ok(None)
186            }
187            BitswapMessage::DontHave(cid) => {
188                // Peer doesn't have the block
189                debug!("Peer {} doesn't have block {}", from, cid);
190
191                let _ = self
192                    .event_tx
193                    .send(BitswapEvent::BlockNotFound { cid, peer: from })
194                    .await;
195
196                Ok(None)
197            }
198        }
199    }
200
201    /// Send a block to a peer
202    pub async fn send_block(&self, cid: Cid, data: Vec<u8>, to: PeerId) -> Result<BitswapMessage> {
203        debug!(
204            "Sending block {} ({} bytes) to peer {}",
205            cid,
206            data.len(),
207            to
208        );
209
210        let _ = self
211            .event_tx
212            .send(BitswapEvent::BlockSent { cid, to })
213            .await;
214
215        Ok(BitswapMessage::Block { cid, data })
216    }
217
218    /// Get pending requests for a peer
219    pub async fn get_pending_requests(&self, peer: &PeerId) -> HashSet<Cid> {
220        let pending = self.pending_requests.read().await;
221        pending.get(peer).cloned().unwrap_or_default()
222    }
223
224    /// Get Bitswap statistics
225    pub async fn stats(&self) -> BitswapStats {
226        let want_list = self.want_list.read().await;
227        let have_list = self.have_list.read().await;
228        let pending = self.pending_requests.read().await;
229
230        BitswapStats {
231            want_list_size: want_list.len(),
232            have_list_size: have_list.len(),
233            pending_requests: pending.values().map(|s| s.len()).sum(),
234            peers_with_pending_requests: pending.len(),
235        }
236    }
237
238    /// Take the event receiver
239    pub fn take_event_receiver(&mut self) -> Option<mpsc::Receiver<BitswapEvent>> {
240        self.event_rx.take()
241    }
242}
243
244impl Default for Bitswap {
245    fn default() -> Self {
246        Self::new()
247    }
248}
249
250/// Bitswap statistics
251#[derive(Debug, Clone, Default, serde::Serialize)]
252pub struct BitswapStats {
253    /// Number of blocks we want
254    pub want_list_size: usize,
255    /// Number of blocks we have
256    pub have_list_size: usize,
257    /// Total pending block requests
258    pub pending_requests: usize,
259    /// Number of peers with pending requests
260    pub peers_with_pending_requests: usize,
261}
262
263#[cfg(test)]
264mod tests {
265    use super::*;
266    use multihash_codetable::{Code, MultihashDigest};
267
268    fn test_cid() -> Cid {
269        let hash = Code::Sha2_256.digest(b"test data");
270        Cid::new_v1(0x55, hash)
271    }
272
273    fn test_peer_id() -> PeerId {
274        PeerId::random()
275    }
276
277    #[tokio::test]
278    async fn test_bitswap_creation() {
279        let bitswap = Bitswap::new();
280        let stats = bitswap.stats().await;
281
282        assert_eq!(stats.want_list_size, 0);
283        assert_eq!(stats.have_list_size, 0);
284        assert_eq!(stats.pending_requests, 0);
285    }
286
287    #[tokio::test]
288    async fn test_want_list() {
289        let bitswap = Bitswap::new();
290        let cid = test_cid();
291
292        // Add to want list
293        bitswap.want(cid).await.unwrap();
294        assert!(bitswap.wants(&cid).await);
295
296        let want_list = bitswap.get_want_list().await;
297        assert_eq!(want_list.len(), 1);
298        assert!(want_list.contains(&cid));
299
300        // Cancel want
301        bitswap.cancel_want(&cid).await.unwrap();
302        assert!(!bitswap.wants(&cid).await);
303    }
304
305    #[tokio::test]
306    async fn test_have_list() {
307        let bitswap = Bitswap::new();
308        let cid = test_cid();
309
310        // Add to have list
311        bitswap.have(cid).await.unwrap();
312        assert!(bitswap.has(&cid).await);
313
314        let have_list = bitswap.get_have_list().await;
315        assert_eq!(have_list.len(), 1);
316        assert!(have_list.contains(&cid));
317    }
318
319    #[tokio::test]
320    async fn test_request_block() {
321        let bitswap = Bitswap::new();
322        let cid = test_cid();
323        let peer = test_peer_id();
324
325        bitswap.request_block(cid, peer).await.unwrap();
326
327        let pending = bitswap.get_pending_requests(&peer).await;
328        assert_eq!(pending.len(), 1);
329        assert!(pending.contains(&cid));
330    }
331
332    #[tokio::test]
333    async fn test_handle_want_message_have_block() {
334        let bitswap = Bitswap::new();
335        let cid = test_cid();
336        let peer = test_peer_id();
337
338        // Add to have list
339        bitswap.have(cid).await.unwrap();
340
341        // Handle want message
342        let response = bitswap
343            .handle_message(BitswapMessage::Want(cid), peer)
344            .await
345            .unwrap();
346
347        assert!(response.is_some());
348        match response.unwrap() {
349            BitswapMessage::Have(received_cid) => assert_eq!(received_cid, cid),
350            _ => panic!("Expected Have message"),
351        }
352    }
353
354    #[tokio::test]
355    async fn test_handle_want_message_dont_have() {
356        let bitswap = Bitswap::new();
357        let cid = test_cid();
358        let peer = test_peer_id();
359
360        // Don't add to have list
361        let response = bitswap
362            .handle_message(BitswapMessage::Want(cid), peer)
363            .await
364            .unwrap();
365
366        assert!(response.is_some());
367        match response.unwrap() {
368            BitswapMessage::DontHave(received_cid) => assert_eq!(received_cid, cid),
369            _ => panic!("Expected DontHave message"),
370        }
371    }
372
373    #[tokio::test]
374    async fn test_handle_have_message() {
375        let bitswap = Bitswap::new();
376        let cid = test_cid();
377        let peer = test_peer_id();
378
379        // Add to want list
380        bitswap.want(cid).await.unwrap();
381
382        // Handle have message
383        let response = bitswap
384            .handle_message(BitswapMessage::Have(cid), peer)
385            .await
386            .unwrap();
387
388        assert!(response.is_none());
389
390        // Should have created a pending request
391        let pending = bitswap.get_pending_requests(&peer).await;
392        assert_eq!(pending.len(), 1);
393        assert!(pending.contains(&cid));
394    }
395
396    #[tokio::test]
397    async fn test_handle_block_message() {
398        let bitswap = Bitswap::new();
399        let cid = test_cid();
400        let peer = test_peer_id();
401        let data = b"test block data".to_vec();
402
403        // Add to want list and pending requests
404        bitswap.want(cid).await.unwrap();
405        bitswap.request_block(cid, peer).await.unwrap();
406
407        // Handle block message
408        let response = bitswap
409            .handle_message(
410                BitswapMessage::Block {
411                    cid,
412                    data: data.clone(),
413                },
414                peer,
415            )
416            .await
417            .unwrap();
418
419        assert!(response.is_none());
420
421        // Should have removed from want list
422        assert!(!bitswap.wants(&cid).await);
423
424        // Should have removed from pending requests
425        let pending = bitswap.get_pending_requests(&peer).await;
426        assert_eq!(pending.len(), 0);
427    }
428
429    #[tokio::test]
430    async fn test_send_block() {
431        let bitswap = Bitswap::new();
432        let cid = test_cid();
433        let peer = test_peer_id();
434        let data = b"test block data".to_vec();
435
436        let message = bitswap.send_block(cid, data.clone(), peer).await.unwrap();
437
438        match message {
439            BitswapMessage::Block {
440                cid: received_cid,
441                data: received_data,
442            } => {
443                assert_eq!(received_cid, cid);
444                assert_eq!(received_data, data);
445            }
446            _ => panic!("Expected Block message"),
447        }
448    }
449
450    #[tokio::test]
451    async fn test_bitswap_stats() {
452        let bitswap = Bitswap::new();
453        let cid1 = test_cid();
454        let cid2 = test_cid();
455        let peer = test_peer_id();
456
457        bitswap.want(cid1).await.unwrap();
458        bitswap.have(cid2).await.unwrap();
459        bitswap.request_block(cid1, peer).await.unwrap();
460
461        let stats = bitswap.stats().await;
462
463        assert_eq!(stats.want_list_size, 1);
464        assert_eq!(stats.have_list_size, 1);
465        assert_eq!(stats.pending_requests, 1);
466        assert_eq!(stats.peers_with_pending_requests, 1);
467    }
468}