1use crate::{
5 Command, CommandHelper, Config, Error, Event as NetworkEvent, MachineSpec,
6 Monitor, MonitorMessage, NodeType, ResolvedSpec,
7 behaviour::{Behaviour, Event as BehaviourEvent, ReqResMessage},
8 metrics::NetworkMetrics,
9 resolve_spec,
10 service::NetworkService,
11 transport::build_transport,
12 utils::{
13 Action, Due, IDENTIFY_PROTOCOL, LimitsConfig, MessagesHelper,
14 NetworkState, REQRES_PROTOCOL, RetryKind, RetryState, ScheduleType,
15 convert_addresses, convert_boot_nodes, peer_id_to_ed25519_pubkey_bytes,
16 },
17};
18
19use std::{
20 collections::{BinaryHeap, HashSet},
21 fmt::Debug,
22 num::{NonZeroU8, NonZeroUsize},
23 pin::Pin,
24 sync::Arc,
25 time::Duration,
26};
27
28use ave_actors::ActorRef;
29use ave_common::identity::KeyPair;
30
31use libp2p::{
32 Multiaddr, PeerId, StreamProtocol, Swarm, identify,
33 identity::{Keypair, ed25519},
34 request_response::{self, ResponseChannel},
35 swarm::{self, DialError, SwarmEvent, dial_opts::DialOpts},
36};
37
38use futures::StreamExt;
39use serde::Serialize;
40use tokio::{
41 sync::mpsc,
42 time::{Instant, Sleep, sleep, sleep_until},
43};
44use tokio_util::sync::CancellationToken;
45use tracing::{debug, error, info, trace, warn};
46
47use bytes::Bytes;
48use std::collections::{HashMap, VecDeque};
49
50const TARGET: &str = "ave::network::worker";
51
52const MAX_PENDING_MESSAGES_PER_PEER: usize = 100;
55
56#[derive(Default)]
60struct PendingQueue {
61 messages: VecDeque<PendingMessage>,
62 pending_bytes: usize,
63}
64
65struct PendingMessage {
66 payload: Bytes,
67 enqueued_at: Instant,
68}
69
70impl PendingQueue {
71 fn contains(&self, message: &Bytes) -> bool {
72 self.messages.iter().any(|x| x.payload == *message)
73 }
74
75 fn pop_front(&mut self) -> Option<PendingMessage> {
76 let popped = self.messages.pop_front()?;
77 self.pending_bytes =
78 self.pending_bytes.saturating_sub(popped.payload.len());
79 Some(popped)
80 }
81
82 fn push_back(&mut self, message: Bytes) {
83 self.pending_bytes += message.len();
84 self.messages.push_back(PendingMessage {
85 payload: message,
86 enqueued_at: Instant::now(),
87 });
88 }
89
90 fn drain(&mut self) -> impl Iterator<Item = PendingMessage> + '_ {
91 self.pending_bytes = 0;
92 self.messages.drain(..)
93 }
94
95 fn is_empty(&self) -> bool {
96 self.messages.is_empty()
97 }
98
99 fn len(&self) -> usize {
100 self.messages.len()
101 }
102
103 const fn bytes_len(&self) -> usize {
104 self.pending_bytes
105 }
106}
107
108pub struct NetworkWorker<T>
113where
114 T: Debug + Serialize,
115{
116 local_peer_id: PeerId,
118
119 service: NetworkService,
121
122 swarm: Swarm<Behaviour>,
124
125 state: NetworkState,
127
128 command_receiver: mpsc::Receiver<Command>,
130
131 helper_sender: Option<mpsc::Sender<CommandHelper<T>>>,
133
134 monitor: Option<ActorRef<Monitor>>,
136
137 graceful_token: CancellationToken,
139 crash_token: CancellationToken,
140
141 node_type: NodeType,
143
144 safe_mode: bool,
146
147 boot_nodes: HashMap<PeerId, Vec<Multiaddr>>,
149
150 retry_boot_nodes: HashMap<PeerId, Vec<Multiaddr>>,
152
153 pending_outbound_messages: HashMap<PeerId, PendingQueue>,
155
156 pending_inbound_messages: HashMap<PeerId, PendingQueue>,
157
158 response_channels:
160 HashMap<PeerId, VecDeque<ResponseChannel<ReqResMessage>>>,
161
162 successful_dials: u64,
164
165 peer_identify: HashSet<PeerId>,
166
167 retry_by_peer: HashMap<PeerId, RetryState>,
168
169 retry_queue: BinaryHeap<Due>,
170
171 retry_timer: Option<Pin<Box<Sleep>>>,
172
173 peer_action: HashMap<PeerId, Action>,
174
175 max_app_message_bytes: usize,
176 max_pending_outbound_bytes_per_peer: usize,
177 max_pending_inbound_bytes_per_peer: usize,
178 max_pending_outbound_bytes_total: usize,
179 max_pending_inbound_bytes_total: usize,
180
181 metrics: Option<Arc<NetworkMetrics>>,
182}
183
184pub struct NetworkWorkerRuntime {
186 pub monitor: Option<ActorRef<Monitor>>,
188 pub graceful_token: CancellationToken,
190 pub crash_token: CancellationToken,
192 pub machine_spec: Option<MachineSpec>,
194 pub metrics: Option<Arc<NetworkMetrics>>,
196}
197
198impl<T: Debug + Serialize> NetworkWorker<T> {
199 pub fn new(
201 keys: &KeyPair,
202 config: Config,
203 safe_mode: bool,
204 runtime: NetworkWorkerRuntime,
205 ) -> Result<Self, Error> {
206 info!(target: TARGET, "network initialising");
208 let (command_sender, command_receiver) = mpsc::channel(512);
209
210 let key = match keys {
211 KeyPair::Ed25519(ed25519_signer) => {
212 let sk_bytes = ed25519_signer
213 .secret_key_bytes()
214 .map_err(|e| Error::KeyExtraction(e.to_string()))?;
215
216 let sk = ed25519::SecretKey::try_from_bytes(sk_bytes)
217 .map_err(|e| Error::KeyExtraction(e.to_string()))?;
218
219 let kp = ed25519::Keypair::from(sk);
220 Keypair::from(kp)
221 }
222 };
223
224 let local_peer_id = key.public().to_peer_id();
226
227 let boot_nodes = convert_boot_nodes(&config.boot_nodes);
228
229 let addresses = convert_addresses(&config.listen_addresses)?;
231
232 let external_addresses = convert_addresses(&config.external_addresses)?;
234
235 let node_type = config.node_type.clone();
236 let ResolvedSpec { ram_mb, cpu_cores } =
238 resolve_spec(runtime.machine_spec);
239
240 let limits = LimitsConfig::build(ram_mb, cpu_cores);
241 let max_app_message_bytes = config.max_app_message_bytes;
242 let max_pending_outbound_bytes_per_peer =
243 config.max_pending_outbound_bytes_per_peer;
244 let max_pending_inbound_bytes_per_peer =
245 config.max_pending_inbound_bytes_per_peer;
246 let max_pending_outbound_bytes_total =
247 config.max_pending_outbound_bytes_total;
248 let max_pending_inbound_bytes_total =
249 config.max_pending_inbound_bytes_total;
250
251 let transport = build_transport(&key, limits.clone())?;
253
254 let behaviour = Behaviour::new(
255 &key.public(),
256 config,
257 runtime.graceful_token.clone(),
258 runtime.crash_token.clone(),
259 limits,
260 runtime.metrics.clone(),
261 );
262
263 let mut swarm = Swarm::new(
265 transport,
266 behaviour,
267 local_peer_id,
268 swarm::Config::with_tokio_executor()
269 .with_idle_connection_timeout(Duration::from_secs(90))
270 .with_max_negotiating_inbound_streams(32)
271 .with_notify_handler_buffer_size(
272 NonZeroUsize::new(32).expect("32 > 0"),
273 )
274 .with_per_connection_event_buffer_size(16)
275 .with_dial_concurrency_factor(
276 NonZeroU8::new(2).expect("2 > 0"),
277 ),
278 );
279
280 let service = NetworkService::new(command_sender);
281
282 if addresses.is_empty() {
283 swarm
285 .listen_on(
286 "/ip4/0.0.0.0/tcp/0"
287 .parse::<Multiaddr>()
288 .map_err(|e| Error::InvalidAddress(e.to_string()))?,
289 )
290 .map_err(|e| Error::Listen(format!("0.0.0.0:0: {e}")))?;
291 info!(target: TARGET, "listening on all interfaces");
292 } else {
293 for addr in addresses.iter() {
295 info!(target: TARGET, addr = %addr, "listening on address");
296 swarm
297 .listen_on(addr.clone())
298 .map_err(|e| Error::Listen(format!("{addr}: {e}")))?;
299 }
300 }
301
302 if !external_addresses.is_empty() {
303 for addr in external_addresses.iter() {
304 debug!(target: TARGET, addr = %addr, "external address registered");
305 swarm.add_external_address(addr.clone());
306 }
307 }
308
309 info!(target: TARGET, peer_id = %local_peer_id, "local peer id");
310
311 let worker = Self {
312 local_peer_id,
313 service,
314 swarm,
315 state: NetworkState::Start,
316 command_receiver,
317 helper_sender: None,
318 monitor: runtime.monitor,
319 graceful_token: runtime.graceful_token,
320 crash_token: runtime.crash_token,
321 node_type,
322 safe_mode,
323 boot_nodes,
324 retry_boot_nodes: HashMap::new(),
325 pending_outbound_messages: HashMap::default(),
326 pending_inbound_messages: HashMap::default(),
327 response_channels: HashMap::default(),
328 successful_dials: 0,
329 peer_identify: HashSet::new(),
330 retry_by_peer: HashMap::new(),
331 retry_queue: BinaryHeap::new(),
332 retry_timer: None,
333 peer_action: HashMap::new(),
334 max_app_message_bytes,
335 max_pending_outbound_bytes_per_peer,
336 max_pending_inbound_bytes_per_peer,
337 max_pending_outbound_bytes_total,
338 max_pending_inbound_bytes_total,
339 metrics: runtime.metrics,
340 };
341
342 if let Some(metrics) = worker.metric_handle() {
343 metrics.set_state_current(&worker.state);
344 }
345 worker.refresh_runtime_metrics();
346
347 Ok(worker)
348 }
349
350 fn metric_handle(&self) -> Option<&NetworkMetrics> {
351 self.metrics.as_deref()
352 }
353
354 const fn is_safe_mode(&self) -> bool {
355 self.safe_mode
356 }
357
358 fn pending_outbound_messages_len(&self) -> usize {
359 self.pending_outbound_messages
360 .values()
361 .map(PendingQueue::len)
362 .sum()
363 }
364
365 fn pending_outbound_bytes_len(&self) -> usize {
366 self.pending_outbound_messages
367 .values()
368 .map(PendingQueue::bytes_len)
369 .sum()
370 }
371
372 fn pending_inbound_messages_len(&self) -> usize {
373 self.pending_inbound_messages
374 .values()
375 .map(PendingQueue::len)
376 .sum()
377 }
378
379 fn pending_inbound_bytes_len(&self) -> usize {
380 self.pending_inbound_messages
381 .values()
382 .map(PendingQueue::bytes_len)
383 .sum()
384 }
385
386 fn pending_response_channels_len(&self) -> usize {
387 self.response_channels.values().map(VecDeque::len).sum()
388 }
389
390 fn refresh_runtime_metrics(&self) {
391 let Some(metrics) = self.metric_handle() else {
392 return;
393 };
394
395 metrics.set_retry_queue_len(self.retry_queue.len() as i64);
396 metrics.set_pending_outbound_peers(
397 self.pending_outbound_messages.len() as i64,
398 );
399 metrics.set_pending_outbound_messages(
400 self.pending_outbound_messages_len() as i64,
401 );
402 metrics.set_pending_outbound_bytes(
403 self.pending_outbound_bytes_len() as i64
404 );
405 metrics.set_pending_inbound_peers(
406 self.pending_inbound_messages.len() as i64
407 );
408 metrics.set_pending_inbound_messages(
409 self.pending_inbound_messages_len() as i64,
410 );
411 metrics
412 .set_pending_inbound_bytes(self.pending_inbound_bytes_len() as i64);
413 metrics.set_identified_peers(self.peer_identify.len() as i64);
414 metrics.set_response_channels_pending(
415 self.pending_response_channels_len() as i64,
416 );
417 }
418
419 fn observe_pending_message_age(&self, enqueued_at: Instant) {
420 if let Some(metrics) = self.metric_handle() {
421 metrics.observe_pending_message_age_seconds(
422 enqueued_at.elapsed().as_secs_f64(),
423 );
424 }
425 }
426
427 fn drop_pending_outbound_messages(&mut self, peer_id: &PeerId) -> usize {
428 let Some(mut queue) = self.pending_outbound_messages.remove(peer_id)
429 else {
430 return 0;
431 };
432
433 let mut dropped = 0usize;
434 for message in queue.drain() {
435 dropped += 1;
436 self.observe_pending_message_age(message.enqueued_at);
437 }
438 dropped
439 }
440
441 fn drop_pending_inbound_messages(&mut self, peer_id: &PeerId) {
442 if let Some(mut queue) = self.pending_inbound_messages.remove(peer_id) {
443 for message in queue.drain() {
444 self.observe_pending_message_age(message.enqueued_at);
445 }
446 }
447 }
448
449 fn observe_identify_error(
450 &self,
451 error: &swarm::StreamUpgradeError<identify::UpgradeError>,
452 ) {
453 let kind = match error {
454 swarm::StreamUpgradeError::Timeout => "timeout",
455 swarm::StreamUpgradeError::Io(_) => "io",
456 swarm::StreamUpgradeError::NegotiationFailed => "negotiation",
457 swarm::StreamUpgradeError::Apply(_) => "other",
458 };
459
460 if let Some(metrics) = self.metric_handle() {
461 metrics.observe_identify_error(kind);
462 }
463 }
464
465 fn schedule_retry(&mut self, peer: PeerId, schedule_type: ScheduleType) {
466 if self.is_safe_mode() {
467 self.peer_action.remove(&peer);
468 self.retry_by_peer.remove(&peer);
469 self.refresh_runtime_metrics();
470 return;
471 }
472
473 if self.peer_action.contains_key(&peer) {
474 return;
475 }
476
477 let (kind, addrs) = match schedule_type {
478 ScheduleType::Discover => (RetryKind::Discover, vec![]),
479 ScheduleType::Dial(multiaddrs) => (RetryKind::Dial, multiaddrs),
480 };
481
482 let now = Instant::now();
483 let base = Duration::from_millis(250);
484 let cap = Duration::from_secs(30);
485
486 let entry = self.retry_by_peer.entry(peer).or_insert(RetryState {
487 attempts: 0,
488 when: now,
489 kind,
490 addrs: vec![],
491 });
492
493 let when = if matches!(
494 (entry.kind, kind),
495 (RetryKind::Discover, RetryKind::Dial)
496 ) {
497 now
498 } else {
499 if entry.attempts >= 8 {
500 self.clear_pending_messages(&peer);
501 return;
502 }
503
504 let exp = 1u32 << entry.attempts.min(7);
507 let mut delay = base * exp;
508 if delay > cap {
509 delay = cap;
510 }
511
512 let hash = peer
515 .to_bytes()
516 .iter()
517 .fold(0u32, |acc, &b| acc.wrapping_add(b as u32));
518 let j = 80 + (hash % 41);
519 delay = delay * j / 100;
520
521 now + delay
522 };
523
524 entry.when = when;
525 entry.kind = kind;
526 entry.addrs = addrs;
527
528 self.peer_action.insert(peer, Action::from(kind));
529
530 self.retry_queue.push(Due(peer, entry.when));
531 self.arm_retry_timer();
532 self.refresh_runtime_metrics();
533 }
534
535 fn arm_retry_timer(&mut self) {
536 if let Some(next) = self.retry_queue.peek() {
537 match &mut self.retry_timer {
538 Some(timer) => timer.as_mut().reset(next.1),
539 None => self.retry_timer = Some(Box::pin(sleep_until(next.1))),
540 }
541 }
542 }
543
544 fn drain_due_retries(
545 &mut self,
546 ) -> Vec<(PeerId, RetryKind, Vec<Multiaddr>)> {
547 let now = Instant::now();
548 let mut out = Vec::new();
549 while let Some(Due(peer, when)) = self.retry_queue.peek().cloned() {
550 if when > now {
551 break;
552 }
553
554 self.retry_queue.pop();
555 if let Some(retry) = self.retry_by_peer.get(&peer).cloned()
561 && retry.when == when
562 {
563 self.retry_by_peer
564 .entry(peer)
565 .and_modify(|x| x.attempts += 1);
566 out.push((peer, retry.kind, retry.addrs));
567 }
568 }
569
570 if self.retry_queue.is_empty() {
571 self.retry_timer = None;
572 } else {
573 self.arm_retry_timer();
574 }
575 self.refresh_runtime_metrics();
576 out
577 }
578
579 pub fn add_helper_sender(
581 &mut self,
582 helper_sender: mpsc::Sender<CommandHelper<T>>,
583 ) {
584 self.helper_sender = Some(helper_sender);
585 }
586
587 pub const fn local_peer_id(&self) -> PeerId {
589 self.local_peer_id
590 }
591
592 fn send_message(
594 &mut self,
595 peer: PeerId,
596 message: Bytes,
597 ) -> Result<(), Error> {
598 if self.is_safe_mode() {
599 debug!(
600 target: TARGET,
601 peer_id = %peer,
602 size = message.len(),
603 "safe mode active; dropping outbound message"
604 );
605 return Ok(());
606 }
607
608 if message.len() > self.max_app_message_bytes {
609 warn!(
610 target: TARGET,
611 peer_id = %peer,
612 size = message.len(),
613 max = self.max_app_message_bytes,
614 "outbound payload rejected: message too large",
615 );
616 if let Some(metrics) = self.metric_handle() {
617 metrics.inc_oversized_outbound_drop();
618 }
619 self.refresh_runtime_metrics();
620 return Err(Error::MessageTooLarge {
621 size: message.len(),
622 max: self.max_app_message_bytes,
623 });
624 }
625
626 if let Some(mut responses) = self.response_channels.remove(&peer) {
627 while let Some(response_channel) = responses.pop_front() {
628 match self
629 .swarm
630 .behaviour_mut()
631 .send_response(response_channel, message.clone())
632 {
633 Ok(()) => {
634 if !responses.is_empty() {
635 self.response_channels.insert(peer, responses);
636 }
637 self.refresh_runtime_metrics();
638 return Ok(());
639 }
640 Err(e) => {
641 debug!(target: TARGET, peer_id = %peer, error = %e, "failed to send response: channel may already be consumed");
642 }
643 }
644 }
645 }
646
647 self.add_pending_outbound_message(peer, message);
648
649 if self.swarm.behaviour_mut().is_known_peer(&peer) {
650 if let Some(Action::Identified(..)) = self.peer_action.get(&peer) {
651 self.send_pending_outbound_messages(peer);
652 } else {
653 self.schedule_retry(peer, ScheduleType::Dial(vec![]));
654 }
655 } else {
656 self.schedule_retry(peer, ScheduleType::Discover);
657 }
658
659 Ok(())
660 }
661
662 fn add_pending_outbound_message(&mut self, peer: PeerId, message: Bytes) {
666 let message_len = message.len();
667 let mut dropped_count = 0u64;
668 let mut dropped_bytes_limit_peer = 0u64;
669 let mut dropped_bytes_limit_global = 0u64;
670 let mut dropped_messages = Vec::new();
671 let mut duplicate = false;
672 let mut total_pending_bytes = self.pending_outbound_bytes_len();
673 let per_peer_limit = self.max_pending_outbound_bytes_per_peer;
674 let global_limit = self.max_pending_outbound_bytes_total;
675
676 {
677 let queue = self.pending_outbound_messages.entry(peer).or_default();
678 if queue.contains(&message) {
679 duplicate = true;
680 } else {
681 while queue.len() >= MAX_PENDING_MESSAGES_PER_PEER {
682 if let Some(evicted) = queue.pop_front() {
683 dropped_count += 1;
684 total_pending_bytes = total_pending_bytes
685 .saturating_sub(evicted.payload.len());
686 dropped_messages.push(evicted);
687 } else {
688 break;
689 }
690 }
691
692 if per_peer_limit > 0 {
693 while queue.bytes_len() + message_len > per_peer_limit {
694 if let Some(evicted) = queue.pop_front() {
695 dropped_bytes_limit_peer += 1;
696 total_pending_bytes = total_pending_bytes
697 .saturating_sub(evicted.payload.len());
698 dropped_messages.push(evicted);
699 } else {
700 break;
701 }
702 }
703 }
704
705 if per_peer_limit > 0
706 && queue.bytes_len() + message_len > per_peer_limit
707 {
708 dropped_bytes_limit_peer += 1;
709 } else if global_limit > 0
710 && total_pending_bytes.saturating_add(message_len)
711 > global_limit
712 {
713 dropped_bytes_limit_global += 1;
714 } else {
715 queue.push_back(message);
716 }
717 }
718 }
719
720 if duplicate {
721 self.refresh_runtime_metrics();
722 return;
723 }
724
725 for evicted in dropped_messages {
726 self.observe_pending_message_age(evicted.enqueued_at);
727 }
728
729 if dropped_count > 0 {
730 warn!(
731 target: TARGET,
732 peer_id = %peer,
733 dropped = dropped_count,
734 max_messages = MAX_PENDING_MESSAGES_PER_PEER,
735 "outbound queue count limit reached; oldest messages evicted",
736 );
737 }
738
739 if dropped_bytes_limit_peer > 0 {
740 warn!(
741 target: TARGET,
742 peer_id = %peer,
743 dropped = dropped_bytes_limit_peer,
744 message_bytes = message_len,
745 max_queue_bytes = per_peer_limit,
746 "outbound queue bytes limit reached; messages evicted/dropped",
747 );
748 }
749
750 if dropped_bytes_limit_global > 0 {
751 warn!(
752 target: TARGET,
753 peer_id = %peer,
754 dropped = dropped_bytes_limit_global,
755 message_bytes = message_len,
756 max_queue_bytes_total = global_limit,
757 "outbound global queue bytes limit reached; messages dropped",
758 );
759 }
760
761 if let Some(metrics) = self.metric_handle() {
762 metrics.inc_outbound_queue_drop_by(dropped_count);
763 metrics.inc_outbound_queue_bytes_drop_per_peer_by(
764 dropped_bytes_limit_peer,
765 );
766 metrics.inc_outbound_queue_bytes_drop_global_by(
767 dropped_bytes_limit_global,
768 );
769 }
770
771 self.refresh_runtime_metrics();
772 }
773
774 fn add_pending_inbound_message(&mut self, peer: PeerId, message: Bytes) {
775 let message_len = message.len();
776 let mut dropped_count = 0u64;
777 let mut dropped_bytes_limit_peer = 0u64;
778 let mut dropped_bytes_limit_global = 0u64;
779 let mut dropped_messages = Vec::new();
780 let mut duplicate = false;
781 let mut total_pending_bytes = self.pending_inbound_bytes_len();
782 let per_peer_limit = self.max_pending_inbound_bytes_per_peer;
783 let global_limit = self.max_pending_inbound_bytes_total;
784
785 {
786 let queue = self.pending_inbound_messages.entry(peer).or_default();
787 if queue.contains(&message) {
788 duplicate = true;
789 } else {
790 while queue.len() >= MAX_PENDING_MESSAGES_PER_PEER {
791 if let Some(evicted) = queue.pop_front() {
792 dropped_count += 1;
793 total_pending_bytes = total_pending_bytes
794 .saturating_sub(evicted.payload.len());
795 dropped_messages.push(evicted);
796 } else {
797 break;
798 }
799 }
800
801 if per_peer_limit > 0 {
802 while queue.bytes_len() + message_len > per_peer_limit {
803 if let Some(evicted) = queue.pop_front() {
804 dropped_bytes_limit_peer += 1;
805 total_pending_bytes = total_pending_bytes
806 .saturating_sub(evicted.payload.len());
807 dropped_messages.push(evicted);
808 } else {
809 break;
810 }
811 }
812 }
813
814 if per_peer_limit > 0
815 && queue.bytes_len() + message_len > per_peer_limit
816 {
817 dropped_bytes_limit_peer += 1;
818 } else if global_limit > 0
819 && total_pending_bytes.saturating_add(message_len)
820 > global_limit
821 {
822 dropped_bytes_limit_global += 1;
823 } else {
824 queue.push_back(message);
825 }
826 }
827 }
828
829 if duplicate {
830 self.refresh_runtime_metrics();
831 return;
832 }
833
834 for evicted in dropped_messages {
835 self.observe_pending_message_age(evicted.enqueued_at);
836 }
837
838 if dropped_count > 0 {
839 warn!(
840 target: TARGET,
841 peer_id = %peer,
842 dropped = dropped_count,
843 max_messages = MAX_PENDING_MESSAGES_PER_PEER,
844 "inbound queue count limit reached; oldest messages evicted",
845 );
846 }
847
848 if dropped_bytes_limit_peer > 0 {
849 warn!(
850 target: TARGET,
851 peer_id = %peer,
852 dropped = dropped_bytes_limit_peer,
853 message_bytes = message_len,
854 max_queue_bytes = per_peer_limit,
855 "inbound queue bytes limit reached; messages evicted/dropped",
856 );
857 }
858
859 if dropped_bytes_limit_global > 0 {
860 warn!(
861 target: TARGET,
862 peer_id = %peer,
863 dropped = dropped_bytes_limit_global,
864 message_bytes = message_len,
865 max_queue_bytes_total = global_limit,
866 "inbound global queue bytes limit reached; messages dropped",
867 );
868 }
869
870 if let Some(metrics) = self.metric_handle() {
871 metrics.inc_inbound_queue_drop_by(dropped_count);
872 metrics.inc_inbound_queue_bytes_drop_per_peer_by(
873 dropped_bytes_limit_peer,
874 );
875 metrics.inc_inbound_queue_bytes_drop_global_by(
876 dropped_bytes_limit_global,
877 );
878 }
879
880 self.refresh_runtime_metrics();
881 }
882
883 fn add_ephemeral_response(
885 &mut self,
886 peer: PeerId,
887 response_channel: ResponseChannel<ReqResMessage>,
888 ) {
889 self.response_channels
890 .entry(peer)
891 .or_default()
892 .push_back(response_channel);
893 self.refresh_runtime_metrics();
894 }
895
896 fn send_pending_outbound_messages(&mut self, peer: PeerId) {
898 if let Some(mut queue) = self.pending_outbound_messages.remove(&peer) {
899 for message in queue.drain() {
900 self.observe_pending_message_age(message.enqueued_at);
901 self.swarm
902 .behaviour_mut()
903 .send_message(&peer, message.payload);
904 }
905 }
906
907 self.retry_by_peer.remove(&peer);
908 self.refresh_runtime_metrics();
909 }
910
911 pub fn service(&self) -> NetworkService {
913 self.service.clone()
914 }
915
916 async fn change_state(&mut self, state: NetworkState) {
918 trace!(target: TARGET, state = ?state, "state changed");
919 self.state = state.clone();
920 if let Some(metrics) = self.metric_handle() {
921 metrics.observe_state_transition(&state);
922 }
923 self.send_event(NetworkEvent::StateChanged(state)).await;
924 }
925
926 async fn send_event(&mut self, event: NetworkEvent) {
928 if let Some(monitor) = self.monitor.as_mut()
929 && let Err(e) = monitor.tell(MonitorMessage::Network(event)).await
930 {
931 error!(target: TARGET, error = %e, "failed to forward event to monitor");
932 self.crash_token.cancel();
933 }
934 }
935
936 pub async fn run(&mut self) {
938 let bootstrap_start = Instant::now();
939
940 if let Err(error) = self.run_connection().await {
942 if let Some(metrics) = self.metric_handle() {
943 metrics.observe_bootstrap_duration_seconds(
944 "failure",
945 bootstrap_start.elapsed().as_secs_f64(),
946 );
947 }
948 error!(target: TARGET, error = %error, "bootstrap connection failed");
949 self.send_event(NetworkEvent::Error(error)).await;
950 self.crash_token.cancel();
952 return;
953 }
954 if let Some(metrics) = self.metric_handle() {
955 metrics.observe_bootstrap_duration_seconds(
956 "success",
957 bootstrap_start.elapsed().as_secs_f64(),
958 );
959 }
960
961 if self.state != NetworkState::Running {
962 self.change_state(NetworkState::Running).await;
963 }
964
965 if !self.is_safe_mode() {
967 self.swarm.behaviour_mut().finish_prerouting_state();
968 }
969 self.run_main().await;
971 }
972
973 pub async fn run_connection(&mut self) -> Result<(), Error> {
975 if self.is_safe_mode() {
976 self.change_state(NetworkState::Running).await;
977 return Ok(());
978 }
979
980 if self.node_type == NodeType::Bootstrap && self.boot_nodes.is_empty() {
982 self.change_state(NetworkState::Running).await;
983 Ok(())
984 } else {
985 self.change_state(NetworkState::Dial).await;
986
987 let mut retrys: u8 = 0;
988
989 let dialing_round_timeout = sleep(Duration::from_secs(0));
994 tokio::pin!(dialing_round_timeout);
995 let mut dialing_timeout_active = false;
996
997 loop {
998 match self.state {
999 NetworkState::Dial => {
1000 if self.boot_nodes.is_empty() {
1002 error!(target: TARGET, "no bootstrap nodes available");
1003 self.send_event(NetworkEvent::Error(
1004 Error::NoBootstrapNode,
1005 ))
1006 .await;
1007 self.change_state(NetworkState::Disconnected).await;
1008 } else {
1009 let copy_boot_nodes = self.boot_nodes.clone();
1010
1011 for (peer, addresses) in copy_boot_nodes {
1012 if let Some(metrics) = self.metric_handle() {
1013 metrics.inc_dial_attempt_bootstrap();
1014 }
1015 if let Err(e) = self.swarm.dial(
1016 DialOpts::peer_id(peer)
1017 .addresses(addresses.clone())
1018 .build(),
1019 ) {
1020 let (add_to_retry, new_addresses) = self
1021 .handle_dial_error(e, &peer, true)
1022 .unwrap_or((false, vec![]));
1023 self.boot_nodes.remove(&peer);
1024 if add_to_retry {
1025 if new_addresses.is_empty() {
1026 self.retry_boot_nodes
1027 .insert(peer, addresses);
1028 } else {
1029 self.retry_boot_nodes
1030 .insert(peer, new_addresses);
1031 }
1032 }
1033 }
1034 }
1035 if !self.boot_nodes.is_empty() {
1036 self.change_state(NetworkState::Dialing).await;
1037 dialing_round_timeout.as_mut().reset(
1038 Instant::now() + Duration::from_secs(15),
1039 );
1040 dialing_timeout_active = true;
1041 } else {
1042 warn!(target: TARGET, "all bootstrap dials failed");
1043 self.change_state(NetworkState::Disconnected)
1044 .await;
1045 }
1046 }
1047 }
1048 NetworkState::Dialing => {
1049 if self.boot_nodes.is_empty()
1051 && self.successful_dials == 0
1052 && !self.retry_boot_nodes.is_empty()
1053 && retrys < 3
1054 {
1055 retrys += 1;
1056 let wait = Duration::from_secs(1u64 << retrys); debug!(target: TARGET, attempt = retrys, wait_secs = wait.as_secs(), "retrying bootstrap dials");
1058
1059 let backoff = sleep(wait);
1060 tokio::pin!(backoff);
1061 loop {
1062 tokio::select! {
1063 _ = &mut backoff => break,
1064 event = self.swarm.select_next_some() => {
1065 self.handle_connection_events(event).await;
1066 }
1067 _ = self.graceful_token.cancelled() => {
1068 return Err(Error::Cancelled);
1069 }
1070 _ = self.crash_token.cancelled() => {
1071 return Err(Error::Cancelled);
1072 }
1073 }
1074 }
1075
1076 dialing_timeout_active = false;
1077 self.boot_nodes.clone_from(&self.retry_boot_nodes);
1078 self.retry_boot_nodes.clear();
1079 self.change_state(NetworkState::Dial).await;
1080 }
1081 else if self.boot_nodes.is_empty()
1083 && self.successful_dials == 0
1084 {
1085 self.change_state(NetworkState::Disconnected).await;
1086 } else if self.boot_nodes.is_empty() {
1088 return Ok(());
1089 }
1090 }
1091 NetworkState::Running => {
1092 return Ok(());
1093 }
1094 NetworkState::Disconnected => {
1095 return Err(Error::NoBootstrapNode);
1096 }
1097 _ => {}
1098 }
1099 if self.state != NetworkState::Disconnected {
1100 tokio::select! {
1101 event = self.swarm.select_next_some() => {
1102 self.handle_connection_events(event).await;
1103 }
1104 _ = self.graceful_token.cancelled() => {
1105 return Err(Error::Cancelled);
1106 }
1107 _ = self.crash_token.cancelled() => {
1108 return Err(Error::Cancelled);
1109 }
1110 _ = &mut dialing_round_timeout, if dialing_timeout_active => {
1111 warn!(
1112 target: TARGET,
1113 remaining = self.boot_nodes.len(),
1114 "bootstrap round timed out waiting for Identify; \
1115 moving remaining peers to retry queue"
1116 );
1117 for (peer, addrs) in self.boot_nodes.drain() {
1118 self.retry_boot_nodes.insert(peer, addrs);
1119 }
1120 dialing_timeout_active = false;
1121 }
1122 }
1123 }
1124 }
1125 }
1126 }
1127
1128 fn collect_retryable_transport_addresses(
1129 &self,
1130 items: Vec<(Multiaddr, libp2p::TransportError<std::io::Error>)>,
1131 trace_unreachable: bool,
1132 ) -> Vec<Multiaddr> {
1133 let mut new_addresses = vec![];
1134 for (address, error) in items {
1135 if trace_unreachable {
1136 trace!(target: TARGET, addr = %address, err = ?error, "address unreachable");
1137 }
1138
1139 if let libp2p::TransportError::Other(e) = error {
1140 match e.kind() {
1141 std::io::ErrorKind::ConnectionRefused
1142 | std::io::ErrorKind::TimedOut
1143 | std::io::ErrorKind::ConnectionAborted
1144 | std::io::ErrorKind::NotConnected
1145 | std::io::ErrorKind::BrokenPipe
1146 | std::io::ErrorKind::Interrupted
1147 | std::io::ErrorKind::HostUnreachable
1148 | std::io::ErrorKind::NetworkUnreachable => {
1149 new_addresses.push(address);
1150 }
1151 _ => {}
1152 }
1153 };
1154 }
1155
1156 new_addresses
1157 }
1158
1159 fn handle_dial_error(
1164 &mut self,
1165 e: DialError,
1166 peer_id: &PeerId,
1167 bootstrap_flow: bool,
1168 ) -> Option<(bool, Vec<Multiaddr>)> {
1169 let phase = if bootstrap_flow {
1170 "bootstrap"
1171 } else {
1172 "runtime"
1173 };
1174 match e {
1175 DialError::LocalPeerId { .. } => {
1176 if let Some(metrics) = self.metric_handle() {
1177 metrics.observe_dial_failure(phase, "local_peer_id");
1178 }
1179 if bootstrap_flow {
1180 warn!(target: TARGET, peer_id = %peer_id, "dial rejected: connected peer-id matches local peer");
1181 return Some((false, vec![]));
1182 }
1183
1184 self.retry_by_peer.remove(peer_id);
1185 self.clear_pending_messages(peer_id);
1186 self.swarm
1187 .behaviour_mut()
1188 .clean_hard_peer_to_remove(peer_id);
1189 None
1190 }
1191 DialError::NoAddresses => {
1192 if let Some(metrics) = self.metric_handle() {
1193 metrics.observe_dial_failure(phase, "no_addresses");
1194 }
1195 if bootstrap_flow {
1196 debug!(target: TARGET, peer_id = %peer_id, "dial skipped: no addresses");
1197 }
1198 Some((false, vec![]))
1199 }
1200 DialError::DialPeerConditionFalse(_) => {
1201 if let Some(metrics) = self.metric_handle() {
1202 metrics.observe_dial_failure(phase, "peer_condition");
1203 }
1204 if bootstrap_flow {
1205 debug!(target: TARGET, peer_id = %peer_id, "dial skipped: peer condition not met");
1206 return Some((false, vec![]));
1207 }
1208
1209 None
1210 }
1211 DialError::Denied { cause } => {
1212 if let Some(metrics) = self.metric_handle() {
1213 metrics.observe_dial_failure(phase, "denied");
1214 }
1215 if bootstrap_flow {
1216 debug!(target: TARGET, peer_id = %peer_id, cause = %cause, "dial denied by behaviour");
1217 }
1218 Some((false, vec![]))
1219 }
1220 DialError::Aborted => {
1221 if let Some(metrics) = self.metric_handle() {
1222 metrics.observe_dial_failure(phase, "aborted");
1223 }
1224 if bootstrap_flow {
1225 debug!(target: TARGET, peer_id = %peer_id, "dial aborted, will retry");
1226 }
1227 Some((true, vec![]))
1228 }
1229 DialError::WrongPeerId { obtained, .. } => {
1230 if let Some(metrics) = self.metric_handle() {
1231 metrics.observe_dial_failure(phase, "wrong_peer_id");
1232 }
1233 if bootstrap_flow {
1234 warn!(target: TARGET, expected = %peer_id, obtained = %obtained, "dial failed: peer identity mismatch");
1235 return Some((false, vec![]));
1236 }
1237
1238 self.retry_by_peer.remove(peer_id);
1239 self.clear_pending_messages(peer_id);
1240 self.swarm
1241 .behaviour_mut()
1242 .clean_hard_peer_to_remove(peer_id);
1243 None
1244 }
1245 DialError::Transport(items) => {
1246 if let Some(metrics) = self.metric_handle() {
1247 metrics.observe_dial_failure(phase, "transport");
1248 }
1249 if bootstrap_flow {
1250 debug!(target: TARGET, peer_id = %peer_id, "transport dial failed");
1251 }
1252
1253 let new_addresses = self.collect_retryable_transport_addresses(
1254 items,
1255 bootstrap_flow,
1256 );
1257 if !new_addresses.is_empty() {
1258 Some((true, new_addresses))
1259 } else {
1260 Some((false, vec![]))
1261 }
1262 }
1263 }
1264 }
1265
1266 async fn handle_connection_events(
1268 &mut self,
1269 event: SwarmEvent<BehaviourEvent>,
1270 ) {
1271 match event {
1272 SwarmEvent::ConnectionClosed { peer_id, .. } => {
1273 self.boot_nodes.remove(&peer_id);
1274 }
1275 SwarmEvent::OutgoingConnectionError {
1276 peer_id: Some(peer_id),
1277 error,
1278 ..
1279 } => {
1280 let (add_to_retry, new_addresses) = self
1281 .handle_dial_error(error, &peer_id, true)
1282 .unwrap_or((false, vec![]));
1283
1284 if let Some(addresses) = self.boot_nodes.remove(&peer_id)
1285 && add_to_retry
1286 {
1287 if new_addresses.is_empty() {
1288 self.retry_boot_nodes.insert(peer_id, addresses);
1289 } else {
1290 self.retry_boot_nodes.insert(peer_id, new_addresses);
1291 }
1292 }
1293 }
1294 SwarmEvent::Behaviour(BehaviourEvent::Identified {
1295 peer_id,
1296 info,
1297 connection_id,
1298 }) => {
1299 if !self
1300 .check_protocols(&info.protocol_version, &info.protocols)
1301 {
1302 warn!(target: TARGET, peer_id = %peer_id, protocol_version = %info.protocol_version, "peer uses incompatible protocols; closing connection");
1303
1304 self.swarm
1305 .behaviour_mut()
1306 .close_connections(&peer_id, Some(connection_id));
1307 } else {
1308 self.peer_action
1309 .insert(peer_id, Action::Identified(connection_id));
1310
1311 let mut any_address_is_valid = false;
1312 for addr in info.listen_addrs {
1313 if self
1314 .swarm
1315 .behaviour_mut()
1316 .add_self_reported_address(&peer_id, &addr)
1317 {
1318 any_address_is_valid = true;
1319 }
1320 }
1321
1322 if any_address_is_valid {
1323 if self.boot_nodes.remove(&peer_id).is_some() {
1324 self.successful_dials += 1;
1325 }
1326 self.peer_identify.insert(peer_id);
1327 } else {
1328 warn!(target: TARGET, peer_id = %peer_id, "bootstrap peer has no valid addresses");
1329
1330 self.swarm
1331 .behaviour_mut()
1332 .close_connections(&peer_id, Some(connection_id));
1333 }
1334 }
1335 }
1336 SwarmEvent::Behaviour(BehaviourEvent::IdentifyError {
1337 peer_id,
1338 error,
1339 }) => {
1340 self.observe_identify_error(&error);
1341 match error {
1342 swarm::StreamUpgradeError::Timeout => {
1343 }
1345 _ => {
1346 debug!(target: TARGET, peer_id = %peer_id, error = %error, "identify hard failure during bootstrap; queuing for retry");
1349 if let Some(addrs) = self.boot_nodes.remove(&peer_id) {
1350 self.retry_boot_nodes.insert(peer_id, addrs);
1351 }
1352 }
1353 }
1354 }
1355 _ => {}
1356 }
1357 self.refresh_runtime_metrics();
1358 }
1359
1360 fn clear_pending_messages(&mut self, peer_id: &PeerId) {
1361 warn!(target: TARGET, peer_id = %peer_id, "max dial attempts reached; dropping pending messages");
1362
1363 let dropped = self.drop_pending_outbound_messages(peer_id);
1364 if let Some(metrics) = self.metric_handle() {
1365 metrics.inc_max_retries_drop_by(dropped as u64);
1366 }
1367 self.peer_action.remove(peer_id);
1368 self.retry_by_peer.remove(peer_id);
1369 self.refresh_runtime_metrics();
1370 }
1371
1372 fn check_protocols(
1373 &self,
1374 protocol_version: &str,
1375 protocols: &[StreamProtocol],
1376 ) -> bool {
1377 let supp_protocols: HashSet<StreamProtocol> = protocols
1378 .iter()
1379 .cloned()
1380 .collect::<HashSet<StreamProtocol>>();
1381
1382 protocol_version == IDENTIFY_PROTOCOL
1383 && supp_protocols.contains(&StreamProtocol::new(REQRES_PROTOCOL))
1384 }
1385
1386 pub async fn run_main(&mut self) {
1388 info!(target: TARGET, "network worker started");
1389
1390 loop {
1391 tokio::select! {
1392 command = self.command_receiver.recv() => {
1393 match command {
1394 Some(command) => self.handle_command(command).await,
1395 None => break,
1396 }
1397 }
1398 event = self.swarm.select_next_some() => {
1399 self.handle_event(event).await;
1401 }
1402 _ = async {
1403 if let Some(t) = &mut self.retry_timer {
1404 t.as_mut().await;
1405 }
1406 }, if self.retry_timer.is_some() => {
1407 for (peer, kind, addrs) in self.drain_due_retries() {
1408 if let Some(action) = self.peer_action.get(&peer) {
1409 match (action, kind) {
1410 (Action::Discover, RetryKind::Discover) => {
1411 self.swarm.behaviour_mut().discover(&peer);
1412 },
1413 (Action::Dial, RetryKind::Dial) => {
1414 if let Some(metrics) = self.metric_handle() {
1415 metrics.inc_dial_attempt_runtime();
1416 }
1417 if let Err(error) = self.swarm.dial(
1418 DialOpts::peer_id(peer)
1419 .addresses(addrs)
1420 .extend_addresses_through_behaviour()
1421 .build()
1422 ) && let Some((retry, new_address)) =
1423 self.handle_dial_error(error, &peer, false) {
1424
1425 self.peer_action.remove(&peer);
1426 if retry {
1427 let addr = new_address
1428 .iter()
1429 .filter(|x| {
1430 !self
1431 .swarm
1432 .behaviour()
1433 .is_invalid_address(x)
1434 })
1435 .cloned()
1436 .collect::<Vec<Multiaddr>>();
1437
1438 if addr.is_empty() {
1439 self.schedule_retry(peer, ScheduleType::Discover);
1440 } else {
1441 self.schedule_retry(peer, ScheduleType::Dial(addr.clone()));
1442 }
1443 } else {
1444 self.schedule_retry(peer, ScheduleType::Discover);
1445 }
1446 };
1447 },
1448 _ => {}
1449 }
1450 };
1451 }
1452 },
1453 _ = self.graceful_token.cancelled() => {
1454 break;
1455 }
1456 _ = self.crash_token.cancelled() => {
1457 break;
1458 }
1459 }
1460 }
1461 }
1462
1463 async fn handle_command(&mut self, command: Command) {
1464 match command {
1465 Command::SendMessage { peer, message } => {
1466 if self.is_safe_mode() {
1467 debug!(
1468 target: TARGET,
1469 peer_id = %peer,
1470 size = message.len(),
1471 "safe mode active; ignoring send command"
1472 );
1473 return;
1474 }
1475 if let Err(error) = self.send_message(peer, message) {
1476 error!(target: TARGET, error = %error, "failed to deliver message");
1477 self.send_event(NetworkEvent::Error(error)).await;
1478 }
1479 }
1480 }
1481 }
1482
1483 async fn message_to_helper(
1484 &mut self,
1485 message: MessagesHelper,
1486 peer_id: &PeerId,
1487 ) {
1488 let sender = match peer_id_to_ed25519_pubkey_bytes(peer_id) {
1489 Ok(public_key) => public_key,
1490 Err(e) => {
1491 warn!(target: TARGET, error = %e, "cannot resolve public key from peer id");
1492 return;
1493 }
1494 };
1495
1496 'Send: {
1497 if let Some(helper_sender) = self.helper_sender.as_mut() {
1498 match message {
1499 MessagesHelper::Single(items) => {
1500 if helper_sender
1501 .send(CommandHelper::ReceivedMessage {
1502 sender,
1503 message: items,
1504 })
1505 .await
1506 .is_err()
1507 {
1508 break 'Send;
1509 }
1510 }
1511 MessagesHelper::Vec(items) => {
1512 for item in items {
1513 if helper_sender
1514 .send(CommandHelper::ReceivedMessage {
1515 sender,
1516 message: item,
1517 })
1518 .await
1519 .is_err()
1520 {
1521 break 'Send;
1522 }
1523 }
1524 }
1525 }
1526
1527 return;
1528 }
1529 }
1530
1531 error!(target: TARGET, "helper channel closed; shutting down");
1532 self.crash_token.cancel();
1533 }
1534
1535 async fn handle_event(&mut self, event: SwarmEvent<BehaviourEvent>) {
1536 if self.is_safe_mode() {
1537 match event {
1538 SwarmEvent::Behaviour(BehaviourEvent::ReqresMessage {
1539 peer_id,
1540 ..
1541 }) => {
1542 debug!(
1543 target: TARGET,
1544 peer_id = %peer_id,
1545 "safe mode active; dropping inbound reqres message"
1546 );
1547 self.swarm
1548 .behaviour_mut()
1549 .close_connections(&peer_id, None);
1550 }
1551 SwarmEvent::Behaviour(BehaviourEvent::Identified {
1552 peer_id,
1553 connection_id,
1554 ..
1555 }) => {
1556 debug!(
1557 target: TARGET,
1558 peer_id = %peer_id,
1559 "safe mode active; closing identified peer"
1560 );
1561 self.swarm
1562 .behaviour_mut()
1563 .close_connections(&peer_id, Some(connection_id));
1564 }
1565 SwarmEvent::Behaviour(BehaviourEvent::IdentifyError {
1566 peer_id,
1567 ..
1568 }) => {
1569 self.swarm
1570 .behaviour_mut()
1571 .close_connections(&peer_id, None);
1572 }
1573 SwarmEvent::ConnectionEstablished {
1574 peer_id,
1575 connection_id,
1576 ..
1577 } => {
1578 debug!(
1579 target: TARGET,
1580 peer_id = %peer_id,
1581 "safe mode active; closing established connection"
1582 );
1583 self.swarm
1584 .behaviour_mut()
1585 .close_connections(&peer_id, Some(connection_id));
1586 }
1587 SwarmEvent::OutgoingConnectionError { .. }
1588 | SwarmEvent::ConnectionClosed { .. }
1589 | SwarmEvent::IncomingConnectionError { .. }
1590 | SwarmEvent::IncomingConnection { .. }
1591 | SwarmEvent::ListenerClosed { .. }
1592 | SwarmEvent::Dialing { .. }
1593 | SwarmEvent::NewExternalAddrCandidate { .. }
1594 | SwarmEvent::ExternalAddrConfirmed { .. }
1595 | SwarmEvent::ExternalAddrExpired { .. }
1596 | SwarmEvent::NewExternalAddrOfPeer { .. }
1597 | SwarmEvent::NewListenAddr { .. }
1598 | SwarmEvent::ExpiredListenAddr { .. }
1599 | SwarmEvent::ListenerError { .. }
1600 | SwarmEvent::Behaviour(BehaviourEvent::ReqresFailure {
1601 ..
1602 })
1603 | SwarmEvent::Behaviour(BehaviourEvent::ClosestPeer {
1604 ..
1605 })
1606 | SwarmEvent::Behaviour(BehaviourEvent::Dummy) => {}
1607 _ => {}
1608 }
1609 self.refresh_runtime_metrics();
1610 return;
1611 }
1612
1613 match event {
1614 SwarmEvent::Behaviour(event) => {
1615 match event {
1616 BehaviourEvent::Identified {
1617 peer_id,
1618 info,
1619 connection_id,
1620 } => {
1621 if !self.check_protocols(
1622 &info.protocol_version,
1623 &info.protocols,
1624 ) {
1625 warn!(
1626 target: TARGET,
1627 peer_id = %peer_id,
1628 protocol_version = %info.protocol_version,
1629 protocols = ?info.protocols,
1630 "peer uses incompatible protocols; closing connection"
1631 );
1632
1633 self.clear_pending_messages(&peer_id);
1634
1635 self.swarm
1636 .behaviour_mut()
1637 .clean_hard_peer_to_remove(&peer_id);
1638
1639 self.swarm.behaviour_mut().close_connections(
1640 &peer_id,
1641 Some(connection_id),
1642 );
1643 } else {
1644 self.peer_action.insert(
1645 peer_id,
1646 Action::Identified(connection_id),
1647 );
1648
1649 self.swarm
1650 .behaviour_mut()
1651 .clean_peer_to_remove(&peer_id);
1652 for addr in info.listen_addrs {
1653 self.swarm
1654 .behaviour_mut()
1655 .add_self_reported_address(&peer_id, &addr);
1656 }
1657
1658 self.peer_identify.insert(peer_id);
1659
1660 if let Some(mut queue) =
1661 self.pending_inbound_messages.remove(&peer_id)
1662 {
1663 let mut buffered = VecDeque::new();
1664 for message in queue.drain() {
1665 self.observe_pending_message_age(
1666 message.enqueued_at,
1667 );
1668 buffered.push_back(message.payload);
1669 }
1670 self.message_to_helper(
1671 MessagesHelper::Vec(buffered),
1672 &peer_id,
1673 )
1674 .await;
1675 };
1676
1677 self.send_pending_outbound_messages(peer_id);
1678 }
1679 }
1680 BehaviourEvent::IdentifyError { peer_id, error } => {
1681 self.observe_identify_error(&error);
1682 debug!(target: TARGET, peer_id = %peer_id, error = %error, "identify error");
1683
1684 match error {
1685 swarm::StreamUpgradeError::Timeout => {
1686 }
1689 swarm::StreamUpgradeError::Apply(..)
1690 | swarm::StreamUpgradeError::NegotiationFailed
1691 | swarm::StreamUpgradeError::Io(..) => {
1692 self.drop_pending_outbound_messages(&peer_id);
1697 self.retry_by_peer.remove(&peer_id);
1698 self.response_channels.remove(&peer_id);
1699 self.drop_pending_inbound_messages(&peer_id);
1700 }
1701 }
1702
1703 self.swarm
1704 .behaviour_mut()
1705 .close_connections(&peer_id, None);
1706 }
1707 BehaviourEvent::ReqresMessage { peer_id, message } => {
1708 let (message_data, is_request, response_channel) =
1709 match message {
1710 request_response::Message::Request {
1711 request,
1712 channel,
1713 ..
1714 } => (request.0, true, Some(channel)),
1715 request_response::Message::Response {
1716 response,
1717 ..
1718 } => (response.0, false, None),
1719 };
1720
1721 if message_data.len() > self.max_app_message_bytes {
1722 warn!(
1723 target: TARGET,
1724 peer_id = %peer_id,
1725 size = message_data.len(),
1726 max = self.max_app_message_bytes,
1727 "inbound payload dropped: message too large",
1728 );
1729 if let Some(metrics) = self.metric_handle() {
1730 metrics.inc_oversized_inbound_drop();
1731 }
1732 self.swarm
1733 .behaviour_mut()
1734 .close_connections(&peer_id, None);
1735 self.refresh_runtime_metrics();
1736 return;
1737 }
1738
1739 if is_request {
1740 if let Some(metrics) = self.metric_handle() {
1741 metrics.inc_reqres_request_received();
1742 }
1743 trace!(target: TARGET, peer_id = %peer_id, "request received");
1744 if let Some(channel) = response_channel {
1745 self.add_ephemeral_response(peer_id, channel);
1746 }
1747 } else {
1748 if let Some(metrics) = self.metric_handle() {
1749 metrics.inc_reqres_response_received();
1750 }
1751 trace!(target: TARGET, peer_id = %peer_id, "response received");
1752 }
1753
1754 if self.peer_identify.contains(&peer_id) {
1755 self.message_to_helper(
1756 MessagesHelper::Single(message_data),
1757 &peer_id,
1758 )
1759 .await;
1760 } else {
1761 self.add_pending_inbound_message(
1762 peer_id,
1763 message_data,
1764 );
1765 }
1766 }
1767 BehaviourEvent::ReqresFailure {
1768 peer_id,
1769 direction,
1770 kind,
1771 } => {
1772 if let Some(metrics) = self.metric_handle() {
1773 metrics.observe_reqres_failure(
1774 direction.as_metric_label(),
1775 kind.as_metric_label(),
1776 );
1777 }
1778 debug!(
1779 target: TARGET,
1780 peer_id = %peer_id,
1781 direction = direction.as_metric_label(),
1782 kind = kind.as_metric_label(),
1783 "request-response failure"
1784 );
1785 }
1786 BehaviourEvent::ClosestPeer { peer_id, info } => {
1787 if matches!(
1788 self.peer_action.get(&peer_id),
1789 Some(Action::Discover)
1790 ) {
1791 self.peer_action.remove(&peer_id);
1792 if let Some(info) = info {
1793 let addr = info
1794 .addrs
1795 .iter()
1796 .filter(|x| {
1797 !self
1798 .swarm
1799 .behaviour()
1800 .is_invalid_address(x)
1801 })
1802 .cloned()
1803 .collect::<Vec<Multiaddr>>();
1804
1805 if addr.is_empty() {
1806 self.schedule_retry(
1807 peer_id,
1808 ScheduleType::Discover,
1809 );
1810 } else {
1811 self.schedule_retry(
1812 peer_id,
1813 ScheduleType::Dial(addr),
1814 );
1815 }
1816 } else {
1817 self.schedule_retry(
1818 peer_id,
1819 ScheduleType::Discover,
1820 );
1821 };
1822 }
1823 }
1824 BehaviourEvent::Dummy => {
1825 }
1827 }
1828 }
1829 SwarmEvent::OutgoingConnectionError {
1830 error,
1831 peer_id: Some(peer_id),
1832 ..
1833 } => {
1834 if matches!(self.peer_action.get(&peer_id), Some(Action::Dial))
1835 {
1836 self.peer_action.remove(&peer_id);
1837
1838 self.swarm.behaviour_mut().add_peer_to_remove(&peer_id);
1839
1840 if let Some((retry, new_address)) =
1841 self.handle_dial_error(error, &peer_id, false)
1842 {
1843 if retry {
1844 let addr = new_address
1845 .iter()
1846 .filter(|x| {
1847 !self
1848 .swarm
1849 .behaviour()
1850 .is_invalid_address(x)
1851 })
1852 .cloned()
1853 .collect::<Vec<Multiaddr>>();
1854
1855 if addr.is_empty() {
1856 self.schedule_retry(
1857 peer_id,
1858 ScheduleType::Discover,
1859 );
1860 } else {
1861 self.schedule_retry(
1862 peer_id,
1863 ScheduleType::Dial(addr),
1864 );
1865 }
1866 } else {
1867 self.schedule_retry(
1868 peer_id,
1869 ScheduleType::Discover,
1870 );
1871 }
1872 };
1873 }
1874 }
1875 SwarmEvent::ConnectionClosed {
1876 peer_id,
1877 connection_id,
1878 num_established: 0,
1879 ..
1880 } => {
1881 if let Some(Action::Identified(id)) =
1882 self.peer_action.get(&peer_id)
1883 && connection_id == *id
1884 {
1885 self.peer_action.remove(&peer_id);
1886
1887 self.peer_identify.remove(&peer_id);
1888 self.drop_pending_inbound_messages(&peer_id);
1889 self.response_channels.remove(&peer_id);
1890
1891 self.retry_by_peer.remove(&peer_id);
1892
1893 if self
1894 .pending_outbound_messages
1895 .get(&peer_id)
1896 .is_some_and(|q| !q.is_empty())
1897 {
1898 self.schedule_retry(
1899 peer_id,
1900 ScheduleType::Dial(vec![]),
1901 );
1902 }
1903 } else if let Some(Action::Dial | Action::Discover) =
1904 self.peer_action.get(&peer_id)
1905 {
1906 self.peer_action.remove(&peer_id);
1907 self.retry_by_peer.remove(&peer_id);
1908 self.drop_pending_inbound_messages(&peer_id);
1909 self.response_channels.remove(&peer_id);
1910 self.peer_identify.remove(&peer_id);
1911
1912 if self
1913 .pending_outbound_messages
1914 .get(&peer_id)
1915 .is_some_and(|q| !q.is_empty())
1916 {
1917 self.schedule_retry(peer_id, ScheduleType::Discover);
1918 }
1919 }
1920 }
1921 SwarmEvent::IncomingConnectionError { .. } => {
1922 }
1927 SwarmEvent::ExpiredListenAddr { address, .. } => {
1928 warn!(target: TARGET, addr = %address, "listen address expired");
1929 }
1930 SwarmEvent::ListenerError { error, .. } => {
1931 error!(target: TARGET, error = %error, "listener error");
1932 }
1933 SwarmEvent::ConnectionEstablished {
1934 peer_id,
1935 connection_id,
1936 num_established,
1937 ..
1938 } if num_established.get() > 1 => {
1939 debug!(target: TARGET, peer_id = %peer_id, "duplicate connection detected; closing excess");
1940 self.swarm
1941 .behaviour_mut()
1942 .close_connections(&peer_id, Some(connection_id));
1943 }
1944 SwarmEvent::IncomingConnection { .. }
1945 | SwarmEvent::ListenerClosed { .. }
1946 | SwarmEvent::Dialing { .. }
1947 | SwarmEvent::NewExternalAddrCandidate { .. }
1948 | SwarmEvent::ExternalAddrConfirmed { .. }
1949 | SwarmEvent::ExternalAddrExpired { .. }
1950 | SwarmEvent::NewExternalAddrOfPeer { .. }
1951 | SwarmEvent::NewListenAddr { .. } => {
1952 }
1954 _ => {}
1955 }
1956 self.refresh_runtime_metrics();
1957 }
1958}
1959
1960#[cfg(test)]
1961mod tests {
1962
1963 use crate::routing::RoutingNode;
1964
1965 use super::*;
1966 use libp2p::core::{ConnectedPoint, Endpoint, transport::PortUse};
1967 use libp2p::identity::Keypair as Libp2pKeypair;
1968 use libp2p::swarm::ConnectionId;
1969 use prometheus_client::{encoding::text::encode, registry::Registry};
1970 use serde::Deserialize;
1971 use test_log::test;
1972
1973 use ave_common::identity::{KeyPair, keys::Ed25519Signer};
1974
1975 use serial_test::serial;
1976
1977 #[derive(Debug, Serialize, Deserialize)]
1978 pub struct Dummy;
1979
1980 fn metric_value(metrics: &str, name: &str) -> f64 {
1981 metrics
1982 .lines()
1983 .find_map(|line| {
1984 if line.starts_with(name) {
1985 line.split_whitespace().nth(1)?.parse::<f64>().ok()
1986 } else {
1987 None
1988 }
1989 })
1990 .unwrap_or(0.0)
1991 }
1992
1993 fn build_worker(
1995 boot_nodes: Vec<RoutingNode>,
1996 random_walk: bool,
1997 node_type: NodeType,
1998 graceful_token: CancellationToken,
1999 crash_token: CancellationToken,
2000 memory_addr: String,
2001 ) -> NetworkWorker<Dummy> {
2002 let config = create_config(
2003 boot_nodes,
2004 random_walk,
2005 node_type,
2006 vec![memory_addr],
2007 );
2008 let keys = KeyPair::Ed25519(Ed25519Signer::generate().unwrap());
2009
2010 NetworkWorker::new(
2011 &keys,
2012 config,
2013 false,
2014 NetworkWorkerRuntime {
2015 monitor: None,
2016 graceful_token,
2017 crash_token,
2018 machine_spec: None,
2019 metrics: None,
2020 },
2021 )
2022 .unwrap()
2023 }
2024
2025 fn create_config(
2027 boot_nodes: Vec<RoutingNode>,
2028 random_walk: bool,
2029 node_type: NodeType,
2030 listen_addresses: Vec<String>,
2031 ) -> Config {
2032 let config = crate::routing::Config::default()
2033 .with_discovery_limit(50)
2034 .with_dht_random_walk(random_walk);
2035
2036 Config {
2037 boot_nodes,
2038 node_type,
2039 routing: config,
2040 external_addresses: vec![],
2041 listen_addresses,
2042 ..Default::default()
2043 }
2044 }
2045
2046 fn build_identified_event(
2047 peer_id: PeerId,
2048 public_key: libp2p::identity::PublicKey,
2049 connection_id: ConnectionId,
2050 ) -> SwarmEvent<BehaviourEvent> {
2051 SwarmEvent::Behaviour(BehaviourEvent::Identified {
2052 peer_id,
2053 info: Box::new(identify::Info {
2054 public_key,
2055 protocol_version: IDENTIFY_PROTOCOL.to_owned(),
2056 agent_version: "test-agent".to_owned(),
2057 listen_addrs: vec!["/memory/9999".parse().expect("multiaddr")],
2058 protocols: vec![StreamProtocol::new(REQRES_PROTOCOL)],
2059 observed_addr: "/memory/9998".parse().expect("multiaddr"),
2060 signed_peer_record: None,
2061 }),
2062 connection_id,
2063 })
2064 }
2065
2066 fn test_endpoint() -> ConnectedPoint {
2067 ConnectedPoint::Dialer {
2068 address: "/memory/9997".parse().expect("multiaddr"),
2069 role_override: Endpoint::Dialer,
2070 port_use: PortUse::New,
2071 }
2072 }
2073
2074 #[test]
2075 fn outbound_queue_respects_bytes_limit_and_updates_metrics() {
2076 let mut config = create_config(
2077 vec![],
2078 false,
2079 NodeType::Addressable,
2080 vec!["/memory/3100".to_owned()],
2081 );
2082 config.max_pending_outbound_bytes_per_peer = 16;
2083
2084 let keys = KeyPair::Ed25519(Ed25519Signer::generate().unwrap());
2085 let mut registry = Registry::default();
2086 let metrics = crate::metrics::register(&mut registry);
2087 let mut worker: NetworkWorker<Dummy> = NetworkWorker::new(
2088 &keys,
2089 config,
2090 false,
2091 NetworkWorkerRuntime {
2092 monitor: None,
2093 graceful_token: CancellationToken::new(),
2094 crash_token: CancellationToken::new(),
2095 machine_spec: None,
2096 metrics: Some(metrics),
2097 },
2098 )
2099 .expect("worker");
2100
2101 let peer = PeerId::random();
2102 worker.add_pending_outbound_message(
2103 peer,
2104 Bytes::from_static(b"aaaaaaaaaaaa"), );
2106 worker.add_pending_outbound_message(
2107 peer,
2108 Bytes::from_static(b"bbbbbbbbbbbb"), );
2110 worker.add_pending_outbound_message(
2111 peer,
2112 Bytes::from_static(b"cccc"), );
2114
2115 let queue = worker
2116 .pending_outbound_messages
2117 .get(&peer)
2118 .expect("queue exists");
2119 assert_eq!(queue.len(), 2);
2120 assert_eq!(queue.bytes_len(), 16);
2121
2122 let mut text = String::new();
2123 encode(&mut text, ®istry).expect("encode metrics");
2124
2125 assert_eq!(
2126 metric_value(
2127 &text,
2128 "network_messages_dropped_total{direction=\"outbound\",reason=\"queue_bytes_limit_per_peer\"}"
2129 ),
2130 1.0
2131 );
2132 assert_eq!(
2133 metric_value(
2134 &text,
2135 "network_messages_dropped_total{direction=\"outbound\",reason=\"queue_bytes_limit_global\"}"
2136 ),
2137 0.0
2138 );
2139 assert_eq!(metric_value(&text, "network_pending_outbound_bytes"), 16.0);
2140 assert!(
2141 metric_value(&text, "network_pending_message_age_seconds_count")
2142 >= 1.0
2143 );
2144 }
2145
2146 #[test]
2147 fn zero_pending_bytes_limits_disable_byte_drops() {
2148 let mut config = create_config(
2149 vec![],
2150 false,
2151 NodeType::Addressable,
2152 vec!["/memory/3101".to_owned()],
2153 );
2154 config.max_pending_outbound_bytes_per_peer = 0;
2155 config.max_pending_inbound_bytes_per_peer = 0;
2156 config.max_pending_outbound_bytes_total = 0;
2157 config.max_pending_inbound_bytes_total = 0;
2158
2159 let keys = KeyPair::Ed25519(Ed25519Signer::generate().unwrap());
2160 let mut registry = Registry::default();
2161 let metrics = crate::metrics::register(&mut registry);
2162 let mut worker: NetworkWorker<Dummy> = NetworkWorker::new(
2163 &keys,
2164 config,
2165 false,
2166 NetworkWorkerRuntime {
2167 monitor: None,
2168 graceful_token: CancellationToken::new(),
2169 crash_token: CancellationToken::new(),
2170 machine_spec: None,
2171 metrics: Some(metrics),
2172 },
2173 )
2174 .expect("worker");
2175
2176 let peer = PeerId::random();
2177 for i in 0..3u8 {
2178 let payload = Bytes::from(vec![i + 1; 12]);
2179 worker.add_pending_outbound_message(peer, payload);
2180 }
2181 for i in 0..3u8 {
2182 let payload = Bytes::from(vec![i + 7; 12]);
2183 worker.add_pending_inbound_message(peer, payload);
2184 }
2185
2186 let outbound = worker
2187 .pending_outbound_messages
2188 .get(&peer)
2189 .expect("outbound queue exists");
2190 let inbound = worker
2191 .pending_inbound_messages
2192 .get(&peer)
2193 .expect("inbound queue exists");
2194 assert_eq!(outbound.len(), 3);
2195 assert_eq!(outbound.bytes_len(), 36);
2196 assert_eq!(inbound.len(), 3);
2197 assert_eq!(inbound.bytes_len(), 36);
2198
2199 let mut text = String::new();
2200 encode(&mut text, ®istry).expect("encode metrics");
2201 assert_eq!(
2202 metric_value(
2203 &text,
2204 "network_messages_dropped_total{direction=\"outbound\",reason=\"queue_bytes_limit_per_peer\"}"
2205 ),
2206 0.0
2207 );
2208 assert_eq!(
2209 metric_value(
2210 &text,
2211 "network_messages_dropped_total{direction=\"outbound\",reason=\"queue_bytes_limit_global\"}"
2212 ),
2213 0.0
2214 );
2215 assert_eq!(
2216 metric_value(
2217 &text,
2218 "network_messages_dropped_total{direction=\"inbound\",reason=\"queue_bytes_limit_per_peer\"}"
2219 ),
2220 0.0
2221 );
2222 assert_eq!(
2223 metric_value(
2224 &text,
2225 "network_messages_dropped_total{direction=\"inbound\",reason=\"queue_bytes_limit_global\"}"
2226 ),
2227 0.0
2228 );
2229 }
2230
2231 #[test]
2232 fn outbound_global_bytes_limit_applies_across_peers() {
2233 let mut config = create_config(
2234 vec![],
2235 false,
2236 NodeType::Addressable,
2237 vec!["/memory/3102".to_owned()],
2238 );
2239 config.max_pending_outbound_bytes_per_peer = 0;
2240 config.max_pending_outbound_bytes_total = 20;
2241
2242 let keys = KeyPair::Ed25519(Ed25519Signer::generate().unwrap());
2243 let mut registry = Registry::default();
2244 let metrics = crate::metrics::register(&mut registry);
2245 let mut worker: NetworkWorker<Dummy> = NetworkWorker::new(
2246 &keys,
2247 config,
2248 false,
2249 NetworkWorkerRuntime {
2250 monitor: None,
2251 graceful_token: CancellationToken::new(),
2252 crash_token: CancellationToken::new(),
2253 machine_spec: None,
2254 metrics: Some(metrics),
2255 },
2256 )
2257 .expect("worker");
2258
2259 let peer_a = PeerId::random();
2260 let peer_b = PeerId::random();
2261
2262 worker.add_pending_outbound_message(
2263 peer_a,
2264 Bytes::from_static(b"aaaaaaaaaaaa"), );
2266 worker.add_pending_outbound_message(
2267 peer_b,
2268 Bytes::from_static(b"bbbbbbbbbbbb"), );
2270
2271 assert_eq!(worker.pending_outbound_bytes_len(), 12);
2272 assert_eq!(
2273 worker
2274 .pending_outbound_messages
2275 .get(&peer_a)
2276 .expect("peer_a queue")
2277 .len(),
2278 1
2279 );
2280 assert_eq!(
2281 worker
2282 .pending_outbound_messages
2283 .get(&peer_b)
2284 .map_or(0, PendingQueue::len),
2285 0
2286 );
2287
2288 let mut text = String::new();
2289 encode(&mut text, ®istry).expect("encode metrics");
2290 assert_eq!(
2291 metric_value(
2292 &text,
2293 "network_messages_dropped_total{direction=\"outbound\",reason=\"queue_bytes_limit_per_peer\"}"
2294 ),
2295 0.0
2296 );
2297 assert_eq!(
2298 metric_value(
2299 &text,
2300 "network_messages_dropped_total{direction=\"outbound\",reason=\"queue_bytes_limit_global\"}"
2301 ),
2302 1.0
2303 );
2304 }
2305
2306 #[test]
2307 fn dial_failures_include_phase_label() {
2308 let mut registry = Registry::default();
2309 let metrics = crate::metrics::register(&mut registry);
2310
2311 metrics.observe_dial_failure("bootstrap", "transport");
2312 metrics.observe_dial_failure("runtime", "denied");
2313
2314 let mut text = String::new();
2315 encode(&mut text, ®istry).expect("encode metrics");
2316
2317 assert_eq!(
2318 metric_value(
2319 &text,
2320 "network_dial_failures_total{phase=\"bootstrap\",kind=\"transport\"}"
2321 ),
2322 1.0
2323 );
2324 assert_eq!(
2325 metric_value(
2326 &text,
2327 "network_dial_failures_total{phase=\"runtime\",kind=\"denied\"}"
2328 ),
2329 1.0
2330 );
2331 }
2332
2333 #[test]
2334 fn bootstrap_duration_is_labeled_by_result() {
2335 let mut registry = Registry::default();
2336 let metrics = crate::metrics::register(&mut registry);
2337
2338 metrics.observe_bootstrap_duration_seconds("success", 0.5);
2339 metrics.observe_bootstrap_duration_seconds("failure", 1.25);
2340
2341 let mut text = String::new();
2342 encode(&mut text, ®istry).expect("encode metrics");
2343
2344 assert_eq!(
2345 metric_value(
2346 &text,
2347 "network_bootstrap_duration_seconds_count{result=\"success\"}"
2348 ),
2349 1.0
2350 );
2351 assert_eq!(
2352 metric_value(
2353 &text,
2354 "network_bootstrap_duration_seconds_count{result=\"failure\"}"
2355 ),
2356 1.0
2357 );
2358 }
2359
2360 #[test]
2361 fn network_state_is_one_hot() {
2362 let mut registry = Registry::default();
2363 let metrics = crate::metrics::register(&mut registry);
2364
2365 metrics.set_state_current(&NetworkState::Running);
2366
2367 let mut text = String::new();
2368 encode(&mut text, ®istry).expect("encode metrics");
2369
2370 assert_eq!(
2371 metric_value(&text, "network_state{state=\"running\"}"),
2372 1.0
2373 );
2374 assert_eq!(metric_value(&text, "network_state{state=\"start\"}"), 0.0);
2375 assert_eq!(metric_value(&text, "network_state{state=\"dial\"}"), 0.0);
2376 assert_eq!(
2377 metric_value(&text, "network_state{state=\"dialing\"}"),
2378 0.0
2379 );
2380 assert_eq!(
2381 metric_value(&text, "network_state{state=\"disconnected\"}"),
2382 0.0
2383 );
2384 }
2385
2386 #[test(tokio::test)]
2387 #[serial]
2388 async fn pending_inbound_messages_keep_arrival_order_after_identify() {
2389 let mut worker = build_worker(
2390 vec![],
2391 false,
2392 NodeType::Addressable,
2393 CancellationToken::new(),
2394 CancellationToken::new(),
2395 "/memory/3200".to_owned(),
2396 );
2397 let remote_keys = Libp2pKeypair::generate_ed25519();
2398 let remote_peer = remote_keys.public().to_peer_id();
2399
2400 worker.add_pending_inbound_message(
2401 remote_peer,
2402 Bytes::from_static(b"msg-2"),
2403 );
2404 worker.add_pending_inbound_message(
2405 remote_peer,
2406 Bytes::from_static(b"msg-1"),
2407 );
2408 worker.add_pending_inbound_message(
2409 remote_peer,
2410 Bytes::from_static(b"msg-3"),
2411 );
2412
2413 let (helper_sender, mut helper_rx) = mpsc::channel(8);
2414 worker.add_helper_sender(helper_sender);
2415
2416 worker
2417 .handle_event(build_identified_event(
2418 remote_peer,
2419 remote_keys.public(),
2420 ConnectionId::new_unchecked(11),
2421 ))
2422 .await;
2423
2424 let mut received = Vec::new();
2425 for _ in 0..3 {
2426 let command = tokio::time::timeout(
2427 Duration::from_millis(200),
2428 helper_rx.recv(),
2429 )
2430 .await
2431 .expect("helper receive timeout")
2432 .expect("helper channel closed");
2433 let CommandHelper::ReceivedMessage { message, .. } = command else {
2434 panic!("unexpected helper command")
2435 };
2436 received.push(message);
2437 }
2438
2439 assert_eq!(
2440 received,
2441 vec![
2442 Bytes::from_static(b"msg-2"),
2443 Bytes::from_static(b"msg-1"),
2444 Bytes::from_static(b"msg-3"),
2445 ]
2446 );
2447 assert!(!worker.pending_inbound_messages.contains_key(&remote_peer));
2448 }
2449
2450 #[test(tokio::test)]
2451 #[serial]
2452 async fn flapping_connection_retries_then_flushes_outbound_queue() {
2453 let mut worker = build_worker(
2454 vec![],
2455 false,
2456 NodeType::Addressable,
2457 CancellationToken::new(),
2458 CancellationToken::new(),
2459 "/memory/3201".to_owned(),
2460 );
2461 let remote_keys = Libp2pKeypair::generate_ed25519();
2462 let remote_peer = remote_keys.public().to_peer_id();
2463 let first_connection = ConnectionId::new_unchecked(21);
2464 let second_connection = ConnectionId::new_unchecked(22);
2465
2466 worker.add_pending_outbound_message(
2467 remote_peer,
2468 Bytes::from_static(b"needs-redelivery"),
2469 );
2470 worker
2471 .peer_action
2472 .insert(remote_peer, Action::Identified(first_connection));
2473 worker.peer_identify.insert(remote_peer);
2474
2475 worker
2476 .handle_event(SwarmEvent::ConnectionClosed {
2477 peer_id: remote_peer,
2478 connection_id: first_connection,
2479 endpoint: test_endpoint(),
2480 num_established: 0,
2481 cause: None,
2482 })
2483 .await;
2484
2485 assert!(worker.retry_by_peer.contains_key(&remote_peer));
2486 assert!(matches!(
2487 worker.retry_by_peer.get(&remote_peer).map(|s| s.kind),
2488 Some(RetryKind::Dial)
2489 ));
2490 assert!(
2491 worker
2492 .pending_outbound_messages
2493 .get(&remote_peer)
2494 .is_some_and(|q| !q.is_empty())
2495 );
2496
2497 worker
2498 .handle_event(build_identified_event(
2499 remote_peer,
2500 remote_keys.public(),
2501 second_connection,
2502 ))
2503 .await;
2504
2505 assert!(!worker.pending_outbound_messages.contains_key(&remote_peer));
2506 assert!(!worker.retry_by_peer.contains_key(&remote_peer));
2507 assert!(matches!(
2508 worker.peer_action.get(&remote_peer),
2509 Some(Action::Identified(id)) if *id == second_connection
2510 ));
2511 }
2512
2513 #[test(tokio::test)]
2514 #[serial]
2515 async fn bootstrap_identify_timeout_keeps_bootnode_until_close() {
2516 let remote_keys = Libp2pKeypair::generate_ed25519();
2517 let remote_peer = remote_keys.public().to_peer_id();
2518 let boot_node = RoutingNode {
2519 peer_id: remote_peer.to_string(),
2520 address: vec!["/memory/3300".to_owned()],
2521 };
2522
2523 let mut worker = build_worker(
2524 vec![boot_node],
2525 false,
2526 NodeType::Addressable,
2527 CancellationToken::new(),
2528 CancellationToken::new(),
2529 "/memory/3301".to_owned(),
2530 );
2531
2532 assert!(worker.boot_nodes.contains_key(&remote_peer));
2533 assert!(!worker.retry_boot_nodes.contains_key(&remote_peer));
2534
2535 worker
2536 .handle_connection_events(SwarmEvent::Behaviour(
2537 BehaviourEvent::IdentifyError {
2538 peer_id: remote_peer,
2539 error: swarm::StreamUpgradeError::Timeout,
2540 },
2541 ))
2542 .await;
2543
2544 assert!(worker.boot_nodes.contains_key(&remote_peer));
2545 assert!(!worker.retry_boot_nodes.contains_key(&remote_peer));
2546 }
2547
2548 #[test(tokio::test)]
2549 #[serial]
2550 async fn test_no_boot_nodes() {
2551 let boot_nodes = vec![];
2552
2553 let node_addr = "/memory/3000";
2555 let mut node = build_worker(
2556 boot_nodes.clone(),
2557 false,
2558 NodeType::Addressable,
2559 CancellationToken::new(),
2560 CancellationToken::new(),
2561 node_addr.to_owned(),
2562 );
2563 if let Err(e) = node.run_connection().await {
2564 assert_eq!(
2565 e.to_string(),
2566 "cannot connect to the ave network: no reachable bootstrap node"
2567 );
2568 };
2569
2570 assert_eq!(node.state, NetworkState::Disconnected);
2571 }
2572
2573 #[test(tokio::test)]
2574 #[serial]
2575 async fn test_fake_boot_node() {
2576 let mut boot_nodes = vec![];
2577
2578 let fake_boot_peer = PeerId::random();
2580 let fake_boot_addr = "/memory/3001";
2581 let fake_node = RoutingNode {
2582 peer_id: fake_boot_peer.to_string(),
2583 address: vec![fake_boot_addr.to_owned()],
2584 };
2585 boot_nodes.push(fake_node);
2586
2587 let node_addr = "/memory/3002";
2589 let mut node = build_worker(
2590 boot_nodes.clone(),
2591 false,
2592 NodeType::Addressable,
2593 CancellationToken::new(),
2594 CancellationToken::new(),
2595 node_addr.to_owned(),
2596 );
2597
2598 if let Err(e) = node.run_connection().await {
2599 assert_eq!(
2600 e.to_string(),
2601 "cannot connect to the ave network: no reachable bootstrap node"
2602 );
2603 };
2604
2605 assert_eq!(node.state, NetworkState::Disconnected);
2606 }
2607
2608 #[test(tokio::test)]
2609 #[serial]
2610 async fn test_connect() {
2611 let mut boot_nodes = vec![];
2612
2613 let boot_addr = "/memory/3003";
2615 let mut boot = build_worker(
2616 boot_nodes.clone(),
2617 false,
2618 NodeType::Bootstrap,
2619 CancellationToken::new(),
2620 CancellationToken::new(),
2621 boot_addr.to_owned(),
2622 );
2623
2624 let boot_node = RoutingNode {
2625 peer_id: boot.local_peer_id().to_string(),
2626 address: vec![boot_addr.to_owned()],
2627 };
2628
2629 boot_nodes.push(boot_node);
2630
2631 let node_addr = "/memory/3004";
2633 let mut node = build_worker(
2634 boot_nodes,
2635 false,
2636 NodeType::Ephemeral,
2637 CancellationToken::new(),
2638 CancellationToken::new(),
2639 node_addr.to_owned(),
2640 );
2641
2642 tokio::spawn(async move {
2644 boot.run_main().await;
2645 });
2646
2647 node.run_connection().await.unwrap();
2649 }
2650}