1use crate::{
5 Command, CommandHelper, Config, Error, Event as NetworkEvent, MachineSpec,
6 Monitor, MonitorMessage, NodeType, ResolvedSpec,
7 behaviour::{Behaviour, Event as BehaviourEvent, ReqResMessage},
8 metrics::NetworkMetrics,
9 resolve_spec,
10 service::NetworkService,
11 transport::build_transport,
12 utils::{
13 Action, Due, IDENTIFY_PROTOCOL, LimitsConfig, MessagesHelper,
14 NetworkState, REQRES_PROTOCOL, RetryKind, RetryState, ScheduleType,
15 convert_addresses, convert_boot_nodes, peer_id_to_ed25519_pubkey_bytes,
16 },
17};
18
19use std::{
20 collections::{BinaryHeap, HashSet},
21 fmt::Debug,
22 num::{NonZeroU8, NonZeroUsize},
23 pin::Pin,
24 sync::Arc,
25 time::Duration,
26};
27
28use ave_actors::ActorRef;
29use ave_common::identity::KeyPair;
30
31use libp2p::{
32 Multiaddr, PeerId, StreamProtocol, Swarm, identify,
33 identity::{Keypair, ed25519},
34 request_response::{self, ResponseChannel},
35 swarm::{self, DialError, SwarmEvent, dial_opts::DialOpts},
36};
37
38use futures::StreamExt;
39use serde::Serialize;
40use tokio::{
41 sync::mpsc,
42 time::{Instant, Sleep, sleep, sleep_until},
43};
44use tokio_util::sync::CancellationToken;
45use tracing::{debug, error, info, trace, warn};
46
47use bytes::Bytes;
48use std::collections::{HashMap, VecDeque};
49
50const TARGET: &str = "ave::network::worker";
51
52const MAX_PENDING_MESSAGES_PER_PEER: usize = 100;
55
56#[derive(Default)]
60struct PendingQueue {
61 messages: VecDeque<PendingMessage>,
62 pending_bytes: usize,
63}
64
65struct PendingMessage {
66 payload: Bytes,
67 enqueued_at: Instant,
68}
69
70impl PendingQueue {
71 fn contains(&self, message: &Bytes) -> bool {
72 self.messages.iter().any(|x| x.payload == *message)
73 }
74
75 fn pop_front(&mut self) -> Option<PendingMessage> {
76 let popped = self.messages.pop_front()?;
77 self.pending_bytes =
78 self.pending_bytes.saturating_sub(popped.payload.len());
79 Some(popped)
80 }
81
82 fn push_back(&mut self, message: Bytes) {
83 self.pending_bytes += message.len();
84 self.messages.push_back(PendingMessage {
85 payload: message,
86 enqueued_at: Instant::now(),
87 });
88 }
89
90 fn drain(&mut self) -> impl Iterator<Item = PendingMessage> + '_ {
91 self.pending_bytes = 0;
92 self.messages.drain(..)
93 }
94
95 fn is_empty(&self) -> bool {
96 self.messages.is_empty()
97 }
98
99 fn len(&self) -> usize {
100 self.messages.len()
101 }
102
103 const fn bytes_len(&self) -> usize {
104 self.pending_bytes
105 }
106}
107
108pub struct NetworkWorker<T>
113where
114 T: Debug + Serialize,
115{
116 local_peer_id: PeerId,
118
119 service: NetworkService,
121
122 swarm: Swarm<Behaviour>,
124
125 state: NetworkState,
127
128 command_receiver: mpsc::Receiver<Command>,
130
131 helper_sender: Option<mpsc::Sender<CommandHelper<T>>>,
133
134 monitor: Option<ActorRef<Monitor>>,
136
137 graceful_token: CancellationToken,
139 crash_token: CancellationToken,
140
141 node_type: NodeType,
143
144 safe_mode: bool,
146
147 boot_nodes: HashMap<PeerId, Vec<Multiaddr>>,
149
150 retry_boot_nodes: HashMap<PeerId, Vec<Multiaddr>>,
152
153 pending_outbound_messages: HashMap<PeerId, PendingQueue>,
155
156 pending_inbound_messages: HashMap<PeerId, PendingQueue>,
157
158 response_channels:
160 HashMap<PeerId, VecDeque<ResponseChannel<ReqResMessage>>>,
161
162 successful_dials: u64,
164
165 peer_identify: HashSet<PeerId>,
166
167 retry_by_peer: HashMap<PeerId, RetryState>,
168
169 retry_queue: BinaryHeap<Due>,
170
171 retry_timer: Option<Pin<Box<Sleep>>>,
172
173 peer_action: HashMap<PeerId, Action>,
174
175 max_app_message_bytes: usize,
176 max_pending_outbound_bytes_per_peer: usize,
177 max_pending_inbound_bytes_per_peer: usize,
178 max_pending_outbound_bytes_total: usize,
179 max_pending_inbound_bytes_total: usize,
180
181 metrics: Option<Arc<NetworkMetrics>>,
182}
183
184impl<T: Debug + Serialize> NetworkWorker<T> {
185 pub fn new(
187 keys: &KeyPair,
188 config: Config,
189 monitor: Option<ActorRef<Monitor>>,
190 graceful_token: CancellationToken,
191 crash_token: CancellationToken,
192 machine_spec: Option<MachineSpec>,
193 metrics: Option<Arc<NetworkMetrics>>,
194 ) -> Result<Self, Error> {
195 info!(target: TARGET, "network initialising");
197 let (command_sender, command_receiver) = mpsc::channel(512);
198
199 let key = match keys {
200 KeyPair::Ed25519(ed25519_signer) => {
201 let sk_bytes = ed25519_signer
202 .secret_key_bytes()
203 .map_err(|e| Error::KeyExtraction(e.to_string()))?;
204
205 let sk = ed25519::SecretKey::try_from_bytes(sk_bytes)
206 .map_err(|e| Error::KeyExtraction(e.to_string()))?;
207
208 let kp = ed25519::Keypair::from(sk);
209 Keypair::from(kp)
210 }
211 };
212
213 let local_peer_id = key.public().to_peer_id();
215
216 let boot_nodes = convert_boot_nodes(&config.boot_nodes);
217
218 let addresses = convert_addresses(&config.listen_addresses)?;
220
221 let external_addresses = convert_addresses(&config.external_addresses)?;
223
224 let node_type = config.node_type.clone();
225 let safe_mode = config.safe_mode;
226
227 let ResolvedSpec { ram_mb, cpu_cores } = resolve_spec(machine_spec);
229
230 let limits = LimitsConfig::build(ram_mb, cpu_cores);
231 let max_app_message_bytes = config.max_app_message_bytes;
232 let max_pending_outbound_bytes_per_peer =
233 config.max_pending_outbound_bytes_per_peer;
234 let max_pending_inbound_bytes_per_peer =
235 config.max_pending_inbound_bytes_per_peer;
236 let max_pending_outbound_bytes_total =
237 config.max_pending_outbound_bytes_total;
238 let max_pending_inbound_bytes_total =
239 config.max_pending_inbound_bytes_total;
240
241 let transport = build_transport(&key, limits.clone())?;
243
244 let behaviour = Behaviour::new(
245 &key.public(),
246 config,
247 graceful_token.clone(),
248 crash_token.clone(),
249 limits,
250 metrics.clone(),
251 );
252
253 let mut swarm = Swarm::new(
255 transport,
256 behaviour,
257 local_peer_id,
258 swarm::Config::with_tokio_executor()
259 .with_idle_connection_timeout(Duration::from_secs(90))
260 .with_max_negotiating_inbound_streams(32)
261 .with_notify_handler_buffer_size(
262 NonZeroUsize::new(32).expect("32 > 0"),
263 )
264 .with_per_connection_event_buffer_size(16)
265 .with_dial_concurrency_factor(
266 NonZeroU8::new(2).expect("2 > 0"),
267 ),
268 );
269
270 let service = NetworkService::new(command_sender);
271
272 if addresses.is_empty() {
273 swarm
275 .listen_on(
276 "/ip4/0.0.0.0/tcp/0"
277 .parse::<Multiaddr>()
278 .map_err(|e| Error::InvalidAddress(e.to_string()))?,
279 )
280 .map_err(|e| Error::Listen(format!("0.0.0.0:0: {e}")))?;
281 info!(target: TARGET, "listening on all interfaces");
282 } else {
283 for addr in addresses.iter() {
285 info!(target: TARGET, addr = %addr, "listening on address");
286 swarm
287 .listen_on(addr.clone())
288 .map_err(|e| Error::Listen(format!("{addr}: {e}")))?;
289 }
290 }
291
292 if !external_addresses.is_empty() {
293 for addr in external_addresses.iter() {
294 debug!(target: TARGET, addr = %addr, "external address registered");
295 swarm.add_external_address(addr.clone());
296 }
297 }
298
299 info!(target: TARGET, peer_id = %local_peer_id, "local peer id");
300
301 let worker = Self {
302 local_peer_id,
303 service,
304 swarm,
305 state: NetworkState::Start,
306 command_receiver,
307 helper_sender: None,
308 monitor,
309 graceful_token,
310 crash_token,
311 node_type,
312 safe_mode,
313 boot_nodes,
314 retry_boot_nodes: HashMap::new(),
315 pending_outbound_messages: HashMap::default(),
316 pending_inbound_messages: HashMap::default(),
317 response_channels: HashMap::default(),
318 successful_dials: 0,
319 peer_identify: HashSet::new(),
320 retry_by_peer: HashMap::new(),
321 retry_queue: BinaryHeap::new(),
322 retry_timer: None,
323 peer_action: HashMap::new(),
324 max_app_message_bytes,
325 max_pending_outbound_bytes_per_peer,
326 max_pending_inbound_bytes_per_peer,
327 max_pending_outbound_bytes_total,
328 max_pending_inbound_bytes_total,
329 metrics,
330 };
331
332 if let Some(metrics) = worker.metric_handle() {
333 metrics.set_state_current(&worker.state);
334 }
335 worker.refresh_runtime_metrics();
336
337 Ok(worker)
338 }
339
340 fn metric_handle(&self) -> Option<&NetworkMetrics> {
341 self.metrics.as_deref()
342 }
343
344 const fn is_safe_mode(&self) -> bool {
345 self.safe_mode
346 }
347
348 fn pending_outbound_messages_len(&self) -> usize {
349 self.pending_outbound_messages
350 .values()
351 .map(PendingQueue::len)
352 .sum()
353 }
354
355 fn pending_outbound_bytes_len(&self) -> usize {
356 self.pending_outbound_messages
357 .values()
358 .map(PendingQueue::bytes_len)
359 .sum()
360 }
361
362 fn pending_inbound_messages_len(&self) -> usize {
363 self.pending_inbound_messages
364 .values()
365 .map(PendingQueue::len)
366 .sum()
367 }
368
369 fn pending_inbound_bytes_len(&self) -> usize {
370 self.pending_inbound_messages
371 .values()
372 .map(PendingQueue::bytes_len)
373 .sum()
374 }
375
376 fn pending_response_channels_len(&self) -> usize {
377 self.response_channels.values().map(VecDeque::len).sum()
378 }
379
380 fn refresh_runtime_metrics(&self) {
381 let Some(metrics) = self.metric_handle() else {
382 return;
383 };
384
385 metrics.set_retry_queue_len(self.retry_queue.len() as i64);
386 metrics.set_pending_outbound_peers(
387 self.pending_outbound_messages.len() as i64,
388 );
389 metrics.set_pending_outbound_messages(
390 self.pending_outbound_messages_len() as i64,
391 );
392 metrics.set_pending_outbound_bytes(
393 self.pending_outbound_bytes_len() as i64
394 );
395 metrics.set_pending_inbound_peers(
396 self.pending_inbound_messages.len() as i64
397 );
398 metrics.set_pending_inbound_messages(
399 self.pending_inbound_messages_len() as i64,
400 );
401 metrics
402 .set_pending_inbound_bytes(self.pending_inbound_bytes_len() as i64);
403 metrics.set_identified_peers(self.peer_identify.len() as i64);
404 metrics.set_response_channels_pending(
405 self.pending_response_channels_len() as i64,
406 );
407 }
408
409 fn observe_pending_message_age(&self, enqueued_at: Instant) {
410 if let Some(metrics) = self.metric_handle() {
411 metrics.observe_pending_message_age_seconds(
412 enqueued_at.elapsed().as_secs_f64(),
413 );
414 }
415 }
416
417 fn drop_pending_outbound_messages(&mut self, peer_id: &PeerId) -> usize {
418 let Some(mut queue) = self.pending_outbound_messages.remove(peer_id)
419 else {
420 return 0;
421 };
422
423 let mut dropped = 0usize;
424 for message in queue.drain() {
425 dropped += 1;
426 self.observe_pending_message_age(message.enqueued_at);
427 }
428 dropped
429 }
430
431 fn drop_pending_inbound_messages(&mut self, peer_id: &PeerId) {
432 if let Some(mut queue) = self.pending_inbound_messages.remove(peer_id) {
433 for message in queue.drain() {
434 self.observe_pending_message_age(message.enqueued_at);
435 }
436 }
437 }
438
439 fn observe_identify_error(
440 &self,
441 error: &swarm::StreamUpgradeError<identify::UpgradeError>,
442 ) {
443 let kind = match error {
444 swarm::StreamUpgradeError::Timeout => "timeout",
445 swarm::StreamUpgradeError::Io(_) => "io",
446 swarm::StreamUpgradeError::NegotiationFailed => "negotiation",
447 swarm::StreamUpgradeError::Apply(_) => "other",
448 };
449
450 if let Some(metrics) = self.metric_handle() {
451 metrics.observe_identify_error(kind);
452 }
453 }
454
455 fn schedule_retry(&mut self, peer: PeerId, schedule_type: ScheduleType) {
456 if self.is_safe_mode() {
457 self.peer_action.remove(&peer);
458 self.retry_by_peer.remove(&peer);
459 self.refresh_runtime_metrics();
460 return;
461 }
462
463 if self.peer_action.contains_key(&peer) {
464 return;
465 }
466
467 let (kind, addrs) = match schedule_type {
468 ScheduleType::Discover => (RetryKind::Discover, vec![]),
469 ScheduleType::Dial(multiaddrs) => (RetryKind::Dial, multiaddrs),
470 };
471
472 let now = Instant::now();
473 let base = Duration::from_millis(250);
474 let cap = Duration::from_secs(30);
475
476 let entry = self.retry_by_peer.entry(peer).or_insert(RetryState {
477 attempts: 0,
478 when: now,
479 kind,
480 addrs: vec![],
481 });
482
483 let when = if matches!(
484 (entry.kind, kind),
485 (RetryKind::Discover, RetryKind::Dial)
486 ) {
487 now
488 } else {
489 if entry.attempts >= 8 {
490 self.clear_pending_messages(&peer);
491 return;
492 }
493
494 let exp = 1u32 << entry.attempts.min(7);
497 let mut delay = base * exp;
498 if delay > cap {
499 delay = cap;
500 }
501
502 let hash = peer
505 .to_bytes()
506 .iter()
507 .fold(0u32, |acc, &b| acc.wrapping_add(b as u32));
508 let j = 80 + (hash % 41);
509 delay = delay * j / 100;
510
511 now + delay
512 };
513
514 entry.when = when;
515 entry.kind = kind;
516 entry.addrs = addrs;
517
518 self.peer_action.insert(peer, Action::from(kind));
519
520 self.retry_queue.push(Due(peer, entry.when));
521 self.arm_retry_timer();
522 self.refresh_runtime_metrics();
523 }
524
525 fn arm_retry_timer(&mut self) {
526 if let Some(next) = self.retry_queue.peek() {
527 match &mut self.retry_timer {
528 Some(timer) => timer.as_mut().reset(next.1),
529 None => self.retry_timer = Some(Box::pin(sleep_until(next.1))),
530 }
531 }
532 }
533
534 fn drain_due_retries(
535 &mut self,
536 ) -> Vec<(PeerId, RetryKind, Vec<Multiaddr>)> {
537 let now = Instant::now();
538 let mut out = Vec::new();
539 while let Some(Due(peer, when)) = self.retry_queue.peek().cloned() {
540 if when > now {
541 break;
542 }
543
544 self.retry_queue.pop();
545 if let Some(retry) = self.retry_by_peer.get(&peer).cloned()
551 && retry.when == when
552 {
553 self.retry_by_peer
554 .entry(peer)
555 .and_modify(|x| x.attempts += 1);
556 out.push((peer, retry.kind, retry.addrs));
557 }
558 }
559
560 if self.retry_queue.is_empty() {
561 self.retry_timer = None;
562 } else {
563 self.arm_retry_timer();
564 }
565 self.refresh_runtime_metrics();
566 out
567 }
568
569 pub fn add_helper_sender(
571 &mut self,
572 helper_sender: mpsc::Sender<CommandHelper<T>>,
573 ) {
574 self.helper_sender = Some(helper_sender);
575 }
576
577 pub const fn local_peer_id(&self) -> PeerId {
579 self.local_peer_id
580 }
581
582 fn send_message(
584 &mut self,
585 peer: PeerId,
586 message: Bytes,
587 ) -> Result<(), Error> {
588 if self.is_safe_mode() {
589 debug!(
590 target: TARGET,
591 peer_id = %peer,
592 size = message.len(),
593 "safe mode active; dropping outbound message"
594 );
595 return Ok(());
596 }
597
598 if message.len() > self.max_app_message_bytes {
599 warn!(
600 target: TARGET,
601 peer_id = %peer,
602 size = message.len(),
603 max = self.max_app_message_bytes,
604 "outbound payload rejected: message too large",
605 );
606 if let Some(metrics) = self.metric_handle() {
607 metrics.inc_oversized_outbound_drop();
608 }
609 self.refresh_runtime_metrics();
610 return Err(Error::MessageTooLarge {
611 size: message.len(),
612 max: self.max_app_message_bytes,
613 });
614 }
615
616 if let Some(mut responses) = self.response_channels.remove(&peer) {
617 while let Some(response_channel) = responses.pop_front() {
618 match self
619 .swarm
620 .behaviour_mut()
621 .send_response(response_channel, message.clone())
622 {
623 Ok(()) => {
624 if !responses.is_empty() {
625 self.response_channels.insert(peer, responses);
626 }
627 self.refresh_runtime_metrics();
628 return Ok(());
629 }
630 Err(e) => {
631 debug!(target: TARGET, peer_id = %peer, error = %e, "failed to send response: channel may already be consumed");
632 }
633 }
634 }
635 }
636
637 self.add_pending_outbound_message(peer, message);
638
639 if self.swarm.behaviour_mut().is_known_peer(&peer) {
640 if let Some(Action::Identified(..)) = self.peer_action.get(&peer) {
641 self.send_pending_outbound_messages(peer);
642 } else {
643 self.schedule_retry(peer, ScheduleType::Dial(vec![]));
644 }
645 } else {
646 self.schedule_retry(peer, ScheduleType::Discover);
647 }
648
649 Ok(())
650 }
651
652 fn add_pending_outbound_message(&mut self, peer: PeerId, message: Bytes) {
656 let message_len = message.len();
657 let mut dropped_count = 0u64;
658 let mut dropped_bytes_limit_peer = 0u64;
659 let mut dropped_bytes_limit_global = 0u64;
660 let mut dropped_messages = Vec::new();
661 let mut duplicate = false;
662 let mut total_pending_bytes = self.pending_outbound_bytes_len();
663 let per_peer_limit = self.max_pending_outbound_bytes_per_peer;
664 let global_limit = self.max_pending_outbound_bytes_total;
665
666 {
667 let queue = self.pending_outbound_messages.entry(peer).or_default();
668 if queue.contains(&message) {
669 duplicate = true;
670 } else {
671 while queue.len() >= MAX_PENDING_MESSAGES_PER_PEER {
672 if let Some(evicted) = queue.pop_front() {
673 dropped_count += 1;
674 total_pending_bytes = total_pending_bytes
675 .saturating_sub(evicted.payload.len());
676 dropped_messages.push(evicted);
677 } else {
678 break;
679 }
680 }
681
682 if per_peer_limit > 0 {
683 while queue.bytes_len() + message_len > per_peer_limit {
684 if let Some(evicted) = queue.pop_front() {
685 dropped_bytes_limit_peer += 1;
686 total_pending_bytes = total_pending_bytes
687 .saturating_sub(evicted.payload.len());
688 dropped_messages.push(evicted);
689 } else {
690 break;
691 }
692 }
693 }
694
695 if per_peer_limit > 0
696 && queue.bytes_len() + message_len > per_peer_limit
697 {
698 dropped_bytes_limit_peer += 1;
699 } else if global_limit > 0
700 && total_pending_bytes.saturating_add(message_len)
701 > global_limit
702 {
703 dropped_bytes_limit_global += 1;
704 } else {
705 queue.push_back(message);
706 }
707 }
708 }
709
710 if duplicate {
711 self.refresh_runtime_metrics();
712 return;
713 }
714
715 for evicted in dropped_messages {
716 self.observe_pending_message_age(evicted.enqueued_at);
717 }
718
719 if dropped_count > 0 {
720 warn!(
721 target: TARGET,
722 peer_id = %peer,
723 dropped = dropped_count,
724 max_messages = MAX_PENDING_MESSAGES_PER_PEER,
725 "outbound queue count limit reached; oldest messages evicted",
726 );
727 }
728
729 if dropped_bytes_limit_peer > 0 {
730 warn!(
731 target: TARGET,
732 peer_id = %peer,
733 dropped = dropped_bytes_limit_peer,
734 message_bytes = message_len,
735 max_queue_bytes = per_peer_limit,
736 "outbound queue bytes limit reached; messages evicted/dropped",
737 );
738 }
739
740 if dropped_bytes_limit_global > 0 {
741 warn!(
742 target: TARGET,
743 peer_id = %peer,
744 dropped = dropped_bytes_limit_global,
745 message_bytes = message_len,
746 max_queue_bytes_total = global_limit,
747 "outbound global queue bytes limit reached; messages dropped",
748 );
749 }
750
751 if let Some(metrics) = self.metric_handle() {
752 metrics.inc_outbound_queue_drop_by(dropped_count);
753 metrics.inc_outbound_queue_bytes_drop_per_peer_by(
754 dropped_bytes_limit_peer,
755 );
756 metrics.inc_outbound_queue_bytes_drop_global_by(
757 dropped_bytes_limit_global,
758 );
759 }
760
761 self.refresh_runtime_metrics();
762 }
763
764 fn add_pending_inbound_message(&mut self, peer: PeerId, message: Bytes) {
765 let message_len = message.len();
766 let mut dropped_count = 0u64;
767 let mut dropped_bytes_limit_peer = 0u64;
768 let mut dropped_bytes_limit_global = 0u64;
769 let mut dropped_messages = Vec::new();
770 let mut duplicate = false;
771 let mut total_pending_bytes = self.pending_inbound_bytes_len();
772 let per_peer_limit = self.max_pending_inbound_bytes_per_peer;
773 let global_limit = self.max_pending_inbound_bytes_total;
774
775 {
776 let queue = self.pending_inbound_messages.entry(peer).or_default();
777 if queue.contains(&message) {
778 duplicate = true;
779 } else {
780 while queue.len() >= MAX_PENDING_MESSAGES_PER_PEER {
781 if let Some(evicted) = queue.pop_front() {
782 dropped_count += 1;
783 total_pending_bytes = total_pending_bytes
784 .saturating_sub(evicted.payload.len());
785 dropped_messages.push(evicted);
786 } else {
787 break;
788 }
789 }
790
791 if per_peer_limit > 0 {
792 while queue.bytes_len() + message_len > per_peer_limit {
793 if let Some(evicted) = queue.pop_front() {
794 dropped_bytes_limit_peer += 1;
795 total_pending_bytes = total_pending_bytes
796 .saturating_sub(evicted.payload.len());
797 dropped_messages.push(evicted);
798 } else {
799 break;
800 }
801 }
802 }
803
804 if per_peer_limit > 0
805 && queue.bytes_len() + message_len > per_peer_limit
806 {
807 dropped_bytes_limit_peer += 1;
808 } else if global_limit > 0
809 && total_pending_bytes.saturating_add(message_len)
810 > global_limit
811 {
812 dropped_bytes_limit_global += 1;
813 } else {
814 queue.push_back(message);
815 }
816 }
817 }
818
819 if duplicate {
820 self.refresh_runtime_metrics();
821 return;
822 }
823
824 for evicted in dropped_messages {
825 self.observe_pending_message_age(evicted.enqueued_at);
826 }
827
828 if dropped_count > 0 {
829 warn!(
830 target: TARGET,
831 peer_id = %peer,
832 dropped = dropped_count,
833 max_messages = MAX_PENDING_MESSAGES_PER_PEER,
834 "inbound queue count limit reached; oldest messages evicted",
835 );
836 }
837
838 if dropped_bytes_limit_peer > 0 {
839 warn!(
840 target: TARGET,
841 peer_id = %peer,
842 dropped = dropped_bytes_limit_peer,
843 message_bytes = message_len,
844 max_queue_bytes = per_peer_limit,
845 "inbound queue bytes limit reached; messages evicted/dropped",
846 );
847 }
848
849 if dropped_bytes_limit_global > 0 {
850 warn!(
851 target: TARGET,
852 peer_id = %peer,
853 dropped = dropped_bytes_limit_global,
854 message_bytes = message_len,
855 max_queue_bytes_total = global_limit,
856 "inbound global queue bytes limit reached; messages dropped",
857 );
858 }
859
860 if let Some(metrics) = self.metric_handle() {
861 metrics.inc_inbound_queue_drop_by(dropped_count);
862 metrics.inc_inbound_queue_bytes_drop_per_peer_by(
863 dropped_bytes_limit_peer,
864 );
865 metrics.inc_inbound_queue_bytes_drop_global_by(
866 dropped_bytes_limit_global,
867 );
868 }
869
870 self.refresh_runtime_metrics();
871 }
872
873 fn add_ephemeral_response(
875 &mut self,
876 peer: PeerId,
877 response_channel: ResponseChannel<ReqResMessage>,
878 ) {
879 self.response_channels
880 .entry(peer)
881 .or_default()
882 .push_back(response_channel);
883 self.refresh_runtime_metrics();
884 }
885
886 fn send_pending_outbound_messages(&mut self, peer: PeerId) {
888 if let Some(mut queue) = self.pending_outbound_messages.remove(&peer) {
889 for message in queue.drain() {
890 self.observe_pending_message_age(message.enqueued_at);
891 self.swarm
892 .behaviour_mut()
893 .send_message(&peer, message.payload);
894 }
895 }
896
897 self.retry_by_peer.remove(&peer);
898 self.refresh_runtime_metrics();
899 }
900
901 pub fn service(&self) -> NetworkService {
903 self.service.clone()
904 }
905
906 async fn change_state(&mut self, state: NetworkState) {
908 trace!(target: TARGET, state = ?state, "state changed");
909 self.state = state.clone();
910 if let Some(metrics) = self.metric_handle() {
911 metrics.observe_state_transition(&state);
912 }
913 self.send_event(NetworkEvent::StateChanged(state)).await;
914 }
915
916 #[allow(clippy::needless_pass_by_ref_mut)]
918 async fn send_event(&mut self, event: NetworkEvent) {
919 if let Some(monitor) = self.monitor.clone()
920 && let Err(e) = monitor.tell(MonitorMessage::Network(event)).await
921 {
922 error!(target: TARGET, error = %e, "failed to forward event to monitor");
923 self.crash_token.cancel();
924 }
925 }
926
927 pub async fn run(&mut self) {
929 let bootstrap_start = Instant::now();
930
931 if let Err(error) = self.run_connection().await {
933 if let Some(metrics) = self.metric_handle() {
934 metrics.observe_bootstrap_duration_seconds(
935 "failure",
936 bootstrap_start.elapsed().as_secs_f64(),
937 );
938 }
939 error!(target: TARGET, error = %error, "bootstrap connection failed");
940 self.send_event(NetworkEvent::Error(error)).await;
941 self.crash_token.cancel();
943 return;
944 }
945 if let Some(metrics) = self.metric_handle() {
946 metrics.observe_bootstrap_duration_seconds(
947 "success",
948 bootstrap_start.elapsed().as_secs_f64(),
949 );
950 }
951
952 if self.state != NetworkState::Running {
953 self.change_state(NetworkState::Running).await;
954 }
955
956 if !self.is_safe_mode() {
958 self.swarm.behaviour_mut().finish_prerouting_state();
959 }
960 self.run_main().await;
962 }
963
964 pub async fn run_connection(&mut self) -> Result<(), Error> {
966 if self.is_safe_mode() {
967 self.change_state(NetworkState::Running).await;
968 return Ok(());
969 }
970
971 if self.node_type == NodeType::Bootstrap && self.boot_nodes.is_empty() {
973 self.change_state(NetworkState::Running).await;
974 Ok(())
975 } else {
976 self.change_state(NetworkState::Dial).await;
977
978 let mut retrys: u8 = 0;
979
980 let dialing_round_timeout = sleep(Duration::from_secs(0));
985 tokio::pin!(dialing_round_timeout);
986 let mut dialing_timeout_active = false;
987
988 loop {
989 match self.state {
990 NetworkState::Dial => {
991 if self.boot_nodes.is_empty() {
993 error!(target: TARGET, "no bootstrap nodes available");
994 self.send_event(NetworkEvent::Error(
995 Error::NoBootstrapNode,
996 ))
997 .await;
998 self.change_state(NetworkState::Disconnected).await;
999 } else {
1000 let copy_boot_nodes = self.boot_nodes.clone();
1001
1002 for (peer, addresses) in copy_boot_nodes {
1003 if let Some(metrics) = self.metric_handle() {
1004 metrics.inc_dial_attempt_bootstrap();
1005 }
1006 if let Err(e) = self.swarm.dial(
1007 DialOpts::peer_id(peer)
1008 .addresses(addresses.clone())
1009 .build(),
1010 ) {
1011 let (add_to_retry, new_addresses) = self
1012 .handle_dial_error(e, &peer, true)
1013 .unwrap_or((false, vec![]));
1014 self.boot_nodes.remove(&peer);
1015 if add_to_retry {
1016 if new_addresses.is_empty() {
1017 self.retry_boot_nodes
1018 .insert(peer, addresses);
1019 } else {
1020 self.retry_boot_nodes
1021 .insert(peer, new_addresses);
1022 }
1023 }
1024 }
1025 }
1026 if !self.boot_nodes.is_empty() {
1027 self.change_state(NetworkState::Dialing).await;
1028 dialing_round_timeout.as_mut().reset(
1029 Instant::now() + Duration::from_secs(15),
1030 );
1031 dialing_timeout_active = true;
1032 } else {
1033 warn!(target: TARGET, "all bootstrap dials failed");
1034 self.change_state(NetworkState::Disconnected)
1035 .await;
1036 }
1037 }
1038 }
1039 NetworkState::Dialing => {
1040 if self.boot_nodes.is_empty()
1042 && self.successful_dials == 0
1043 && !self.retry_boot_nodes.is_empty()
1044 && retrys < 3
1045 {
1046 retrys += 1;
1047 let wait = Duration::from_secs(1u64 << retrys); debug!(target: TARGET, attempt = retrys, wait_secs = wait.as_secs(), "retrying bootstrap dials");
1049
1050 let backoff = sleep(wait);
1051 tokio::pin!(backoff);
1052 loop {
1053 tokio::select! {
1054 _ = &mut backoff => break,
1055 event = self.swarm.select_next_some() => {
1056 self.handle_connection_events(event).await;
1057 }
1058 _ = self.graceful_token.cancelled() => {
1059 return Err(Error::Cancelled);
1060 }
1061 _ = self.crash_token.cancelled() => {
1062 return Err(Error::Cancelled);
1063 }
1064 }
1065 }
1066
1067 dialing_timeout_active = false;
1068 self.boot_nodes.clone_from(&self.retry_boot_nodes);
1069 self.retry_boot_nodes.clear();
1070 self.change_state(NetworkState::Dial).await;
1071 }
1072 else if self.boot_nodes.is_empty()
1074 && self.successful_dials == 0
1075 {
1076 self.change_state(NetworkState::Disconnected).await;
1077 } else if self.boot_nodes.is_empty() {
1079 return Ok(());
1080 }
1081 }
1082 NetworkState::Running => {
1083 return Ok(());
1084 }
1085 NetworkState::Disconnected => {
1086 return Err(Error::NoBootstrapNode);
1087 }
1088 _ => {}
1089 }
1090 if self.state != NetworkState::Disconnected {
1091 tokio::select! {
1092 event = self.swarm.select_next_some() => {
1093 self.handle_connection_events(event).await;
1094 }
1095 _ = self.graceful_token.cancelled() => {
1096 return Err(Error::Cancelled);
1097 }
1098 _ = self.crash_token.cancelled() => {
1099 return Err(Error::Cancelled);
1100 }
1101 _ = &mut dialing_round_timeout, if dialing_timeout_active => {
1102 warn!(
1103 target: TARGET,
1104 remaining = self.boot_nodes.len(),
1105 "bootstrap round timed out waiting for Identify; \
1106 moving remaining peers to retry queue"
1107 );
1108 for (peer, addrs) in self.boot_nodes.drain() {
1109 self.retry_boot_nodes.insert(peer, addrs);
1110 }
1111 dialing_timeout_active = false;
1112 }
1113 }
1114 }
1115 }
1116 }
1117 }
1118
1119 fn collect_retryable_transport_addresses(
1120 &self,
1121 items: Vec<(Multiaddr, libp2p::TransportError<std::io::Error>)>,
1122 trace_unreachable: bool,
1123 ) -> Vec<Multiaddr> {
1124 let mut new_addresses = vec![];
1125 for (address, error) in items {
1126 if trace_unreachable {
1127 trace!(target: TARGET, addr = %address, err = ?error, "address unreachable");
1128 }
1129
1130 if let libp2p::TransportError::Other(e) = error {
1131 match e.kind() {
1132 std::io::ErrorKind::ConnectionRefused
1133 | std::io::ErrorKind::TimedOut
1134 | std::io::ErrorKind::ConnectionAborted
1135 | std::io::ErrorKind::NotConnected
1136 | std::io::ErrorKind::BrokenPipe
1137 | std::io::ErrorKind::Interrupted
1138 | std::io::ErrorKind::HostUnreachable
1139 | std::io::ErrorKind::NetworkUnreachable => {
1140 new_addresses.push(address);
1141 }
1142 _ => {}
1143 }
1144 };
1145 }
1146
1147 new_addresses
1148 }
1149
1150 fn handle_dial_error(
1155 &mut self,
1156 e: DialError,
1157 peer_id: &PeerId,
1158 bootstrap_flow: bool,
1159 ) -> Option<(bool, Vec<Multiaddr>)> {
1160 let phase = if bootstrap_flow {
1161 "bootstrap"
1162 } else {
1163 "runtime"
1164 };
1165 match e {
1166 DialError::LocalPeerId { .. } => {
1167 if let Some(metrics) = self.metric_handle() {
1168 metrics.observe_dial_failure(phase, "local_peer_id");
1169 }
1170 if bootstrap_flow {
1171 warn!(target: TARGET, peer_id = %peer_id, "dial rejected: connected peer-id matches local peer");
1172 return Some((false, vec![]));
1173 }
1174
1175 self.retry_by_peer.remove(peer_id);
1176 self.clear_pending_messages(peer_id);
1177 self.swarm
1178 .behaviour_mut()
1179 .clean_hard_peer_to_remove(peer_id);
1180 None
1181 }
1182 DialError::NoAddresses => {
1183 if let Some(metrics) = self.metric_handle() {
1184 metrics.observe_dial_failure(phase, "no_addresses");
1185 }
1186 if bootstrap_flow {
1187 debug!(target: TARGET, peer_id = %peer_id, "dial skipped: no addresses");
1188 }
1189 Some((false, vec![]))
1190 }
1191 DialError::DialPeerConditionFalse(_) => {
1192 if let Some(metrics) = self.metric_handle() {
1193 metrics.observe_dial_failure(phase, "peer_condition");
1194 }
1195 if bootstrap_flow {
1196 debug!(target: TARGET, peer_id = %peer_id, "dial skipped: peer condition not met");
1197 return Some((false, vec![]));
1198 }
1199
1200 None
1201 }
1202 DialError::Denied { cause } => {
1203 if let Some(metrics) = self.metric_handle() {
1204 metrics.observe_dial_failure(phase, "denied");
1205 }
1206 if bootstrap_flow {
1207 debug!(target: TARGET, peer_id = %peer_id, cause = %cause, "dial denied by behaviour");
1208 }
1209 Some((false, vec![]))
1210 }
1211 DialError::Aborted => {
1212 if let Some(metrics) = self.metric_handle() {
1213 metrics.observe_dial_failure(phase, "aborted");
1214 }
1215 if bootstrap_flow {
1216 debug!(target: TARGET, peer_id = %peer_id, "dial aborted, will retry");
1217 }
1218 Some((true, vec![]))
1219 }
1220 DialError::WrongPeerId { obtained, .. } => {
1221 if let Some(metrics) = self.metric_handle() {
1222 metrics.observe_dial_failure(phase, "wrong_peer_id");
1223 }
1224 if bootstrap_flow {
1225 warn!(target: TARGET, expected = %peer_id, obtained = %obtained, "dial failed: peer identity mismatch");
1226 return Some((false, vec![]));
1227 }
1228
1229 self.retry_by_peer.remove(peer_id);
1230 self.clear_pending_messages(peer_id);
1231 self.swarm
1232 .behaviour_mut()
1233 .clean_hard_peer_to_remove(peer_id);
1234 None
1235 }
1236 DialError::Transport(items) => {
1237 if let Some(metrics) = self.metric_handle() {
1238 metrics.observe_dial_failure(phase, "transport");
1239 }
1240 if bootstrap_flow {
1241 debug!(target: TARGET, peer_id = %peer_id, "transport dial failed");
1242 }
1243
1244 let new_addresses = self.collect_retryable_transport_addresses(
1245 items,
1246 bootstrap_flow,
1247 );
1248 if !new_addresses.is_empty() {
1249 Some((true, new_addresses))
1250 } else {
1251 Some((false, vec![]))
1252 }
1253 }
1254 }
1255 }
1256
1257 async fn handle_connection_events(
1259 &mut self,
1260 event: SwarmEvent<BehaviourEvent>,
1261 ) {
1262 match event {
1263 SwarmEvent::ConnectionClosed { peer_id, .. } => {
1264 self.boot_nodes.remove(&peer_id);
1265 }
1266 SwarmEvent::OutgoingConnectionError {
1267 peer_id: Some(peer_id),
1268 error,
1269 ..
1270 } => {
1271 let (add_to_retry, new_addresses) = self
1272 .handle_dial_error(error, &peer_id, true)
1273 .unwrap_or((false, vec![]));
1274
1275 if let Some(addresses) = self.boot_nodes.remove(&peer_id)
1276 && add_to_retry
1277 {
1278 if new_addresses.is_empty() {
1279 self.retry_boot_nodes.insert(peer_id, addresses);
1280 } else {
1281 self.retry_boot_nodes.insert(peer_id, new_addresses);
1282 }
1283 }
1284 }
1285 SwarmEvent::Behaviour(BehaviourEvent::Identified {
1286 peer_id,
1287 info,
1288 connection_id,
1289 }) => {
1290 if !self
1291 .check_protocols(&info.protocol_version, &info.protocols)
1292 {
1293 warn!(target: TARGET, peer_id = %peer_id, protocol_version = %info.protocol_version, "peer uses incompatible protocols; closing connection");
1294
1295 self.swarm
1296 .behaviour_mut()
1297 .close_connections(&peer_id, Some(connection_id));
1298 } else {
1299 self.peer_action
1300 .insert(peer_id, Action::Identified(connection_id));
1301
1302 let mut any_address_is_valid = false;
1303 for addr in info.listen_addrs {
1304 if self
1305 .swarm
1306 .behaviour_mut()
1307 .add_self_reported_address(&peer_id, &addr)
1308 {
1309 any_address_is_valid = true;
1310 }
1311 }
1312
1313 if any_address_is_valid {
1314 if self.boot_nodes.remove(&peer_id).is_some() {
1315 self.successful_dials += 1;
1316 }
1317 self.peer_identify.insert(peer_id);
1318 } else {
1319 warn!(target: TARGET, peer_id = %peer_id, "bootstrap peer has no valid addresses");
1320
1321 self.swarm
1322 .behaviour_mut()
1323 .close_connections(&peer_id, Some(connection_id));
1324 }
1325 }
1326 }
1327 SwarmEvent::Behaviour(BehaviourEvent::IdentifyError {
1328 peer_id,
1329 error,
1330 }) => {
1331 self.observe_identify_error(&error);
1332 match error {
1333 swarm::StreamUpgradeError::Timeout => {
1334 }
1336 _ => {
1337 debug!(target: TARGET, peer_id = %peer_id, error = %error, "identify hard failure during bootstrap; queuing for retry");
1340 if let Some(addrs) = self.boot_nodes.remove(&peer_id) {
1341 self.retry_boot_nodes.insert(peer_id, addrs);
1342 }
1343 }
1344 }
1345 }
1346 _ => {}
1347 }
1348 self.refresh_runtime_metrics();
1349 }
1350
1351 fn clear_pending_messages(&mut self, peer_id: &PeerId) {
1352 warn!(target: TARGET, peer_id = %peer_id, "max dial attempts reached; dropping pending messages");
1353
1354 let dropped = self.drop_pending_outbound_messages(peer_id);
1355 if let Some(metrics) = self.metric_handle() {
1356 metrics.inc_max_retries_drop_by(dropped as u64);
1357 }
1358 self.peer_action.remove(peer_id);
1359 self.retry_by_peer.remove(peer_id);
1360 self.refresh_runtime_metrics();
1361 }
1362
1363 fn check_protocols(
1364 &self,
1365 protocol_version: &str,
1366 protocols: &[StreamProtocol],
1367 ) -> bool {
1368 let supp_protocols: HashSet<StreamProtocol> = protocols
1369 .iter()
1370 .cloned()
1371 .collect::<HashSet<StreamProtocol>>();
1372
1373 protocol_version == IDENTIFY_PROTOCOL
1374 && supp_protocols.contains(&StreamProtocol::new(REQRES_PROTOCOL))
1375 }
1376
1377 pub async fn run_main(&mut self) {
1379 info!(target: TARGET, "network worker started");
1380
1381 loop {
1382 tokio::select! {
1383 command = self.command_receiver.recv() => {
1384 match command {
1385 Some(command) => self.handle_command(command).await,
1386 None => break,
1387 }
1388 }
1389 event = self.swarm.select_next_some() => {
1390 self.handle_event(event).await;
1392 }
1393 _ = async {
1394 if let Some(t) = &mut self.retry_timer {
1395 t.as_mut().await;
1396 }
1397 }, if self.retry_timer.is_some() => {
1398 for (peer, kind, addrs) in self.drain_due_retries() {
1399 if let Some(action) = self.peer_action.get(&peer) {
1400 match (action, kind) {
1401 (Action::Discover, RetryKind::Discover) => {
1402 self.swarm.behaviour_mut().discover(&peer);
1403 },
1404 (Action::Dial, RetryKind::Dial) => {
1405 if let Some(metrics) = self.metric_handle() {
1406 metrics.inc_dial_attempt_runtime();
1407 }
1408 if let Err(error) = self.swarm.dial(
1409 DialOpts::peer_id(peer)
1410 .addresses(addrs)
1411 .extend_addresses_through_behaviour()
1412 .build()
1413 ) && let Some((retry, new_address)) =
1414 self.handle_dial_error(error, &peer, false) {
1415
1416 self.peer_action.remove(&peer);
1417 if retry {
1418 let addr = new_address
1419 .iter()
1420 .filter(|x| {
1421 !self
1422 .swarm
1423 .behaviour()
1424 .is_invalid_address(x)
1425 })
1426 .cloned()
1427 .collect::<Vec<Multiaddr>>();
1428
1429 if addr.is_empty() {
1430 self.schedule_retry(peer, ScheduleType::Discover);
1431 } else {
1432 self.schedule_retry(peer, ScheduleType::Dial(addr.clone()));
1433 }
1434 } else {
1435 self.schedule_retry(peer, ScheduleType::Discover);
1436 }
1437 };
1438 },
1439 _ => {}
1440 }
1441 };
1442 }
1443 },
1444 _ = self.graceful_token.cancelled() => {
1445 break;
1446 }
1447 _ = self.crash_token.cancelled() => {
1448 break;
1449 }
1450 }
1451 }
1452 }
1453
1454 async fn handle_command(&mut self, command: Command) {
1455 match command {
1456 Command::SendMessage { peer, message } => {
1457 if self.is_safe_mode() {
1458 debug!(
1459 target: TARGET,
1460 peer_id = %peer,
1461 size = message.len(),
1462 "safe mode active; ignoring send command"
1463 );
1464 return;
1465 }
1466 if let Err(error) = self.send_message(peer, message) {
1467 error!(target: TARGET, error = %error, "failed to deliver message");
1468 self.send_event(NetworkEvent::Error(error)).await;
1469 }
1470 }
1471 }
1472 }
1473
1474 #[allow(clippy::needless_pass_by_ref_mut)]
1475 async fn message_to_helper(
1476 &mut self,
1477 message: MessagesHelper,
1478 peer_id: &PeerId,
1479 ) {
1480 let sender = match peer_id_to_ed25519_pubkey_bytes(peer_id) {
1481 Ok(public_key) => public_key,
1482 Err(e) => {
1483 warn!(target: TARGET, error = %e, "cannot resolve public key from peer id");
1484 return;
1485 }
1486 };
1487
1488 'Send: {
1489 if let Some(helper_sender) = self.helper_sender.as_ref() {
1490 match message {
1491 MessagesHelper::Single(items) => {
1492 if helper_sender
1493 .send(CommandHelper::ReceivedMessage {
1494 sender,
1495 message: items,
1496 })
1497 .await
1498 .is_err()
1499 {
1500 break 'Send;
1501 }
1502 }
1503 MessagesHelper::Vec(items) => {
1504 for item in items {
1505 if helper_sender
1506 .send(CommandHelper::ReceivedMessage {
1507 sender,
1508 message: item,
1509 })
1510 .await
1511 .is_err()
1512 {
1513 break 'Send;
1514 }
1515 }
1516 }
1517 }
1518
1519 return;
1520 }
1521 }
1522
1523 error!(target: TARGET, "helper channel closed; shutting down");
1524 self.crash_token.cancel();
1525 }
1526
1527 async fn handle_event(&mut self, event: SwarmEvent<BehaviourEvent>) {
1528 if self.is_safe_mode() {
1529 match event {
1530 SwarmEvent::Behaviour(BehaviourEvent::ReqresMessage {
1531 peer_id,
1532 ..
1533 }) => {
1534 debug!(
1535 target: TARGET,
1536 peer_id = %peer_id,
1537 "safe mode active; dropping inbound reqres message"
1538 );
1539 self.swarm
1540 .behaviour_mut()
1541 .close_connections(&peer_id, None);
1542 }
1543 SwarmEvent::Behaviour(BehaviourEvent::Identified {
1544 peer_id,
1545 connection_id,
1546 ..
1547 }) => {
1548 debug!(
1549 target: TARGET,
1550 peer_id = %peer_id,
1551 "safe mode active; closing identified peer"
1552 );
1553 self.swarm
1554 .behaviour_mut()
1555 .close_connections(&peer_id, Some(connection_id));
1556 }
1557 SwarmEvent::Behaviour(BehaviourEvent::IdentifyError {
1558 peer_id,
1559 ..
1560 }) => {
1561 self.swarm
1562 .behaviour_mut()
1563 .close_connections(&peer_id, None);
1564 }
1565 SwarmEvent::ConnectionEstablished {
1566 peer_id,
1567 connection_id,
1568 ..
1569 } => {
1570 debug!(
1571 target: TARGET,
1572 peer_id = %peer_id,
1573 "safe mode active; closing established connection"
1574 );
1575 self.swarm
1576 .behaviour_mut()
1577 .close_connections(&peer_id, Some(connection_id));
1578 }
1579 SwarmEvent::OutgoingConnectionError { .. }
1580 | SwarmEvent::ConnectionClosed { .. }
1581 | SwarmEvent::IncomingConnectionError { .. }
1582 | SwarmEvent::IncomingConnection { .. }
1583 | SwarmEvent::ListenerClosed { .. }
1584 | SwarmEvent::Dialing { .. }
1585 | SwarmEvent::NewExternalAddrCandidate { .. }
1586 | SwarmEvent::ExternalAddrConfirmed { .. }
1587 | SwarmEvent::ExternalAddrExpired { .. }
1588 | SwarmEvent::NewExternalAddrOfPeer { .. }
1589 | SwarmEvent::NewListenAddr { .. }
1590 | SwarmEvent::ExpiredListenAddr { .. }
1591 | SwarmEvent::ListenerError { .. }
1592 | SwarmEvent::Behaviour(BehaviourEvent::ReqresFailure {
1593 ..
1594 })
1595 | SwarmEvent::Behaviour(BehaviourEvent::ClosestPeer {
1596 ..
1597 })
1598 | SwarmEvent::Behaviour(BehaviourEvent::Dummy) => {}
1599 _ => {}
1600 }
1601 self.refresh_runtime_metrics();
1602 return;
1603 }
1604
1605 match event {
1606 SwarmEvent::Behaviour(event) => {
1607 match event {
1608 BehaviourEvent::Identified {
1609 peer_id,
1610 info,
1611 connection_id,
1612 } => {
1613 if !self.check_protocols(
1614 &info.protocol_version,
1615 &info.protocols,
1616 ) {
1617 warn!(
1618 target: TARGET,
1619 peer_id = %peer_id,
1620 protocol_version = %info.protocol_version,
1621 protocols = ?info.protocols,
1622 "peer uses incompatible protocols; closing connection"
1623 );
1624
1625 self.clear_pending_messages(&peer_id);
1626
1627 self.swarm
1628 .behaviour_mut()
1629 .clean_hard_peer_to_remove(&peer_id);
1630
1631 self.swarm.behaviour_mut().close_connections(
1632 &peer_id,
1633 Some(connection_id),
1634 );
1635 } else {
1636 self.peer_action.insert(
1637 peer_id,
1638 Action::Identified(connection_id),
1639 );
1640
1641 self.swarm
1642 .behaviour_mut()
1643 .clean_peer_to_remove(&peer_id);
1644 for addr in info.listen_addrs {
1645 self.swarm
1646 .behaviour_mut()
1647 .add_self_reported_address(&peer_id, &addr);
1648 }
1649
1650 self.peer_identify.insert(peer_id);
1651
1652 if let Some(mut queue) =
1653 self.pending_inbound_messages.remove(&peer_id)
1654 {
1655 let mut buffered = VecDeque::new();
1656 for message in queue.drain() {
1657 self.observe_pending_message_age(
1658 message.enqueued_at,
1659 );
1660 buffered.push_back(message.payload);
1661 }
1662 self.message_to_helper(
1663 MessagesHelper::Vec(buffered),
1664 &peer_id,
1665 )
1666 .await;
1667 };
1668
1669 self.send_pending_outbound_messages(peer_id);
1670 }
1671 }
1672 BehaviourEvent::IdentifyError { peer_id, error } => {
1673 self.observe_identify_error(&error);
1674 debug!(target: TARGET, peer_id = %peer_id, error = %error, "identify error");
1675
1676 match error {
1677 swarm::StreamUpgradeError::Timeout => {
1678 }
1681 swarm::StreamUpgradeError::Apply(..)
1682 | swarm::StreamUpgradeError::NegotiationFailed
1683 | swarm::StreamUpgradeError::Io(..) => {
1684 self.drop_pending_outbound_messages(&peer_id);
1689 self.retry_by_peer.remove(&peer_id);
1690 self.response_channels.remove(&peer_id);
1691 self.drop_pending_inbound_messages(&peer_id);
1692 }
1693 }
1694
1695 self.swarm
1696 .behaviour_mut()
1697 .close_connections(&peer_id, None);
1698 }
1699 BehaviourEvent::ReqresMessage { peer_id, message } => {
1700 let (message_data, is_request, response_channel) =
1701 match message {
1702 request_response::Message::Request {
1703 request,
1704 channel,
1705 ..
1706 } => (request.0, true, Some(channel)),
1707 request_response::Message::Response {
1708 response,
1709 ..
1710 } => (response.0, false, None),
1711 };
1712
1713 if message_data.len() > self.max_app_message_bytes {
1714 warn!(
1715 target: TARGET,
1716 peer_id = %peer_id,
1717 size = message_data.len(),
1718 max = self.max_app_message_bytes,
1719 "inbound payload dropped: message too large",
1720 );
1721 if let Some(metrics) = self.metric_handle() {
1722 metrics.inc_oversized_inbound_drop();
1723 }
1724 self.swarm
1725 .behaviour_mut()
1726 .close_connections(&peer_id, None);
1727 self.refresh_runtime_metrics();
1728 return;
1729 }
1730
1731 if is_request {
1732 if let Some(metrics) = self.metric_handle() {
1733 metrics.inc_reqres_request_received();
1734 }
1735 trace!(target: TARGET, peer_id = %peer_id, "request received");
1736 if let Some(channel) = response_channel {
1737 self.add_ephemeral_response(peer_id, channel);
1738 }
1739 } else {
1740 if let Some(metrics) = self.metric_handle() {
1741 metrics.inc_reqres_response_received();
1742 }
1743 trace!(target: TARGET, peer_id = %peer_id, "response received");
1744 }
1745
1746 if self.peer_identify.contains(&peer_id) {
1747 self.message_to_helper(
1748 MessagesHelper::Single(message_data),
1749 &peer_id,
1750 )
1751 .await;
1752 } else {
1753 self.add_pending_inbound_message(
1754 peer_id,
1755 message_data,
1756 );
1757 }
1758 }
1759 BehaviourEvent::ReqresFailure {
1760 peer_id,
1761 direction,
1762 kind,
1763 } => {
1764 if let Some(metrics) = self.metric_handle() {
1765 metrics.observe_reqres_failure(
1766 direction.as_metric_label(),
1767 kind.as_metric_label(),
1768 );
1769 }
1770 debug!(
1771 target: TARGET,
1772 peer_id = %peer_id,
1773 direction = direction.as_metric_label(),
1774 kind = kind.as_metric_label(),
1775 "request-response failure"
1776 );
1777 }
1778 BehaviourEvent::ClosestPeer { peer_id, info } => {
1779 if matches!(
1780 self.peer_action.get(&peer_id),
1781 Some(Action::Discover)
1782 ) {
1783 self.peer_action.remove(&peer_id);
1784 if let Some(info) = info {
1785 let addr = info
1786 .addrs
1787 .iter()
1788 .filter(|x| {
1789 !self
1790 .swarm
1791 .behaviour()
1792 .is_invalid_address(x)
1793 })
1794 .cloned()
1795 .collect::<Vec<Multiaddr>>();
1796
1797 if addr.is_empty() {
1798 self.schedule_retry(
1799 peer_id,
1800 ScheduleType::Discover,
1801 );
1802 } else {
1803 self.schedule_retry(
1804 peer_id,
1805 ScheduleType::Dial(addr),
1806 );
1807 }
1808 } else {
1809 self.schedule_retry(
1810 peer_id,
1811 ScheduleType::Discover,
1812 );
1813 };
1814 }
1815 }
1816 BehaviourEvent::Dummy => {
1817 }
1819 }
1820 }
1821 SwarmEvent::OutgoingConnectionError {
1822 error,
1823 peer_id: Some(peer_id),
1824 ..
1825 } => {
1826 if matches!(self.peer_action.get(&peer_id), Some(Action::Dial))
1827 {
1828 self.peer_action.remove(&peer_id);
1829
1830 self.swarm.behaviour_mut().add_peer_to_remove(&peer_id);
1831
1832 if let Some((retry, new_address)) =
1833 self.handle_dial_error(error, &peer_id, false)
1834 {
1835 if retry {
1836 let addr = new_address
1837 .iter()
1838 .filter(|x| {
1839 !self
1840 .swarm
1841 .behaviour()
1842 .is_invalid_address(x)
1843 })
1844 .cloned()
1845 .collect::<Vec<Multiaddr>>();
1846
1847 if addr.is_empty() {
1848 self.schedule_retry(
1849 peer_id,
1850 ScheduleType::Discover,
1851 );
1852 } else {
1853 self.schedule_retry(
1854 peer_id,
1855 ScheduleType::Dial(addr),
1856 );
1857 }
1858 } else {
1859 self.schedule_retry(
1860 peer_id,
1861 ScheduleType::Discover,
1862 );
1863 }
1864 };
1865 }
1866 }
1867 SwarmEvent::ConnectionClosed {
1868 peer_id,
1869 connection_id,
1870 num_established,
1871 ..
1872 } => {
1873 if num_established == 0 {
1874 if let Some(Action::Identified(id)) =
1875 self.peer_action.get(&peer_id)
1876 && connection_id == *id
1877 {
1878 self.peer_action.remove(&peer_id);
1879
1880 self.peer_identify.remove(&peer_id);
1881 self.drop_pending_inbound_messages(&peer_id);
1882 self.response_channels.remove(&peer_id);
1883
1884 self.retry_by_peer.remove(&peer_id);
1885
1886 if self
1887 .pending_outbound_messages
1888 .get(&peer_id)
1889 .is_some_and(|q| !q.is_empty())
1890 {
1891 self.schedule_retry(
1892 peer_id,
1893 ScheduleType::Dial(vec![]),
1894 );
1895 }
1896 } else if let Some(Action::Dial | Action::Discover) =
1897 self.peer_action.get(&peer_id)
1898 {
1899 self.peer_action.remove(&peer_id);
1900 self.retry_by_peer.remove(&peer_id);
1901 self.drop_pending_inbound_messages(&peer_id);
1902 self.response_channels.remove(&peer_id);
1903 self.peer_identify.remove(&peer_id);
1904
1905 if self
1906 .pending_outbound_messages
1907 .get(&peer_id)
1908 .is_some_and(|q| !q.is_empty())
1909 {
1910 self.schedule_retry(
1911 peer_id,
1912 ScheduleType::Discover,
1913 );
1914 }
1915 }
1916 }
1917 }
1918 SwarmEvent::IncomingConnectionError { .. } => {
1919 }
1924 SwarmEvent::ExpiredListenAddr { address, .. } => {
1925 warn!(target: TARGET, addr = %address, "listen address expired");
1926 }
1927 SwarmEvent::ListenerError { error, .. } => {
1928 error!(target: TARGET, error = %error, "listener error");
1929 }
1930 SwarmEvent::ConnectionEstablished {
1931 peer_id,
1932 connection_id,
1933 num_established,
1934 ..
1935 } => {
1936 if num_established.get() > 1 {
1937 debug!(target: TARGET, peer_id = %peer_id, "duplicate connection detected; closing excess");
1938 self.swarm
1939 .behaviour_mut()
1940 .close_connections(&peer_id, Some(connection_id));
1941 }
1942 }
1943 SwarmEvent::IncomingConnection { .. }
1944 | SwarmEvent::ListenerClosed { .. }
1945 | SwarmEvent::Dialing { .. }
1946 | SwarmEvent::NewExternalAddrCandidate { .. }
1947 | SwarmEvent::ExternalAddrConfirmed { .. }
1948 | SwarmEvent::ExternalAddrExpired { .. }
1949 | SwarmEvent::NewExternalAddrOfPeer { .. }
1950 | SwarmEvent::NewListenAddr { .. } => {
1951 }
1953 _ => {}
1954 }
1955 self.refresh_runtime_metrics();
1956 }
1957}
1958
1959#[cfg(test)]
1960mod tests {
1961
1962 use crate::routing::RoutingNode;
1963
1964 use super::*;
1965 use libp2p::core::{ConnectedPoint, Endpoint, transport::PortUse};
1966 use libp2p::identity::Keypair as Libp2pKeypair;
1967 use libp2p::swarm::ConnectionId;
1968 use prometheus_client::{encoding::text::encode, registry::Registry};
1969 use serde::Deserialize;
1970 use test_log::test;
1971
1972 use ave_common::identity::{KeyPair, keys::Ed25519Signer};
1973
1974 use serial_test::serial;
1975
1976 #[derive(Debug, Serialize, Deserialize)]
1977 pub struct Dummy;
1978
1979 fn metric_value(metrics: &str, name: &str) -> f64 {
1980 metrics
1981 .lines()
1982 .find_map(|line| {
1983 if line.starts_with(name) {
1984 line.split_whitespace().nth(1)?.parse::<f64>().ok()
1985 } else {
1986 None
1987 }
1988 })
1989 .unwrap_or(0.0)
1990 }
1991
1992 fn build_worker(
1994 boot_nodes: Vec<RoutingNode>,
1995 random_walk: bool,
1996 node_type: NodeType,
1997 graceful_token: CancellationToken,
1998 crash_token: CancellationToken,
1999 memory_addr: String,
2000 ) -> NetworkWorker<Dummy> {
2001 let config = create_config(
2002 boot_nodes,
2003 random_walk,
2004 node_type,
2005 vec![memory_addr],
2006 );
2007 let keys = KeyPair::Ed25519(Ed25519Signer::generate().unwrap());
2008
2009 NetworkWorker::new(
2010 &keys,
2011 config,
2012 None,
2013 graceful_token,
2014 crash_token,
2015 None,
2016 None,
2017 )
2018 .unwrap()
2019 }
2020
2021 fn create_config(
2023 boot_nodes: Vec<RoutingNode>,
2024 random_walk: bool,
2025 node_type: NodeType,
2026 listen_addresses: Vec<String>,
2027 ) -> Config {
2028 let config = crate::routing::Config::default()
2029 .with_discovery_limit(50)
2030 .with_dht_random_walk(random_walk);
2031
2032 Config {
2033 boot_nodes,
2034 node_type,
2035 routing: config,
2036 external_addresses: vec![],
2037 listen_addresses,
2038 ..Default::default()
2039 }
2040 }
2041
2042 fn build_identified_event(
2043 peer_id: PeerId,
2044 public_key: libp2p::identity::PublicKey,
2045 connection_id: ConnectionId,
2046 ) -> SwarmEvent<BehaviourEvent> {
2047 SwarmEvent::Behaviour(BehaviourEvent::Identified {
2048 peer_id,
2049 info: Box::new(identify::Info {
2050 public_key,
2051 protocol_version: IDENTIFY_PROTOCOL.to_owned(),
2052 agent_version: "test-agent".to_owned(),
2053 listen_addrs: vec!["/memory/9999".parse().expect("multiaddr")],
2054 protocols: vec![StreamProtocol::new(REQRES_PROTOCOL)],
2055 observed_addr: "/memory/9998".parse().expect("multiaddr"),
2056 signed_peer_record: None,
2057 }),
2058 connection_id,
2059 })
2060 }
2061
2062 fn test_endpoint() -> ConnectedPoint {
2063 ConnectedPoint::Dialer {
2064 address: "/memory/9997".parse().expect("multiaddr"),
2065 role_override: Endpoint::Dialer,
2066 port_use: PortUse::New,
2067 }
2068 }
2069
2070 #[test]
2071 fn outbound_queue_respects_bytes_limit_and_updates_metrics() {
2072 let mut config = create_config(
2073 vec![],
2074 false,
2075 NodeType::Addressable,
2076 vec!["/memory/3100".to_owned()],
2077 );
2078 config.max_pending_outbound_bytes_per_peer = 16;
2079
2080 let keys = KeyPair::Ed25519(Ed25519Signer::generate().unwrap());
2081 let mut registry = Registry::default();
2082 let metrics = crate::metrics::register(&mut registry);
2083 let mut worker: NetworkWorker<Dummy> = NetworkWorker::new(
2084 &keys,
2085 config,
2086 None,
2087 CancellationToken::new(),
2088 CancellationToken::new(),
2089 None,
2090 Some(metrics),
2091 )
2092 .expect("worker");
2093
2094 let peer = PeerId::random();
2095 worker.add_pending_outbound_message(
2096 peer,
2097 Bytes::from_static(b"aaaaaaaaaaaa"), );
2099 worker.add_pending_outbound_message(
2100 peer,
2101 Bytes::from_static(b"bbbbbbbbbbbb"), );
2103 worker.add_pending_outbound_message(
2104 peer,
2105 Bytes::from_static(b"cccc"), );
2107
2108 let queue = worker
2109 .pending_outbound_messages
2110 .get(&peer)
2111 .expect("queue exists");
2112 assert_eq!(queue.len(), 2);
2113 assert_eq!(queue.bytes_len(), 16);
2114
2115 let mut text = String::new();
2116 encode(&mut text, ®istry).expect("encode metrics");
2117
2118 assert_eq!(
2119 metric_value(
2120 &text,
2121 "network_messages_dropped_total{direction=\"outbound\",reason=\"queue_bytes_limit_per_peer\"}"
2122 ),
2123 1.0
2124 );
2125 assert_eq!(
2126 metric_value(
2127 &text,
2128 "network_messages_dropped_total{direction=\"outbound\",reason=\"queue_bytes_limit_global\"}"
2129 ),
2130 0.0
2131 );
2132 assert_eq!(metric_value(&text, "network_pending_outbound_bytes"), 16.0);
2133 assert!(
2134 metric_value(&text, "network_pending_message_age_seconds_count")
2135 >= 1.0
2136 );
2137 }
2138
2139 #[test]
2140 fn zero_pending_bytes_limits_disable_byte_drops() {
2141 let mut config = create_config(
2142 vec![],
2143 false,
2144 NodeType::Addressable,
2145 vec!["/memory/3101".to_owned()],
2146 );
2147 config.max_pending_outbound_bytes_per_peer = 0;
2148 config.max_pending_inbound_bytes_per_peer = 0;
2149 config.max_pending_outbound_bytes_total = 0;
2150 config.max_pending_inbound_bytes_total = 0;
2151
2152 let keys = KeyPair::Ed25519(Ed25519Signer::generate().unwrap());
2153 let mut registry = Registry::default();
2154 let metrics = crate::metrics::register(&mut registry);
2155 let mut worker: NetworkWorker<Dummy> = NetworkWorker::new(
2156 &keys,
2157 config,
2158 None,
2159 CancellationToken::new(),
2160 CancellationToken::new(),
2161 None,
2162 Some(metrics),
2163 )
2164 .expect("worker");
2165
2166 let peer = PeerId::random();
2167 for i in 0..3u8 {
2168 let payload = Bytes::from(vec![i + 1; 12]);
2169 worker.add_pending_outbound_message(peer, payload);
2170 }
2171 for i in 0..3u8 {
2172 let payload = Bytes::from(vec![i + 7; 12]);
2173 worker.add_pending_inbound_message(peer, payload);
2174 }
2175
2176 let outbound = worker
2177 .pending_outbound_messages
2178 .get(&peer)
2179 .expect("outbound queue exists");
2180 let inbound = worker
2181 .pending_inbound_messages
2182 .get(&peer)
2183 .expect("inbound queue exists");
2184 assert_eq!(outbound.len(), 3);
2185 assert_eq!(outbound.bytes_len(), 36);
2186 assert_eq!(inbound.len(), 3);
2187 assert_eq!(inbound.bytes_len(), 36);
2188
2189 let mut text = String::new();
2190 encode(&mut text, ®istry).expect("encode metrics");
2191 assert_eq!(
2192 metric_value(
2193 &text,
2194 "network_messages_dropped_total{direction=\"outbound\",reason=\"queue_bytes_limit_per_peer\"}"
2195 ),
2196 0.0
2197 );
2198 assert_eq!(
2199 metric_value(
2200 &text,
2201 "network_messages_dropped_total{direction=\"outbound\",reason=\"queue_bytes_limit_global\"}"
2202 ),
2203 0.0
2204 );
2205 assert_eq!(
2206 metric_value(
2207 &text,
2208 "network_messages_dropped_total{direction=\"inbound\",reason=\"queue_bytes_limit_per_peer\"}"
2209 ),
2210 0.0
2211 );
2212 assert_eq!(
2213 metric_value(
2214 &text,
2215 "network_messages_dropped_total{direction=\"inbound\",reason=\"queue_bytes_limit_global\"}"
2216 ),
2217 0.0
2218 );
2219 }
2220
2221 #[test]
2222 fn outbound_global_bytes_limit_applies_across_peers() {
2223 let mut config = create_config(
2224 vec![],
2225 false,
2226 NodeType::Addressable,
2227 vec!["/memory/3102".to_owned()],
2228 );
2229 config.max_pending_outbound_bytes_per_peer = 0;
2230 config.max_pending_outbound_bytes_total = 20;
2231
2232 let keys = KeyPair::Ed25519(Ed25519Signer::generate().unwrap());
2233 let mut registry = Registry::default();
2234 let metrics = crate::metrics::register(&mut registry);
2235 let mut worker: NetworkWorker<Dummy> = NetworkWorker::new(
2236 &keys,
2237 config,
2238 None,
2239 CancellationToken::new(),
2240 CancellationToken::new(),
2241 None,
2242 Some(metrics),
2243 )
2244 .expect("worker");
2245
2246 let peer_a = PeerId::random();
2247 let peer_b = PeerId::random();
2248
2249 worker.add_pending_outbound_message(
2250 peer_a,
2251 Bytes::from_static(b"aaaaaaaaaaaa"), );
2253 worker.add_pending_outbound_message(
2254 peer_b,
2255 Bytes::from_static(b"bbbbbbbbbbbb"), );
2257
2258 assert_eq!(worker.pending_outbound_bytes_len(), 12);
2259 assert_eq!(
2260 worker
2261 .pending_outbound_messages
2262 .get(&peer_a)
2263 .expect("peer_a queue")
2264 .len(),
2265 1
2266 );
2267 assert_eq!(
2268 worker
2269 .pending_outbound_messages
2270 .get(&peer_b)
2271 .map_or(0, PendingQueue::len),
2272 0
2273 );
2274
2275 let mut text = String::new();
2276 encode(&mut text, ®istry).expect("encode metrics");
2277 assert_eq!(
2278 metric_value(
2279 &text,
2280 "network_messages_dropped_total{direction=\"outbound\",reason=\"queue_bytes_limit_per_peer\"}"
2281 ),
2282 0.0
2283 );
2284 assert_eq!(
2285 metric_value(
2286 &text,
2287 "network_messages_dropped_total{direction=\"outbound\",reason=\"queue_bytes_limit_global\"}"
2288 ),
2289 1.0
2290 );
2291 }
2292
2293 #[test]
2294 fn dial_failures_include_phase_label() {
2295 let mut registry = Registry::default();
2296 let metrics = crate::metrics::register(&mut registry);
2297
2298 metrics.observe_dial_failure("bootstrap", "transport");
2299 metrics.observe_dial_failure("runtime", "denied");
2300
2301 let mut text = String::new();
2302 encode(&mut text, ®istry).expect("encode metrics");
2303
2304 assert_eq!(
2305 metric_value(
2306 &text,
2307 "network_dial_failures_total{phase=\"bootstrap\",kind=\"transport\"}"
2308 ),
2309 1.0
2310 );
2311 assert_eq!(
2312 metric_value(
2313 &text,
2314 "network_dial_failures_total{phase=\"runtime\",kind=\"denied\"}"
2315 ),
2316 1.0
2317 );
2318 }
2319
2320 #[test]
2321 fn bootstrap_duration_is_labeled_by_result() {
2322 let mut registry = Registry::default();
2323 let metrics = crate::metrics::register(&mut registry);
2324
2325 metrics.observe_bootstrap_duration_seconds("success", 0.5);
2326 metrics.observe_bootstrap_duration_seconds("failure", 1.25);
2327
2328 let mut text = String::new();
2329 encode(&mut text, ®istry).expect("encode metrics");
2330
2331 assert_eq!(
2332 metric_value(
2333 &text,
2334 "network_bootstrap_duration_seconds_count{result=\"success\"}"
2335 ),
2336 1.0
2337 );
2338 assert_eq!(
2339 metric_value(
2340 &text,
2341 "network_bootstrap_duration_seconds_count{result=\"failure\"}"
2342 ),
2343 1.0
2344 );
2345 }
2346
2347 #[test]
2348 fn network_state_is_one_hot() {
2349 let mut registry = Registry::default();
2350 let metrics = crate::metrics::register(&mut registry);
2351
2352 metrics.set_state_current(&NetworkState::Running);
2353
2354 let mut text = String::new();
2355 encode(&mut text, ®istry).expect("encode metrics");
2356
2357 assert_eq!(
2358 metric_value(&text, "network_state{state=\"running\"}"),
2359 1.0
2360 );
2361 assert_eq!(metric_value(&text, "network_state{state=\"start\"}"), 0.0);
2362 assert_eq!(metric_value(&text, "network_state{state=\"dial\"}"), 0.0);
2363 assert_eq!(
2364 metric_value(&text, "network_state{state=\"dialing\"}"),
2365 0.0
2366 );
2367 assert_eq!(
2368 metric_value(&text, "network_state{state=\"disconnected\"}"),
2369 0.0
2370 );
2371 }
2372
2373 #[test(tokio::test)]
2374 #[serial]
2375 async fn pending_inbound_messages_keep_arrival_order_after_identify() {
2376 let mut worker = build_worker(
2377 vec![],
2378 false,
2379 NodeType::Addressable,
2380 CancellationToken::new(),
2381 CancellationToken::new(),
2382 "/memory/3200".to_owned(),
2383 );
2384 let remote_keys = Libp2pKeypair::generate_ed25519();
2385 let remote_peer = remote_keys.public().to_peer_id();
2386
2387 worker.add_pending_inbound_message(
2388 remote_peer,
2389 Bytes::from_static(b"msg-2"),
2390 );
2391 worker.add_pending_inbound_message(
2392 remote_peer,
2393 Bytes::from_static(b"msg-1"),
2394 );
2395 worker.add_pending_inbound_message(
2396 remote_peer,
2397 Bytes::from_static(b"msg-3"),
2398 );
2399
2400 let (helper_sender, mut helper_rx) = mpsc::channel(8);
2401 worker.add_helper_sender(helper_sender);
2402
2403 worker
2404 .handle_event(build_identified_event(
2405 remote_peer,
2406 remote_keys.public(),
2407 ConnectionId::new_unchecked(11),
2408 ))
2409 .await;
2410
2411 let mut received = Vec::new();
2412 for _ in 0..3 {
2413 let command = tokio::time::timeout(
2414 Duration::from_millis(200),
2415 helper_rx.recv(),
2416 )
2417 .await
2418 .expect("helper receive timeout")
2419 .expect("helper channel closed");
2420 let CommandHelper::ReceivedMessage { message, .. } = command else {
2421 panic!("unexpected helper command")
2422 };
2423 received.push(message);
2424 }
2425
2426 assert_eq!(
2427 received,
2428 vec![
2429 Bytes::from_static(b"msg-2"),
2430 Bytes::from_static(b"msg-1"),
2431 Bytes::from_static(b"msg-3"),
2432 ]
2433 );
2434 assert!(!worker.pending_inbound_messages.contains_key(&remote_peer));
2435 }
2436
2437 #[test(tokio::test)]
2438 #[serial]
2439 async fn flapping_connection_retries_then_flushes_outbound_queue() {
2440 let mut worker = build_worker(
2441 vec![],
2442 false,
2443 NodeType::Addressable,
2444 CancellationToken::new(),
2445 CancellationToken::new(),
2446 "/memory/3201".to_owned(),
2447 );
2448 let remote_keys = Libp2pKeypair::generate_ed25519();
2449 let remote_peer = remote_keys.public().to_peer_id();
2450 let first_connection = ConnectionId::new_unchecked(21);
2451 let second_connection = ConnectionId::new_unchecked(22);
2452
2453 worker.add_pending_outbound_message(
2454 remote_peer,
2455 Bytes::from_static(b"needs-redelivery"),
2456 );
2457 worker
2458 .peer_action
2459 .insert(remote_peer, Action::Identified(first_connection));
2460 worker.peer_identify.insert(remote_peer);
2461
2462 worker
2463 .handle_event(SwarmEvent::ConnectionClosed {
2464 peer_id: remote_peer,
2465 connection_id: first_connection,
2466 endpoint: test_endpoint(),
2467 num_established: 0,
2468 cause: None,
2469 })
2470 .await;
2471
2472 assert!(worker.retry_by_peer.contains_key(&remote_peer));
2473 assert!(matches!(
2474 worker.retry_by_peer.get(&remote_peer).map(|s| s.kind),
2475 Some(RetryKind::Dial)
2476 ));
2477 assert!(
2478 worker
2479 .pending_outbound_messages
2480 .get(&remote_peer)
2481 .is_some_and(|q| !q.is_empty())
2482 );
2483
2484 worker
2485 .handle_event(build_identified_event(
2486 remote_peer,
2487 remote_keys.public(),
2488 second_connection,
2489 ))
2490 .await;
2491
2492 assert!(!worker.pending_outbound_messages.contains_key(&remote_peer));
2493 assert!(!worker.retry_by_peer.contains_key(&remote_peer));
2494 assert!(matches!(
2495 worker.peer_action.get(&remote_peer),
2496 Some(Action::Identified(id)) if *id == second_connection
2497 ));
2498 }
2499
2500 #[test(tokio::test)]
2501 #[serial]
2502 async fn bootstrap_identify_timeout_keeps_bootnode_until_close() {
2503 let remote_keys = Libp2pKeypair::generate_ed25519();
2504 let remote_peer = remote_keys.public().to_peer_id();
2505 let boot_node = RoutingNode {
2506 peer_id: remote_peer.to_string(),
2507 address: vec!["/memory/3300".to_owned()],
2508 };
2509
2510 let mut worker = build_worker(
2511 vec![boot_node],
2512 false,
2513 NodeType::Addressable,
2514 CancellationToken::new(),
2515 CancellationToken::new(),
2516 "/memory/3301".to_owned(),
2517 );
2518
2519 assert!(worker.boot_nodes.contains_key(&remote_peer));
2520 assert!(!worker.retry_boot_nodes.contains_key(&remote_peer));
2521
2522 worker
2523 .handle_connection_events(SwarmEvent::Behaviour(
2524 BehaviourEvent::IdentifyError {
2525 peer_id: remote_peer,
2526 error: swarm::StreamUpgradeError::Timeout,
2527 },
2528 ))
2529 .await;
2530
2531 assert!(worker.boot_nodes.contains_key(&remote_peer));
2532 assert!(!worker.retry_boot_nodes.contains_key(&remote_peer));
2533 }
2534
2535 #[test(tokio::test)]
2536 #[serial]
2537 async fn test_no_boot_nodes() {
2538 let boot_nodes = vec![];
2539
2540 let node_addr = "/memory/3000";
2542 let mut node = build_worker(
2543 boot_nodes.clone(),
2544 false,
2545 NodeType::Addressable,
2546 CancellationToken::new(),
2547 CancellationToken::new(),
2548 node_addr.to_owned(),
2549 );
2550 if let Err(e) = node.run_connection().await {
2551 assert_eq!(
2552 e.to_string(),
2553 "cannot connect to the ave network: no reachable bootstrap node"
2554 );
2555 };
2556
2557 assert_eq!(node.state, NetworkState::Disconnected);
2558 }
2559
2560 #[test(tokio::test)]
2561 #[serial]
2562 async fn test_fake_boot_node() {
2563 let mut boot_nodes = vec![];
2564
2565 let fake_boot_peer = PeerId::random();
2567 let fake_boot_addr = "/memory/3001";
2568 let fake_node = RoutingNode {
2569 peer_id: fake_boot_peer.to_string(),
2570 address: vec![fake_boot_addr.to_owned()],
2571 };
2572 boot_nodes.push(fake_node);
2573
2574 let node_addr = "/memory/3002";
2576 let mut node = build_worker(
2577 boot_nodes.clone(),
2578 false,
2579 NodeType::Addressable,
2580 CancellationToken::new(),
2581 CancellationToken::new(),
2582 node_addr.to_owned(),
2583 );
2584
2585 if let Err(e) = node.run_connection().await {
2586 assert_eq!(
2587 e.to_string(),
2588 "cannot connect to the ave network: no reachable bootstrap node"
2589 );
2590 };
2591
2592 assert_eq!(node.state, NetworkState::Disconnected);
2593 }
2594
2595 #[test(tokio::test)]
2596 #[serial]
2597 async fn test_connect() {
2598 let mut boot_nodes = vec![];
2599
2600 let boot_addr = "/memory/3003";
2602 let mut boot = build_worker(
2603 boot_nodes.clone(),
2604 false,
2605 NodeType::Bootstrap,
2606 CancellationToken::new(),
2607 CancellationToken::new(),
2608 boot_addr.to_owned(),
2609 );
2610
2611 let boot_node = RoutingNode {
2612 peer_id: boot.local_peer_id().to_string(),
2613 address: vec![boot_addr.to_owned()],
2614 };
2615
2616 boot_nodes.push(boot_node);
2617
2618 let node_addr = "/memory/3004";
2620 let mut node = build_worker(
2621 boot_nodes,
2622 false,
2623 NodeType::Ephemeral,
2624 CancellationToken::new(),
2625 CancellationToken::new(),
2626 node_addr.to_owned(),
2627 );
2628
2629 tokio::spawn(async move {
2631 boot.run_main().await;
2632 });
2633
2634 node.run_connection().await.unwrap();
2636 }
2637}