helia_bitswap/
coordinator.rs

1//! Bitswap coordinator - High-level API for block exchange
2//! Based on @helia/bitswap/src/index.ts
3
4use crate::{
5    constants::*,
6    network_new::{Network, NetworkInit},
7    pb,
8    wantlist_new::WantList,
9    Result,
10};
11use bytes::Bytes;
12use cid::Cid;
13use helia_interface::{Blocks, HeliaError};
14use libp2p::PeerId;
15use std::{collections::HashMap, sync::Arc, time::Duration};
16use tokio::sync::RwLock;
17use tracing::{debug, info, trace, warn};
18
19/// Bitswap statistics
20#[derive(Debug, Clone, Default)]
21pub struct BitswapStats {
22    /// Total blocks sent
23    pub blocks_sent: u64,
24    /// Total blocks received
25    pub blocks_received: u64,
26    /// Total data sent (bytes)
27    pub data_sent: u64,
28    /// Total data received (bytes)
29    pub data_received: u64,
30    /// Total duplicate blocks received
31    pub dup_blocks_received: u64,
32    /// Total duplicate data received (bytes)
33    pub dup_data_received: u64,
34    /// Messages received
35    pub messages_received: u64,
36    /// Blocks sent by peer
37    pub blocks_sent_by_peer: HashMap<PeerId, u64>,
38    /// Blocks received by peer
39    pub blocks_received_by_peer: HashMap<PeerId, u64>,
40}
41
42/// Options for wanting a block
43#[derive(Debug, Clone)]
44pub struct WantOptions {
45    /// Timeout for the want operation
46    pub timeout: Option<Duration>,
47    /// Priority (higher = more important)
48    pub priority: i32,
49    /// Whether to accept block presence messages
50    pub accept_block_presence: bool,
51    /// Specific peer to request from (for session-based requests)
52    pub peer: Option<PeerId>,
53}
54
55impl Default for WantOptions {
56    fn default() -> Self {
57        Self {
58            timeout: Some(Duration::from_millis(DEFAULT_WANT_TIMEOUT)),
59            priority: DEFAULT_PRIORITY,
60            accept_block_presence: true,
61            peer: None,
62        }
63    }
64}
65
66/// Options for notifying new blocks
67#[derive(Debug, Clone, Default)]
68pub struct NotifyOptions {
69    /// Whether to announce to all connected peers
70    pub broadcast: bool,
71}
72
73/// Bitswap configuration
74#[derive(Debug, Clone)]
75pub struct BitswapConfig {
76    /// Network configuration
77    pub network: NetworkInit,
78}
79
80impl Default for BitswapConfig {
81    fn default() -> Self {
82        Self {
83            network: NetworkInit::default(),
84        }
85    }
86}
87
88/// Main Bitswap coordinator
89///
90/// Provides high-level API for block exchange via the Bitswap protocol.
91///
92/// # Example
93///
94/// ```no_run
95/// use helia_bitswap::{Bitswap, BitswapConfig, WantOptions};
96/// use bytes::Bytes;
97/// use cid::Cid;
98/// use std::sync::Arc;
99///
100/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
101/// // Create blockstore (example - use appropriate implementation)
102/// # use helia_interface::Blocks;
103/// # let blockstore: Arc<dyn Blocks> = unimplemented!();
104/// let bitswap = Bitswap::new(blockstore, BitswapConfig::default()).await?;
105///
106/// // Start the bitswap node
107/// bitswap.start().await?;
108///
109/// // Want a block
110/// let cid: Cid = "QmHash".parse()?;
111/// let block = bitswap.want(&cid, WantOptions::default()).await?;
112///
113/// // Notify that we have new blocks
114/// let blocks = vec![(cid, block)];
115/// bitswap.notify_new_blocks(blocks, Default::default()).await?;
116///
117/// // Stop the bitswap node
118/// bitswap.stop().await?;
119/// # Ok(())
120/// # }
121/// ```
122/// Outbound message request to be sent via the swarm
123#[derive(Debug, Clone)]
124pub struct OutboundMessage {
125    pub peer: PeerId,
126    pub message: pb::BitswapMessage,
127}
128
129pub struct Bitswap {
130    /// Network layer (deprecated - kept for compatibility)
131    network: Arc<RwLock<Network>>,
132    /// WantList manager
133    wantlist: Arc<WantList>,
134    /// Blockstore for local block storage
135    pub(crate) blockstore: Arc<dyn Blocks>,
136    /// Statistics
137    stats: Arc<RwLock<BitswapStats>>,
138    /// Running flag
139    running: Arc<RwLock<bool>>,
140    /// Configuration
141    config: BitswapConfig,
142    /// Channel for sending messages via the swarm
143    outbound_tx: Option<tokio::sync::mpsc::UnboundedSender<OutboundMessage>>,
144    /// Shared outbound sender slot used by Network facade
145    outbound_sender_slot: Arc<RwLock<Option<tokio::sync::mpsc::UnboundedSender<OutboundMessage>>>>,
146    /// Connected peers
147    connected_peers: Arc<RwLock<Vec<PeerId>>>,
148    /// Block notification broadcast channel (for event-driven want resolution)
149    block_notify_tx: tokio::sync::broadcast::Sender<Cid>,
150}
151
152impl Bitswap {
153    /// Create a new Bitswap coordinator
154    pub async fn new(blockstore: Arc<dyn Blocks>, config: BitswapConfig) -> Result<Self> {
155        info!("Creating Bitswap coordinator");
156
157        // Shared outbound sender slot between coordinator and network facade
158        let outbound_sender_slot = Arc::new(RwLock::new(None));
159
160        // Create network (kept for compatibility, but messages should go through swarm)
161        let network = Arc::new(RwLock::new(Network::new(
162            config.network.clone(),
163            outbound_sender_slot.clone(),
164        )));
165
166        // Create wantlist with a separate Network instance sharing the same sender slot
167        let network_for_wantlist = Arc::new(Network::new(
168            config.network.clone(),
169            outbound_sender_slot.clone(),
170        ));
171        let wantlist = Arc::new(WantList::new(network_for_wantlist));
172
173        // Create block notification channel (capacity of 1000 pending notifications)
174        let (block_notify_tx, _) = tokio::sync::broadcast::channel(1000);
175
176        Ok(Self {
177            network,
178            wantlist,
179            blockstore,
180            stats: Arc::new(RwLock::new(BitswapStats::default())),
181            running: Arc::new(RwLock::new(false)),
182            config,
183            outbound_tx: None,
184            outbound_sender_slot,
185            connected_peers: Arc::new(RwLock::new(Vec::new())),
186            block_notify_tx,
187        })
188    }
189
190    /// Set the outbound message sender (connected to swarm)
191    pub async fn set_outbound_sender(
192        &mut self,
193        tx: tokio::sync::mpsc::UnboundedSender<OutboundMessage>,
194    ) {
195        self.outbound_tx = Some(tx.clone());
196
197        {
198            let mut slot = self.outbound_sender_slot.write().await;
199            *slot = Some(tx.clone());
200        }
201
202        // Wantlist network shares the same slot, so it will see the sender automatically
203
204        info!("Bitswap coordinator connected to swarm message channel");
205    }
206
207    /// Add a connected peer
208    pub async fn add_peer(&self, peer: PeerId) {
209        let mut peers = self.connected_peers.write().await;
210        if !peers.contains(&peer) {
211            peers.push(peer);
212            info!("Bitswap: Added peer {}", peer);
213        }
214    }
215
216    /// Remove a disconnected peer
217    pub async fn remove_peer(&self, peer: &PeerId) {
218        let mut peers = self.connected_peers.write().await;
219        peers.retain(|p| p != peer);
220        info!("Bitswap: Removed peer {}", peer);
221    }
222
223    /// Get connected peers
224    pub async fn get_connected_peers(&self) -> Vec<PeerId> {
225        self.connected_peers.read().await.clone()
226    }
227
228    /// Send a message via the swarm
229    fn send_via_swarm(&self, peer: PeerId, message: pb::BitswapMessage) -> Result<()> {
230        if let Some(tx) = &self.outbound_tx {
231            tx.send(OutboundMessage { peer, message }).map_err(|e| {
232                HeliaError::network(format!("Failed to queue outbound message: {}", e))
233            })?;
234            Ok(())
235        } else {
236            Err(HeliaError::network(
237                "Outbound message channel not connected to swarm",
238            ))
239        }
240    }
241
242    /// Broadcast WANT for a block to connected peers via swarm
243    pub fn broadcast_want_via_swarm(
244        &self,
245        cid: &Cid,
246        priority: i32,
247        peers: Vec<PeerId>,
248    ) -> Result<()> {
249        if peers.is_empty() {
250            debug!("No peers to send WANT to");
251            return Ok(());
252        }
253
254        // Build wantlist message
255        let wantlist_entry = pb::WantlistEntry {
256            cid: cid.to_bytes(),
257            priority,
258            cancel: false,
259            want_type: pb::WantType::WantBlock as i32,
260            send_dont_have: true,
261        };
262
263        let message = pb::BitswapMessage {
264            wantlist: Some(pb::Wantlist {
265                entries: vec![wantlist_entry],
266                full: false,
267            }),
268            raw_blocks: Vec::new(),
269            blocks: Vec::new(),
270            block_presences: Vec::new(),
271            pending_bytes: 0,
272        };
273
274        // Send to all peers
275        for peer in peers {
276            debug!("Sending WANT for {} to peer {} via swarm", cid, peer);
277            if let Err(e) = self.send_via_swarm(peer, message.clone()) {
278                warn!("Failed to send WANT to peer {}: {}", peer, e);
279            }
280        }
281
282        Ok(())
283    }
284
285    /// Start the Bitswap coordinator
286    pub async fn start(&self) -> Result<()> {
287        let mut running = self.running.write().await;
288        if *running {
289            return Ok(());
290        }
291
292        info!("Starting Bitswap coordinator");
293
294        // Start network
295        self.network.write().await.start().await?;
296
297        // Start wantlist
298        self.wantlist.start();
299
300        *running = true;
301        info!("Bitswap coordinator started");
302        Ok(())
303    }
304
305    /// Stop the Bitswap coordinator
306    pub async fn stop(&self) -> Result<()> {
307        let mut running = self.running.write().await;
308        if !*running {
309            return Ok(());
310        }
311
312        info!("Stopping Bitswap coordinator");
313
314        // Stop wantlist
315        self.wantlist.stop().await;
316
317        // Stop network
318        self.network.write().await.stop().await?;
319
320        *running = false;
321        info!("Bitswap coordinator stopped");
322        Ok(())
323    }
324
325    /// Want a block
326    ///
327    /// Requests a block from the network. This will:
328    /// 1. Check local blockstore first
329    /// 2. If not found locally, add to wantlist
330    /// 3. Send want messages to connected peers
331    /// 4. Wait for block to arrive or timeout (EVENT-DRIVEN, not polling)
332    ///
333    /// # Arguments
334    ///
335    /// * `cid` - CID of the block to retrieve
336    /// * `options` - Want options (timeout, priority, etc.)
337    ///
338    /// # Returns
339    ///
340    /// The block data if found, or an error if timeout or not found
341    pub async fn want(&self, cid: &Cid, options: WantOptions) -> Result<Bytes> {
342        debug!("Wanting block: {}", cid);
343
344        // Check if we already have it
345        if let Ok(block) = self.blockstore.get(cid, None).await {
346            debug!("Block {} found in local blockstore", cid);
347            return Ok(block);
348        }
349
350        // Send WANT via swarm to connected peers
351        let peers = self.get_connected_peers().await;
352        if peers.is_empty() {
353            debug!(
354                "No connected peers currently available for {} - will wait for providers",
355                cid
356            );
357        } else {
358            info!(
359                "Sending WANT for {} to {} peers via swarm",
360                cid,
361                peers.len()
362            );
363            self.broadcast_want_via_swarm(cid, options.priority, peers)?;
364        }
365
366        // Subscribe to block notifications BEFORE sending want
367        let mut block_rx = self.block_notify_tx.subscribe();
368        let target_cid = cid.clone();
369
370        // Wait for the block to arrive with timeout (EVENT-DRIVEN)
371        let timeout = options.timeout.unwrap_or(Duration::from_secs(30));
372
373        // Use tokio::select to wait for either block notification or timeout
374        tokio::select! {
375            _ = tokio::time::sleep(timeout) => {
376                debug!("Timeout waiting for block {}", target_cid);
377                Err(HeliaError::Timeout)
378            }
379            result = async {
380                loop {
381                    // Wait for block notification
382                    match block_rx.recv().await {
383                        Ok(received_cid) => {
384                            if received_cid == target_cid {
385                                // This is our block! Try to get it from blockstore
386                                match self.blockstore.get(&target_cid, None).await {
387                                    Ok(block) => {
388                                        debug!("Block {} received from network", target_cid);
389
390                                        // Update stats
391                                        let mut stats = self.stats.write().await;
392                                        stats.blocks_received += 1;
393                                        stats.data_received += block.len() as u64;
394
395                                        return Ok(block);
396                                    }
397                                    Err(e) => {
398                                        // Block was notified but not in blockstore? Strange, keep waiting
399                                        warn!("Block {} notified but not in blockstore: {}", target_cid, e);
400                                    }
401                                }
402                            }
403                            // Not our block, keep waiting
404                        }
405                        Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
406                            // Channel lagged, check if block arrived while we were catching up
407                            if let Ok(block) = self.blockstore.get(&target_cid, None).await {
408                                debug!("Block {} found in blockstore after channel lag", target_cid);
409
410                                let mut stats = self.stats.write().await;
411                                stats.blocks_received += 1;
412                                stats.data_received += block.len() as u64;
413
414                                return Ok(block);
415                            }
416                            // Not found, continue waiting
417                        }
418                        Err(tokio::sync::broadcast::error::RecvError::Closed) => {
419                            return Err(HeliaError::network("Block notification channel closed"));
420                        }
421                    }
422                }
423            } => result
424        }
425    }
426
427    /// Notify that we have new blocks
428    ///
429    /// Announces to connected peers that we have these blocks.
430    /// This allows peers that are waiting for these blocks to request them.
431    /// Also broadcasts internally to wake up any local want() calls.
432    ///
433    /// # Arguments
434    ///
435    /// * `blocks` - Vector of (CID, block data) pairs
436    /// * `options` - Notify options
437    pub async fn notify_new_blocks(
438        &self,
439        blocks: Vec<(Cid, Bytes)>,
440        _options: NotifyOptions,
441    ) -> Result<()> {
442        if blocks.is_empty() {
443            return Ok(());
444        }
445
446        debug!("Notifying {} new blocks", blocks.len());
447
448        // Store blocks in blockstore
449        for (cid, block) in &blocks {
450            self.blockstore.put(cid, block.clone(), None).await?;
451        }
452
453        // Notify wantlist about the blocks
454        for (cid, _block) in &blocks {
455            self.wantlist.received_block(cid).await?;
456
457            // Broadcast block arrival to wake up waiting want() calls
458            // Ignore send errors (no receivers is fine)
459            let _ = self.block_notify_tx.send(cid.clone());
460            trace!("Broadcasted block notification for {}", cid);
461        }
462
463        Ok(())
464    }
465
466    /// Notify a single block arrival (called from event loop when block received)
467    ///
468    /// This is the KEY optimization from JS Helia - immediately notify waiting
469    /// requests instead of having them poll.
470    pub fn notify_block_received(&self, cid: &Cid) {
471        // Broadcast block arrival
472        let _ = self.block_notify_tx.send(cid.clone());
473        trace!("Broadcasted block notification for {}", cid);
474    }
475
476    /// Get current statistics
477    pub async fn stats(&self) -> BitswapStats {
478        self.stats.read().await.clone()
479    }
480
481    /// Get the wantlist
482    pub fn wantlist(&self) -> Arc<WantList> {
483        self.wantlist.clone()
484    }
485
486    /// Check if bitswap is running
487    pub async fn is_running(&self) -> bool {
488        *self.running.read().await
489    }
490}
491
492#[cfg(test)]
493mod tests {
494    use super::*;
495    use helia_utils::blockstore::SledBlockstore;
496    use helia_utils::BlockstoreConfig;
497
498    #[tokio::test]
499    async fn test_bitswap_creation() {
500        let blockstore = Arc::new(SledBlockstore::new(BlockstoreConfig::default()).unwrap());
501        let config = BitswapConfig::default();
502        let bitswap = Bitswap::new(blockstore, config).await;
503        assert!(bitswap.is_ok());
504    }
505
506    #[tokio::test]
507    async fn test_bitswap_start_stop() {
508        let blockstore = Arc::new(SledBlockstore::new(BlockstoreConfig::default()).unwrap());
509        let config = BitswapConfig::default();
510        let bitswap = Bitswap::new(blockstore, config).await.unwrap();
511
512        assert!(!bitswap.is_running().await);
513
514        bitswap.start().await.unwrap();
515        assert!(bitswap.is_running().await);
516
517        bitswap.stop().await.unwrap();
518        assert!(!bitswap.is_running().await);
519    }
520
521    #[tokio::test]
522    async fn test_bitswap_stats() {
523        let blockstore = Arc::new(SledBlockstore::new(BlockstoreConfig::default()).unwrap());
524        let config = BitswapConfig::default();
525        let bitswap = Bitswap::new(blockstore, config).await.unwrap();
526
527        let stats = bitswap.stats().await;
528        assert_eq!(stats.blocks_sent, 0);
529        assert_eq!(stats.blocks_received, 0);
530    }
531}