1use crate::{
5 Command, CommandHelper, Config, Error, Event as NetworkEvent, MachineSpec,
6 Monitor, MonitorMessage, NodeType, ResolvedSpec,
7 behaviour::{Behaviour, Event as BehaviourEvent, ReqResMessage},
8 metrics::NetworkMetrics,
9 resolve_spec,
10 service::NetworkService,
11 transport::build_transport,
12 utils::{
13 Action, Due, IDENTIFY_PROTOCOL, LimitsConfig, MessagesHelper,
14 NetworkState, REQRES_PROTOCOL, RetryKind, RetryState, ScheduleType,
15 convert_addresses, convert_boot_nodes, peer_id_to_ed25519_pubkey_bytes,
16 },
17};
18
19use std::{
20 collections::{BinaryHeap, HashSet},
21 fmt::Debug,
22 num::{NonZeroU8, NonZeroUsize},
23 pin::Pin,
24 sync::Arc,
25 time::Duration,
26};
27
28use ave_actors::ActorRef;
29use ave_common::identity::KeyPair;
30
31use libp2p::{
32 Multiaddr, PeerId, StreamProtocol, Swarm, identify,
33 identity::{Keypair, ed25519},
34 request_response::{self, ResponseChannel},
35 swarm::{self, DialError, SwarmEvent, dial_opts::DialOpts},
36};
37
38use futures::StreamExt;
39use serde::Serialize;
40use tokio::{
41 sync::mpsc,
42 time::{Instant, Sleep, sleep, sleep_until},
43};
44use tokio_util::sync::CancellationToken;
45use tracing::{debug, error, info, trace, warn};
46
47use bytes::Bytes;
48use std::collections::{HashMap, VecDeque};
49
50const TARGET: &str = "ave::network::worker";
51
52const MAX_PENDING_MESSAGES_PER_PEER: usize = 100;
55
56#[derive(Default)]
60struct PendingQueue {
61 messages: VecDeque<PendingMessage>,
62 pending_bytes: usize,
63}
64
65struct PendingMessage {
66 payload: Bytes,
67 enqueued_at: Instant,
68}
69
70impl PendingQueue {
71 fn contains(&self, message: &Bytes) -> bool {
72 self.messages.iter().any(|x| x.payload == *message)
73 }
74
75 fn pop_front(&mut self) -> Option<PendingMessage> {
76 let popped = self.messages.pop_front()?;
77 self.pending_bytes =
78 self.pending_bytes.saturating_sub(popped.payload.len());
79 Some(popped)
80 }
81
82 fn push_back(&mut self, message: Bytes) {
83 self.pending_bytes += message.len();
84 self.messages.push_back(PendingMessage {
85 payload: message,
86 enqueued_at: Instant::now(),
87 });
88 }
89
90 fn drain(&mut self) -> impl Iterator<Item = PendingMessage> + '_ {
91 self.pending_bytes = 0;
92 self.messages.drain(..)
93 }
94
95 fn is_empty(&self) -> bool {
96 self.messages.is_empty()
97 }
98
99 fn len(&self) -> usize {
100 self.messages.len()
101 }
102
103 const fn bytes_len(&self) -> usize {
104 self.pending_bytes
105 }
106}
107
108pub struct NetworkWorker<T>
113where
114 T: Debug + Serialize,
115{
116 local_peer_id: PeerId,
118
119 service: NetworkService,
121
122 swarm: Swarm<Behaviour>,
124
125 state: NetworkState,
127
128 command_receiver: mpsc::Receiver<Command>,
130
131 helper_sender: Option<mpsc::Sender<CommandHelper<T>>>,
133
134 monitor: Option<ActorRef<Monitor>>,
136
137 graceful_token: CancellationToken,
139 crash_token: CancellationToken,
140
141 node_type: NodeType,
143
144 boot_nodes: HashMap<PeerId, Vec<Multiaddr>>,
146
147 retry_boot_nodes: HashMap<PeerId, Vec<Multiaddr>>,
149
150 pending_outbound_messages: HashMap<PeerId, PendingQueue>,
152
153 pending_inbound_messages: HashMap<PeerId, PendingQueue>,
154
155 response_channels:
157 HashMap<PeerId, VecDeque<ResponseChannel<ReqResMessage>>>,
158
159 successful_dials: u64,
161
162 peer_identify: HashSet<PeerId>,
163
164 retry_by_peer: HashMap<PeerId, RetryState>,
165
166 retry_queue: BinaryHeap<Due>,
167
168 retry_timer: Option<Pin<Box<Sleep>>>,
169
170 peer_action: HashMap<PeerId, Action>,
171
172 max_app_message_bytes: usize,
173 max_pending_outbound_bytes_per_peer: usize,
174 max_pending_inbound_bytes_per_peer: usize,
175 max_pending_outbound_bytes_total: usize,
176 max_pending_inbound_bytes_total: usize,
177
178 metrics: Option<Arc<NetworkMetrics>>,
179}
180
181impl<T: Debug + Serialize> NetworkWorker<T> {
182 pub fn new(
184 keys: &KeyPair,
185 config: Config,
186 monitor: Option<ActorRef<Monitor>>,
187 graceful_token: CancellationToken,
188 crash_token: CancellationToken,
189 machine_spec: Option<MachineSpec>,
190 metrics: Option<Arc<NetworkMetrics>>,
191 ) -> Result<Self, Error> {
192 info!(target: TARGET, "network initialising");
194 let (command_sender, command_receiver) = mpsc::channel(512);
195
196 let key = match keys {
197 KeyPair::Ed25519(ed25519_signer) => {
198 let sk_bytes = ed25519_signer
199 .secret_key_bytes()
200 .map_err(|e| Error::KeyExtraction(e.to_string()))?;
201
202 let sk = ed25519::SecretKey::try_from_bytes(sk_bytes)
203 .map_err(|e| Error::KeyExtraction(e.to_string()))?;
204
205 let kp = ed25519::Keypair::from(sk);
206 Keypair::from(kp)
207 }
208 };
209
210 let local_peer_id = key.public().to_peer_id();
212
213 let boot_nodes = convert_boot_nodes(&config.boot_nodes);
214
215 let addresses = convert_addresses(&config.listen_addresses)?;
217
218 let external_addresses = convert_addresses(&config.external_addresses)?;
220
221 let node_type = config.node_type.clone();
222
223 let ResolvedSpec { ram_mb, cpu_cores } = resolve_spec(machine_spec);
225
226 let limits = LimitsConfig::build(ram_mb, cpu_cores);
227 let max_app_message_bytes = config.max_app_message_bytes;
228 let max_pending_outbound_bytes_per_peer =
229 config.max_pending_outbound_bytes_per_peer;
230 let max_pending_inbound_bytes_per_peer =
231 config.max_pending_inbound_bytes_per_peer;
232 let max_pending_outbound_bytes_total =
233 config.max_pending_outbound_bytes_total;
234 let max_pending_inbound_bytes_total =
235 config.max_pending_inbound_bytes_total;
236
237 let transport = build_transport(&key, limits.clone())?;
239
240 let behaviour = Behaviour::new(
241 &key.public(),
242 config,
243 graceful_token.clone(),
244 crash_token.clone(),
245 limits,
246 metrics.clone(),
247 );
248
249 let mut swarm = Swarm::new(
251 transport,
252 behaviour,
253 local_peer_id,
254 swarm::Config::with_tokio_executor()
255 .with_idle_connection_timeout(Duration::from_secs(90))
256 .with_max_negotiating_inbound_streams(32)
257 .with_notify_handler_buffer_size(
258 NonZeroUsize::new(32).expect("32 > 0"),
259 )
260 .with_per_connection_event_buffer_size(16)
261 .with_dial_concurrency_factor(
262 NonZeroU8::new(2).expect("2 > 0"),
263 ),
264 );
265
266 let service = NetworkService::new(command_sender);
267
268 if addresses.is_empty() {
269 swarm
271 .listen_on(
272 "/ip4/0.0.0.0/tcp/0"
273 .parse::<Multiaddr>()
274 .map_err(|e| Error::InvalidAddress(e.to_string()))?,
275 )
276 .map_err(|e| Error::Listen(format!("0.0.0.0:0: {e}")))?;
277 info!(target: TARGET, "listening on all interfaces");
278 } else {
279 for addr in addresses.iter() {
281 info!(target: TARGET, addr = %addr, "listening on address");
282 swarm
283 .listen_on(addr.clone())
284 .map_err(|e| Error::Listen(format!("{addr}: {e}")))?;
285 }
286 }
287
288 if !external_addresses.is_empty() {
289 for addr in external_addresses.iter() {
290 debug!(target: TARGET, addr = %addr, "external address registered");
291 swarm.add_external_address(addr.clone());
292 }
293 }
294
295 info!(target: TARGET, peer_id = %local_peer_id, "local peer id");
296
297 let worker = Self {
298 local_peer_id,
299 service,
300 swarm,
301 state: NetworkState::Start,
302 command_receiver,
303 helper_sender: None,
304 monitor,
305 graceful_token,
306 crash_token,
307 node_type,
308 boot_nodes,
309 retry_boot_nodes: HashMap::new(),
310 pending_outbound_messages: HashMap::default(),
311 pending_inbound_messages: HashMap::default(),
312 response_channels: HashMap::default(),
313 successful_dials: 0,
314 peer_identify: HashSet::new(),
315 retry_by_peer: HashMap::new(),
316 retry_queue: BinaryHeap::new(),
317 retry_timer: None,
318 peer_action: HashMap::new(),
319 max_app_message_bytes,
320 max_pending_outbound_bytes_per_peer,
321 max_pending_inbound_bytes_per_peer,
322 max_pending_outbound_bytes_total,
323 max_pending_inbound_bytes_total,
324 metrics,
325 };
326
327 if let Some(metrics) = worker.metric_handle() {
328 metrics.set_state_current(&worker.state);
329 }
330 worker.refresh_runtime_metrics();
331
332 Ok(worker)
333 }
334
335 fn metric_handle(&self) -> Option<&NetworkMetrics> {
336 self.metrics.as_deref()
337 }
338
339 fn pending_outbound_messages_len(&self) -> usize {
340 self.pending_outbound_messages
341 .values()
342 .map(PendingQueue::len)
343 .sum()
344 }
345
346 fn pending_outbound_bytes_len(&self) -> usize {
347 self.pending_outbound_messages
348 .values()
349 .map(PendingQueue::bytes_len)
350 .sum()
351 }
352
353 fn pending_inbound_messages_len(&self) -> usize {
354 self.pending_inbound_messages
355 .values()
356 .map(PendingQueue::len)
357 .sum()
358 }
359
360 fn pending_inbound_bytes_len(&self) -> usize {
361 self.pending_inbound_messages
362 .values()
363 .map(PendingQueue::bytes_len)
364 .sum()
365 }
366
367 fn pending_response_channels_len(&self) -> usize {
368 self.response_channels.values().map(VecDeque::len).sum()
369 }
370
371 fn refresh_runtime_metrics(&self) {
372 let Some(metrics) = self.metric_handle() else {
373 return;
374 };
375
376 metrics.set_retry_queue_len(self.retry_queue.len() as i64);
377 metrics.set_pending_outbound_peers(
378 self.pending_outbound_messages.len() as i64,
379 );
380 metrics.set_pending_outbound_messages(
381 self.pending_outbound_messages_len() as i64,
382 );
383 metrics.set_pending_outbound_bytes(
384 self.pending_outbound_bytes_len() as i64
385 );
386 metrics.set_pending_inbound_peers(
387 self.pending_inbound_messages.len() as i64
388 );
389 metrics.set_pending_inbound_messages(
390 self.pending_inbound_messages_len() as i64,
391 );
392 metrics
393 .set_pending_inbound_bytes(self.pending_inbound_bytes_len() as i64);
394 metrics.set_identified_peers(self.peer_identify.len() as i64);
395 metrics.set_response_channels_pending(
396 self.pending_response_channels_len() as i64,
397 );
398 }
399
400 fn observe_pending_message_age(&self, enqueued_at: Instant) {
401 if let Some(metrics) = self.metric_handle() {
402 metrics.observe_pending_message_age_seconds(
403 enqueued_at.elapsed().as_secs_f64(),
404 );
405 }
406 }
407
408 fn drop_pending_outbound_messages(&mut self, peer_id: &PeerId) -> usize {
409 let Some(mut queue) = self.pending_outbound_messages.remove(peer_id)
410 else {
411 return 0;
412 };
413
414 let mut dropped = 0usize;
415 for message in queue.drain() {
416 dropped += 1;
417 self.observe_pending_message_age(message.enqueued_at);
418 }
419 dropped
420 }
421
422 fn drop_pending_inbound_messages(&mut self, peer_id: &PeerId) {
423 if let Some(mut queue) = self.pending_inbound_messages.remove(peer_id) {
424 for message in queue.drain() {
425 self.observe_pending_message_age(message.enqueued_at);
426 }
427 }
428 }
429
430 fn observe_identify_error(
431 &self,
432 error: &swarm::StreamUpgradeError<identify::UpgradeError>,
433 ) {
434 let kind = match error {
435 swarm::StreamUpgradeError::Timeout => "timeout",
436 swarm::StreamUpgradeError::Io(_) => "io",
437 swarm::StreamUpgradeError::NegotiationFailed => "negotiation",
438 swarm::StreamUpgradeError::Apply(_) => "other",
439 };
440
441 if let Some(metrics) = self.metric_handle() {
442 metrics.observe_identify_error(kind);
443 }
444 }
445
446 fn schedule_retry(&mut self, peer: PeerId, schedule_type: ScheduleType) {
447 if self.peer_action.contains_key(&peer) {
448 return;
449 }
450
451 let (kind, addrs) = match schedule_type {
452 ScheduleType::Discover => (RetryKind::Discover, vec![]),
453 ScheduleType::Dial(multiaddrs) => (RetryKind::Dial, multiaddrs),
454 };
455
456 let now = Instant::now();
457 let base = Duration::from_millis(250);
458 let cap = Duration::from_secs(30);
459
460 let entry = self.retry_by_peer.entry(peer).or_insert(RetryState {
461 attempts: 0,
462 when: now,
463 kind,
464 addrs: vec![],
465 });
466
467 let when = if matches!(
468 (entry.kind, kind),
469 (RetryKind::Discover, RetryKind::Dial)
470 ) {
471 now
472 } else {
473 if entry.attempts >= 8 {
474 self.clear_pending_messages(&peer);
475 return;
476 }
477
478 let exp = 1u32 << entry.attempts.min(7);
481 let mut delay = base * exp;
482 if delay > cap {
483 delay = cap;
484 }
485
486 let hash = peer
489 .to_bytes()
490 .iter()
491 .fold(0u32, |acc, &b| acc.wrapping_add(b as u32));
492 let j = 80 + (hash % 41);
493 delay = delay * j / 100;
494
495 now + delay
496 };
497
498 entry.when = when;
499 entry.kind = kind;
500 entry.addrs = addrs;
501
502 self.peer_action.insert(peer, Action::from(kind));
503
504 self.retry_queue.push(Due(peer, entry.when));
505 self.arm_retry_timer();
506 self.refresh_runtime_metrics();
507 }
508
509 fn arm_retry_timer(&mut self) {
510 if let Some(next) = self.retry_queue.peek() {
511 match &mut self.retry_timer {
512 Some(timer) => timer.as_mut().reset(next.1),
513 None => self.retry_timer = Some(Box::pin(sleep_until(next.1))),
514 }
515 }
516 }
517
518 fn drain_due_retries(
519 &mut self,
520 ) -> Vec<(PeerId, RetryKind, Vec<Multiaddr>)> {
521 let now = Instant::now();
522 let mut out = Vec::new();
523 while let Some(Due(peer, when)) = self.retry_queue.peek().cloned() {
524 if when > now {
525 break;
526 }
527
528 self.retry_queue.pop();
529 if let Some(retry) = self.retry_by_peer.get(&peer).cloned()
535 && retry.when == when
536 {
537 self.retry_by_peer
538 .entry(peer)
539 .and_modify(|x| x.attempts += 1);
540 out.push((peer, retry.kind, retry.addrs));
541 }
542 }
543
544 if self.retry_queue.is_empty() {
545 self.retry_timer = None;
546 } else {
547 self.arm_retry_timer();
548 }
549 self.refresh_runtime_metrics();
550 out
551 }
552
553 pub fn add_helper_sender(
555 &mut self,
556 helper_sender: mpsc::Sender<CommandHelper<T>>,
557 ) {
558 self.helper_sender = Some(helper_sender);
559 }
560
561 pub const fn local_peer_id(&self) -> PeerId {
563 self.local_peer_id
564 }
565
566 fn send_message(
568 &mut self,
569 peer: PeerId,
570 message: Bytes,
571 ) -> Result<(), Error> {
572 if message.len() > self.max_app_message_bytes {
573 warn!(
574 target: TARGET,
575 peer_id = %peer,
576 size = message.len(),
577 max = self.max_app_message_bytes,
578 "outbound payload rejected: message too large",
579 );
580 if let Some(metrics) = self.metric_handle() {
581 metrics.inc_oversized_outbound_drop();
582 }
583 self.refresh_runtime_metrics();
584 return Err(Error::MessageTooLarge {
585 size: message.len(),
586 max: self.max_app_message_bytes,
587 });
588 }
589
590 if let Some(mut responses) = self.response_channels.remove(&peer) {
591 while let Some(response_channel) = responses.pop_front() {
592 match self
593 .swarm
594 .behaviour_mut()
595 .send_response(response_channel, message.clone())
596 {
597 Ok(()) => {
598 if !responses.is_empty() {
599 self.response_channels.insert(peer, responses);
600 }
601 self.refresh_runtime_metrics();
602 return Ok(());
603 }
604 Err(e) => {
605 debug!(target: TARGET, peer_id = %peer, error = %e, "failed to send response: channel may already be consumed");
606 }
607 }
608 }
609 }
610
611 self.add_pending_outbound_message(peer, message);
612
613 if self.swarm.behaviour_mut().is_known_peer(&peer) {
614 if let Some(Action::Identified(..)) = self.peer_action.get(&peer) {
615 self.send_pending_outbound_messages(peer);
616 } else {
617 self.schedule_retry(peer, ScheduleType::Dial(vec![]));
618 }
619 } else {
620 self.schedule_retry(peer, ScheduleType::Discover);
621 }
622
623 Ok(())
624 }
625
626 fn add_pending_outbound_message(&mut self, peer: PeerId, message: Bytes) {
630 let message_len = message.len();
631 let mut dropped_count = 0u64;
632 let mut dropped_bytes_limit_peer = 0u64;
633 let mut dropped_bytes_limit_global = 0u64;
634 let mut dropped_messages = Vec::new();
635 let mut duplicate = false;
636 let mut total_pending_bytes = self.pending_outbound_bytes_len();
637 let per_peer_limit = self.max_pending_outbound_bytes_per_peer;
638 let global_limit = self.max_pending_outbound_bytes_total;
639
640 {
641 let queue = self.pending_outbound_messages.entry(peer).or_default();
642 if queue.contains(&message) {
643 duplicate = true;
644 } else {
645 while queue.len() >= MAX_PENDING_MESSAGES_PER_PEER {
646 if let Some(evicted) = queue.pop_front() {
647 dropped_count += 1;
648 total_pending_bytes = total_pending_bytes
649 .saturating_sub(evicted.payload.len());
650 dropped_messages.push(evicted);
651 } else {
652 break;
653 }
654 }
655
656 if per_peer_limit > 0 {
657 while queue.bytes_len() + message_len > per_peer_limit {
658 if let Some(evicted) = queue.pop_front() {
659 dropped_bytes_limit_peer += 1;
660 total_pending_bytes = total_pending_bytes
661 .saturating_sub(evicted.payload.len());
662 dropped_messages.push(evicted);
663 } else {
664 break;
665 }
666 }
667 }
668
669 if per_peer_limit > 0
670 && queue.bytes_len() + message_len > per_peer_limit
671 {
672 dropped_bytes_limit_peer += 1;
673 } else if global_limit > 0
674 && total_pending_bytes.saturating_add(message_len)
675 > global_limit
676 {
677 dropped_bytes_limit_global += 1;
678 } else {
679 queue.push_back(message);
680 }
681 }
682 }
683
684 if duplicate {
685 self.refresh_runtime_metrics();
686 return;
687 }
688
689 for evicted in dropped_messages {
690 self.observe_pending_message_age(evicted.enqueued_at);
691 }
692
693 if dropped_count > 0 {
694 warn!(
695 target: TARGET,
696 peer_id = %peer,
697 dropped = dropped_count,
698 max_messages = MAX_PENDING_MESSAGES_PER_PEER,
699 "outbound queue count limit reached; oldest messages evicted",
700 );
701 }
702
703 if dropped_bytes_limit_peer > 0 {
704 warn!(
705 target: TARGET,
706 peer_id = %peer,
707 dropped = dropped_bytes_limit_peer,
708 message_bytes = message_len,
709 max_queue_bytes = per_peer_limit,
710 "outbound queue bytes limit reached; messages evicted/dropped",
711 );
712 }
713
714 if dropped_bytes_limit_global > 0 {
715 warn!(
716 target: TARGET,
717 peer_id = %peer,
718 dropped = dropped_bytes_limit_global,
719 message_bytes = message_len,
720 max_queue_bytes_total = global_limit,
721 "outbound global queue bytes limit reached; messages dropped",
722 );
723 }
724
725 if let Some(metrics) = self.metric_handle() {
726 metrics.inc_outbound_queue_drop_by(dropped_count);
727 metrics.inc_outbound_queue_bytes_drop_per_peer_by(
728 dropped_bytes_limit_peer,
729 );
730 metrics.inc_outbound_queue_bytes_drop_global_by(
731 dropped_bytes_limit_global,
732 );
733 }
734
735 self.refresh_runtime_metrics();
736 }
737
738 fn add_pending_inbound_message(&mut self, peer: PeerId, message: Bytes) {
739 let message_len = message.len();
740 let mut dropped_count = 0u64;
741 let mut dropped_bytes_limit_peer = 0u64;
742 let mut dropped_bytes_limit_global = 0u64;
743 let mut dropped_messages = Vec::new();
744 let mut duplicate = false;
745 let mut total_pending_bytes = self.pending_inbound_bytes_len();
746 let per_peer_limit = self.max_pending_inbound_bytes_per_peer;
747 let global_limit = self.max_pending_inbound_bytes_total;
748
749 {
750 let queue = self.pending_inbound_messages.entry(peer).or_default();
751 if queue.contains(&message) {
752 duplicate = true;
753 } else {
754 while queue.len() >= MAX_PENDING_MESSAGES_PER_PEER {
755 if let Some(evicted) = queue.pop_front() {
756 dropped_count += 1;
757 total_pending_bytes = total_pending_bytes
758 .saturating_sub(evicted.payload.len());
759 dropped_messages.push(evicted);
760 } else {
761 break;
762 }
763 }
764
765 if per_peer_limit > 0 {
766 while queue.bytes_len() + message_len > per_peer_limit {
767 if let Some(evicted) = queue.pop_front() {
768 dropped_bytes_limit_peer += 1;
769 total_pending_bytes = total_pending_bytes
770 .saturating_sub(evicted.payload.len());
771 dropped_messages.push(evicted);
772 } else {
773 break;
774 }
775 }
776 }
777
778 if per_peer_limit > 0
779 && queue.bytes_len() + message_len > per_peer_limit
780 {
781 dropped_bytes_limit_peer += 1;
782 } else if global_limit > 0
783 && total_pending_bytes.saturating_add(message_len)
784 > global_limit
785 {
786 dropped_bytes_limit_global += 1;
787 } else {
788 queue.push_back(message);
789 }
790 }
791 }
792
793 if duplicate {
794 self.refresh_runtime_metrics();
795 return;
796 }
797
798 for evicted in dropped_messages {
799 self.observe_pending_message_age(evicted.enqueued_at);
800 }
801
802 if dropped_count > 0 {
803 warn!(
804 target: TARGET,
805 peer_id = %peer,
806 dropped = dropped_count,
807 max_messages = MAX_PENDING_MESSAGES_PER_PEER,
808 "inbound queue count limit reached; oldest messages evicted",
809 );
810 }
811
812 if dropped_bytes_limit_peer > 0 {
813 warn!(
814 target: TARGET,
815 peer_id = %peer,
816 dropped = dropped_bytes_limit_peer,
817 message_bytes = message_len,
818 max_queue_bytes = per_peer_limit,
819 "inbound queue bytes limit reached; messages evicted/dropped",
820 );
821 }
822
823 if dropped_bytes_limit_global > 0 {
824 warn!(
825 target: TARGET,
826 peer_id = %peer,
827 dropped = dropped_bytes_limit_global,
828 message_bytes = message_len,
829 max_queue_bytes_total = global_limit,
830 "inbound global queue bytes limit reached; messages dropped",
831 );
832 }
833
834 if let Some(metrics) = self.metric_handle() {
835 metrics.inc_inbound_queue_drop_by(dropped_count);
836 metrics.inc_inbound_queue_bytes_drop_per_peer_by(
837 dropped_bytes_limit_peer,
838 );
839 metrics.inc_inbound_queue_bytes_drop_global_by(
840 dropped_bytes_limit_global,
841 );
842 }
843
844 self.refresh_runtime_metrics();
845 }
846
847 fn add_ephemeral_response(
849 &mut self,
850 peer: PeerId,
851 response_channel: ResponseChannel<ReqResMessage>,
852 ) {
853 self.response_channels
854 .entry(peer)
855 .or_default()
856 .push_back(response_channel);
857 self.refresh_runtime_metrics();
858 }
859
860 fn send_pending_outbound_messages(&mut self, peer: PeerId) {
862 if let Some(mut queue) = self.pending_outbound_messages.remove(&peer) {
863 for message in queue.drain() {
864 self.observe_pending_message_age(message.enqueued_at);
865 self.swarm
866 .behaviour_mut()
867 .send_message(&peer, message.payload);
868 }
869 }
870
871 self.retry_by_peer.remove(&peer);
872 self.refresh_runtime_metrics();
873 }
874
875 pub fn service(&self) -> NetworkService {
877 self.service.clone()
878 }
879
880 async fn change_state(&mut self, state: NetworkState) {
882 trace!(target: TARGET, state = ?state, "state changed");
883 self.state = state.clone();
884 if let Some(metrics) = self.metric_handle() {
885 metrics.observe_state_transition(&state);
886 }
887 self.send_event(NetworkEvent::StateChanged(state)).await;
888 }
889
890 #[allow(clippy::needless_pass_by_ref_mut)]
892 async fn send_event(&mut self, event: NetworkEvent) {
893 if let Some(monitor) = self.monitor.clone()
894 && let Err(e) = monitor.tell(MonitorMessage::Network(event)).await
895 {
896 error!(target: TARGET, error = %e, "failed to forward event to monitor");
897 self.crash_token.cancel();
898 }
899 }
900
901 pub async fn run(&mut self) {
903 let bootstrap_start = Instant::now();
904
905 if let Err(error) = self.run_connection().await {
907 if let Some(metrics) = self.metric_handle() {
908 metrics.observe_bootstrap_duration_seconds(
909 bootstrap_start.elapsed().as_secs_f64(),
910 );
911 }
912 error!(target: TARGET, error = %error, "bootstrap connection failed");
913 self.send_event(NetworkEvent::Error(error)).await;
914 self.crash_token.cancel();
916 return;
917 }
918 if let Some(metrics) = self.metric_handle() {
919 metrics.observe_bootstrap_duration_seconds(
920 bootstrap_start.elapsed().as_secs_f64(),
921 );
922 }
923
924 if self.state != NetworkState::Running {
925 self.change_state(NetworkState::Running).await;
926 }
927
928 self.swarm.behaviour_mut().finish_prerouting_state();
930 self.run_main().await;
932 }
933
934 pub async fn run_connection(&mut self) -> Result<(), Error> {
936 if self.node_type == NodeType::Bootstrap && self.boot_nodes.is_empty() {
938 self.change_state(NetworkState::Running).await;
939 Ok(())
940 } else {
941 self.change_state(NetworkState::Dial).await;
942
943 let mut retrys: u8 = 0;
944
945 let dialing_round_timeout = sleep(Duration::from_secs(0));
950 tokio::pin!(dialing_round_timeout);
951 let mut dialing_timeout_active = false;
952
953 loop {
954 match self.state {
955 NetworkState::Dial => {
956 if self.boot_nodes.is_empty() {
958 error!(target: TARGET, "no bootstrap nodes available");
959 self.send_event(NetworkEvent::Error(
960 Error::NoBootstrapNode,
961 ))
962 .await;
963 self.change_state(NetworkState::Disconnected).await;
964 } else {
965 let copy_boot_nodes = self.boot_nodes.clone();
966
967 for (peer, addresses) in copy_boot_nodes {
968 if let Some(metrics) = self.metric_handle() {
969 metrics.inc_dial_attempt_bootstrap();
970 }
971 if let Err(e) = self.swarm.dial(
972 DialOpts::peer_id(peer)
973 .addresses(addresses.clone())
974 .build(),
975 ) {
976 let (add_to_retry, new_addresses) = self
977 .handle_dial_error(e, &peer, true)
978 .unwrap_or((false, vec![]));
979 self.boot_nodes.remove(&peer);
980 if add_to_retry {
981 if new_addresses.is_empty() {
982 self.retry_boot_nodes
983 .insert(peer, addresses);
984 } else {
985 self.retry_boot_nodes
986 .insert(peer, new_addresses);
987 }
988 }
989 }
990 }
991 if !self.boot_nodes.is_empty() {
992 self.change_state(NetworkState::Dialing).await;
993 dialing_round_timeout.as_mut().reset(
994 Instant::now() + Duration::from_secs(15),
995 );
996 dialing_timeout_active = true;
997 } else {
998 warn!(target: TARGET, "all bootstrap dials failed");
999 self.change_state(NetworkState::Disconnected)
1000 .await;
1001 }
1002 }
1003 }
1004 NetworkState::Dialing => {
1005 if self.boot_nodes.is_empty()
1007 && self.successful_dials == 0
1008 && !self.retry_boot_nodes.is_empty()
1009 && retrys < 3
1010 {
1011 retrys += 1;
1012 let wait = Duration::from_secs(1u64 << retrys); debug!(target: TARGET, attempt = retrys, wait_secs = wait.as_secs(), "retrying bootstrap dials");
1014
1015 let backoff = sleep(wait);
1016 tokio::pin!(backoff);
1017 loop {
1018 tokio::select! {
1019 _ = &mut backoff => break,
1020 event = self.swarm.select_next_some() => {
1021 self.handle_connection_events(event).await;
1022 }
1023 _ = self.graceful_token.cancelled() => {
1024 return Err(Error::Cancelled);
1025 }
1026 _ = self.crash_token.cancelled() => {
1027 return Err(Error::Cancelled);
1028 }
1029 }
1030 }
1031
1032 dialing_timeout_active = false;
1033 self.boot_nodes.clone_from(&self.retry_boot_nodes);
1034 self.retry_boot_nodes.clear();
1035 self.change_state(NetworkState::Dial).await;
1036 }
1037 else if self.boot_nodes.is_empty()
1039 && self.successful_dials == 0
1040 {
1041 self.change_state(NetworkState::Disconnected).await;
1042 } else if self.boot_nodes.is_empty() {
1044 return Ok(());
1045 }
1046 }
1047 NetworkState::Running => {
1048 return Ok(());
1049 }
1050 NetworkState::Disconnected => {
1051 return Err(Error::NoBootstrapNode);
1052 }
1053 _ => {}
1054 }
1055 if self.state != NetworkState::Disconnected {
1056 tokio::select! {
1057 event = self.swarm.select_next_some() => {
1058 self.handle_connection_events(event).await;
1059 }
1060 _ = self.graceful_token.cancelled() => {
1061 return Err(Error::Cancelled);
1062 }
1063 _ = self.crash_token.cancelled() => {
1064 return Err(Error::Cancelled);
1065 }
1066 _ = &mut dialing_round_timeout, if dialing_timeout_active => {
1067 warn!(
1068 target: TARGET,
1069 remaining = self.boot_nodes.len(),
1070 "bootstrap round timed out waiting for Identify; \
1071 moving remaining peers to retry queue"
1072 );
1073 for (peer, addrs) in self.boot_nodes.drain() {
1074 self.retry_boot_nodes.insert(peer, addrs);
1075 }
1076 dialing_timeout_active = false;
1077 }
1078 }
1079 }
1080 }
1081 }
1082 }
1083
1084 fn collect_retryable_transport_addresses(
1085 &self,
1086 items: Vec<(Multiaddr, libp2p::TransportError<std::io::Error>)>,
1087 trace_unreachable: bool,
1088 ) -> Vec<Multiaddr> {
1089 let mut new_addresses = vec![];
1090 for (address, error) in items {
1091 if trace_unreachable {
1092 trace!(target: TARGET, addr = %address, err = ?error, "address unreachable");
1093 }
1094
1095 if let libp2p::TransportError::Other(e) = error {
1096 match e.kind() {
1097 std::io::ErrorKind::ConnectionRefused
1098 | std::io::ErrorKind::TimedOut
1099 | std::io::ErrorKind::ConnectionAborted
1100 | std::io::ErrorKind::NotConnected
1101 | std::io::ErrorKind::BrokenPipe
1102 | std::io::ErrorKind::Interrupted
1103 | std::io::ErrorKind::HostUnreachable
1104 | std::io::ErrorKind::NetworkUnreachable => {
1105 new_addresses.push(address);
1106 }
1107 _ => {}
1108 }
1109 };
1110 }
1111
1112 new_addresses
1113 }
1114
1115 fn handle_dial_error(
1120 &mut self,
1121 e: DialError,
1122 peer_id: &PeerId,
1123 bootstrap_flow: bool,
1124 ) -> Option<(bool, Vec<Multiaddr>)> {
1125 match e {
1126 DialError::LocalPeerId { .. } => {
1127 if let Some(metrics) = self.metric_handle() {
1128 metrics.observe_dial_failure("local_peer_id");
1129 }
1130 if bootstrap_flow {
1131 warn!(target: TARGET, peer_id = %peer_id, "dial rejected: connected peer-id matches local peer");
1132 return Some((false, vec![]));
1133 }
1134
1135 self.retry_by_peer.remove(peer_id);
1136 self.clear_pending_messages(peer_id);
1137 self.swarm
1138 .behaviour_mut()
1139 .clean_hard_peer_to_remove(peer_id);
1140 None
1141 }
1142 DialError::NoAddresses => {
1143 if let Some(metrics) = self.metric_handle() {
1144 metrics.observe_dial_failure("no_addresses");
1145 }
1146 if bootstrap_flow {
1147 debug!(target: TARGET, peer_id = %peer_id, "dial skipped: no addresses");
1148 }
1149 Some((false, vec![]))
1150 }
1151 DialError::DialPeerConditionFalse(_) => {
1152 if let Some(metrics) = self.metric_handle() {
1153 metrics.observe_dial_failure("peer_condition");
1154 }
1155 if bootstrap_flow {
1156 debug!(target: TARGET, peer_id = %peer_id, "dial skipped: peer condition not met");
1157 return Some((false, vec![]));
1158 }
1159
1160 None
1161 }
1162 DialError::Denied { cause } => {
1163 if let Some(metrics) = self.metric_handle() {
1164 metrics.observe_dial_failure("denied");
1165 }
1166 if bootstrap_flow {
1167 debug!(target: TARGET, peer_id = %peer_id, cause = %cause, "dial denied by behaviour");
1168 }
1169 Some((false, vec![]))
1170 }
1171 DialError::Aborted => {
1172 if let Some(metrics) = self.metric_handle() {
1173 metrics.observe_dial_failure("aborted");
1174 }
1175 if bootstrap_flow {
1176 debug!(target: TARGET, peer_id = %peer_id, "dial aborted, will retry");
1177 }
1178 Some((true, vec![]))
1179 }
1180 DialError::WrongPeerId { obtained, .. } => {
1181 if let Some(metrics) = self.metric_handle() {
1182 metrics.observe_dial_failure("wrong_peer_id");
1183 }
1184 if bootstrap_flow {
1185 warn!(target: TARGET, expected = %peer_id, obtained = %obtained, "dial failed: peer identity mismatch");
1186 return Some((false, vec![]));
1187 }
1188
1189 self.retry_by_peer.remove(peer_id);
1190 self.clear_pending_messages(peer_id);
1191 self.swarm
1192 .behaviour_mut()
1193 .clean_hard_peer_to_remove(peer_id);
1194 None
1195 }
1196 DialError::Transport(items) => {
1197 if let Some(metrics) = self.metric_handle() {
1198 metrics.observe_dial_failure("transport");
1199 }
1200 if bootstrap_flow {
1201 debug!(target: TARGET, peer_id = %peer_id, "transport dial failed");
1202 }
1203
1204 let new_addresses = self.collect_retryable_transport_addresses(
1205 items,
1206 bootstrap_flow,
1207 );
1208 if !new_addresses.is_empty() {
1209 Some((true, new_addresses))
1210 } else {
1211 Some((false, vec![]))
1212 }
1213 }
1214 }
1215 }
1216
1217 async fn handle_connection_events(
1219 &mut self,
1220 event: SwarmEvent<BehaviourEvent>,
1221 ) {
1222 match event {
1223 SwarmEvent::ConnectionClosed { peer_id, .. } => {
1224 self.boot_nodes.remove(&peer_id);
1225 }
1226 SwarmEvent::OutgoingConnectionError {
1227 peer_id: Some(peer_id),
1228 error,
1229 ..
1230 } => {
1231 let (add_to_retry, new_addresses) = self
1232 .handle_dial_error(error, &peer_id, true)
1233 .unwrap_or((false, vec![]));
1234
1235 if let Some(addresses) = self.boot_nodes.remove(&peer_id)
1236 && add_to_retry
1237 {
1238 if new_addresses.is_empty() {
1239 self.retry_boot_nodes.insert(peer_id, addresses);
1240 } else {
1241 self.retry_boot_nodes.insert(peer_id, new_addresses);
1242 }
1243 }
1244 }
1245 SwarmEvent::Behaviour(BehaviourEvent::Identified {
1246 peer_id,
1247 info,
1248 connection_id,
1249 }) => {
1250 if !self
1251 .check_protocols(&info.protocol_version, &info.protocols)
1252 {
1253 warn!(target: TARGET, peer_id = %peer_id, protocol_version = %info.protocol_version, "peer uses incompatible protocols; closing connection");
1254
1255 self.swarm
1256 .behaviour_mut()
1257 .close_connections(&peer_id, Some(connection_id));
1258 } else {
1259 self.peer_action
1260 .insert(peer_id, Action::Identified(connection_id));
1261
1262 let mut any_address_is_valid = false;
1263 for addr in info.listen_addrs {
1264 if self
1265 .swarm
1266 .behaviour_mut()
1267 .add_self_reported_address(&peer_id, &addr)
1268 {
1269 any_address_is_valid = true;
1270 }
1271 }
1272
1273 if any_address_is_valid {
1274 if self.boot_nodes.remove(&peer_id).is_some() {
1275 self.successful_dials += 1;
1276 }
1277 self.peer_identify.insert(peer_id);
1278 } else {
1279 warn!(target: TARGET, peer_id = %peer_id, "bootstrap peer has no valid addresses");
1280
1281 self.swarm
1282 .behaviour_mut()
1283 .close_connections(&peer_id, Some(connection_id));
1284 }
1285 }
1286 }
1287 SwarmEvent::Behaviour(BehaviourEvent::IdentifyError {
1288 peer_id,
1289 error,
1290 }) => {
1291 self.observe_identify_error(&error);
1292 match error {
1293 swarm::StreamUpgradeError::Timeout => {
1294 }
1296 _ => {
1297 debug!(target: TARGET, peer_id = %peer_id, error = %error, "identify hard failure during bootstrap; queuing for retry");
1300 if let Some(addrs) = self.boot_nodes.remove(&peer_id) {
1301 self.retry_boot_nodes.insert(peer_id, addrs);
1302 }
1303 }
1304 }
1305 }
1306 _ => {}
1307 }
1308 self.refresh_runtime_metrics();
1309 }
1310
1311 fn clear_pending_messages(&mut self, peer_id: &PeerId) {
1312 warn!(target: TARGET, peer_id = %peer_id, "max dial attempts reached; dropping pending messages");
1313
1314 let dropped = self.drop_pending_outbound_messages(peer_id);
1315 if let Some(metrics) = self.metric_handle() {
1316 metrics.inc_max_retries_drop_by(dropped as u64);
1317 }
1318 self.peer_action.remove(peer_id);
1319 self.retry_by_peer.remove(peer_id);
1320 self.refresh_runtime_metrics();
1321 }
1322
1323 fn check_protocols(
1324 &self,
1325 protocol_version: &str,
1326 protocols: &[StreamProtocol],
1327 ) -> bool {
1328 let supp_protocols: HashSet<StreamProtocol> = protocols
1329 .iter()
1330 .cloned()
1331 .collect::<HashSet<StreamProtocol>>();
1332
1333 protocol_version == IDENTIFY_PROTOCOL
1334 && supp_protocols.contains(&StreamProtocol::new(REQRES_PROTOCOL))
1335 }
1336
1337 pub async fn run_main(&mut self) {
1339 info!(target: TARGET, "network worker started");
1340
1341 loop {
1342 tokio::select! {
1343 command = self.command_receiver.recv() => {
1344 match command {
1345 Some(command) => self.handle_command(command).await,
1346 None => break,
1347 }
1348 }
1349 event = self.swarm.select_next_some() => {
1350 self.handle_event(event).await;
1352 }
1353 _ = async {
1354 if let Some(t) = &mut self.retry_timer {
1355 t.as_mut().await;
1356 }
1357 }, if self.retry_timer.is_some() => {
1358 for (peer, kind, addrs) in self.drain_due_retries() {
1359 if let Some(action) = self.peer_action.get(&peer) {
1360 match (action, kind) {
1361 (Action::Discover, RetryKind::Discover) => {
1362 self.swarm.behaviour_mut().discover(&peer);
1363 },
1364 (Action::Dial, RetryKind::Dial) => {
1365 if let Some(metrics) = self.metric_handle() {
1366 metrics.inc_dial_attempt_runtime();
1367 }
1368 if let Err(error) = self.swarm.dial(
1369 DialOpts::peer_id(peer)
1370 .addresses(addrs)
1371 .extend_addresses_through_behaviour()
1372 .build()
1373 ) && let Some((retry, new_address)) =
1374 self.handle_dial_error(error, &peer, false) {
1375
1376 self.peer_action.remove(&peer);
1377 if retry {
1378 let addr = new_address
1379 .iter()
1380 .filter(|x| {
1381 !self
1382 .swarm
1383 .behaviour()
1384 .is_invalid_address(x)
1385 })
1386 .cloned()
1387 .collect::<Vec<Multiaddr>>();
1388
1389 if addr.is_empty() {
1390 self.schedule_retry(peer, ScheduleType::Discover);
1391 } else {
1392 self.schedule_retry(peer, ScheduleType::Dial(addr.clone()));
1393 }
1394 } else {
1395 self.schedule_retry(peer, ScheduleType::Discover);
1396 }
1397 };
1398 },
1399 _ => {}
1400 }
1401 };
1402 }
1403 },
1404 _ = self.graceful_token.cancelled() => {
1405 break;
1406 }
1407 _ = self.crash_token.cancelled() => {
1408 break;
1409 }
1410 }
1411 }
1412 }
1413
1414 async fn handle_command(&mut self, command: Command) {
1415 match command {
1416 Command::SendMessage { peer, message } => {
1417 if let Err(error) = self.send_message(peer, message) {
1418 error!(target: TARGET, error = %error, "failed to deliver message");
1419 self.send_event(NetworkEvent::Error(error)).await;
1420 }
1421 }
1422 }
1423 }
1424
1425 #[allow(clippy::needless_pass_by_ref_mut)]
1426 async fn message_to_helper(
1427 &mut self,
1428 message: MessagesHelper,
1429 peer_id: &PeerId,
1430 ) {
1431 let sender = match peer_id_to_ed25519_pubkey_bytes(peer_id) {
1432 Ok(public_key) => public_key,
1433 Err(e) => {
1434 warn!(target: TARGET, error = %e, "cannot resolve public key from peer id");
1435 return;
1436 }
1437 };
1438
1439 'Send: {
1440 if let Some(helper_sender) = self.helper_sender.as_ref() {
1441 match message {
1442 MessagesHelper::Single(items) => {
1443 if helper_sender
1444 .send(CommandHelper::ReceivedMessage {
1445 sender,
1446 message: items,
1447 })
1448 .await
1449 .is_err()
1450 {
1451 break 'Send;
1452 }
1453 }
1454 MessagesHelper::Vec(items) => {
1455 for item in items {
1456 if helper_sender
1457 .send(CommandHelper::ReceivedMessage {
1458 sender,
1459 message: item,
1460 })
1461 .await
1462 .is_err()
1463 {
1464 break 'Send;
1465 }
1466 }
1467 }
1468 }
1469
1470 return;
1471 }
1472 }
1473
1474 error!(target: TARGET, "helper channel closed; shutting down");
1475 self.crash_token.cancel();
1476 }
1477
1478 async fn handle_event(&mut self, event: SwarmEvent<BehaviourEvent>) {
1479 match event {
1480 SwarmEvent::Behaviour(event) => {
1481 match event {
1482 BehaviourEvent::Identified {
1483 peer_id,
1484 info,
1485 connection_id,
1486 } => {
1487 if !self.check_protocols(
1488 &info.protocol_version,
1489 &info.protocols,
1490 ) {
1491 warn!(
1492 target: TARGET,
1493 peer_id = %peer_id,
1494 protocol_version = %info.protocol_version,
1495 protocols = ?info.protocols,
1496 "peer uses incompatible protocols; closing connection"
1497 );
1498
1499 self.clear_pending_messages(&peer_id);
1500
1501 self.swarm
1502 .behaviour_mut()
1503 .clean_hard_peer_to_remove(&peer_id);
1504
1505 self.swarm.behaviour_mut().close_connections(
1506 &peer_id,
1507 Some(connection_id),
1508 );
1509 } else {
1510 self.peer_action.insert(
1511 peer_id,
1512 Action::Identified(connection_id),
1513 );
1514
1515 self.swarm
1516 .behaviour_mut()
1517 .clean_peer_to_remove(&peer_id);
1518 for addr in info.listen_addrs {
1519 self.swarm
1520 .behaviour_mut()
1521 .add_self_reported_address(&peer_id, &addr);
1522 }
1523
1524 self.peer_identify.insert(peer_id);
1525
1526 if let Some(mut queue) =
1527 self.pending_inbound_messages.remove(&peer_id)
1528 {
1529 let mut buffered = VecDeque::new();
1530 for message in queue.drain() {
1531 self.observe_pending_message_age(
1532 message.enqueued_at,
1533 );
1534 buffered.push_back(message.payload);
1535 }
1536 self.message_to_helper(
1537 MessagesHelper::Vec(buffered),
1538 &peer_id,
1539 )
1540 .await;
1541 };
1542
1543 self.send_pending_outbound_messages(peer_id);
1544 }
1545 }
1546 BehaviourEvent::IdentifyError { peer_id, error } => {
1547 self.observe_identify_error(&error);
1548 debug!(target: TARGET, peer_id = %peer_id, error = %error, "identify error");
1549
1550 match error {
1551 swarm::StreamUpgradeError::Timeout => {
1552 }
1555 swarm::StreamUpgradeError::Apply(..)
1556 | swarm::StreamUpgradeError::NegotiationFailed
1557 | swarm::StreamUpgradeError::Io(..) => {
1558 self.drop_pending_outbound_messages(&peer_id);
1563 self.retry_by_peer.remove(&peer_id);
1564 self.response_channels.remove(&peer_id);
1565 self.drop_pending_inbound_messages(&peer_id);
1566 }
1567 }
1568
1569 self.swarm
1570 .behaviour_mut()
1571 .close_connections(&peer_id, None);
1572 }
1573 BehaviourEvent::ReqresMessage { peer_id, message } => {
1574 let (message_data, is_request, response_channel) =
1575 match message {
1576 request_response::Message::Request {
1577 request,
1578 channel,
1579 ..
1580 } => (request.0, true, Some(channel)),
1581 request_response::Message::Response {
1582 response,
1583 ..
1584 } => (response.0, false, None),
1585 };
1586
1587 if message_data.len() > self.max_app_message_bytes {
1588 warn!(
1589 target: TARGET,
1590 peer_id = %peer_id,
1591 size = message_data.len(),
1592 max = self.max_app_message_bytes,
1593 "inbound payload dropped: message too large",
1594 );
1595 if let Some(metrics) = self.metric_handle() {
1596 metrics.inc_oversized_inbound_drop();
1597 }
1598 self.swarm
1599 .behaviour_mut()
1600 .close_connections(&peer_id, None);
1601 self.refresh_runtime_metrics();
1602 return;
1603 }
1604
1605 if is_request {
1606 if let Some(metrics) = self.metric_handle() {
1607 metrics.inc_reqres_request_received();
1608 }
1609 trace!(target: TARGET, peer_id = %peer_id, "request received");
1610 if let Some(channel) = response_channel {
1611 self.add_ephemeral_response(peer_id, channel);
1612 }
1613 } else {
1614 if let Some(metrics) = self.metric_handle() {
1615 metrics.inc_reqres_response_received();
1616 }
1617 trace!(target: TARGET, peer_id = %peer_id, "response received");
1618 }
1619
1620 if self.peer_identify.contains(&peer_id) {
1621 self.message_to_helper(
1622 MessagesHelper::Single(message_data),
1623 &peer_id,
1624 )
1625 .await;
1626 } else {
1627 self.add_pending_inbound_message(
1628 peer_id,
1629 message_data,
1630 );
1631 }
1632 }
1633 BehaviourEvent::ReqresFailure {
1634 peer_id,
1635 direction,
1636 kind,
1637 } => {
1638 if let Some(metrics) = self.metric_handle() {
1639 metrics.observe_reqres_failure(
1640 direction.as_metric_label(),
1641 kind.as_metric_label(),
1642 );
1643 }
1644 debug!(
1645 target: TARGET,
1646 peer_id = %peer_id,
1647 direction = direction.as_metric_label(),
1648 kind = kind.as_metric_label(),
1649 "request-response failure"
1650 );
1651 }
1652 BehaviourEvent::ClosestPeer { peer_id, info } => {
1653 if matches!(
1654 self.peer_action.get(&peer_id),
1655 Some(Action::Discover)
1656 ) {
1657 self.peer_action.remove(&peer_id);
1658 if let Some(info) = info {
1659 let addr = info
1660 .addrs
1661 .iter()
1662 .filter(|x| {
1663 !self
1664 .swarm
1665 .behaviour()
1666 .is_invalid_address(x)
1667 })
1668 .cloned()
1669 .collect::<Vec<Multiaddr>>();
1670
1671 if addr.is_empty() {
1672 self.schedule_retry(
1673 peer_id,
1674 ScheduleType::Discover,
1675 );
1676 } else {
1677 self.schedule_retry(
1678 peer_id,
1679 ScheduleType::Dial(addr),
1680 );
1681 }
1682 } else {
1683 self.schedule_retry(
1684 peer_id,
1685 ScheduleType::Discover,
1686 );
1687 };
1688 }
1689 }
1690 BehaviourEvent::Dummy => {
1691 }
1693 }
1694 }
1695 SwarmEvent::OutgoingConnectionError {
1696 error,
1697 peer_id: Some(peer_id),
1698 ..
1699 } => {
1700 if matches!(self.peer_action.get(&peer_id), Some(Action::Dial))
1701 {
1702 self.peer_action.remove(&peer_id);
1703
1704 self.swarm.behaviour_mut().add_peer_to_remove(&peer_id);
1705
1706 if let Some((retry, new_address)) =
1707 self.handle_dial_error(error, &peer_id, false)
1708 {
1709 if retry {
1710 let addr = new_address
1711 .iter()
1712 .filter(|x| {
1713 !self
1714 .swarm
1715 .behaviour()
1716 .is_invalid_address(x)
1717 })
1718 .cloned()
1719 .collect::<Vec<Multiaddr>>();
1720
1721 if addr.is_empty() {
1722 self.schedule_retry(
1723 peer_id,
1724 ScheduleType::Discover,
1725 );
1726 } else {
1727 self.schedule_retry(
1728 peer_id,
1729 ScheduleType::Dial(addr),
1730 );
1731 }
1732 } else {
1733 self.schedule_retry(
1734 peer_id,
1735 ScheduleType::Discover,
1736 );
1737 }
1738 };
1739 }
1740 }
1741 SwarmEvent::ConnectionClosed {
1742 peer_id,
1743 connection_id,
1744 num_established,
1745 ..
1746 } => {
1747 if num_established == 0 {
1748 if let Some(Action::Identified(id)) =
1749 self.peer_action.get(&peer_id)
1750 && connection_id == *id
1751 {
1752 self.peer_action.remove(&peer_id);
1753
1754 self.peer_identify.remove(&peer_id);
1755 self.drop_pending_inbound_messages(&peer_id);
1756 self.response_channels.remove(&peer_id);
1757
1758 self.retry_by_peer.remove(&peer_id);
1759
1760 if self
1761 .pending_outbound_messages
1762 .get(&peer_id)
1763 .is_some_and(|q| !q.is_empty())
1764 {
1765 self.schedule_retry(
1766 peer_id,
1767 ScheduleType::Dial(vec![]),
1768 );
1769 }
1770 } else if let Some(Action::Dial | Action::Discover) =
1771 self.peer_action.get(&peer_id)
1772 {
1773 self.peer_action.remove(&peer_id);
1774 self.retry_by_peer.remove(&peer_id);
1775 self.drop_pending_inbound_messages(&peer_id);
1776 self.response_channels.remove(&peer_id);
1777 self.peer_identify.remove(&peer_id);
1778
1779 if self
1780 .pending_outbound_messages
1781 .get(&peer_id)
1782 .is_some_and(|q| !q.is_empty())
1783 {
1784 self.schedule_retry(
1785 peer_id,
1786 ScheduleType::Discover,
1787 );
1788 }
1789 }
1790 }
1791 }
1792 SwarmEvent::IncomingConnectionError { .. } => {
1793 }
1798 SwarmEvent::ExpiredListenAddr { address, .. } => {
1799 warn!(target: TARGET, addr = %address, "listen address expired");
1800 }
1801 SwarmEvent::ListenerError { error, .. } => {
1802 error!(target: TARGET, error = %error, "listener error");
1803 }
1804 SwarmEvent::ConnectionEstablished {
1805 peer_id,
1806 connection_id,
1807 num_established,
1808 ..
1809 } => {
1810 if num_established.get() > 1 {
1811 debug!(target: TARGET, peer_id = %peer_id, "duplicate connection detected; closing excess");
1812 self.swarm
1813 .behaviour_mut()
1814 .close_connections(&peer_id, Some(connection_id));
1815 }
1816 }
1817 SwarmEvent::IncomingConnection { .. }
1818 | SwarmEvent::ListenerClosed { .. }
1819 | SwarmEvent::Dialing { .. }
1820 | SwarmEvent::NewExternalAddrCandidate { .. }
1821 | SwarmEvent::ExternalAddrConfirmed { .. }
1822 | SwarmEvent::ExternalAddrExpired { .. }
1823 | SwarmEvent::NewExternalAddrOfPeer { .. }
1824 | SwarmEvent::NewListenAddr { .. } => {
1825 }
1827 _ => {}
1828 }
1829 self.refresh_runtime_metrics();
1830 }
1831}
1832
1833#[cfg(test)]
1834mod tests {
1835
1836 use crate::routing::RoutingNode;
1837
1838 use super::*;
1839 use libp2p::core::{ConnectedPoint, Endpoint, transport::PortUse};
1840 use libp2p::identity::Keypair as Libp2pKeypair;
1841 use libp2p::swarm::ConnectionId;
1842 use prometheus_client::{encoding::text::encode, registry::Registry};
1843 use serde::Deserialize;
1844 use test_log::test;
1845
1846 use ave_common::identity::{KeyPair, keys::Ed25519Signer};
1847
1848 use serial_test::serial;
1849
1850 #[derive(Debug, Serialize, Deserialize)]
1851 pub struct Dummy;
1852
1853 fn metric_value(metrics: &str, name: &str) -> f64 {
1854 metrics
1855 .lines()
1856 .find_map(|line| {
1857 if line.starts_with(name) {
1858 line.split_whitespace().nth(1)?.parse::<f64>().ok()
1859 } else {
1860 None
1861 }
1862 })
1863 .unwrap_or(0.0)
1864 }
1865
1866 fn build_worker(
1868 boot_nodes: Vec<RoutingNode>,
1869 random_walk: bool,
1870 node_type: NodeType,
1871 graceful_token: CancellationToken,
1872 crash_token: CancellationToken,
1873 memory_addr: String,
1874 ) -> NetworkWorker<Dummy> {
1875 let config = create_config(
1876 boot_nodes,
1877 random_walk,
1878 node_type,
1879 vec![memory_addr],
1880 );
1881 let keys = KeyPair::Ed25519(Ed25519Signer::generate().unwrap());
1882
1883 NetworkWorker::new(
1884 &keys,
1885 config,
1886 None,
1887 graceful_token,
1888 crash_token,
1889 None,
1890 None,
1891 )
1892 .unwrap()
1893 }
1894
1895 fn create_config(
1897 boot_nodes: Vec<RoutingNode>,
1898 random_walk: bool,
1899 node_type: NodeType,
1900 listen_addresses: Vec<String>,
1901 ) -> Config {
1902 let config = crate::routing::Config::default()
1903 .with_discovery_limit(50)
1904 .with_dht_random_walk(random_walk);
1905
1906 Config {
1907 boot_nodes,
1908 node_type,
1909 routing: config,
1910 external_addresses: vec![],
1911 listen_addresses,
1912 ..Default::default()
1913 }
1914 }
1915
1916 fn build_identified_event(
1917 peer_id: PeerId,
1918 public_key: libp2p::identity::PublicKey,
1919 connection_id: ConnectionId,
1920 ) -> SwarmEvent<BehaviourEvent> {
1921 SwarmEvent::Behaviour(BehaviourEvent::Identified {
1922 peer_id,
1923 info: Box::new(identify::Info {
1924 public_key,
1925 protocol_version: IDENTIFY_PROTOCOL.to_owned(),
1926 agent_version: "test-agent".to_owned(),
1927 listen_addrs: vec!["/memory/9999".parse().expect("multiaddr")],
1928 protocols: vec![StreamProtocol::new(REQRES_PROTOCOL)],
1929 observed_addr: "/memory/9998".parse().expect("multiaddr"),
1930 signed_peer_record: None,
1931 }),
1932 connection_id,
1933 })
1934 }
1935
1936 fn test_endpoint() -> ConnectedPoint {
1937 ConnectedPoint::Dialer {
1938 address: "/memory/9997".parse().expect("multiaddr"),
1939 role_override: Endpoint::Dialer,
1940 port_use: PortUse::New,
1941 }
1942 }
1943
1944 #[test]
1945 fn outbound_queue_respects_bytes_limit_and_updates_metrics() {
1946 let mut config = create_config(
1947 vec![],
1948 false,
1949 NodeType::Addressable,
1950 vec!["/memory/3100".to_owned()],
1951 );
1952 config.max_pending_outbound_bytes_per_peer = 16;
1953
1954 let keys = KeyPair::Ed25519(Ed25519Signer::generate().unwrap());
1955 let mut registry = Registry::default();
1956 let metrics = crate::metrics::register(&mut registry);
1957 let mut worker: NetworkWorker<Dummy> = NetworkWorker::new(
1958 &keys,
1959 config,
1960 None,
1961 CancellationToken::new(),
1962 CancellationToken::new(),
1963 None,
1964 Some(metrics),
1965 )
1966 .expect("worker");
1967
1968 let peer = PeerId::random();
1969 worker.add_pending_outbound_message(
1970 peer,
1971 Bytes::from_static(b"aaaaaaaaaaaa"), );
1973 worker.add_pending_outbound_message(
1974 peer,
1975 Bytes::from_static(b"bbbbbbbbbbbb"), );
1977 worker.add_pending_outbound_message(
1978 peer,
1979 Bytes::from_static(b"cccc"), );
1981
1982 let queue = worker
1983 .pending_outbound_messages
1984 .get(&peer)
1985 .expect("queue exists");
1986 assert_eq!(queue.len(), 2);
1987 assert_eq!(queue.bytes_len(), 16);
1988
1989 let mut text = String::new();
1990 encode(&mut text, ®istry).expect("encode metrics");
1991
1992 assert_eq!(
1993 metric_value(
1994 &text,
1995 "network_messages_dropped_outbound_queue_bytes_limit_total"
1996 ),
1997 1.0
1998 );
1999 assert_eq!(
2000 metric_value(
2001 &text,
2002 "network_messages_dropped_outbound_queue_bytes_limit_per_peer_total"
2003 ),
2004 1.0
2005 );
2006 assert_eq!(
2007 metric_value(
2008 &text,
2009 "network_messages_dropped_outbound_queue_bytes_limit_global_total"
2010 ),
2011 0.0
2012 );
2013 assert_eq!(metric_value(&text, "network_pending_outbound_bytes"), 16.0);
2014 assert!(
2015 metric_value(&text, "network_pending_message_age_seconds_count")
2016 >= 1.0
2017 );
2018 }
2019
2020 #[test]
2021 fn zero_pending_bytes_limits_disable_byte_drops() {
2022 let mut config = create_config(
2023 vec![],
2024 false,
2025 NodeType::Addressable,
2026 vec!["/memory/3101".to_owned()],
2027 );
2028 config.max_pending_outbound_bytes_per_peer = 0;
2029 config.max_pending_inbound_bytes_per_peer = 0;
2030 config.max_pending_outbound_bytes_total = 0;
2031 config.max_pending_inbound_bytes_total = 0;
2032
2033 let keys = KeyPair::Ed25519(Ed25519Signer::generate().unwrap());
2034 let mut registry = Registry::default();
2035 let metrics = crate::metrics::register(&mut registry);
2036 let mut worker: NetworkWorker<Dummy> = NetworkWorker::new(
2037 &keys,
2038 config,
2039 None,
2040 CancellationToken::new(),
2041 CancellationToken::new(),
2042 None,
2043 Some(metrics),
2044 )
2045 .expect("worker");
2046
2047 let peer = PeerId::random();
2048 for i in 0..3u8 {
2049 let payload = Bytes::from(vec![i + 1; 12]);
2050 worker.add_pending_outbound_message(peer, payload);
2051 }
2052 for i in 0..3u8 {
2053 let payload = Bytes::from(vec![i + 7; 12]);
2054 worker.add_pending_inbound_message(peer, payload);
2055 }
2056
2057 let outbound = worker
2058 .pending_outbound_messages
2059 .get(&peer)
2060 .expect("outbound queue exists");
2061 let inbound = worker
2062 .pending_inbound_messages
2063 .get(&peer)
2064 .expect("inbound queue exists");
2065 assert_eq!(outbound.len(), 3);
2066 assert_eq!(outbound.bytes_len(), 36);
2067 assert_eq!(inbound.len(), 3);
2068 assert_eq!(inbound.bytes_len(), 36);
2069
2070 let mut text = String::new();
2071 encode(&mut text, ®istry).expect("encode metrics");
2072 assert_eq!(
2073 metric_value(
2074 &text,
2075 "network_messages_dropped_outbound_queue_bytes_limit_total"
2076 ),
2077 0.0
2078 );
2079 assert_eq!(
2080 metric_value(
2081 &text,
2082 "network_messages_dropped_outbound_queue_bytes_limit_per_peer_total"
2083 ),
2084 0.0
2085 );
2086 assert_eq!(
2087 metric_value(
2088 &text,
2089 "network_messages_dropped_outbound_queue_bytes_limit_global_total"
2090 ),
2091 0.0
2092 );
2093 assert_eq!(
2094 metric_value(
2095 &text,
2096 "network_messages_dropped_inbound_queue_bytes_limit_total"
2097 ),
2098 0.0
2099 );
2100 assert_eq!(
2101 metric_value(
2102 &text,
2103 "network_messages_dropped_inbound_queue_bytes_limit_per_peer_total"
2104 ),
2105 0.0
2106 );
2107 assert_eq!(
2108 metric_value(
2109 &text,
2110 "network_messages_dropped_inbound_queue_bytes_limit_global_total"
2111 ),
2112 0.0
2113 );
2114 }
2115
2116 #[test]
2117 fn outbound_global_bytes_limit_applies_across_peers() {
2118 let mut config = create_config(
2119 vec![],
2120 false,
2121 NodeType::Addressable,
2122 vec!["/memory/3102".to_owned()],
2123 );
2124 config.max_pending_outbound_bytes_per_peer = 0;
2125 config.max_pending_outbound_bytes_total = 20;
2126
2127 let keys = KeyPair::Ed25519(Ed25519Signer::generate().unwrap());
2128 let mut registry = Registry::default();
2129 let metrics = crate::metrics::register(&mut registry);
2130 let mut worker: NetworkWorker<Dummy> = NetworkWorker::new(
2131 &keys,
2132 config,
2133 None,
2134 CancellationToken::new(),
2135 CancellationToken::new(),
2136 None,
2137 Some(metrics),
2138 )
2139 .expect("worker");
2140
2141 let peer_a = PeerId::random();
2142 let peer_b = PeerId::random();
2143
2144 worker.add_pending_outbound_message(
2145 peer_a,
2146 Bytes::from_static(b"aaaaaaaaaaaa"), );
2148 worker.add_pending_outbound_message(
2149 peer_b,
2150 Bytes::from_static(b"bbbbbbbbbbbb"), );
2152
2153 assert_eq!(worker.pending_outbound_bytes_len(), 12);
2154 assert_eq!(
2155 worker
2156 .pending_outbound_messages
2157 .get(&peer_a)
2158 .expect("peer_a queue")
2159 .len(),
2160 1
2161 );
2162 assert_eq!(
2163 worker
2164 .pending_outbound_messages
2165 .get(&peer_b)
2166 .map_or(0, PendingQueue::len),
2167 0
2168 );
2169
2170 let mut text = String::new();
2171 encode(&mut text, ®istry).expect("encode metrics");
2172 assert_eq!(
2173 metric_value(
2174 &text,
2175 "network_messages_dropped_outbound_queue_bytes_limit_total"
2176 ),
2177 1.0
2178 );
2179 assert_eq!(
2180 metric_value(
2181 &text,
2182 "network_messages_dropped_outbound_queue_bytes_limit_per_peer_total"
2183 ),
2184 0.0
2185 );
2186 assert_eq!(
2187 metric_value(
2188 &text,
2189 "network_messages_dropped_outbound_queue_bytes_limit_global_total"
2190 ),
2191 1.0
2192 );
2193 }
2194
2195 #[test(tokio::test)]
2196 #[serial]
2197 async fn pending_inbound_messages_keep_arrival_order_after_identify() {
2198 let mut worker = build_worker(
2199 vec![],
2200 false,
2201 NodeType::Addressable,
2202 CancellationToken::new(),
2203 CancellationToken::new(),
2204 "/memory/3200".to_owned(),
2205 );
2206 let remote_keys = Libp2pKeypair::generate_ed25519();
2207 let remote_peer = remote_keys.public().to_peer_id();
2208
2209 worker.add_pending_inbound_message(
2210 remote_peer,
2211 Bytes::from_static(b"msg-2"),
2212 );
2213 worker.add_pending_inbound_message(
2214 remote_peer,
2215 Bytes::from_static(b"msg-1"),
2216 );
2217 worker.add_pending_inbound_message(
2218 remote_peer,
2219 Bytes::from_static(b"msg-3"),
2220 );
2221
2222 let (helper_sender, mut helper_rx) = mpsc::channel(8);
2223 worker.add_helper_sender(helper_sender);
2224
2225 worker
2226 .handle_event(build_identified_event(
2227 remote_peer,
2228 remote_keys.public(),
2229 ConnectionId::new_unchecked(11),
2230 ))
2231 .await;
2232
2233 let mut received = Vec::new();
2234 for _ in 0..3 {
2235 let command = tokio::time::timeout(
2236 Duration::from_millis(200),
2237 helper_rx.recv(),
2238 )
2239 .await
2240 .expect("helper receive timeout")
2241 .expect("helper channel closed");
2242 let CommandHelper::ReceivedMessage { message, .. } = command else {
2243 panic!("unexpected helper command")
2244 };
2245 received.push(message);
2246 }
2247
2248 assert_eq!(
2249 received,
2250 vec![
2251 Bytes::from_static(b"msg-2"),
2252 Bytes::from_static(b"msg-1"),
2253 Bytes::from_static(b"msg-3"),
2254 ]
2255 );
2256 assert!(!worker.pending_inbound_messages.contains_key(&remote_peer));
2257 }
2258
2259 #[test(tokio::test)]
2260 #[serial]
2261 async fn flapping_connection_retries_then_flushes_outbound_queue() {
2262 let mut worker = build_worker(
2263 vec![],
2264 false,
2265 NodeType::Addressable,
2266 CancellationToken::new(),
2267 CancellationToken::new(),
2268 "/memory/3201".to_owned(),
2269 );
2270 let remote_keys = Libp2pKeypair::generate_ed25519();
2271 let remote_peer = remote_keys.public().to_peer_id();
2272 let first_connection = ConnectionId::new_unchecked(21);
2273 let second_connection = ConnectionId::new_unchecked(22);
2274
2275 worker.add_pending_outbound_message(
2276 remote_peer,
2277 Bytes::from_static(b"needs-redelivery"),
2278 );
2279 worker
2280 .peer_action
2281 .insert(remote_peer, Action::Identified(first_connection));
2282 worker.peer_identify.insert(remote_peer);
2283
2284 worker
2285 .handle_event(SwarmEvent::ConnectionClosed {
2286 peer_id: remote_peer,
2287 connection_id: first_connection,
2288 endpoint: test_endpoint(),
2289 num_established: 0,
2290 cause: None,
2291 })
2292 .await;
2293
2294 assert!(worker.retry_by_peer.contains_key(&remote_peer));
2295 assert!(matches!(
2296 worker.retry_by_peer.get(&remote_peer).map(|s| s.kind),
2297 Some(RetryKind::Dial)
2298 ));
2299 assert!(
2300 worker
2301 .pending_outbound_messages
2302 .get(&remote_peer)
2303 .is_some_and(|q| !q.is_empty())
2304 );
2305
2306 worker
2307 .handle_event(build_identified_event(
2308 remote_peer,
2309 remote_keys.public(),
2310 second_connection,
2311 ))
2312 .await;
2313
2314 assert!(!worker.pending_outbound_messages.contains_key(&remote_peer));
2315 assert!(!worker.retry_by_peer.contains_key(&remote_peer));
2316 assert!(matches!(
2317 worker.peer_action.get(&remote_peer),
2318 Some(Action::Identified(id)) if *id == second_connection
2319 ));
2320 }
2321
2322 #[test(tokio::test)]
2323 #[serial]
2324 async fn bootstrap_identify_timeout_keeps_bootnode_until_close() {
2325 let remote_keys = Libp2pKeypair::generate_ed25519();
2326 let remote_peer = remote_keys.public().to_peer_id();
2327 let boot_node = RoutingNode {
2328 peer_id: remote_peer.to_string(),
2329 address: vec!["/memory/3300".to_owned()],
2330 };
2331
2332 let mut worker = build_worker(
2333 vec![boot_node],
2334 false,
2335 NodeType::Addressable,
2336 CancellationToken::new(),
2337 CancellationToken::new(),
2338 "/memory/3301".to_owned(),
2339 );
2340
2341 assert!(worker.boot_nodes.contains_key(&remote_peer));
2342 assert!(!worker.retry_boot_nodes.contains_key(&remote_peer));
2343
2344 worker
2345 .handle_connection_events(SwarmEvent::Behaviour(
2346 BehaviourEvent::IdentifyError {
2347 peer_id: remote_peer,
2348 error: swarm::StreamUpgradeError::Timeout,
2349 },
2350 ))
2351 .await;
2352
2353 assert!(worker.boot_nodes.contains_key(&remote_peer));
2354 assert!(!worker.retry_boot_nodes.contains_key(&remote_peer));
2355 }
2356
2357 #[test(tokio::test)]
2358 #[serial]
2359 async fn test_no_boot_nodes() {
2360 let boot_nodes = vec![];
2361
2362 let node_addr = "/memory/3000";
2364 let mut node = build_worker(
2365 boot_nodes.clone(),
2366 false,
2367 NodeType::Addressable,
2368 CancellationToken::new(),
2369 CancellationToken::new(),
2370 node_addr.to_owned(),
2371 );
2372 if let Err(e) = node.run_connection().await {
2373 assert_eq!(
2374 e.to_string(),
2375 "cannot connect to the ave network: no reachable bootstrap node"
2376 );
2377 };
2378
2379 assert_eq!(node.state, NetworkState::Disconnected);
2380 }
2381
2382 #[test(tokio::test)]
2383 #[serial]
2384 async fn test_fake_boot_node() {
2385 let mut boot_nodes = vec![];
2386
2387 let fake_boot_peer = PeerId::random();
2389 let fake_boot_addr = "/memory/3001";
2390 let fake_node = RoutingNode {
2391 peer_id: fake_boot_peer.to_string(),
2392 address: vec![fake_boot_addr.to_owned()],
2393 };
2394 boot_nodes.push(fake_node);
2395
2396 let node_addr = "/memory/3002";
2398 let mut node = build_worker(
2399 boot_nodes.clone(),
2400 false,
2401 NodeType::Addressable,
2402 CancellationToken::new(),
2403 CancellationToken::new(),
2404 node_addr.to_owned(),
2405 );
2406
2407 if let Err(e) = node.run_connection().await {
2408 assert_eq!(
2409 e.to_string(),
2410 "cannot connect to the ave network: no reachable bootstrap node"
2411 );
2412 };
2413
2414 assert_eq!(node.state, NetworkState::Disconnected);
2415 }
2416
2417 #[test(tokio::test)]
2418 #[serial]
2419 async fn test_connect() {
2420 let mut boot_nodes = vec![];
2421
2422 let boot_addr = "/memory/3003";
2424 let mut boot = build_worker(
2425 boot_nodes.clone(),
2426 false,
2427 NodeType::Bootstrap,
2428 CancellationToken::new(),
2429 CancellationToken::new(),
2430 boot_addr.to_owned(),
2431 );
2432
2433 let boot_node = RoutingNode {
2434 peer_id: boot.local_peer_id().to_string(),
2435 address: vec![boot_addr.to_owned()],
2436 };
2437
2438 boot_nodes.push(boot_node);
2439
2440 let node_addr = "/memory/3004";
2442 let mut node = build_worker(
2443 boot_nodes,
2444 false,
2445 NodeType::Ephemeral,
2446 CancellationToken::new(),
2447 CancellationToken::new(),
2448 node_addr.to_owned(),
2449 );
2450
2451 tokio::spawn(async move {
2453 boot.run_main().await;
2454 });
2455
2456 node.run_connection().await.unwrap();
2458 }
2459}