1use crate::{
5 constants::*,
6 coordinator::OutboundMessage,
7 pb::BitswapMessage as PbBitswapMessage,
8 utils::{merge_messages, split_message, QueuedBitswapMessage},
9 Result,
10};
11use bytes::Bytes;
12use cid::Cid;
13use helia_interface::HeliaError;
14use libp2p::PeerId;
15use prost::Message;
16use std::{
17 collections::{HashMap, VecDeque},
18 io::Cursor,
19 sync::Arc,
20 time::Duration,
21};
22use tokio::sync::{mpsc, RwLock};
23use tracing::{debug, info, trace};
24
25#[derive(Debug, Clone)]
27pub struct BitswapMessageEvent {
28 pub peer: PeerId,
29 pub message: PbBitswapMessage,
30}
31
32#[derive(Debug, Clone)]
34pub enum NetworkEvent {
35 BitswapMessage(BitswapMessageEvent),
37 PeerConnected(PeerId),
39 PeerDisconnected(PeerId),
41}
42
43pub struct Network {
45 protocols: Vec<String>,
47 running: bool,
49 max_inbound_streams: usize,
51 max_outbound_streams: usize,
53 message_receive_timeout: Duration,
55 run_on_limited_connections: bool,
57 max_incoming_message_size: usize,
59 max_outgoing_message_size: usize,
61 send_queue: Arc<RwLock<HashMap<PeerId, VecDeque<QueuedBitswapMessage>>>>,
63 event_tx: mpsc::UnboundedSender<NetworkEvent>,
65 event_rx: Arc<RwLock<mpsc::UnboundedReceiver<NetworkEvent>>>,
67 outbound_tx: Arc<RwLock<Option<mpsc::UnboundedSender<OutboundMessage>>>>,
69}
70
71#[derive(Debug, Clone)]
73pub struct NetworkInit {
74 pub protocols: Option<Vec<String>>,
75 pub max_inbound_streams: Option<usize>,
76 pub max_outbound_streams: Option<usize>,
77 pub message_receive_timeout: Option<Duration>,
78 pub message_send_concurrency: Option<usize>,
79 pub run_on_limited_connections: Option<bool>,
80 pub max_outgoing_message_size: Option<usize>,
81 pub max_incoming_message_size: Option<usize>,
82}
83
84impl Default for NetworkInit {
85 fn default() -> Self {
86 Self {
87 protocols: None,
88 max_inbound_streams: Some(DEFAULT_MAX_INBOUND_STREAMS),
89 max_outbound_streams: Some(DEFAULT_MAX_OUTBOUND_STREAMS),
90 message_receive_timeout: Some(Duration::from_millis(DEFAULT_MESSAGE_RECEIVE_TIMEOUT)),
91 message_send_concurrency: Some(DEFAULT_MESSAGE_SEND_CONCURRENCY),
92 run_on_limited_connections: Some(DEFAULT_RUN_ON_TRANSIENT_CONNECTIONS),
93 max_outgoing_message_size: Some(DEFAULT_MAX_OUTGOING_MESSAGE_SIZE),
94 max_incoming_message_size: Some(DEFAULT_MAX_INCOMING_MESSAGE_SIZE),
95 }
96 }
97}
98
99impl Network {
100 pub fn new(
102 init: NetworkInit,
103 outbound_tx: Arc<RwLock<Option<mpsc::UnboundedSender<OutboundMessage>>>>,
104 ) -> Self {
105 let (event_tx, event_rx) = mpsc::unbounded_channel();
106
107 Self {
108 protocols: init.protocols.unwrap_or_else(|| {
109 vec![
110 BITSWAP_120.to_string(),
111 BITSWAP_110.to_string(),
112 BITSWAP_100.to_string(),
113 ]
114 }),
115 running: false,
116 max_inbound_streams: init
117 .max_inbound_streams
118 .unwrap_or(DEFAULT_MAX_INBOUND_STREAMS),
119 max_outbound_streams: init
120 .max_outbound_streams
121 .unwrap_or(DEFAULT_MAX_OUTBOUND_STREAMS),
122 message_receive_timeout: init
123 .message_receive_timeout
124 .unwrap_or(Duration::from_millis(DEFAULT_MESSAGE_RECEIVE_TIMEOUT)),
125 run_on_limited_connections: init
126 .run_on_limited_connections
127 .unwrap_or(DEFAULT_RUN_ON_TRANSIENT_CONNECTIONS),
128 max_incoming_message_size: init
129 .max_incoming_message_size
130 .unwrap_or(DEFAULT_MAX_INCOMING_MESSAGE_SIZE),
131 max_outgoing_message_size: init
132 .max_outgoing_message_size
133 .unwrap_or(DEFAULT_MAX_OUTGOING_MESSAGE_SIZE),
134 send_queue: Arc::new(RwLock::new(HashMap::new())),
135 event_tx,
136 event_rx: Arc::new(RwLock::new(event_rx)),
137 outbound_tx,
138 }
139 }
140
141 pub async fn set_outbound_sender(&self, sender: mpsc::UnboundedSender<OutboundMessage>) {
143 let mut guard = self.outbound_tx.write().await;
144 *guard = Some(sender);
145 }
146
147 pub async fn start(&mut self) -> Result<()> {
149 if self.running {
150 return Ok(());
151 }
152
153 info!(
154 "Starting Bitswap network with protocols: {:?}",
155 self.protocols
156 );
157 self.running = true;
158
159 Ok(())
160 }
161
162 pub async fn stop(&mut self) -> Result<()> {
164 if !self.running {
165 return Ok(());
166 }
167
168 info!("Stopping Bitswap network");
169 self.running = false;
170
171 self.send_queue.write().await.clear();
173
174 Ok(())
175 }
176
177 pub fn is_running(&self) -> bool {
179 self.running
180 }
181
182 pub async fn handle_incoming_stream(&self, peer: PeerId, data: Bytes) -> Result<()> {
184 if !self.running {
185 return Err(HeliaError::network("Network is not running"));
186 }
187
188 trace!("Handling incoming stream from {}", peer);
189
190 let message = PbBitswapMessage::decode(&mut Cursor::new(&data))
192 .map_err(|e| HeliaError::network(format!("Failed to decode message: {}", e)))?;
193
194 debug!(
195 "Received message from {} with {} structured blocks, {} raw blocks, {} wantlist entries",
196 peer,
197 message.blocks.len(),
198 message.raw_blocks.len(),
199 message
200 .wantlist
201 .as_ref()
202 .map(|w| w.entries.len())
203 .unwrap_or(0)
204 );
205
206 let _ = self
208 .event_tx
209 .send(NetworkEvent::BitswapMessage(BitswapMessageEvent {
210 peer,
211 message,
212 }));
213
214 Ok(())
215 }
216
217 pub async fn send_message(&self, peer: PeerId, message: QueuedBitswapMessage) -> Result<()> {
219 if !self.running {
220 debug!("Network facade not marked running; queuing message anyway");
221 }
222
223 let mut queue = self.send_queue.write().await;
225 let peer_queue = queue.entry(peer).or_insert_with(VecDeque::new);
226
227 if let Some(existing) = peer_queue.pop_back() {
229 let merged = merge_messages(existing, message);
230 peer_queue.push_back(merged);
231 } else {
232 peer_queue.push_back(message);
233 }
234
235 let message_to_send = peer_queue
237 .pop_front()
238 .ok_or_else(|| HeliaError::network("No message to send"))?;
239
240 drop(queue);
241
242 debug!("Sending message to {}", peer);
243
244 let messages = split_message(message_to_send, self.max_outgoing_message_size);
246
247 let sender = {
248 let guard = self.outbound_tx.read().await;
249 guard.clone()
250 };
251
252 let outbound =
253 sender.ok_or_else(|| HeliaError::network("Outbound message channel not connected"))?;
254
255 for msg in messages {
256 outbound
257 .send(OutboundMessage { peer, message: msg })
258 .map_err(|e| {
259 HeliaError::network(format!("Failed to queue outbound message: {}", e))
260 })?;
261 }
262
263 Ok(())
264 }
265
266 pub async fn find_providers(&self, _cid: &Cid) -> Result<Vec<PeerId>> {
268 debug!("Finding providers (not yet implemented)");
270 Ok(Vec::new())
271 }
272
273 pub async fn find_and_connect(&self, cid: &Cid) -> Result<()> {
275 debug!("Finding and connecting to providers for {}", cid);
276
277 Ok(())
280 }
281
282 pub async fn connect_to(&self, peer: PeerId) -> Result<()> {
284 if !self.running {
285 return Err(HeliaError::network("Network is not running"));
286 }
287
288 debug!("Connecting to peer {}", peer);
289
290 Ok(())
292 }
293
294 pub async fn next_event(&self) -> Option<NetworkEvent> {
296 self.event_rx.write().await.recv().await
297 }
298
299 pub fn dispatch_event(&self, event: NetworkEvent) {
301 let _ = self.event_tx.send(event);
302 }
303
304 pub fn protocols(&self) -> &[String] {
306 &self.protocols
307 }
308
309 pub fn max_outgoing_message_size(&self) -> usize {
311 self.max_outgoing_message_size
312 }
313
314 pub fn max_incoming_message_size(&self) -> usize {
316 self.max_incoming_message_size
317 }
318}
319
320#[cfg(test)]
321mod tests {
322 use super::*;
323
324 #[tokio::test]
325 async fn test_network_start_stop() {
326 let (tx, _rx) = mpsc::unbounded_channel();
327 let sender_slot = Arc::new(RwLock::new(Some(tx)));
328 let mut network = Network::new(NetworkInit::default(), sender_slot);
329 assert!(!network.is_running());
330
331 network.start().await.unwrap();
332 assert!(network.is_running());
333
334 network.stop().await.unwrap();
335 assert!(!network.is_running());
336 }
337
338 #[tokio::test]
339 async fn test_network_protocols() {
340 let sender_slot = Arc::new(RwLock::new(None));
341 let network = Network::new(NetworkInit::default(), sender_slot);
342 let protocols = network.protocols();
343
344 assert!(protocols.contains(&BITSWAP_120.to_string()));
345 assert!(protocols.contains(&BITSWAP_110.to_string()));
346 assert!(protocols.contains(&BITSWAP_100.to_string()));
347 }
348
349 #[tokio::test]
350 async fn test_send_message() {
351 let (tx, mut rx) = mpsc::unbounded_channel();
352 let sender_slot = Arc::new(RwLock::new(Some(tx)));
353 let mut network = Network::new(NetworkInit::default(), sender_slot);
354 network.start().await.unwrap();
355
356 let peer = PeerId::random();
357 let message = QueuedBitswapMessage::new();
358
359 let result = network.send_message(peer, message).await;
361 assert!(result.is_ok());
362
363 assert!(rx.try_recv().is_ok());
365 }
366}