1use cid::Cid;
7use ipfrs_core::error::Result;
8use libp2p::PeerId;
9use std::collections::{HashMap, HashSet};
10use std::sync::Arc;
11use tokio::sync::{mpsc, RwLock};
12use tracing::{debug, info};
13
14pub struct Bitswap {
16 want_list: Arc<RwLock<HashSet<Cid>>>,
18 have_list: Arc<RwLock<HashSet<Cid>>>,
20 pending_requests: Arc<RwLock<HashMap<PeerId, HashSet<Cid>>>>,
22 event_tx: mpsc::Sender<BitswapEvent>,
24 event_rx: Option<mpsc::Receiver<BitswapEvent>>,
25}
26
27#[derive(Debug, Clone)]
29pub enum BitswapEvent {
30 BlockReceived {
32 cid: Cid,
33 data: Vec<u8>,
34 from: PeerId,
35 },
36 BlockSent { cid: Cid, to: PeerId },
38 BlockRequested { cid: Cid, from: PeerId },
40 BlockNotFound { cid: Cid, peer: PeerId },
42}
43
44#[derive(Debug, Clone)]
46pub enum BitswapMessage {
47 Want(Cid),
49 Have(Cid),
51 Block { cid: Cid, data: Vec<u8> },
53 DontHave(Cid),
55}
56
57impl Bitswap {
58 pub fn new() -> Self {
60 let (event_tx, event_rx) = mpsc::channel(1024);
61
62 Self {
63 want_list: Arc::new(RwLock::new(HashSet::new())),
64 have_list: Arc::new(RwLock::new(HashSet::new())),
65 pending_requests: Arc::new(RwLock::new(HashMap::new())),
66 event_tx,
67 event_rx: Some(event_rx),
68 }
69 }
70
71 pub async fn want(&self, cid: Cid) -> Result<()> {
73 let mut want_list = self.want_list.write().await;
74 want_list.insert(cid);
75 debug!("Added to want list: {}", cid);
76 Ok(())
77 }
78
79 pub async fn cancel_want(&self, cid: &Cid) -> Result<()> {
81 let mut want_list = self.want_list.write().await;
82 want_list.remove(cid);
83 debug!("Removed from want list: {}", cid);
84 Ok(())
85 }
86
87 pub async fn have(&self, cid: Cid) -> Result<()> {
89 let mut have_list = self.have_list.write().await;
90 have_list.insert(cid);
91 debug!("Added to have list: {}", cid);
92 Ok(())
93 }
94
95 pub async fn wants(&self, cid: &Cid) -> bool {
97 let want_list = self.want_list.read().await;
98 want_list.contains(cid)
99 }
100
101 pub async fn has(&self, cid: &Cid) -> bool {
103 let have_list = self.have_list.read().await;
104 have_list.contains(cid)
105 }
106
107 pub async fn get_want_list(&self) -> HashSet<Cid> {
109 let want_list = self.want_list.read().await;
110 want_list.clone()
111 }
112
113 pub async fn get_have_list(&self) -> HashSet<Cid> {
115 let have_list = self.have_list.read().await;
116 have_list.clone()
117 }
118
119 pub async fn request_block(&self, cid: Cid, peer: PeerId) -> Result<()> {
121 let mut pending = self.pending_requests.write().await;
122 pending.entry(peer).or_insert_with(HashSet::new).insert(cid);
123 debug!("Requesting block {} from peer {}", cid, peer);
124 Ok(())
125 }
126
127 pub async fn handle_message(
129 &self,
130 message: BitswapMessage,
131 from: PeerId,
132 ) -> Result<Option<BitswapMessage>> {
133 match message {
134 BitswapMessage::Want(cid) => {
135 debug!("Peer {} wants block {}", from, cid);
137 let _ = self
138 .event_tx
139 .send(BitswapEvent::BlockRequested { cid, from })
140 .await;
141
142 if self.has(&cid).await {
144 Ok(Some(BitswapMessage::Have(cid)))
147 } else {
148 Ok(Some(BitswapMessage::DontHave(cid)))
149 }
150 }
151 BitswapMessage::Have(cid) => {
152 debug!("Peer {} has block {}", from, cid);
154
155 if self.wants(&cid).await {
157 self.request_block(cid, from).await?;
158 }
159 Ok(None)
160 }
161 BitswapMessage::Block { cid, data } => {
162 info!(
164 "Received block {} ({} bytes) from peer {}",
165 cid,
166 data.len(),
167 from
168 );
169
170 self.cancel_want(&cid).await?;
172
173 let mut pending = self.pending_requests.write().await;
175 if let Some(peer_requests) = pending.get_mut(&from) {
176 peer_requests.remove(&cid);
177 }
178
179 let _ = self
181 .event_tx
182 .send(BitswapEvent::BlockReceived { cid, data, from })
183 .await;
184
185 Ok(None)
186 }
187 BitswapMessage::DontHave(cid) => {
188 debug!("Peer {} doesn't have block {}", from, cid);
190
191 let _ = self
192 .event_tx
193 .send(BitswapEvent::BlockNotFound { cid, peer: from })
194 .await;
195
196 Ok(None)
197 }
198 }
199 }
200
201 pub async fn send_block(&self, cid: Cid, data: Vec<u8>, to: PeerId) -> Result<BitswapMessage> {
203 debug!(
204 "Sending block {} ({} bytes) to peer {}",
205 cid,
206 data.len(),
207 to
208 );
209
210 let _ = self
211 .event_tx
212 .send(BitswapEvent::BlockSent { cid, to })
213 .await;
214
215 Ok(BitswapMessage::Block { cid, data })
216 }
217
218 pub async fn get_pending_requests(&self, peer: &PeerId) -> HashSet<Cid> {
220 let pending = self.pending_requests.read().await;
221 pending.get(peer).cloned().unwrap_or_default()
222 }
223
224 pub async fn stats(&self) -> BitswapStats {
226 let want_list = self.want_list.read().await;
227 let have_list = self.have_list.read().await;
228 let pending = self.pending_requests.read().await;
229
230 BitswapStats {
231 want_list_size: want_list.len(),
232 have_list_size: have_list.len(),
233 pending_requests: pending.values().map(|s| s.len()).sum(),
234 peers_with_pending_requests: pending.len(),
235 }
236 }
237
238 pub fn take_event_receiver(&mut self) -> Option<mpsc::Receiver<BitswapEvent>> {
240 self.event_rx.take()
241 }
242}
243
244impl Default for Bitswap {
245 fn default() -> Self {
246 Self::new()
247 }
248}
249
250#[derive(Debug, Clone, Default, serde::Serialize)]
252pub struct BitswapStats {
253 pub want_list_size: usize,
255 pub have_list_size: usize,
257 pub pending_requests: usize,
259 pub peers_with_pending_requests: usize,
261}
262
263#[cfg(test)]
264mod tests {
265 use super::*;
266 use multihash_codetable::{Code, MultihashDigest};
267
268 fn test_cid() -> Cid {
269 let hash = Code::Sha2_256.digest(b"test data");
270 Cid::new_v1(0x55, hash)
271 }
272
273 fn test_peer_id() -> PeerId {
274 PeerId::random()
275 }
276
277 #[tokio::test]
278 async fn test_bitswap_creation() {
279 let bitswap = Bitswap::new();
280 let stats = bitswap.stats().await;
281
282 assert_eq!(stats.want_list_size, 0);
283 assert_eq!(stats.have_list_size, 0);
284 assert_eq!(stats.pending_requests, 0);
285 }
286
287 #[tokio::test]
288 async fn test_want_list() {
289 let bitswap = Bitswap::new();
290 let cid = test_cid();
291
292 bitswap.want(cid).await.unwrap();
294 assert!(bitswap.wants(&cid).await);
295
296 let want_list = bitswap.get_want_list().await;
297 assert_eq!(want_list.len(), 1);
298 assert!(want_list.contains(&cid));
299
300 bitswap.cancel_want(&cid).await.unwrap();
302 assert!(!bitswap.wants(&cid).await);
303 }
304
305 #[tokio::test]
306 async fn test_have_list() {
307 let bitswap = Bitswap::new();
308 let cid = test_cid();
309
310 bitswap.have(cid).await.unwrap();
312 assert!(bitswap.has(&cid).await);
313
314 let have_list = bitswap.get_have_list().await;
315 assert_eq!(have_list.len(), 1);
316 assert!(have_list.contains(&cid));
317 }
318
319 #[tokio::test]
320 async fn test_request_block() {
321 let bitswap = Bitswap::new();
322 let cid = test_cid();
323 let peer = test_peer_id();
324
325 bitswap.request_block(cid, peer).await.unwrap();
326
327 let pending = bitswap.get_pending_requests(&peer).await;
328 assert_eq!(pending.len(), 1);
329 assert!(pending.contains(&cid));
330 }
331
332 #[tokio::test]
333 async fn test_handle_want_message_have_block() {
334 let bitswap = Bitswap::new();
335 let cid = test_cid();
336 let peer = test_peer_id();
337
338 bitswap.have(cid).await.unwrap();
340
341 let response = bitswap
343 .handle_message(BitswapMessage::Want(cid), peer)
344 .await
345 .unwrap();
346
347 assert!(response.is_some());
348 match response.unwrap() {
349 BitswapMessage::Have(received_cid) => assert_eq!(received_cid, cid),
350 _ => panic!("Expected Have message"),
351 }
352 }
353
354 #[tokio::test]
355 async fn test_handle_want_message_dont_have() {
356 let bitswap = Bitswap::new();
357 let cid = test_cid();
358 let peer = test_peer_id();
359
360 let response = bitswap
362 .handle_message(BitswapMessage::Want(cid), peer)
363 .await
364 .unwrap();
365
366 assert!(response.is_some());
367 match response.unwrap() {
368 BitswapMessage::DontHave(received_cid) => assert_eq!(received_cid, cid),
369 _ => panic!("Expected DontHave message"),
370 }
371 }
372
373 #[tokio::test]
374 async fn test_handle_have_message() {
375 let bitswap = Bitswap::new();
376 let cid = test_cid();
377 let peer = test_peer_id();
378
379 bitswap.want(cid).await.unwrap();
381
382 let response = bitswap
384 .handle_message(BitswapMessage::Have(cid), peer)
385 .await
386 .unwrap();
387
388 assert!(response.is_none());
389
390 let pending = bitswap.get_pending_requests(&peer).await;
392 assert_eq!(pending.len(), 1);
393 assert!(pending.contains(&cid));
394 }
395
396 #[tokio::test]
397 async fn test_handle_block_message() {
398 let bitswap = Bitswap::new();
399 let cid = test_cid();
400 let peer = test_peer_id();
401 let data = b"test block data".to_vec();
402
403 bitswap.want(cid).await.unwrap();
405 bitswap.request_block(cid, peer).await.unwrap();
406
407 let response = bitswap
409 .handle_message(
410 BitswapMessage::Block {
411 cid,
412 data: data.clone(),
413 },
414 peer,
415 )
416 .await
417 .unwrap();
418
419 assert!(response.is_none());
420
421 assert!(!bitswap.wants(&cid).await);
423
424 let pending = bitswap.get_pending_requests(&peer).await;
426 assert_eq!(pending.len(), 0);
427 }
428
429 #[tokio::test]
430 async fn test_send_block() {
431 let bitswap = Bitswap::new();
432 let cid = test_cid();
433 let peer = test_peer_id();
434 let data = b"test block data".to_vec();
435
436 let message = bitswap.send_block(cid, data.clone(), peer).await.unwrap();
437
438 match message {
439 BitswapMessage::Block {
440 cid: received_cid,
441 data: received_data,
442 } => {
443 assert_eq!(received_cid, cid);
444 assert_eq!(received_data, data);
445 }
446 _ => panic!("Expected Block message"),
447 }
448 }
449
450 #[tokio::test]
451 async fn test_bitswap_stats() {
452 let bitswap = Bitswap::new();
453 let cid1 = test_cid();
454 let cid2 = test_cid();
455 let peer = test_peer_id();
456
457 bitswap.want(cid1).await.unwrap();
458 bitswap.have(cid2).await.unwrap();
459 bitswap.request_block(cid1, peer).await.unwrap();
460
461 let stats = bitswap.stats().await;
462
463 assert_eq!(stats.want_list_size, 1);
464 assert_eq!(stats.have_list_size, 1);
465 assert_eq!(stats.pending_requests, 1);
466 assert_eq!(stats.peers_with_pending_requests, 1);
467 }
468}