1use crate::{
5 constants::*,
6 network_new::{Network, NetworkInit},
7 pb,
8 wantlist_new::WantList,
9 Result,
10};
11use bytes::Bytes;
12use cid::Cid;
13use helia_interface::{Blocks, HeliaError};
14use libp2p::PeerId;
15use std::{collections::HashMap, sync::Arc, time::Duration};
16use tokio::sync::RwLock;
17use tracing::{debug, info, trace, warn};
18
19#[derive(Debug, Clone, Default)]
21pub struct BitswapStats {
22 pub blocks_sent: u64,
24 pub blocks_received: u64,
26 pub data_sent: u64,
28 pub data_received: u64,
30 pub dup_blocks_received: u64,
32 pub dup_data_received: u64,
34 pub messages_received: u64,
36 pub blocks_sent_by_peer: HashMap<PeerId, u64>,
38 pub blocks_received_by_peer: HashMap<PeerId, u64>,
40}
41
42#[derive(Debug, Clone)]
44pub struct WantOptions {
45 pub timeout: Option<Duration>,
47 pub priority: i32,
49 pub accept_block_presence: bool,
51 pub peer: Option<PeerId>,
53}
54
55impl Default for WantOptions {
56 fn default() -> Self {
57 Self {
58 timeout: Some(Duration::from_millis(DEFAULT_WANT_TIMEOUT)),
59 priority: DEFAULT_PRIORITY,
60 accept_block_presence: true,
61 peer: None,
62 }
63 }
64}
65
66#[derive(Debug, Clone, Default)]
68pub struct NotifyOptions {
69 pub broadcast: bool,
71}
72
73#[derive(Debug, Clone)]
75pub struct BitswapConfig {
76 pub network: NetworkInit,
78}
79
80impl Default for BitswapConfig {
81 fn default() -> Self {
82 Self {
83 network: NetworkInit::default(),
84 }
85 }
86}
87
88#[derive(Debug, Clone)]
124pub struct OutboundMessage {
125 pub peer: PeerId,
126 pub message: pb::BitswapMessage,
127}
128
129pub struct Bitswap {
130 network: Arc<RwLock<Network>>,
132 wantlist: Arc<WantList>,
134 pub(crate) blockstore: Arc<dyn Blocks>,
136 stats: Arc<RwLock<BitswapStats>>,
138 running: Arc<RwLock<bool>>,
140 config: BitswapConfig,
142 outbound_tx: Option<tokio::sync::mpsc::UnboundedSender<OutboundMessage>>,
144 outbound_sender_slot: Arc<RwLock<Option<tokio::sync::mpsc::UnboundedSender<OutboundMessage>>>>,
146 connected_peers: Arc<RwLock<Vec<PeerId>>>,
148 block_notify_tx: tokio::sync::broadcast::Sender<Cid>,
150}
151
152impl Bitswap {
153 pub async fn new(blockstore: Arc<dyn Blocks>, config: BitswapConfig) -> Result<Self> {
155 info!("Creating Bitswap coordinator");
156
157 let outbound_sender_slot = Arc::new(RwLock::new(None));
159
160 let network = Arc::new(RwLock::new(Network::new(
162 config.network.clone(),
163 outbound_sender_slot.clone(),
164 )));
165
166 let network_for_wantlist = Arc::new(Network::new(
168 config.network.clone(),
169 outbound_sender_slot.clone(),
170 ));
171 let wantlist = Arc::new(WantList::new(network_for_wantlist));
172
173 let (block_notify_tx, _) = tokio::sync::broadcast::channel(1000);
175
176 Ok(Self {
177 network,
178 wantlist,
179 blockstore,
180 stats: Arc::new(RwLock::new(BitswapStats::default())),
181 running: Arc::new(RwLock::new(false)),
182 config,
183 outbound_tx: None,
184 outbound_sender_slot,
185 connected_peers: Arc::new(RwLock::new(Vec::new())),
186 block_notify_tx,
187 })
188 }
189
190 pub async fn set_outbound_sender(
192 &mut self,
193 tx: tokio::sync::mpsc::UnboundedSender<OutboundMessage>,
194 ) {
195 self.outbound_tx = Some(tx.clone());
196
197 {
198 let mut slot = self.outbound_sender_slot.write().await;
199 *slot = Some(tx.clone());
200 }
201
202 info!("Bitswap coordinator connected to swarm message channel");
205 }
206
207 pub async fn add_peer(&self, peer: PeerId) {
209 let mut peers = self.connected_peers.write().await;
210 if !peers.contains(&peer) {
211 peers.push(peer);
212 info!("Bitswap: Added peer {}", peer);
213 }
214 }
215
216 pub async fn remove_peer(&self, peer: &PeerId) {
218 let mut peers = self.connected_peers.write().await;
219 peers.retain(|p| p != peer);
220 info!("Bitswap: Removed peer {}", peer);
221 }
222
223 pub async fn get_connected_peers(&self) -> Vec<PeerId> {
225 self.connected_peers.read().await.clone()
226 }
227
228 fn send_via_swarm(&self, peer: PeerId, message: pb::BitswapMessage) -> Result<()> {
230 if let Some(tx) = &self.outbound_tx {
231 tx.send(OutboundMessage { peer, message }).map_err(|e| {
232 HeliaError::network(format!("Failed to queue outbound message: {}", e))
233 })?;
234 Ok(())
235 } else {
236 Err(HeliaError::network(
237 "Outbound message channel not connected to swarm",
238 ))
239 }
240 }
241
242 pub fn broadcast_want_via_swarm(
244 &self,
245 cid: &Cid,
246 priority: i32,
247 peers: Vec<PeerId>,
248 ) -> Result<()> {
249 if peers.is_empty() {
250 debug!("No peers to send WANT to");
251 return Ok(());
252 }
253
254 let wantlist_entry = pb::WantlistEntry {
256 cid: cid.to_bytes(),
257 priority,
258 cancel: false,
259 want_type: pb::WantType::WantBlock as i32,
260 send_dont_have: true,
261 };
262
263 let message = pb::BitswapMessage {
264 wantlist: Some(pb::Wantlist {
265 entries: vec![wantlist_entry],
266 full: false,
267 }),
268 raw_blocks: Vec::new(),
269 blocks: Vec::new(),
270 block_presences: Vec::new(),
271 pending_bytes: 0,
272 };
273
274 for peer in peers {
276 debug!("Sending WANT for {} to peer {} via swarm", cid, peer);
277 if let Err(e) = self.send_via_swarm(peer, message.clone()) {
278 warn!("Failed to send WANT to peer {}: {}", peer, e);
279 }
280 }
281
282 Ok(())
283 }
284
285 pub async fn start(&self) -> Result<()> {
287 let mut running = self.running.write().await;
288 if *running {
289 return Ok(());
290 }
291
292 info!("Starting Bitswap coordinator");
293
294 self.network.write().await.start().await?;
296
297 self.wantlist.start();
299
300 *running = true;
301 info!("Bitswap coordinator started");
302 Ok(())
303 }
304
305 pub async fn stop(&self) -> Result<()> {
307 let mut running = self.running.write().await;
308 if !*running {
309 return Ok(());
310 }
311
312 info!("Stopping Bitswap coordinator");
313
314 self.wantlist.stop().await;
316
317 self.network.write().await.stop().await?;
319
320 *running = false;
321 info!("Bitswap coordinator stopped");
322 Ok(())
323 }
324
325 pub async fn want(&self, cid: &Cid, options: WantOptions) -> Result<Bytes> {
342 debug!("Wanting block: {}", cid);
343
344 if let Ok(block) = self.blockstore.get(cid, None).await {
346 debug!("Block {} found in local blockstore", cid);
347 return Ok(block);
348 }
349
350 let peers = self.get_connected_peers().await;
352 if peers.is_empty() {
353 debug!(
354 "No connected peers currently available for {} - will wait for providers",
355 cid
356 );
357 } else {
358 info!(
359 "Sending WANT for {} to {} peers via swarm",
360 cid,
361 peers.len()
362 );
363 self.broadcast_want_via_swarm(cid, options.priority, peers)?;
364 }
365
366 let mut block_rx = self.block_notify_tx.subscribe();
368 let target_cid = cid.clone();
369
370 let timeout = options.timeout.unwrap_or(Duration::from_secs(30));
372
373 tokio::select! {
375 _ = tokio::time::sleep(timeout) => {
376 debug!("Timeout waiting for block {}", target_cid);
377 Err(HeliaError::Timeout)
378 }
379 result = async {
380 loop {
381 match block_rx.recv().await {
383 Ok(received_cid) => {
384 if received_cid == target_cid {
385 match self.blockstore.get(&target_cid, None).await {
387 Ok(block) => {
388 debug!("Block {} received from network", target_cid);
389
390 let mut stats = self.stats.write().await;
392 stats.blocks_received += 1;
393 stats.data_received += block.len() as u64;
394
395 return Ok(block);
396 }
397 Err(e) => {
398 warn!("Block {} notified but not in blockstore: {}", target_cid, e);
400 }
401 }
402 }
403 }
405 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
406 if let Ok(block) = self.blockstore.get(&target_cid, None).await {
408 debug!("Block {} found in blockstore after channel lag", target_cid);
409
410 let mut stats = self.stats.write().await;
411 stats.blocks_received += 1;
412 stats.data_received += block.len() as u64;
413
414 return Ok(block);
415 }
416 }
418 Err(tokio::sync::broadcast::error::RecvError::Closed) => {
419 return Err(HeliaError::network("Block notification channel closed"));
420 }
421 }
422 }
423 } => result
424 }
425 }
426
427 pub async fn notify_new_blocks(
438 &self,
439 blocks: Vec<(Cid, Bytes)>,
440 _options: NotifyOptions,
441 ) -> Result<()> {
442 if blocks.is_empty() {
443 return Ok(());
444 }
445
446 debug!("Notifying {} new blocks", blocks.len());
447
448 for (cid, block) in &blocks {
450 self.blockstore.put(cid, block.clone(), None).await?;
451 }
452
453 for (cid, _block) in &blocks {
455 self.wantlist.received_block(cid).await?;
456
457 let _ = self.block_notify_tx.send(cid.clone());
460 trace!("Broadcasted block notification for {}", cid);
461 }
462
463 Ok(())
464 }
465
466 pub fn notify_block_received(&self, cid: &Cid) {
471 let _ = self.block_notify_tx.send(cid.clone());
473 trace!("Broadcasted block notification for {}", cid);
474 }
475
476 pub async fn stats(&self) -> BitswapStats {
478 self.stats.read().await.clone()
479 }
480
481 pub fn wantlist(&self) -> Arc<WantList> {
483 self.wantlist.clone()
484 }
485
486 pub async fn is_running(&self) -> bool {
488 *self.running.read().await
489 }
490}
491
492#[cfg(test)]
493mod tests {
494 use super::*;
495 use helia_utils::blockstore::SledBlockstore;
496 use helia_utils::BlockstoreConfig;
497
498 #[tokio::test]
499 async fn test_bitswap_creation() {
500 let blockstore = Arc::new(SledBlockstore::new(BlockstoreConfig::default()).unwrap());
501 let config = BitswapConfig::default();
502 let bitswap = Bitswap::new(blockstore, config).await;
503 assert!(bitswap.is_ok());
504 }
505
506 #[tokio::test]
507 async fn test_bitswap_start_stop() {
508 let blockstore = Arc::new(SledBlockstore::new(BlockstoreConfig::default()).unwrap());
509 let config = BitswapConfig::default();
510 let bitswap = Bitswap::new(blockstore, config).await.unwrap();
511
512 assert!(!bitswap.is_running().await);
513
514 bitswap.start().await.unwrap();
515 assert!(bitswap.is_running().await);
516
517 bitswap.stop().await.unwrap();
518 assert!(!bitswap.is_running().await);
519 }
520
521 #[tokio::test]
522 async fn test_bitswap_stats() {
523 let blockstore = Arc::new(SledBlockstore::new(BlockstoreConfig::default()).unwrap());
524 let config = BitswapConfig::default();
525 let bitswap = Bitswap::new(blockstore, config).await.unwrap();
526
527 let stats = bitswap.stats().await;
528 assert_eq!(stats.blocks_sent, 0);
529 assert_eq!(stats.blocks_received, 0);
530 }
531}