helia_bitswap/
network_new.rs

1//! Network layer for Bitswap protocol
2//! Based on @helia/bitswap/src/network.ts
3
4use crate::{
5    constants::*,
6    coordinator::OutboundMessage,
7    pb::BitswapMessage as PbBitswapMessage,
8    utils::{merge_messages, split_message, QueuedBitswapMessage},
9    Result,
10};
11use bytes::Bytes;
12use cid::Cid;
13use helia_interface::HeliaError;
14use libp2p::PeerId;
15use prost::Message;
16use std::{
17    collections::{HashMap, VecDeque},
18    io::Cursor,
19    sync::Arc,
20    time::Duration,
21};
22use tokio::sync::{mpsc, RwLock};
23use tracing::{debug, info, trace};
24
25/// Bitswap message event detail
26#[derive(Debug, Clone)]
27pub struct BitswapMessageEvent {
28    pub peer: PeerId,
29    pub message: PbBitswapMessage,
30}
31
32/// Network events
33#[derive(Debug, Clone)]
34pub enum NetworkEvent {
35    /// Bitswap message received
36    BitswapMessage(BitswapMessageEvent),
37    /// Peer connected
38    PeerConnected(PeerId),
39    /// Peer disconnected
40    PeerDisconnected(PeerId),
41}
42
43/// Network component for Bitswap
44pub struct Network {
45    /// Supported protocol versions
46    protocols: Vec<String>,
47    /// Whether the network is running
48    running: bool,
49    /// Maximum inbound streams
50    max_inbound_streams: usize,
51    /// Maximum outbound streams
52    max_outbound_streams: usize,
53    /// Message receive timeout
54    message_receive_timeout: Duration,
55    /// Run on limited connections
56    run_on_limited_connections: bool,
57    /// Maximum incoming message size
58    max_incoming_message_size: usize,
59    /// Maximum outgoing message size
60    max_outgoing_message_size: usize,
61    /// Message send queue
62    send_queue: Arc<RwLock<HashMap<PeerId, VecDeque<QueuedBitswapMessage>>>>,
63    /// Event sender
64    event_tx: mpsc::UnboundedSender<NetworkEvent>,
65    /// Event receiver
66    event_rx: Arc<RwLock<mpsc::UnboundedReceiver<NetworkEvent>>>,
67    /// Outbound swarm sender shared with coordinator
68    outbound_tx: Arc<RwLock<Option<mpsc::UnboundedSender<OutboundMessage>>>>,
69}
70
71/// Network initialization parameters
72#[derive(Debug, Clone)]
73pub struct NetworkInit {
74    pub protocols: Option<Vec<String>>,
75    pub max_inbound_streams: Option<usize>,
76    pub max_outbound_streams: Option<usize>,
77    pub message_receive_timeout: Option<Duration>,
78    pub message_send_concurrency: Option<usize>,
79    pub run_on_limited_connections: Option<bool>,
80    pub max_outgoing_message_size: Option<usize>,
81    pub max_incoming_message_size: Option<usize>,
82}
83
84impl Default for NetworkInit {
85    fn default() -> Self {
86        Self {
87            protocols: None,
88            max_inbound_streams: Some(DEFAULT_MAX_INBOUND_STREAMS),
89            max_outbound_streams: Some(DEFAULT_MAX_OUTBOUND_STREAMS),
90            message_receive_timeout: Some(Duration::from_millis(DEFAULT_MESSAGE_RECEIVE_TIMEOUT)),
91            message_send_concurrency: Some(DEFAULT_MESSAGE_SEND_CONCURRENCY),
92            run_on_limited_connections: Some(DEFAULT_RUN_ON_TRANSIENT_CONNECTIONS),
93            max_outgoing_message_size: Some(DEFAULT_MAX_OUTGOING_MESSAGE_SIZE),
94            max_incoming_message_size: Some(DEFAULT_MAX_INCOMING_MESSAGE_SIZE),
95        }
96    }
97}
98
99impl Network {
100    /// Create a new network
101    pub fn new(
102        init: NetworkInit,
103        outbound_tx: Arc<RwLock<Option<mpsc::UnboundedSender<OutboundMessage>>>>,
104    ) -> Self {
105        let (event_tx, event_rx) = mpsc::unbounded_channel();
106
107        Self {
108            protocols: init.protocols.unwrap_or_else(|| {
109                vec![
110                    BITSWAP_120.to_string(),
111                    BITSWAP_110.to_string(),
112                    BITSWAP_100.to_string(),
113                ]
114            }),
115            running: false,
116            max_inbound_streams: init
117                .max_inbound_streams
118                .unwrap_or(DEFAULT_MAX_INBOUND_STREAMS),
119            max_outbound_streams: init
120                .max_outbound_streams
121                .unwrap_or(DEFAULT_MAX_OUTBOUND_STREAMS),
122            message_receive_timeout: init
123                .message_receive_timeout
124                .unwrap_or(Duration::from_millis(DEFAULT_MESSAGE_RECEIVE_TIMEOUT)),
125            run_on_limited_connections: init
126                .run_on_limited_connections
127                .unwrap_or(DEFAULT_RUN_ON_TRANSIENT_CONNECTIONS),
128            max_incoming_message_size: init
129                .max_incoming_message_size
130                .unwrap_or(DEFAULT_MAX_INCOMING_MESSAGE_SIZE),
131            max_outgoing_message_size: init
132                .max_outgoing_message_size
133                .unwrap_or(DEFAULT_MAX_OUTGOING_MESSAGE_SIZE),
134            send_queue: Arc::new(RwLock::new(HashMap::new())),
135            event_tx,
136            event_rx: Arc::new(RwLock::new(event_rx)),
137            outbound_tx,
138        }
139    }
140
141    /// Set (or replace) the outbound sender
142    pub async fn set_outbound_sender(&self, sender: mpsc::UnboundedSender<OutboundMessage>) {
143        let mut guard = self.outbound_tx.write().await;
144        *guard = Some(sender);
145    }
146
147    /// Start the network
148    pub async fn start(&mut self) -> Result<()> {
149        if self.running {
150            return Ok(());
151        }
152
153        info!(
154            "Starting Bitswap network with protocols: {:?}",
155            self.protocols
156        );
157        self.running = true;
158
159        Ok(())
160    }
161
162    /// Stop the network
163    pub async fn stop(&mut self) -> Result<()> {
164        if !self.running {
165            return Ok(());
166        }
167
168        info!("Stopping Bitswap network");
169        self.running = false;
170
171        // Clear send queue
172        self.send_queue.write().await.clear();
173
174        Ok(())
175    }
176
177    /// Check if network is running
178    pub fn is_running(&self) -> bool {
179        self.running
180    }
181
182    /// Handle incoming stream with bitswap message
183    pub async fn handle_incoming_stream(&self, peer: PeerId, data: Bytes) -> Result<()> {
184        if !self.running {
185            return Err(HeliaError::network("Network is not running"));
186        }
187
188        trace!("Handling incoming stream from {}", peer);
189
190        // Decode message
191        let message = PbBitswapMessage::decode(&mut Cursor::new(&data))
192            .map_err(|e| HeliaError::network(format!("Failed to decode message: {}", e)))?;
193
194        debug!(
195            "Received message from {} with {} structured blocks, {} raw blocks, {} wantlist entries",
196            peer,
197            message.blocks.len(),
198            message.raw_blocks.len(),
199            message
200                .wantlist
201                .as_ref()
202                .map(|w| w.entries.len())
203                .unwrap_or(0)
204        );
205
206        // Send event
207        let _ = self
208            .event_tx
209            .send(NetworkEvent::BitswapMessage(BitswapMessageEvent {
210                peer,
211                message,
212            }));
213
214        Ok(())
215    }
216
217    /// Send a message to a peer
218    pub async fn send_message(&self, peer: PeerId, message: QueuedBitswapMessage) -> Result<()> {
219        if !self.running {
220            debug!("Network facade not marked running; queuing message anyway");
221        }
222
223        // Check if there's already a queued message for this peer
224        let mut queue = self.send_queue.write().await;
225        let peer_queue = queue.entry(peer).or_insert_with(VecDeque::new);
226
227        // Merge with existing message if present
228        if let Some(existing) = peer_queue.pop_back() {
229            let merged = merge_messages(existing, message);
230            peer_queue.push_back(merged);
231        } else {
232            peer_queue.push_back(message);
233        }
234
235        // Get the message to send
236        let message_to_send = peer_queue
237            .pop_front()
238            .ok_or_else(|| HeliaError::network("No message to send"))?;
239
240        drop(queue);
241
242        debug!("Sending message to {}", peer);
243
244        // Split message if too large
245        let messages = split_message(message_to_send, self.max_outgoing_message_size);
246
247        let sender = {
248            let guard = self.outbound_tx.read().await;
249            guard.clone()
250        };
251
252        let outbound =
253            sender.ok_or_else(|| HeliaError::network("Outbound message channel not connected"))?;
254
255        for msg in messages {
256            outbound
257                .send(OutboundMessage { peer, message: msg })
258                .map_err(|e| {
259                    HeliaError::network(format!("Failed to queue outbound message: {}", e))
260                })?;
261        }
262
263        Ok(())
264    }
265
266    /// Find providers for a CID
267    pub async fn find_providers(&self, _cid: &Cid) -> Result<Vec<PeerId>> {
268        // This will be implemented when integrated with libp2p routing
269        debug!("Finding providers (not yet implemented)");
270        Ok(Vec::new())
271    }
272
273    /// Find and connect to providers
274    pub async fn find_and_connect(&self, cid: &Cid) -> Result<()> {
275        debug!("Finding and connecting to providers for {}", cid);
276
277        // This will be implemented when integrated with libp2p
278        // For now, just log
279        Ok(())
280    }
281
282    /// Connect to a peer
283    pub async fn connect_to(&self, peer: PeerId) -> Result<()> {
284        if !self.running {
285            return Err(HeliaError::network("Network is not running"));
286        }
287
288        debug!("Connecting to peer {}", peer);
289
290        // This will be implemented when integrated with libp2p
291        Ok(())
292    }
293
294    /// Get next event
295    pub async fn next_event(&self) -> Option<NetworkEvent> {
296        self.event_rx.write().await.recv().await
297    }
298
299    /// Dispatch an event
300    pub fn dispatch_event(&self, event: NetworkEvent) {
301        let _ = self.event_tx.send(event);
302    }
303
304    /// Get supported protocols
305    pub fn protocols(&self) -> &[String] {
306        &self.protocols
307    }
308
309    /// Get max outgoing message size
310    pub fn max_outgoing_message_size(&self) -> usize {
311        self.max_outgoing_message_size
312    }
313
314    /// Get max incoming message size
315    pub fn max_incoming_message_size(&self) -> usize {
316        self.max_incoming_message_size
317    }
318}
319
320#[cfg(test)]
321mod tests {
322    use super::*;
323
324    #[tokio::test]
325    async fn test_network_start_stop() {
326        let (tx, _rx) = mpsc::unbounded_channel();
327        let sender_slot = Arc::new(RwLock::new(Some(tx)));
328        let mut network = Network::new(NetworkInit::default(), sender_slot);
329        assert!(!network.is_running());
330
331        network.start().await.unwrap();
332        assert!(network.is_running());
333
334        network.stop().await.unwrap();
335        assert!(!network.is_running());
336    }
337
338    #[tokio::test]
339    async fn test_network_protocols() {
340        let sender_slot = Arc::new(RwLock::new(None));
341        let network = Network::new(NetworkInit::default(), sender_slot);
342        let protocols = network.protocols();
343
344        assert!(protocols.contains(&BITSWAP_120.to_string()));
345        assert!(protocols.contains(&BITSWAP_110.to_string()));
346        assert!(protocols.contains(&BITSWAP_100.to_string()));
347    }
348
349    #[tokio::test]
350    async fn test_send_message() {
351        let (tx, mut rx) = mpsc::unbounded_channel();
352        let sender_slot = Arc::new(RwLock::new(Some(tx)));
353        let mut network = Network::new(NetworkInit::default(), sender_slot);
354        network.start().await.unwrap();
355
356        let peer = PeerId::random();
357        let message = QueuedBitswapMessage::new();
358
359        // Should be able to send message
360        let result = network.send_message(peer, message).await;
361        assert!(result.is_ok());
362
363        // Sending an empty message still queues an outbound packet (default empty)
364        assert!(rx.try_recv().is_ok());
365    }
366}