1use std::sync::Arc;
4
5use async_trait::async_trait;
6use bytes::Bytes;
7use cid::Cid;
8use multihash_codetable::{Code as MultihashCode, MultihashDigest};
9use std::convert::TryFrom;
10use futures::stream;
11use futures::StreamExt;
12use helia_bitswap::BlockPresenceType;
13use libp2p::{
14 kad,
15 swarm::{
16 dial_opts::{DialOpts, PeerCondition},
17 SwarmEvent,
18 },
19 Swarm,
20};
21use tokio::sync::{Mutex, RwLock};
22use tokio::task::JoinHandle;
23use trust_dns_resolver::TokioAsyncResolver;
24use unsigned_varint::decode as varint_decode;
25
26use helia_interface::pins::Pin as HeliaPin;
27use helia_interface::*;
28use tokio::sync::broadcast;
29
30use crate::libp2p_behaviour::HeliaBehaviourEvent;
31use crate::{
32 create_swarm, BlockstoreWithBitswap, HeliaBehaviour, HeliaConfig, SledBlockstore,
33 SledDatastore, TracingLogger,
34};
35use helia_bitswap::{
36 network_new::{BitswapMessageEvent, NetworkEvent},
37 Bitswap, BitswapConfig, BitswapEvent,
38};
39
40pub struct HeliaImpl {
42 libp2p: Arc<Mutex<Swarm<HeliaBehaviour>>>,
43 blockstore: Arc<dyn Blocks>,
44 datastore: Arc<SledDatastore>,
45 pins: Arc<SimplePins>,
46 logger: Arc<TracingLogger>,
47 routing: Arc<DummyRouting>,
48 dns: TokioAsyncResolver,
49 metrics: Option<Arc<dyn Metrics>>,
50 started: Arc<RwLock<bool>>,
51 event_loop_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
52 bitswap: Arc<Bitswap>,
53 outbound_rx: Arc<
54 Mutex<
55 Option<
56 tokio::sync::mpsc::UnboundedReceiver<helia_bitswap::coordinator::OutboundMessage>,
57 >,
58 >,
59 >,
60 event_tx: broadcast::Sender<HeliaEvent>,
62}
63
64impl HeliaImpl {
65 pub async fn new(mut config: HeliaConfig) -> Result<Self, HeliaError> {
66 let local_blockstore = Arc::new(SledBlockstore::new(config.blockstore)?);
68 let datastore = Arc::new(SledDatastore::new(config.datastore)?);
69 let pins = Arc::new(SimplePins::new(datastore.clone()));
70 let logger = Arc::new(TracingLogger::new(config.logger));
71 let routing = Arc::new(DummyRouting::new());
72
73 let libp2p = if let Some(swarm) = config.libp2p.take() {
75 swarm
76 } else {
77 let swarm = create_swarm().await.map_err(|e| {
78 HeliaError::network(format!("Failed to create libp2p swarm: {}", e))
79 })?;
80 Arc::new(Mutex::new(swarm))
81 };
82
83 let dns = config.dns.unwrap_or_else(|| {
84 TokioAsyncResolver::tokio_from_system_conf().expect("Failed to create DNS resolver")
85 });
86
87 let bitswap_config = BitswapConfig::default();
89 let mut bitswap = Bitswap::new(local_blockstore.clone() as Arc<dyn Blocks>, bitswap_config)
90 .await
91 .map_err(|e| HeliaError::network(format!("Failed to create Bitswap: {}", e)))?;
92
93 let (outbound_tx, outbound_rx) = tokio::sync::mpsc::unbounded_channel();
95 bitswap.set_outbound_sender(outbound_tx).await;
96 logger.info("Bitswap outbound message channel created");
97
98 let bitswap = Arc::new(bitswap);
99
100 {
103 let mut swarm_guard = libp2p.lock().await;
104 swarm_guard
105 .behaviour_mut()
106 .bitswap
107 .set_coordinator(bitswap.clone());
108 logger.info("Bitswap coordinator connected to NetworkBehaviour");
109 }
110
111 let blockstore: Arc<dyn Blocks> = Arc::new(BlockstoreWithBitswap::new(
113 local_blockstore,
114 bitswap.clone(),
115 ));
116
117 logger.info("Helia node initialized with Bitswap P2P support");
118
119 let (event_tx, _) = broadcast::channel(100);
121
122 Ok(Self {
123 libp2p,
124 blockstore,
125 datastore,
126 pins,
127 logger,
128 routing,
129 dns,
130 metrics: config.metrics,
131 started: Arc::new(RwLock::new(false)),
132 event_loop_handle: Arc::new(Mutex::new(None)),
133 bitswap,
134 outbound_rx: Arc::new(Mutex::new(Some(outbound_rx))),
135 event_tx,
136 })
137 }
138}
139
140#[async_trait]
141impl Helia for HeliaImpl {
142 fn blockstore(&self) -> &dyn Blocks {
143 self.blockstore.as_ref()
144 }
145
146 fn datastore(&self) -> &dyn Datastore {
147 self.datastore.as_ref()
148 }
149
150 fn pins(&self) -> &dyn Pins {
151 self.pins.as_ref()
152 }
153
154 fn logger(&self) -> &dyn ComponentLogger {
155 self.logger.as_ref()
156 }
157
158 fn routing(&self) -> &dyn Routing {
159 self.routing.as_ref()
160 }
161
162 fn dns(&self) -> &TokioAsyncResolver {
163 &self.dns
164 }
165
166 fn metrics(&self) -> Option<&dyn Metrics> {
167 self.metrics.as_ref().map(|m| m.as_ref())
168 }
169
170 fn subscribe_events(&self) -> HeliaEventReceiver {
171 self.event_tx.subscribe()
172 }
173
174 async fn start(&self) -> Result<(), HeliaError> {
175 let mut started = self.started.write().await;
176 if *started {
177 return Ok(());
178 }
179
180 self.bitswap
182 .start()
183 .await
184 .map_err(|e| HeliaError::network(format!("Failed to start Bitswap: {}", e)))?;
185 self.logger.info("Bitswap coordinator started");
186
187 let mut swarm = self.libp2p.lock().await;
189 swarm
190 .listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap())
191 .map_err(|e| HeliaError::network(format!("Failed to start listening: {}", e)))?;
192 drop(swarm); let swarm_clone = self.libp2p.clone();
196 let blockstore_clone = self.blockstore.clone();
197 let logger_clone = self.logger.clone();
198 let bitswap_clone = self.bitswap.clone();
199
200 let outbound_rx = self
202 .outbound_rx
203 .lock()
204 .await
205 .take()
206 .ok_or_else(|| HeliaError::other("Bitswap outbound channel already taken"))?;
207
208 let handle = tokio::spawn(async move {
209 run_swarm_event_loop(
210 swarm_clone,
211 blockstore_clone,
212 logger_clone,
213 bitswap_clone,
214 outbound_rx,
215 )
216 .await;
217 });
218
219 *self.event_loop_handle.lock().await = Some(handle);
220
221 self.logger.info("Helia node started");
222 *started = true;
223
224 let _ = self.event_tx.send(HeliaEvent::Start);
226
227 Ok(())
228 }
229
230 async fn stop(&self) -> Result<(), HeliaError> {
231 let mut started = self.started.write().await;
232 if !*started {
233 return Ok(());
234 }
235
236 if let Some(handle) = self.event_loop_handle.lock().await.take() {
238 handle.abort();
239 }
240
241 self.bitswap
243 .stop()
244 .await
245 .map_err(|e| HeliaError::network(format!("Failed to stop Bitswap: {}", e)))?;
246 self.logger.info("Bitswap coordinator stopped");
247
248 self.logger.info("Helia node stopped");
249 *started = false;
250
251 let _ = self.event_tx.send(HeliaEvent::Stop);
253
254 Ok(())
255 }
256 async fn gc(&self, _options: Option<GcOptions>) -> Result<(), HeliaError> {
257 let _ = self.event_tx.send(HeliaEvent::GcStarted);
259
260 self.logger.info("Garbage collection not yet implemented");
262
263 let _ = self.event_tx.send(HeliaEvent::GcCompleted);
265
266 Ok(())
267 }
268
269 async fn get_codec(&self, code: u64) -> Result<Box<dyn Codec>, HeliaError> {
270 Err(HeliaError::CodecNotFound { code })
272 }
273
274 async fn get_hasher(&self, code: u64) -> Result<Box<dyn Hasher>, HeliaError> {
275 Err(HeliaError::HasherNotFound { code })
277 }
278}
279
280#[async_trait]
281impl HeliaWithLibp2p<HeliaBehaviour> for HeliaImpl {
282 fn libp2p(&self) -> Arc<Mutex<Swarm<HeliaBehaviour>>> {
283 self.libp2p.clone()
284 }
285}
286
287pub struct DummyRouting;
331
332impl DummyRouting {
333 pub fn new() -> Self {
334 Self
335 }
336}
337
338#[async_trait]
339impl Routing for DummyRouting {
340 async fn find_providers(
341 &self,
342 _cid: &Cid,
343 _options: Option<FindProvidersOptions>,
344 ) -> Result<AwaitIterable<Provider>, HeliaError> {
345 Err(HeliaError::routing("Routing not yet implemented"))
346 }
347
348 async fn provide(
349 &self,
350 _cid: &Cid,
351 _options: Option<ProvideOptions>,
352 ) -> Result<(), HeliaError> {
353 Err(HeliaError::routing("Routing not yet implemented"))
354 }
355
356 async fn find_peers(
357 &self,
358 _peer_id: &libp2p::PeerId,
359 _options: Option<FindPeersOptions>,
360 ) -> Result<AwaitIterable<PeerInfo>, HeliaError> {
361 Err(HeliaError::routing("Routing not yet implemented"))
362 }
363
364 async fn get(
365 &self,
366 _key: &[u8],
367 _options: Option<GetOptions>,
368 ) -> Result<Option<RoutingRecord>, HeliaError> {
369 Err(HeliaError::routing("Routing not yet implemented"))
370 }
371
372 async fn put(
373 &self,
374 _key: &[u8],
375 _value: &[u8],
376 _options: Option<PutOptions>,
377 ) -> Result<(), HeliaError> {
378 Err(HeliaError::routing("Routing not yet implemented"))
379 }
380}
381
382pub struct SimplePins {
384 datastore: Arc<dyn Datastore>,
385}
386
387impl SimplePins {
388 pub fn new(datastore: Arc<dyn Datastore>) -> Self {
389 Self { datastore }
390 }
391
392 fn pin_key(&self, cid: &Cid) -> Vec<u8> {
393 format!("pin:{}", cid).into_bytes()
394 }
395
396 fn pin_to_bytes(&self, pin: &HeliaPin) -> Result<Bytes, HeliaError> {
397 serde_json::to_vec(pin)
398 .map(Bytes::from)
399 .map_err(|e| HeliaError::other(format!("Failed to serialize pin: {}", e)))
400 }
401
402 fn bytes_to_pin(&self, data: &[u8]) -> Result<HeliaPin, HeliaError> {
403 serde_json::from_slice(data)
404 .map_err(|e| HeliaError::other(format!("Failed to deserialize pin: {}", e)))
405 }
406}
407
408#[async_trait]
409impl Pins for SimplePins {
410 async fn add(&self, cid: &Cid, options: Option<AddOptions>) -> Result<(), HeliaError> {
411 let options = options.unwrap_or_default();
412
413 let pin = HeliaPin {
414 cid: *cid,
415 depth: options.depth.unwrap_or(u64::MAX), metadata: options.metadata,
417 };
418
419 let key = self.pin_key(cid);
420 let value = self.pin_to_bytes(&pin)?;
421
422 self.datastore.put(&key, value).await?;
423 Ok(())
424 }
425
426 async fn rm(&self, cid: &Cid, _options: Option<RmOptions>) -> Result<(), HeliaError> {
427 let key = self.pin_key(cid);
428 self.datastore.delete(&key).await?;
429 Ok(())
430 }
431
432 async fn ls(&self, options: Option<LsOptions>) -> Result<AwaitIterable<HeliaPin>, HeliaError> {
433 let options = options.unwrap_or_default();
434
435 if let Some(filter_cid) = options.cid {
437 let key = self.pin_key(&filter_cid);
438 match self.datastore.get(&key).await? {
439 Some(data) => {
440 let pin = self.bytes_to_pin(&data)?;
441 Ok(Box::pin(stream::iter(vec![pin])))
442 }
443 None => Ok(Box::pin(stream::iter(vec![]))),
444 }
445 } else {
446 let mut pins = Vec::new();
448 let mut query_stream = self.datastore.query(Some(b"pin:")).await?;
449
450 use futures::StreamExt;
451 while let Some(data) = query_stream.next().await {
452 match self.bytes_to_pin(&data) {
453 Ok(pin) => pins.push(pin),
454 Err(_) => continue, }
456 }
457
458 Ok(Box::pin(stream::iter(pins)))
459 }
460 }
461
462 async fn is_pinned(
463 &self,
464 cid: &Cid,
465 _options: Option<IsPinnedOptions>,
466 ) -> Result<bool, HeliaError> {
467 let key = self.pin_key(cid);
468 self.datastore.has(&key).await
469 }
470}
471
472async fn run_swarm_event_loop(
474 swarm: Arc<Mutex<Swarm<HeliaBehaviour>>>,
475 blockstore: Arc<dyn Blocks>,
476 logger: Arc<TracingLogger>,
477 bitswap: Arc<Bitswap>,
478 mut outbound_rx: tokio::sync::mpsc::UnboundedReceiver<
479 helia_bitswap::coordinator::OutboundMessage,
480 >,
481) {
482 loop {
483 tokio::select! {
484 event = async {
486 let mut swarm_guard = swarm.lock().await;
487 swarm_guard.select_next_some().await
488 } => {
489 match event {
490 SwarmEvent::Behaviour(behaviour_event) => {
491 match behaviour_event {
494 HeliaBehaviourEvent::Bitswap(bitswap_event) => {
495 handle_bitswap_event(bitswap_event, blockstore.clone(), bitswap.clone(), logger.clone()).await;
496 }
497 HeliaBehaviourEvent::Identify(identify_event) => {
498 logger.debug(&format!("Identify event: {:?}", identify_event));
499 }
500 HeliaBehaviourEvent::Kademlia(kad_event) => {
501 use libp2p::kad::QueryResult;
502
503 match kad_event {
504 kad::Event::OutboundQueryProgressed { result, .. } => {
505 match result {
506 QueryResult::GetProviders(Ok(ok)) => {
507 logger.info(&format!("Kademlia: provider query result {:?}", ok));
508 }
509 QueryResult::GetProviders(Err(err)) => {
510 logger.warn(&format!("Kademlia: provider query error: {:?}", err));
511 }
512 QueryResult::GetClosestPeers(Ok(ok)) => {
513 logger.info(&format!("Kademlia: closest peers result {:?}", ok));
514 }
515 QueryResult::GetClosestPeers(Err(err)) => {
516 logger.warn(&format!("Kademlia: closest peers query error: {:?}", err));
517 }
518 other => {
519 logger.debug(&format!("Kademlia query result: {:?}", other));
520 }
521 }
522 }
523 kad::Event::RoutingUpdated { peer, addresses, .. } => {
524 for addr in addresses.iter() {
525 logger.info(&format!(
526 "Kademlia: routing updated for peer {} at {}",
527 peer, addr
528 ));
529 }
530 }
531 kad::Event::RoutablePeer { peer, address } => {
532 logger.info(&format!(
533 "Kademlia: routable peer {} via {}",
534 peer, address
535 ));
536 }
537 kad::Event::PendingRoutablePeer { peer, address } => {
538 logger.info(&format!(
539 "Kademlia: pending routable peer {} via {}",
540 peer, address
541 ));
542 }
543 other => {
544 logger.debug(&format!("Kademlia event: {:?}", other));
545 }
546 }
547 }
548 HeliaBehaviourEvent::Gossipsub(gossip_event) => {
549 logger.debug(&format!("Gossipsub event: {:?}", gossip_event));
550 }
551 HeliaBehaviourEvent::Mdns(mdns_event) => {
552 use libp2p::mdns;
553 match mdns_event {
554 mdns::Event::Discovered(list) => {
555 for (peer_id, multiaddr) in list {
556 logger.info(&format!("mDNS discovered peer: {} at {}", peer_id, multiaddr));
557 let mut swarm_guard = swarm.lock().await;
559 if let Err(e) = swarm_guard.dial(multiaddr.clone()) {
560 logger.warn(&format!("Failed to dial discovered peer {}: {}", peer_id, e));
561 } else {
562 logger.info(&format!("Dialing discovered peer: {}", peer_id));
563 }
564 }
565 }
566 mdns::Event::Expired(list) => {
567 for (peer_id, _multiaddr) in list {
568 logger.info(&format!("mDNS peer expired: {}", peer_id));
569 }
570 }
571 }
572 }
573 _ => {
574 logger.debug(&format!("Other behaviour event: {:?}", behaviour_event));
576 }
577 }
578 }
579 SwarmEvent::NewListenAddr { address, .. } => {
580 logger.info(&format!("Listening on {}", address));
581 }
582 SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. } => {
583 logger.info(&format!("Connection established with peer: {} at {}", peer_id, endpoint.get_remote_address()));
584 bitswap.add_peer(peer_id).await;
586 bitswap
587 .wantlist()
588 .dispatch_event(NetworkEvent::PeerConnected(peer_id));
589 }
590 SwarmEvent::ConnectionClosed { peer_id, cause, .. } => {
591 logger.info(&format!("Connection closed with peer: {} (cause: {:?})", peer_id, cause));
592 bitswap.remove_peer(&peer_id).await;
594 bitswap
595 .wantlist()
596 .dispatch_event(NetworkEvent::PeerDisconnected(peer_id));
597 }
598 SwarmEvent::IncomingConnection { local_addr, send_back_addr, .. } => {
599 logger.debug(&format!("Incoming connection from {} to {}", send_back_addr, local_addr));
600 }
601 SwarmEvent::IncomingConnectionError { local_addr, send_back_addr, error, .. } => {
602 logger.warn(&format!("Incoming connection error from {} to {}: {}", send_back_addr, local_addr, error));
603 }
604 SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
605 if let Some(peer_id) = peer_id {
606 logger.warn(&format!("Outgoing connection error to {}: {}", peer_id, error));
607 } else {
608 logger.warn(&format!("Outgoing connection error: {}", error));
609 }
610 }
611 _ => {
612 }
614 }
615 }
616
617 Some(outbound_msg) = outbound_rx.recv() => {
619 logger.debug(&format!("Sending Bitswap message to peer {} via swarm", outbound_msg.peer));
620 let mut swarm_guard = swarm.lock().await;
621 swarm_guard.behaviour_mut().bitswap.send_message(outbound_msg.peer, outbound_msg.message);
622 }
623 }
624 }
625}
626
627async fn handle_bitswap_event(
629 event: BitswapEvent,
630 blockstore: Arc<dyn Blocks>,
631 bitswap: Arc<Bitswap>,
632 logger: Arc<TracingLogger>,
633) {
634 match event {
635 BitswapEvent::MessageReceived { peer, message } => {
636 logger.info(&format!("Received Bitswap message from peer: {}", peer));
637 logger.debug(&format!(
638 "Bitswap payload summary -> structured blocks: {}, raw blocks: {}, presences: {}, wantlist entries: {}",
639 message.blocks.len(),
640 message.raw_blocks.len(),
641 message.block_presences.len(),
642 message
643 .wantlist
644 .as_ref()
645 .map(|w| w.entries.len())
646 .unwrap_or(0)
647 ));
648
649 bitswap
651 .wantlist()
652 .dispatch_event(NetworkEvent::BitswapMessage(BitswapMessageEvent {
653 peer,
654 message: message.clone(),
655 }));
656
657 if !message.blocks.is_empty() {
659 logger.info(&format!(
660 "Received {} blocks from peer {}",
661 message.blocks.len(),
662 peer
663 ));
664
665 let wantlist = bitswap.wantlist();
666
667 for block in &message.blocks {
668 logger.debug(&format!(
669 "Block received - prefix_len: {}, data_len: {}",
670 block.prefix.len(),
671 block.data.len()
672 ));
673
674 match reconstruct_cid_from_block(&block.prefix, &block.data) {
679 Ok(cid) => {
680 logger.info(&format!("Storing received block: {}", cid));
681
682 if let Err(e) = blockstore
684 .put(&cid, Bytes::from(block.data.clone()), None)
685 .await
686 {
687 logger.warn(&format!(
688 "Failed to store received block {}: {}",
689 cid, e
690 ));
691 } else {
692 logger.info(&format!("✅ Successfully stored block: {}", cid));
693
694 bitswap.notify_block_received(&cid);
697 logger.debug(&format!(
698 "Notified coordinator of block arrival: {}",
699 cid
700 ));
701
702 if let Err(e) = wantlist.received_block(&cid).await {
703 logger.warn(&format!(
704 "Failed to notify wantlist for {}: {}",
705 cid, e
706 ));
707 }
708 }
709 }
710 Err(e) => {
711 logger.warn(&format!("Failed to decode CID from block prefix: {}", e));
712 }
713 }
714 }
715 }
716
717 if !message.raw_blocks.is_empty() {
718 logger.info(&format!(
719 "Received {} legacy raw block(s) from {}",
720 message.raw_blocks.len(),
721 peer
722 ));
723 }
724
725 if message.blocks.is_empty() && message.raw_blocks.is_empty() {
726 logger.warn("Bitswap message contained no blocks");
727 }
728
729 if let Some(wantlist) = &message.wantlist {
731 logger.debug(&format!(
732 "Received wantlist with {} entries (full: {})",
733 wantlist.entries.len(),
734 wantlist.full
735 ));
736
737 }
740
741 if !message.block_presences.is_empty() {
743 logger.info(&format!(
744 "Received {} block presence notification(s) from {}",
745 message.block_presences.len(),
746 peer
747 ));
748
749 for presence in &message.block_presences {
750 let cid_display = match Cid::try_from(presence.cid.as_slice()) {
751 Ok(cid) => cid.to_string(),
752 Err(_) => "<invalid cid>".to_string(),
753 };
754
755 let status = match presence.r#type {
756 x if x == BlockPresenceType::HaveBlock as i32 => "HAVE",
757 x if x == BlockPresenceType::DoNotHaveBlock as i32 => "DONT_HAVE",
758 _ => "UNKNOWN",
759 };
760
761 logger.info(&format!(" Presence: {} reports {}", cid_display, status));
762 }
763 }
764 }
765 BitswapEvent::MessageSent { peer } => {
766 logger.debug(&format!(
767 "Successfully sent Bitswap message to peer: {}",
768 peer
769 ));
770 }
771 BitswapEvent::SendError { peer, error } => {
772 logger.warn(&format!(
773 "Failed to send Bitswap message to peer {}: {}",
774 peer, error
775 ));
776 }
777 }
778}
779
780fn reconstruct_cid_from_block(prefix: &[u8], data: &[u8]) -> Result<cid::Cid, HeliaError> {
785 let (version_val, remaining) = varint_decode::u64(prefix)
786 .map_err(|e| HeliaError::network(format!("Failed to decode CID version from prefix: {}", e)))?;
787
788 let (codec_val, remaining) = varint_decode::u64(remaining)
789 .map_err(|e| HeliaError::network(format!("Failed to decode codec from prefix: {}", e)))?;
790
791 let (mh_code_val, remaining) = varint_decode::u64(remaining)
792 .map_err(|e| HeliaError::network(format!("Failed to decode multihash code from prefix: {}", e)))?;
793
794 let (mh_len_val, _remaining) = varint_decode::u64(remaining)
795 .map_err(|e| HeliaError::network(format!("Failed to decode multihash length from prefix: {}", e)))?;
796
797 let code = MultihashCode::try_from(mh_code_val).map_err(|_| {
798 HeliaError::network(format!("Unsupported multihash code in prefix: {}", mh_code_val))
799 })?;
800
801 let multihash = code.digest(data);
802 let expected_len = usize::try_from(mh_len_val).map_err(|_| {
803 HeliaError::network(format!("Multihash length {} does not fit in usize", mh_len_val))
804 })?;
805
806 if multihash.digest().len() != expected_len {
807 return Err(HeliaError::network(format!(
808 "Multihash length mismatch: expected {}, got {}",
809 expected_len,
810 multihash.digest().len()
811 )));
812 }
813
814 match version_val {
815 0 => cid::Cid::new_v0(multihash)
816 .map_err(|e| HeliaError::network(format!("Failed to construct CIDv0: {}", e))),
817 1 => Ok(cid::Cid::new_v1(codec_val, multihash)),
818 v => Err(HeliaError::network(format!(
819 "Unsupported CID version in prefix: {}",
820 v
821 ))),
822 }
823}
824
825#[cfg(test)]
826mod tests {
827 use super::*;
828 use multihash_codetable::{Code as MultihashCode, MultihashDigest};
829 use unsigned_varint::encode;
830
831 fn push_varint(target: &mut Vec<u8>, value: u64) {
832 let mut buf = encode::u64_buffer();
833 target.extend_from_slice(encode::u64(value, &mut buf));
834 }
835
836 #[test]
837 fn reconstructs_cid_v1_with_sha2_256() {
838 let data = b"hello world";
839 let digest = MultihashCode::Sha2_256.digest(data);
840 let codec_val = 0x55; let mut prefix = Vec::new();
843 push_varint(&mut prefix, 1); push_varint(&mut prefix, codec_val);
845 push_varint(&mut prefix, u64::from(MultihashCode::Sha2_256));
846 push_varint(&mut prefix, digest.digest().len() as u64);
847
848 let reconstructed = reconstruct_cid_from_block(&prefix, data).expect("cid reconstruction");
849 let expected = Cid::new_v1(codec_val, digest.clone());
850
851 assert_eq!(reconstructed, expected);
852 }
853
854 #[test]
855 fn fails_on_mismatched_digest_length() {
856 let data = b"hello world";
857 let digest = MultihashCode::Sha2_256.digest(data);
858 let codec_val = 0x55; let mut prefix = Vec::new();
861 push_varint(&mut prefix, 1); push_varint(&mut prefix, codec_val);
863 push_varint(&mut prefix, u64::from(MultihashCode::Sha2_256));
864 push_varint(&mut prefix, (digest.digest().len() as u64) - 1); let err = reconstruct_cid_from_block(&prefix, data).expect_err("length mismatch should fail");
867
868 assert!(matches!(err, HeliaError::Network { .. }));
869 }
870}