helia_utils/
helia.rs

1//! Main Helia implementation
2
3use std::sync::Arc;
4
5use async_trait::async_trait;
6use bytes::Bytes;
7use cid::Cid;
8use multihash_codetable::{Code as MultihashCode, MultihashDigest};
9use std::convert::TryFrom;
10use futures::stream;
11use futures::StreamExt;
12use helia_bitswap::BlockPresenceType;
13use libp2p::{
14    kad,
15    swarm::{
16        dial_opts::{DialOpts, PeerCondition},
17        SwarmEvent,
18    },
19    Swarm,
20};
21use tokio::sync::{Mutex, RwLock};
22use tokio::task::JoinHandle;
23use trust_dns_resolver::TokioAsyncResolver;
24use unsigned_varint::decode as varint_decode;
25
26use helia_interface::pins::Pin as HeliaPin;
27use helia_interface::*;
28use tokio::sync::broadcast;
29
30use crate::libp2p_behaviour::HeliaBehaviourEvent;
31use crate::{
32    create_swarm, BlockstoreWithBitswap, HeliaBehaviour, HeliaConfig, SledBlockstore,
33    SledDatastore, TracingLogger,
34};
35use helia_bitswap::{
36    network_new::{BitswapMessageEvent, NetworkEvent},
37    Bitswap, BitswapConfig, BitswapEvent,
38};
39
40/// Main implementation of the Helia trait
41pub struct HeliaImpl {
42    libp2p: Arc<Mutex<Swarm<HeliaBehaviour>>>,
43    blockstore: Arc<dyn Blocks>,
44    datastore: Arc<SledDatastore>,
45    pins: Arc<SimplePins>,
46    logger: Arc<TracingLogger>,
47    routing: Arc<DummyRouting>,
48    dns: TokioAsyncResolver,
49    metrics: Option<Arc<dyn Metrics>>,
50    started: Arc<RwLock<bool>>,
51    event_loop_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
52    bitswap: Arc<Bitswap>,
53    outbound_rx: Arc<
54        Mutex<
55            Option<
56                tokio::sync::mpsc::UnboundedReceiver<helia_bitswap::coordinator::OutboundMessage>,
57            >,
58        >,
59    >,
60    /// Event broadcaster for Helia events
61    event_tx: broadcast::Sender<HeliaEvent>,
62}
63
64impl HeliaImpl {
65    pub async fn new(mut config: HeliaConfig) -> Result<Self, HeliaError> {
66        // Create base infrastructure
67        let local_blockstore = Arc::new(SledBlockstore::new(config.blockstore)?);
68        let datastore = Arc::new(SledDatastore::new(config.datastore)?);
69        let pins = Arc::new(SimplePins::new(datastore.clone()));
70        let logger = Arc::new(TracingLogger::new(config.logger));
71        let routing = Arc::new(DummyRouting::new());
72
73        // Use provided libp2p swarm or create a new one
74        let libp2p = if let Some(swarm) = config.libp2p.take() {
75            swarm
76        } else {
77            let swarm = create_swarm().await.map_err(|e| {
78                HeliaError::network(format!("Failed to create libp2p swarm: {}", e))
79            })?;
80            Arc::new(Mutex::new(swarm))
81        };
82
83        let dns = config.dns.unwrap_or_else(|| {
84            TokioAsyncResolver::tokio_from_system_conf().expect("Failed to create DNS resolver")
85        });
86
87        // Create Bitswap coordinator
88        let bitswap_config = BitswapConfig::default();
89        let mut bitswap = Bitswap::new(local_blockstore.clone() as Arc<dyn Blocks>, bitswap_config)
90            .await
91            .map_err(|e| HeliaError::network(format!("Failed to create Bitswap: {}", e)))?;
92
93        // Create channel for outbound Bitswap messages
94        let (outbound_tx, outbound_rx) = tokio::sync::mpsc::unbounded_channel();
95        bitswap.set_outbound_sender(outbound_tx).await;
96        logger.info("Bitswap outbound message channel created");
97
98        let bitswap = Arc::new(bitswap);
99
100        // Connect Bitswap coordinator to the NetworkBehaviour
101        // This allows the behaviour to respond to incoming WANT requests
102        {
103            let mut swarm_guard = libp2p.lock().await;
104            swarm_guard
105                .behaviour_mut()
106                .bitswap
107                .set_coordinator(bitswap.clone());
108            logger.info("Bitswap coordinator connected to NetworkBehaviour");
109        }
110
111        // Wrap blockstore with Bitswap integration for network retrieval
112        let blockstore: Arc<dyn Blocks> = Arc::new(BlockstoreWithBitswap::new(
113            local_blockstore,
114            bitswap.clone(),
115        ));
116
117        logger.info("Helia node initialized with Bitswap P2P support");
118
119        // Create event broadcaster with a buffer size of 100
120        let (event_tx, _) = broadcast::channel(100);
121
122        Ok(Self {
123            libp2p,
124            blockstore,
125            datastore,
126            pins,
127            logger,
128            routing,
129            dns,
130            metrics: config.metrics,
131            started: Arc::new(RwLock::new(false)),
132            event_loop_handle: Arc::new(Mutex::new(None)),
133            bitswap,
134            outbound_rx: Arc::new(Mutex::new(Some(outbound_rx))),
135            event_tx,
136        })
137    }
138}
139
140#[async_trait]
141impl Helia for HeliaImpl {
142    fn blockstore(&self) -> &dyn Blocks {
143        self.blockstore.as_ref()
144    }
145
146    fn datastore(&self) -> &dyn Datastore {
147        self.datastore.as_ref()
148    }
149
150    fn pins(&self) -> &dyn Pins {
151        self.pins.as_ref()
152    }
153
154    fn logger(&self) -> &dyn ComponentLogger {
155        self.logger.as_ref()
156    }
157
158    fn routing(&self) -> &dyn Routing {
159        self.routing.as_ref()
160    }
161
162    fn dns(&self) -> &TokioAsyncResolver {
163        &self.dns
164    }
165
166    fn metrics(&self) -> Option<&dyn Metrics> {
167        self.metrics.as_ref().map(|m| m.as_ref())
168    }
169
170    fn subscribe_events(&self) -> HeliaEventReceiver {
171        self.event_tx.subscribe()
172    }
173
174    async fn start(&self) -> Result<(), HeliaError> {
175        let mut started = self.started.write().await;
176        if *started {
177            return Ok(());
178        }
179
180        // Start Bitswap coordinator
181        self.bitswap
182            .start()
183            .await
184            .map_err(|e| HeliaError::network(format!("Failed to start Bitswap: {}", e)))?;
185        self.logger.info("Bitswap coordinator started");
186
187        // Start libp2p swarm
188        let mut swarm = self.libp2p.lock().await;
189        swarm
190            .listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap())
191            .map_err(|e| HeliaError::network(format!("Failed to start listening: {}", e)))?;
192        drop(swarm); // Release lock before spawning event loop
193
194        // Start swarm event loop
195        let swarm_clone = self.libp2p.clone();
196        let blockstore_clone = self.blockstore.clone();
197        let logger_clone = self.logger.clone();
198        let bitswap_clone = self.bitswap.clone();
199
200        // Take the outbound_rx channel (only available once)
201        let outbound_rx = self
202            .outbound_rx
203            .lock()
204            .await
205            .take()
206            .ok_or_else(|| HeliaError::other("Bitswap outbound channel already taken"))?;
207
208        let handle = tokio::spawn(async move {
209            run_swarm_event_loop(
210                swarm_clone,
211                blockstore_clone,
212                logger_clone,
213                bitswap_clone,
214                outbound_rx,
215            )
216            .await;
217        });
218
219        *self.event_loop_handle.lock().await = Some(handle);
220
221        self.logger.info("Helia node started");
222        *started = true;
223        
224        // Emit start event (ignore errors if no subscribers)
225        let _ = self.event_tx.send(HeliaEvent::Start);
226        
227        Ok(())
228    }
229
230    async fn stop(&self) -> Result<(), HeliaError> {
231        let mut started = self.started.write().await;
232        if !*started {
233            return Ok(());
234        }
235
236        // Stop event loop
237        if let Some(handle) = self.event_loop_handle.lock().await.take() {
238            handle.abort();
239        }
240
241        // Stop Bitswap coordinator
242        self.bitswap
243            .stop()
244            .await
245            .map_err(|e| HeliaError::network(format!("Failed to stop Bitswap: {}", e)))?;
246        self.logger.info("Bitswap coordinator stopped");
247
248        self.logger.info("Helia node stopped");
249        *started = false;
250        
251        // Emit stop event (ignore errors if no subscribers)
252        let _ = self.event_tx.send(HeliaEvent::Stop);
253        
254        Ok(())
255    }
256    async fn gc(&self, _options: Option<GcOptions>) -> Result<(), HeliaError> {
257        // Emit GC started event
258        let _ = self.event_tx.send(HeliaEvent::GcStarted);
259        
260        // TODO: Implement garbage collection
261        self.logger.info("Garbage collection not yet implemented");
262        
263        // Emit GC completed event
264        let _ = self.event_tx.send(HeliaEvent::GcCompleted);
265        
266        Ok(())
267    }
268
269    async fn get_codec(&self, code: u64) -> Result<Box<dyn Codec>, HeliaError> {
270        // TODO: Implement codec loading
271        Err(HeliaError::CodecNotFound { code })
272    }
273
274    async fn get_hasher(&self, code: u64) -> Result<Box<dyn Hasher>, HeliaError> {
275        // TODO: Implement hasher loading
276        Err(HeliaError::HasherNotFound { code })
277    }
278}
279
280#[async_trait]
281impl HeliaWithLibp2p<HeliaBehaviour> for HeliaImpl {
282    fn libp2p(&self) -> Arc<Mutex<Swarm<HeliaBehaviour>>> {
283        self.libp2p.clone()
284    }
285}
286
287/*
288/// Dummy libp2p implementation for now - DEPRECATED: Now using real libp2p
289pub struct DummyLibp2p {
290    started: Arc<RwLock<bool>>,
291}
292
293impl DummyLibp2p {
294    pub fn new() -> Self {
295        Self {
296            started: Arc::new(RwLock::new(false)),
297        }
298    }
299}
300
301#[async_trait]
302impl Libp2p for DummyLibp2p {
303    fn is_started(&self) -> bool {
304        false
305    }
306
307    fn peer_id(&self) -> libp2p::PeerId {
308        libp2p::PeerId::random()
309    }
310
311    fn listeners(&self) -> Vec<libp2p::Multiaddr> {
312        vec![]
313    }
314
315    async fn start(&self) -> Result<(), HeliaError> {
316        let mut started = self.started.write().await;
317        *started = true;
318        Ok(())
319    }
320
321    async fn stop(&self) -> Result<(), HeliaError> {
322        let mut started = self.started.write().await;
323        *started = false;
324        Ok(())
325    }
326}
327*/
328
329/// Dummy routing implementation
330pub struct DummyRouting;
331
332impl DummyRouting {
333    pub fn new() -> Self {
334        Self
335    }
336}
337
338#[async_trait]
339impl Routing for DummyRouting {
340    async fn find_providers(
341        &self,
342        _cid: &Cid,
343        _options: Option<FindProvidersOptions>,
344    ) -> Result<AwaitIterable<Provider>, HeliaError> {
345        Err(HeliaError::routing("Routing not yet implemented"))
346    }
347
348    async fn provide(
349        &self,
350        _cid: &Cid,
351        _options: Option<ProvideOptions>,
352    ) -> Result<(), HeliaError> {
353        Err(HeliaError::routing("Routing not yet implemented"))
354    }
355
356    async fn find_peers(
357        &self,
358        _peer_id: &libp2p::PeerId,
359        _options: Option<FindPeersOptions>,
360    ) -> Result<AwaitIterable<PeerInfo>, HeliaError> {
361        Err(HeliaError::routing("Routing not yet implemented"))
362    }
363
364    async fn get(
365        &self,
366        _key: &[u8],
367        _options: Option<GetOptions>,
368    ) -> Result<Option<RoutingRecord>, HeliaError> {
369        Err(HeliaError::routing("Routing not yet implemented"))
370    }
371
372    async fn put(
373        &self,
374        _key: &[u8],
375        _value: &[u8],
376        _options: Option<PutOptions>,
377    ) -> Result<(), HeliaError> {
378        Err(HeliaError::routing("Routing not yet implemented"))
379    }
380}
381
382/// Simple pins implementation  
383pub struct SimplePins {
384    datastore: Arc<dyn Datastore>,
385}
386
387impl SimplePins {
388    pub fn new(datastore: Arc<dyn Datastore>) -> Self {
389        Self { datastore }
390    }
391
392    fn pin_key(&self, cid: &Cid) -> Vec<u8> {
393        format!("pin:{}", cid).into_bytes()
394    }
395
396    fn pin_to_bytes(&self, pin: &HeliaPin) -> Result<Bytes, HeliaError> {
397        serde_json::to_vec(pin)
398            .map(Bytes::from)
399            .map_err(|e| HeliaError::other(format!("Failed to serialize pin: {}", e)))
400    }
401
402    fn bytes_to_pin(&self, data: &[u8]) -> Result<HeliaPin, HeliaError> {
403        serde_json::from_slice(data)
404            .map_err(|e| HeliaError::other(format!("Failed to deserialize pin: {}", e)))
405    }
406}
407
408#[async_trait]
409impl Pins for SimplePins {
410    async fn add(&self, cid: &Cid, options: Option<AddOptions>) -> Result<(), HeliaError> {
411        let options = options.unwrap_or_default();
412
413        let pin = HeliaPin {
414            cid: *cid,
415            depth: options.depth.unwrap_or(u64::MAX), // Default to recursive (infinite depth)
416            metadata: options.metadata,
417        };
418
419        let key = self.pin_key(cid);
420        let value = self.pin_to_bytes(&pin)?;
421
422        self.datastore.put(&key, value).await?;
423        Ok(())
424    }
425
426    async fn rm(&self, cid: &Cid, _options: Option<RmOptions>) -> Result<(), HeliaError> {
427        let key = self.pin_key(cid);
428        self.datastore.delete(&key).await?;
429        Ok(())
430    }
431
432    async fn ls(&self, options: Option<LsOptions>) -> Result<AwaitIterable<HeliaPin>, HeliaError> {
433        let options = options.unwrap_or_default();
434
435        // If filtering by specific CID
436        if let Some(filter_cid) = options.cid {
437            let key = self.pin_key(&filter_cid);
438            match self.datastore.get(&key).await? {
439                Some(data) => {
440                    let pin = self.bytes_to_pin(&data)?;
441                    Ok(Box::pin(stream::iter(vec![pin])))
442                }
443                None => Ok(Box::pin(stream::iter(vec![]))),
444            }
445        } else {
446            // List all pins - get all entries with "pin:" prefix
447            let mut pins = Vec::new();
448            let mut query_stream = self.datastore.query(Some(b"pin:")).await?;
449
450            use futures::StreamExt;
451            while let Some(data) = query_stream.next().await {
452                match self.bytes_to_pin(&data) {
453                    Ok(pin) => pins.push(pin),
454                    Err(_) => continue, // Skip invalid pin entries
455                }
456            }
457
458            Ok(Box::pin(stream::iter(pins)))
459        }
460    }
461
462    async fn is_pinned(
463        &self,
464        cid: &Cid,
465        _options: Option<IsPinnedOptions>,
466    ) -> Result<bool, HeliaError> {
467        let key = self.pin_key(cid);
468        self.datastore.has(&key).await
469    }
470}
471
472/// Run the libp2p swarm event loop
473async fn run_swarm_event_loop(
474    swarm: Arc<Mutex<Swarm<HeliaBehaviour>>>,
475    blockstore: Arc<dyn Blocks>,
476    logger: Arc<TracingLogger>,
477    bitswap: Arc<Bitswap>,
478    mut outbound_rx: tokio::sync::mpsc::UnboundedReceiver<
479        helia_bitswap::coordinator::OutboundMessage,
480    >,
481) {
482    loop {
483        tokio::select! {
484            // Handle swarm events
485            event = async {
486                let mut swarm_guard = swarm.lock().await;
487                swarm_guard.select_next_some().await
488            } => {
489                match event {
490                    SwarmEvent::Behaviour(behaviour_event) => {
491                        // Handle different behaviour events
492                        // The NetworkBehaviour derive macro generates HeliaBehaviourEvent
493                        match behaviour_event {
494                            HeliaBehaviourEvent::Bitswap(bitswap_event) => {
495                                handle_bitswap_event(bitswap_event, blockstore.clone(), bitswap.clone(), logger.clone()).await;
496                            }
497                            HeliaBehaviourEvent::Identify(identify_event) => {
498                                logger.debug(&format!("Identify event: {:?}", identify_event));
499                            }
500                            HeliaBehaviourEvent::Kademlia(kad_event) => {
501                                use libp2p::kad::QueryResult;
502
503                                match kad_event {
504                                    kad::Event::OutboundQueryProgressed { result, .. } => {
505                                        match result {
506                                            QueryResult::GetProviders(Ok(ok)) => {
507                                                logger.info(&format!("Kademlia: provider query result {:?}", ok));
508                                            }
509                                            QueryResult::GetProviders(Err(err)) => {
510                                                logger.warn(&format!("Kademlia: provider query error: {:?}", err));
511                                            }
512                                            QueryResult::GetClosestPeers(Ok(ok)) => {
513                                                logger.info(&format!("Kademlia: closest peers result {:?}", ok));
514                                            }
515                                            QueryResult::GetClosestPeers(Err(err)) => {
516                                                logger.warn(&format!("Kademlia: closest peers query error: {:?}", err));
517                                            }
518                                            other => {
519                                                logger.debug(&format!("Kademlia query result: {:?}", other));
520                                            }
521                                        }
522                                    }
523                                    kad::Event::RoutingUpdated { peer, addresses, .. } => {
524                                        for addr in addresses.iter() {
525                                            logger.info(&format!(
526                                                "Kademlia: routing updated for peer {} at {}",
527                                                peer, addr
528                                            ));
529                                        }
530                                    }
531                                    kad::Event::RoutablePeer { peer, address } => {
532                                        logger.info(&format!(
533                                            "Kademlia: routable peer {} via {}",
534                                            peer, address
535                                        ));
536                                    }
537                                    kad::Event::PendingRoutablePeer { peer, address } => {
538                                        logger.info(&format!(
539                                            "Kademlia: pending routable peer {} via {}",
540                                            peer, address
541                                        ));
542                                    }
543                                    other => {
544                                        logger.debug(&format!("Kademlia event: {:?}", other));
545                                    }
546                                }
547                            }
548                    HeliaBehaviourEvent::Gossipsub(gossip_event) => {
549                        logger.debug(&format!("Gossipsub event: {:?}", gossip_event));
550                    }
551                    HeliaBehaviourEvent::Mdns(mdns_event) => {
552                        use libp2p::mdns;
553                        match mdns_event {
554                            mdns::Event::Discovered(list) => {
555                                for (peer_id, multiaddr) in list {
556                                                                       logger.info(&format!("mDNS discovered peer: {} at {}", peer_id, multiaddr));
557                                    // Dial the discovered peer to establish connection
558                                    let mut swarm_guard = swarm.lock().await;
559                                    if let Err(e) = swarm_guard.dial(multiaddr.clone()) {
560                                        logger.warn(&format!("Failed to dial discovered peer {}: {}", peer_id, e));
561                                    } else {
562                                        logger.info(&format!("Dialing discovered peer: {}", peer_id));
563                                    }
564                                }
565                            }
566                            mdns::Event::Expired(list) => {
567                                for (peer_id, _multiaddr) in list {
568                                    logger.info(&format!("mDNS peer expired: {}", peer_id));
569                                }
570                            }
571                        }
572                    }
573                    _ => {
574                        // Handle other protocol events
575                        logger.debug(&format!("Other behaviour event: {:?}", behaviour_event));
576                    }
577                }
578            }
579            SwarmEvent::NewListenAddr { address, .. } => {
580                logger.info(&format!("Listening on {}", address));
581            }
582            SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. } => {
583                logger.info(&format!("Connection established with peer: {} at {}", peer_id, endpoint.get_remote_address()));
584                // Notify Bitswap coordinator of new peer
585                bitswap.add_peer(peer_id).await;
586                bitswap
587                    .wantlist()
588                    .dispatch_event(NetworkEvent::PeerConnected(peer_id));
589            }
590            SwarmEvent::ConnectionClosed { peer_id, cause, .. } => {
591                logger.info(&format!("Connection closed with peer: {} (cause: {:?})", peer_id, cause));
592                // Notify Bitswap coordinator of disconnected peer
593                bitswap.remove_peer(&peer_id).await;
594                bitswap
595                    .wantlist()
596                    .dispatch_event(NetworkEvent::PeerDisconnected(peer_id));
597            }
598            SwarmEvent::IncomingConnection { local_addr, send_back_addr, .. } => {
599                logger.debug(&format!("Incoming connection from {} to {}", send_back_addr, local_addr));
600            }
601            SwarmEvent::IncomingConnectionError { local_addr, send_back_addr, error, .. } => {
602                logger.warn(&format!("Incoming connection error from {} to {}: {}", send_back_addr, local_addr, error));
603            }
604            SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
605                if let Some(peer_id) = peer_id {
606                    logger.warn(&format!("Outgoing connection error to {}: {}", peer_id, error));
607                } else {
608                    logger.warn(&format!("Outgoing connection error: {}", error));
609                }
610            }
611            _ => {
612                // Handle other events as needed
613            }
614        }
615            }
616
617            // Handle outbound Bitswap messages from coordinator
618            Some(outbound_msg) = outbound_rx.recv() => {
619                logger.debug(&format!("Sending Bitswap message to peer {} via swarm", outbound_msg.peer));
620                let mut swarm_guard = swarm.lock().await;
621                swarm_guard.behaviour_mut().bitswap.send_message(outbound_msg.peer, outbound_msg.message);
622            }
623        }
624    }
625}
626
627/// Handle Bitswap events (MessageReceived, MessageSent, SendError)
628async fn handle_bitswap_event(
629    event: BitswapEvent,
630    blockstore: Arc<dyn Blocks>,
631    bitswap: Arc<Bitswap>,
632    logger: Arc<TracingLogger>,
633) {
634    match event {
635        BitswapEvent::MessageReceived { peer, message } => {
636            logger.info(&format!("Received Bitswap message from peer: {}", peer));
637            logger.debug(&format!(
638                "Bitswap payload summary -> structured blocks: {}, raw blocks: {}, presences: {}, wantlist entries: {}",
639                message.blocks.len(),
640                message.raw_blocks.len(),
641                message.block_presences.len(),
642                message
643                    .wantlist
644                    .as_ref()
645                    .map(|w| w.entries.len())
646                    .unwrap_or(0)
647            ));
648
649            // Forward message to Bitswap wantlist for responder handling
650            bitswap
651                .wantlist()
652                .dispatch_event(NetworkEvent::BitswapMessage(BitswapMessageEvent {
653                    peer,
654                    message: message.clone(),
655                }));
656
657            // Store any blocks that were received
658            if !message.blocks.is_empty() {
659                logger.info(&format!(
660                    "Received {} blocks from peer {}",
661                    message.blocks.len(),
662                    peer
663                ));
664
665                let wantlist = bitswap.wantlist();
666
667                for block in &message.blocks {
668                    logger.debug(&format!(
669                        "Block received - prefix_len: {}, data_len: {}",
670                        block.prefix.len(),
671                        block.data.len()
672                    ));
673
674                    // Decode CID from prefix and data
675                    // The prefix contains: [version, codec, ...]
676                    // For now, we'll reconstruct the CID from the block data
677                    // In Bitswap, the full CID can be reconstructed by hashing the data
678                    match reconstruct_cid_from_block(&block.prefix, &block.data) {
679                        Ok(cid) => {
680                            logger.info(&format!("Storing received block: {}", cid));
681
682                            // Store in blockstore
683                            if let Err(e) = blockstore
684                                .put(&cid, Bytes::from(block.data.clone()), None)
685                                .await
686                            {
687                                logger.warn(&format!(
688                                    "Failed to store received block {}: {}",
689                                    cid, e
690                                ));
691                            } else {
692                                logger.info(&format!("✅ Successfully stored block: {}", cid));
693
694                                // **OPTIMIZATION**: Immediately notify bitswap coordinator
695                                // This wakes up any waiting want() calls (event-driven, not polling)
696                                bitswap.notify_block_received(&cid);
697                                logger.debug(&format!(
698                                    "Notified coordinator of block arrival: {}",
699                                    cid
700                                ));
701
702                                if let Err(e) = wantlist.received_block(&cid).await {
703                                    logger.warn(&format!(
704                                        "Failed to notify wantlist for {}: {}",
705                                        cid, e
706                                    ));
707                                }
708                            }
709                        }
710                        Err(e) => {
711                            logger.warn(&format!("Failed to decode CID from block prefix: {}", e));
712                        }
713                    }
714                }
715            }
716
717            if !message.raw_blocks.is_empty() {
718                logger.info(&format!(
719                    "Received {} legacy raw block(s) from {}",
720                    message.raw_blocks.len(),
721                    peer
722                ));
723            }
724
725            if message.blocks.is_empty() && message.raw_blocks.is_empty() {
726                logger.warn("Bitswap message contained no blocks");
727            }
728
729            // Log wantlist if present
730            if let Some(wantlist) = &message.wantlist {
731                logger.debug(&format!(
732                    "Received wantlist with {} entries (full: {})",
733                    wantlist.entries.len(),
734                    wantlist.full
735                ));
736
737                // TODO: Process wantlist and send blocks if we have them
738                // This will be implemented when we connect the coordinator
739            }
740
741            // Log block presences if present
742            if !message.block_presences.is_empty() {
743                logger.info(&format!(
744                    "Received {} block presence notification(s) from {}",
745                    message.block_presences.len(),
746                    peer
747                ));
748
749                for presence in &message.block_presences {
750                    let cid_display = match Cid::try_from(presence.cid.as_slice()) {
751                        Ok(cid) => cid.to_string(),
752                        Err(_) => "<invalid cid>".to_string(),
753                    };
754
755                    let status = match presence.r#type {
756                        x if x == BlockPresenceType::HaveBlock as i32 => "HAVE",
757                        x if x == BlockPresenceType::DoNotHaveBlock as i32 => "DONT_HAVE",
758                        _ => "UNKNOWN",
759                    };
760
761                    logger.info(&format!("   Presence: {} reports {}", cid_display, status));
762                }
763            }
764        }
765        BitswapEvent::MessageSent { peer } => {
766            logger.debug(&format!(
767                "Successfully sent Bitswap message to peer: {}",
768                peer
769            ));
770        }
771        BitswapEvent::SendError { peer, error } => {
772            logger.warn(&format!(
773                "Failed to send Bitswap message to peer {}: {}",
774                peer, error
775            ));
776        }
777    }
778}
779
780/// Reconstruct CID from Bitswap block prefix and data
781///
782/// In our implementation, the prefix contains the full CID bytes,
783/// which allows us to get the exact CID without needing to re-hash.
784fn reconstruct_cid_from_block(prefix: &[u8], data: &[u8]) -> Result<cid::Cid, HeliaError> {
785    let (version_val, remaining) = varint_decode::u64(prefix)
786        .map_err(|e| HeliaError::network(format!("Failed to decode CID version from prefix: {}", e)))?;
787
788    let (codec_val, remaining) = varint_decode::u64(remaining)
789        .map_err(|e| HeliaError::network(format!("Failed to decode codec from prefix: {}", e)))?;
790
791    let (mh_code_val, remaining) = varint_decode::u64(remaining)
792        .map_err(|e| HeliaError::network(format!("Failed to decode multihash code from prefix: {}", e)))?;
793
794    let (mh_len_val, _remaining) = varint_decode::u64(remaining)
795        .map_err(|e| HeliaError::network(format!("Failed to decode multihash length from prefix: {}", e)))?;
796
797    let code = MultihashCode::try_from(mh_code_val).map_err(|_| {
798        HeliaError::network(format!("Unsupported multihash code in prefix: {}", mh_code_val))
799    })?;
800
801    let multihash = code.digest(data);
802    let expected_len = usize::try_from(mh_len_val).map_err(|_| {
803        HeliaError::network(format!("Multihash length {} does not fit in usize", mh_len_val))
804    })?;
805
806    if multihash.digest().len() != expected_len {
807        return Err(HeliaError::network(format!(
808            "Multihash length mismatch: expected {}, got {}",
809            expected_len,
810            multihash.digest().len()
811        )));
812    }
813
814    match version_val {
815        0 => cid::Cid::new_v0(multihash)
816            .map_err(|e| HeliaError::network(format!("Failed to construct CIDv0: {}", e))),
817        1 => Ok(cid::Cid::new_v1(codec_val, multihash)),
818        v => Err(HeliaError::network(format!(
819            "Unsupported CID version in prefix: {}",
820            v
821        ))),
822    }
823}
824
825#[cfg(test)]
826mod tests {
827    use super::*;
828    use multihash_codetable::{Code as MultihashCode, MultihashDigest};
829    use unsigned_varint::encode;
830
831    fn push_varint(target: &mut Vec<u8>, value: u64) {
832        let mut buf = encode::u64_buffer();
833        target.extend_from_slice(encode::u64(value, &mut buf));
834    }
835
836    #[test]
837    fn reconstructs_cid_v1_with_sha2_256() {
838        let data = b"hello world";
839    let digest = MultihashCode::Sha2_256.digest(data);
840        let codec_val = 0x55; // raw codec
841
842        let mut prefix = Vec::new();
843        push_varint(&mut prefix, 1); // CIDv1
844        push_varint(&mut prefix, codec_val);
845        push_varint(&mut prefix, u64::from(MultihashCode::Sha2_256));
846        push_varint(&mut prefix, digest.digest().len() as u64);
847
848        let reconstructed = reconstruct_cid_from_block(&prefix, data).expect("cid reconstruction");
849        let expected = Cid::new_v1(codec_val, digest.clone());
850
851        assert_eq!(reconstructed, expected);
852    }
853
854    #[test]
855    fn fails_on_mismatched_digest_length() {
856        let data = b"hello world";
857    let digest = MultihashCode::Sha2_256.digest(data);
858        let codec_val = 0x55; // raw codec
859
860        let mut prefix = Vec::new();
861        push_varint(&mut prefix, 1); // CIDv1
862        push_varint(&mut prefix, codec_val);
863        push_varint(&mut prefix, u64::from(MultihashCode::Sha2_256));
864        push_varint(&mut prefix, (digest.digest().len() as u64) - 1); // incorrect length
865
866        let err = reconstruct_cid_from_block(&prefix, data).expect_err("length mismatch should fail");
867
868        assert!(matches!(err, HeliaError::Network { .. }));
869    }
870}