Skip to main content

ave_network/
worker.rs

1//! # Network worker.
2//!
3
4use crate::{
5    Command, CommandHelper, Config, Error, Event as NetworkEvent, MachineSpec,
6    Monitor, MonitorMessage, NodeType, ResolvedSpec,
7    behaviour::{Behaviour, Event as BehaviourEvent, ReqResMessage},
8    metrics::NetworkMetrics,
9    resolve_spec,
10    service::NetworkService,
11    transport::build_transport,
12    utils::{
13        Action, Due, IDENTIFY_PROTOCOL, LimitsConfig, MessagesHelper,
14        NetworkState, REQRES_PROTOCOL, RetryKind, RetryState, ScheduleType,
15        convert_addresses, convert_boot_nodes, peer_id_to_ed25519_pubkey_bytes,
16    },
17};
18
19use std::{
20    collections::{BinaryHeap, HashSet},
21    fmt::Debug,
22    num::{NonZeroU8, NonZeroUsize},
23    pin::Pin,
24    sync::Arc,
25    time::Duration,
26};
27
28use ave_actors::ActorRef;
29use ave_common::identity::KeyPair;
30
31use libp2p::{
32    Multiaddr, PeerId, StreamProtocol, Swarm, identify,
33    identity::{Keypair, ed25519},
34    request_response::{self, ResponseChannel},
35    swarm::{self, DialError, SwarmEvent, dial_opts::DialOpts},
36};
37
38use futures::StreamExt;
39use serde::Serialize;
40use tokio::{
41    sync::mpsc,
42    time::{Instant, Sleep, sleep, sleep_until},
43};
44use tokio_util::sync::CancellationToken;
45use tracing::{debug, error, info, trace, warn};
46
47use bytes::Bytes;
48use std::collections::{HashMap, VecDeque};
49
50const TARGET: &str = "ave::network::worker";
51
52/// Maximum number of outbound messages queued per peer while disconnected.
53/// When this limit is reached the oldest message is evicted to make room.
54const MAX_PENDING_MESSAGES_PER_PEER: usize = 100;
55
56/// Bounded queue of outbound messages for a single peer.
57///
58/// Keeps the 100 most recent messages; when full the oldest is evicted.
59#[derive(Default)]
60struct PendingQueue {
61    messages: VecDeque<PendingMessage>,
62    pending_bytes: usize,
63}
64
65struct PendingMessage {
66    payload: Bytes,
67    enqueued_at: Instant,
68}
69
70impl PendingQueue {
71    fn contains(&self, message: &Bytes) -> bool {
72        self.messages.iter().any(|x| x.payload == *message)
73    }
74
75    fn pop_front(&mut self) -> Option<PendingMessage> {
76        let popped = self.messages.pop_front()?;
77        self.pending_bytes =
78            self.pending_bytes.saturating_sub(popped.payload.len());
79        Some(popped)
80    }
81
82    fn push_back(&mut self, message: Bytes) {
83        self.pending_bytes += message.len();
84        self.messages.push_back(PendingMessage {
85            payload: message,
86            enqueued_at: Instant::now(),
87        });
88    }
89
90    fn drain(&mut self) -> impl Iterator<Item = PendingMessage> + '_ {
91        self.pending_bytes = 0;
92        self.messages.drain(..)
93    }
94
95    fn is_empty(&self) -> bool {
96        self.messages.is_empty()
97    }
98
99    fn len(&self) -> usize {
100        self.messages.len()
101    }
102
103    const fn bytes_len(&self) -> usize {
104        self.pending_bytes
105    }
106}
107
108/// Main network worker. Must be polled in order for the network to advance.
109///
110/// The worker is responsible for handling the network events and commands.
111///
112pub struct NetworkWorker<T>
113where
114    T: Debug + Serialize,
115{
116    /// Local Peer ID.
117    local_peer_id: PeerId,
118
119    /// Network service.
120    service: NetworkService,
121
122    /// The libp2p swarm.
123    swarm: Swarm<Behaviour>,
124
125    /// The network state.
126    state: NetworkState,
127
128    /// The command receiver.
129    command_receiver: mpsc::Receiver<Command>,
130
131    /// The command sender to Helper Intermediary.
132    helper_sender: Option<mpsc::Sender<CommandHelper<T>>>,
133
134    /// Monitor actor.
135    monitor: Option<ActorRef<Monitor>>,
136
137    /// The cancellation token.
138    graceful_token: CancellationToken,
139    crash_token: CancellationToken,
140
141    /// Node type.
142    node_type: NodeType,
143
144    /// List of boot noodes.
145    boot_nodes: HashMap<PeerId, Vec<Multiaddr>>,
146
147    /// nodes with which it has not been possible to establish a connection by keepAliveTimeout in pre-routing.
148    retry_boot_nodes: HashMap<PeerId, Vec<Multiaddr>>,
149
150    /// Pending outbound messages to the peer (bounded by count and bytes).
151    pending_outbound_messages: HashMap<PeerId, PendingQueue>,
152
153    pending_inbound_messages: HashMap<PeerId, PendingQueue>,
154
155    /// Ephemeral responses.
156    response_channels:
157        HashMap<PeerId, VecDeque<ResponseChannel<ReqResMessage>>>,
158
159    /// Successful dials
160    successful_dials: u64,
161
162    peer_identify: HashSet<PeerId>,
163
164    retry_by_peer: HashMap<PeerId, RetryState>,
165
166    retry_queue: BinaryHeap<Due>,
167
168    retry_timer: Option<Pin<Box<Sleep>>>,
169
170    peer_action: HashMap<PeerId, Action>,
171
172    max_app_message_bytes: usize,
173    max_pending_outbound_bytes_per_peer: usize,
174    max_pending_inbound_bytes_per_peer: usize,
175    max_pending_outbound_bytes_total: usize,
176    max_pending_inbound_bytes_total: usize,
177
178    metrics: Option<Arc<NetworkMetrics>>,
179}
180
181impl<T: Debug + Serialize> NetworkWorker<T> {
182    /// Create a new `NetworkWorker`.
183    pub fn new(
184        keys: &KeyPair,
185        config: Config,
186        monitor: Option<ActorRef<Monitor>>,
187        graceful_token: CancellationToken,
188        crash_token: CancellationToken,
189        machine_spec: Option<MachineSpec>,
190        metrics: Option<Arc<NetworkMetrics>>,
191    ) -> Result<Self, Error> {
192        // Create channels to communicate commands
193        info!(target: TARGET, "network initialising");
194        let (command_sender, command_receiver) = mpsc::channel(512);
195
196        let key = match keys {
197            KeyPair::Ed25519(ed25519_signer) => {
198                let sk_bytes = ed25519_signer
199                    .secret_key_bytes()
200                    .map_err(|e| Error::KeyExtraction(e.to_string()))?;
201
202                let sk = ed25519::SecretKey::try_from_bytes(sk_bytes)
203                    .map_err(|e| Error::KeyExtraction(e.to_string()))?;
204
205                let kp = ed25519::Keypair::from(sk);
206                Keypair::from(kp)
207            }
208        };
209
210        // Generate the `PeerId` from the public key.
211        let local_peer_id = key.public().to_peer_id();
212
213        let boot_nodes = convert_boot_nodes(&config.boot_nodes);
214
215        // Create the listen addressess.
216        let addresses = convert_addresses(&config.listen_addresses)?;
217
218        // Create the listen addressess.
219        let external_addresses = convert_addresses(&config.external_addresses)?;
220
221        let node_type = config.node_type.clone();
222
223        // Resolve machine sizing from the declared spec, or auto-detect from host.
224        let ResolvedSpec { ram_mb, cpu_cores } = resolve_spec(machine_spec);
225
226        let limits = LimitsConfig::build(ram_mb, cpu_cores);
227        let max_app_message_bytes = config.max_app_message_bytes;
228        let max_pending_outbound_bytes_per_peer =
229            config.max_pending_outbound_bytes_per_peer;
230        let max_pending_inbound_bytes_per_peer =
231            config.max_pending_inbound_bytes_per_peer;
232        let max_pending_outbound_bytes_total =
233            config.max_pending_outbound_bytes_total;
234        let max_pending_inbound_bytes_total =
235            config.max_pending_inbound_bytes_total;
236
237        // Build transport.
238        let transport = build_transport(&key, limits.clone())?;
239
240        let behaviour = Behaviour::new(
241            &key.public(),
242            config,
243            graceful_token.clone(),
244            crash_token.clone(),
245            limits,
246            metrics.clone(),
247        );
248
249        // Create the swarm.
250        let mut swarm = Swarm::new(
251            transport,
252            behaviour,
253            local_peer_id,
254            swarm::Config::with_tokio_executor()
255                .with_idle_connection_timeout(Duration::from_secs(90))
256                .with_max_negotiating_inbound_streams(32)
257                .with_notify_handler_buffer_size(
258                    NonZeroUsize::new(32).expect("32 > 0"),
259                )
260                .with_per_connection_event_buffer_size(16)
261                .with_dial_concurrency_factor(
262                    NonZeroU8::new(2).expect("2 > 0"),
263                ),
264        );
265
266        let service = NetworkService::new(command_sender);
267
268        if addresses.is_empty() {
269            // Listen on all tcp addresses.
270            swarm
271                .listen_on(
272                    "/ip4/0.0.0.0/tcp/0"
273                        .parse::<Multiaddr>()
274                        .map_err(|e| Error::InvalidAddress(e.to_string()))?,
275                )
276                .map_err(|e| Error::Listen(format!("0.0.0.0:0: {e}")))?;
277            info!(target: TARGET, "listening on all interfaces");
278        } else {
279            // Listen on the external addresses.
280            for addr in addresses.iter() {
281                info!(target: TARGET, addr = %addr, "listening on address");
282                swarm
283                    .listen_on(addr.clone())
284                    .map_err(|e| Error::Listen(format!("{addr}: {e}")))?;
285            }
286        }
287
288        if !external_addresses.is_empty() {
289            for addr in external_addresses.iter() {
290                debug!(target: TARGET, addr = %addr, "external address registered");
291                swarm.add_external_address(addr.clone());
292            }
293        }
294
295        info!(target: TARGET, peer_id = %local_peer_id, "local peer id");
296
297        let worker = Self {
298            local_peer_id,
299            service,
300            swarm,
301            state: NetworkState::Start,
302            command_receiver,
303            helper_sender: None,
304            monitor,
305            graceful_token,
306            crash_token,
307            node_type,
308            boot_nodes,
309            retry_boot_nodes: HashMap::new(),
310            pending_outbound_messages: HashMap::default(),
311            pending_inbound_messages: HashMap::default(),
312            response_channels: HashMap::default(),
313            successful_dials: 0,
314            peer_identify: HashSet::new(),
315            retry_by_peer: HashMap::new(),
316            retry_queue: BinaryHeap::new(),
317            retry_timer: None,
318            peer_action: HashMap::new(),
319            max_app_message_bytes,
320            max_pending_outbound_bytes_per_peer,
321            max_pending_inbound_bytes_per_peer,
322            max_pending_outbound_bytes_total,
323            max_pending_inbound_bytes_total,
324            metrics,
325        };
326
327        if let Some(metrics) = worker.metric_handle() {
328            metrics.set_state_current(&worker.state);
329        }
330        worker.refresh_runtime_metrics();
331
332        Ok(worker)
333    }
334
335    fn metric_handle(&self) -> Option<&NetworkMetrics> {
336        self.metrics.as_deref()
337    }
338
339    fn pending_outbound_messages_len(&self) -> usize {
340        self.pending_outbound_messages
341            .values()
342            .map(PendingQueue::len)
343            .sum()
344    }
345
346    fn pending_outbound_bytes_len(&self) -> usize {
347        self.pending_outbound_messages
348            .values()
349            .map(PendingQueue::bytes_len)
350            .sum()
351    }
352
353    fn pending_inbound_messages_len(&self) -> usize {
354        self.pending_inbound_messages
355            .values()
356            .map(PendingQueue::len)
357            .sum()
358    }
359
360    fn pending_inbound_bytes_len(&self) -> usize {
361        self.pending_inbound_messages
362            .values()
363            .map(PendingQueue::bytes_len)
364            .sum()
365    }
366
367    fn pending_response_channels_len(&self) -> usize {
368        self.response_channels.values().map(VecDeque::len).sum()
369    }
370
371    fn refresh_runtime_metrics(&self) {
372        let Some(metrics) = self.metric_handle() else {
373            return;
374        };
375
376        metrics.set_retry_queue_len(self.retry_queue.len() as i64);
377        metrics.set_pending_outbound_peers(
378            self.pending_outbound_messages.len() as i64,
379        );
380        metrics.set_pending_outbound_messages(
381            self.pending_outbound_messages_len() as i64,
382        );
383        metrics.set_pending_outbound_bytes(
384            self.pending_outbound_bytes_len() as i64
385        );
386        metrics.set_pending_inbound_peers(
387            self.pending_inbound_messages.len() as i64
388        );
389        metrics.set_pending_inbound_messages(
390            self.pending_inbound_messages_len() as i64,
391        );
392        metrics
393            .set_pending_inbound_bytes(self.pending_inbound_bytes_len() as i64);
394        metrics.set_identified_peers(self.peer_identify.len() as i64);
395        metrics.set_response_channels_pending(
396            self.pending_response_channels_len() as i64,
397        );
398    }
399
400    fn observe_pending_message_age(&self, enqueued_at: Instant) {
401        if let Some(metrics) = self.metric_handle() {
402            metrics.observe_pending_message_age_seconds(
403                enqueued_at.elapsed().as_secs_f64(),
404            );
405        }
406    }
407
408    fn drop_pending_outbound_messages(&mut self, peer_id: &PeerId) -> usize {
409        let Some(mut queue) = self.pending_outbound_messages.remove(peer_id)
410        else {
411            return 0;
412        };
413
414        let mut dropped = 0usize;
415        for message in queue.drain() {
416            dropped += 1;
417            self.observe_pending_message_age(message.enqueued_at);
418        }
419        dropped
420    }
421
422    fn drop_pending_inbound_messages(&mut self, peer_id: &PeerId) {
423        if let Some(mut queue) = self.pending_inbound_messages.remove(peer_id) {
424            for message in queue.drain() {
425                self.observe_pending_message_age(message.enqueued_at);
426            }
427        }
428    }
429
430    fn observe_identify_error(
431        &self,
432        error: &swarm::StreamUpgradeError<identify::UpgradeError>,
433    ) {
434        let kind = match error {
435            swarm::StreamUpgradeError::Timeout => "timeout",
436            swarm::StreamUpgradeError::Io(_) => "io",
437            swarm::StreamUpgradeError::NegotiationFailed => "negotiation",
438            swarm::StreamUpgradeError::Apply(_) => "other",
439        };
440
441        if let Some(metrics) = self.metric_handle() {
442            metrics.observe_identify_error(kind);
443        }
444    }
445
446    fn schedule_retry(&mut self, peer: PeerId, schedule_type: ScheduleType) {
447        if self.peer_action.contains_key(&peer) {
448            return;
449        }
450
451        let (kind, addrs) = match schedule_type {
452            ScheduleType::Discover => (RetryKind::Discover, vec![]),
453            ScheduleType::Dial(multiaddrs) => (RetryKind::Dial, multiaddrs),
454        };
455
456        let now = Instant::now();
457        let base = Duration::from_millis(250);
458        let cap = Duration::from_secs(30);
459
460        let entry = self.retry_by_peer.entry(peer).or_insert(RetryState {
461            attempts: 0,
462            when: now,
463            kind,
464            addrs: vec![],
465        });
466
467        let when = if matches!(
468            (entry.kind, kind),
469            (RetryKind::Discover, RetryKind::Dial)
470        ) {
471            now
472        } else {
473            if entry.attempts >= 8 {
474                self.clear_pending_messages(&peer);
475                return;
476            }
477
478            // Exponential backoff: 250ms * 2^attempt, capped at 30s
479            // attempts 0-7 → ~250ms, 500ms, 1s, 2s, 4s, 8s, 16s, 30s
480            let exp = 1u32 << entry.attempts.min(7);
481            let mut delay = base * exp;
482            if delay > cap {
483                delay = cap;
484            }
485
486            // jitter 80–120% determinista por peer (sin RNG externo)
487            // Fold all bytes to avoid the fixed multihash prefix dominating.
488            let hash = peer
489                .to_bytes()
490                .iter()
491                .fold(0u32, |acc, &b| acc.wrapping_add(b as u32));
492            let j = 80 + (hash % 41);
493            delay = delay * j / 100;
494
495            now + delay
496        };
497
498        entry.when = when;
499        entry.kind = kind;
500        entry.addrs = addrs;
501
502        self.peer_action.insert(peer, Action::from(kind));
503
504        self.retry_queue.push(Due(peer, entry.when));
505        self.arm_retry_timer();
506        self.refresh_runtime_metrics();
507    }
508
509    fn arm_retry_timer(&mut self) {
510        if let Some(next) = self.retry_queue.peek() {
511            match &mut self.retry_timer {
512                Some(timer) => timer.as_mut().reset(next.1),
513                None => self.retry_timer = Some(Box::pin(sleep_until(next.1))),
514            }
515        }
516    }
517
518    fn drain_due_retries(
519        &mut self,
520    ) -> Vec<(PeerId, RetryKind, Vec<Multiaddr>)> {
521        let now = Instant::now();
522        let mut out = Vec::new();
523        while let Some(Due(peer, when)) = self.retry_queue.peek().cloned() {
524            if when > now {
525                break;
526            }
527
528            self.retry_queue.pop();
529            // Match the exact instant to reject stale Due entries. Multiple Due
530            // entries for the same peer can accumulate (e.g. Discover → Dial
531            // transition pushes a second entry without removing the first).
532            // Comparing `when` (from the popped Due) against `retry_by_peer[peer].when`
533            // ensures only the current scheduling cycle fires.
534            if let Some(retry) = self.retry_by_peer.get(&peer).cloned()
535                && retry.when == when
536            {
537                self.retry_by_peer
538                    .entry(peer)
539                    .and_modify(|x| x.attempts += 1);
540                out.push((peer, retry.kind, retry.addrs));
541            }
542        }
543
544        if self.retry_queue.is_empty() {
545            self.retry_timer = None;
546        } else {
547            self.arm_retry_timer();
548        }
549        self.refresh_runtime_metrics();
550        out
551    }
552
553    /// Add sender helper
554    pub fn add_helper_sender(
555        &mut self,
556        helper_sender: mpsc::Sender<CommandHelper<T>>,
557    ) {
558        self.helper_sender = Some(helper_sender);
559    }
560
561    /// Get the local peer ID.
562    pub const fn local_peer_id(&self) -> PeerId {
563        self.local_peer_id
564    }
565
566    /// Send message to a peer.
567    fn send_message(
568        &mut self,
569        peer: PeerId,
570        message: Bytes,
571    ) -> Result<(), Error> {
572        if message.len() > self.max_app_message_bytes {
573            warn!(
574                target: TARGET,
575                peer_id = %peer,
576                size = message.len(),
577                max = self.max_app_message_bytes,
578                "outbound payload rejected: message too large",
579            );
580            if let Some(metrics) = self.metric_handle() {
581                metrics.inc_oversized_outbound_drop();
582            }
583            self.refresh_runtime_metrics();
584            return Err(Error::MessageTooLarge {
585                size: message.len(),
586                max: self.max_app_message_bytes,
587            });
588        }
589
590        if let Some(mut responses) = self.response_channels.remove(&peer) {
591            while let Some(response_channel) = responses.pop_front() {
592                match self
593                    .swarm
594                    .behaviour_mut()
595                    .send_response(response_channel, message.clone())
596                {
597                    Ok(()) => {
598                        if !responses.is_empty() {
599                            self.response_channels.insert(peer, responses);
600                        }
601                        self.refresh_runtime_metrics();
602                        return Ok(());
603                    }
604                    Err(e) => {
605                        debug!(target: TARGET, peer_id = %peer, error = %e, "failed to send response: channel may already be consumed");
606                    }
607                }
608            }
609        }
610
611        self.add_pending_outbound_message(peer, message);
612
613        if self.swarm.behaviour_mut().is_known_peer(&peer) {
614            if let Some(Action::Identified(..)) = self.peer_action.get(&peer) {
615                self.send_pending_outbound_messages(peer);
616            } else {
617                self.schedule_retry(peer, ScheduleType::Dial(vec![]));
618            }
619        } else {
620            self.schedule_retry(peer, ScheduleType::Discover);
621        }
622
623        Ok(())
624    }
625
626    /// Add pending message to peer.
627    ///
628    /// If count/bytes limits are reached, oldest messages are evicted first.
629    fn add_pending_outbound_message(&mut self, peer: PeerId, message: Bytes) {
630        let message_len = message.len();
631        let mut dropped_count = 0u64;
632        let mut dropped_bytes_limit_peer = 0u64;
633        let mut dropped_bytes_limit_global = 0u64;
634        let mut dropped_messages = Vec::new();
635        let mut duplicate = false;
636        let mut total_pending_bytes = self.pending_outbound_bytes_len();
637        let per_peer_limit = self.max_pending_outbound_bytes_per_peer;
638        let global_limit = self.max_pending_outbound_bytes_total;
639
640        {
641            let queue = self.pending_outbound_messages.entry(peer).or_default();
642            if queue.contains(&message) {
643                duplicate = true;
644            } else {
645                while queue.len() >= MAX_PENDING_MESSAGES_PER_PEER {
646                    if let Some(evicted) = queue.pop_front() {
647                        dropped_count += 1;
648                        total_pending_bytes = total_pending_bytes
649                            .saturating_sub(evicted.payload.len());
650                        dropped_messages.push(evicted);
651                    } else {
652                        break;
653                    }
654                }
655
656                if per_peer_limit > 0 {
657                    while queue.bytes_len() + message_len > per_peer_limit {
658                        if let Some(evicted) = queue.pop_front() {
659                            dropped_bytes_limit_peer += 1;
660                            total_pending_bytes = total_pending_bytes
661                                .saturating_sub(evicted.payload.len());
662                            dropped_messages.push(evicted);
663                        } else {
664                            break;
665                        }
666                    }
667                }
668
669                if per_peer_limit > 0
670                    && queue.bytes_len() + message_len > per_peer_limit
671                {
672                    dropped_bytes_limit_peer += 1;
673                } else if global_limit > 0
674                    && total_pending_bytes.saturating_add(message_len)
675                        > global_limit
676                {
677                    dropped_bytes_limit_global += 1;
678                } else {
679                    queue.push_back(message);
680                }
681            }
682        }
683
684        if duplicate {
685            self.refresh_runtime_metrics();
686            return;
687        }
688
689        for evicted in dropped_messages {
690            self.observe_pending_message_age(evicted.enqueued_at);
691        }
692
693        if dropped_count > 0 {
694            warn!(
695                target: TARGET,
696                peer_id = %peer,
697                dropped = dropped_count,
698                max_messages = MAX_PENDING_MESSAGES_PER_PEER,
699                "outbound queue count limit reached; oldest messages evicted",
700            );
701        }
702
703        if dropped_bytes_limit_peer > 0 {
704            warn!(
705                target: TARGET,
706                peer_id = %peer,
707                dropped = dropped_bytes_limit_peer,
708                message_bytes = message_len,
709                max_queue_bytes = per_peer_limit,
710                "outbound queue bytes limit reached; messages evicted/dropped",
711            );
712        }
713
714        if dropped_bytes_limit_global > 0 {
715            warn!(
716                target: TARGET,
717                peer_id = %peer,
718                dropped = dropped_bytes_limit_global,
719                message_bytes = message_len,
720                max_queue_bytes_total = global_limit,
721                "outbound global queue bytes limit reached; messages dropped",
722            );
723        }
724
725        if let Some(metrics) = self.metric_handle() {
726            metrics.inc_outbound_queue_drop_by(dropped_count);
727            metrics.inc_outbound_queue_bytes_drop_per_peer_by(
728                dropped_bytes_limit_peer,
729            );
730            metrics.inc_outbound_queue_bytes_drop_global_by(
731                dropped_bytes_limit_global,
732            );
733        }
734
735        self.refresh_runtime_metrics();
736    }
737
738    fn add_pending_inbound_message(&mut self, peer: PeerId, message: Bytes) {
739        let message_len = message.len();
740        let mut dropped_count = 0u64;
741        let mut dropped_bytes_limit_peer = 0u64;
742        let mut dropped_bytes_limit_global = 0u64;
743        let mut dropped_messages = Vec::new();
744        let mut duplicate = false;
745        let mut total_pending_bytes = self.pending_inbound_bytes_len();
746        let per_peer_limit = self.max_pending_inbound_bytes_per_peer;
747        let global_limit = self.max_pending_inbound_bytes_total;
748
749        {
750            let queue = self.pending_inbound_messages.entry(peer).or_default();
751            if queue.contains(&message) {
752                duplicate = true;
753            } else {
754                while queue.len() >= MAX_PENDING_MESSAGES_PER_PEER {
755                    if let Some(evicted) = queue.pop_front() {
756                        dropped_count += 1;
757                        total_pending_bytes = total_pending_bytes
758                            .saturating_sub(evicted.payload.len());
759                        dropped_messages.push(evicted);
760                    } else {
761                        break;
762                    }
763                }
764
765                if per_peer_limit > 0 {
766                    while queue.bytes_len() + message_len > per_peer_limit {
767                        if let Some(evicted) = queue.pop_front() {
768                            dropped_bytes_limit_peer += 1;
769                            total_pending_bytes = total_pending_bytes
770                                .saturating_sub(evicted.payload.len());
771                            dropped_messages.push(evicted);
772                        } else {
773                            break;
774                        }
775                    }
776                }
777
778                if per_peer_limit > 0
779                    && queue.bytes_len() + message_len > per_peer_limit
780                {
781                    dropped_bytes_limit_peer += 1;
782                } else if global_limit > 0
783                    && total_pending_bytes.saturating_add(message_len)
784                        > global_limit
785                {
786                    dropped_bytes_limit_global += 1;
787                } else {
788                    queue.push_back(message);
789                }
790            }
791        }
792
793        if duplicate {
794            self.refresh_runtime_metrics();
795            return;
796        }
797
798        for evicted in dropped_messages {
799            self.observe_pending_message_age(evicted.enqueued_at);
800        }
801
802        if dropped_count > 0 {
803            warn!(
804                target: TARGET,
805                peer_id = %peer,
806                dropped = dropped_count,
807                max_messages = MAX_PENDING_MESSAGES_PER_PEER,
808                "inbound queue count limit reached; oldest messages evicted",
809            );
810        }
811
812        if dropped_bytes_limit_peer > 0 {
813            warn!(
814                target: TARGET,
815                peer_id = %peer,
816                dropped = dropped_bytes_limit_peer,
817                message_bytes = message_len,
818                max_queue_bytes = per_peer_limit,
819                "inbound queue bytes limit reached; messages evicted/dropped",
820            );
821        }
822
823        if dropped_bytes_limit_global > 0 {
824            warn!(
825                target: TARGET,
826                peer_id = %peer,
827                dropped = dropped_bytes_limit_global,
828                message_bytes = message_len,
829                max_queue_bytes_total = global_limit,
830                "inbound global queue bytes limit reached; messages dropped",
831            );
832        }
833
834        if let Some(metrics) = self.metric_handle() {
835            metrics.inc_inbound_queue_drop_by(dropped_count);
836            metrics.inc_inbound_queue_bytes_drop_per_peer_by(
837                dropped_bytes_limit_peer,
838            );
839            metrics.inc_inbound_queue_bytes_drop_global_by(
840                dropped_bytes_limit_global,
841            );
842        }
843
844        self.refresh_runtime_metrics();
845    }
846
847    /// Add ephemeral response.
848    fn add_ephemeral_response(
849        &mut self,
850        peer: PeerId,
851        response_channel: ResponseChannel<ReqResMessage>,
852    ) {
853        self.response_channels
854            .entry(peer)
855            .or_default()
856            .push_back(response_channel);
857        self.refresh_runtime_metrics();
858    }
859
860    /// Send pending messages to peer.
861    fn send_pending_outbound_messages(&mut self, peer: PeerId) {
862        if let Some(mut queue) = self.pending_outbound_messages.remove(&peer) {
863            for message in queue.drain() {
864                self.observe_pending_message_age(message.enqueued_at);
865                self.swarm
866                    .behaviour_mut()
867                    .send_message(&peer, message.payload);
868            }
869        }
870
871        self.retry_by_peer.remove(&peer);
872        self.refresh_runtime_metrics();
873    }
874
875    /// Get the network service.
876    pub fn service(&self) -> NetworkService {
877        self.service.clone()
878    }
879
880    /// Change the network state.
881    async fn change_state(&mut self, state: NetworkState) {
882        trace!(target: TARGET, state = ?state, "state changed");
883        self.state = state.clone();
884        if let Some(metrics) = self.metric_handle() {
885            metrics.observe_state_transition(&state);
886        }
887        self.send_event(NetworkEvent::StateChanged(state)).await;
888    }
889
890    /// Send event
891    #[allow(clippy::needless_pass_by_ref_mut)]
892    async fn send_event(&mut self, event: NetworkEvent) {
893        if let Some(monitor) = self.monitor.clone()
894            && let Err(e) = monitor.tell(MonitorMessage::Network(event)).await
895        {
896            error!(target: TARGET, error = %e, "failed to forward event to monitor");
897            self.crash_token.cancel();
898        }
899    }
900
901    /// Run the network worker.
902    pub async fn run(&mut self) {
903        let bootstrap_start = Instant::now();
904
905        // Run connection to bootstrap node.
906        if let Err(error) = self.run_connection().await {
907            if let Some(metrics) = self.metric_handle() {
908                metrics.observe_bootstrap_duration_seconds(
909                    bootstrap_start.elapsed().as_secs_f64(),
910                );
911            }
912            error!(target: TARGET, error = %error, "bootstrap connection failed");
913            self.send_event(NetworkEvent::Error(error)).await;
914            // Irrecoverable error. Cancel the node.
915            self.crash_token.cancel();
916            return;
917        }
918        if let Some(metrics) = self.metric_handle() {
919            metrics.observe_bootstrap_duration_seconds(
920                bootstrap_start.elapsed().as_secs_f64(),
921            );
922        }
923
924        if self.state != NetworkState::Running {
925            self.change_state(NetworkState::Running).await;
926        }
927
928        // Finish pre routing state, activating random walk (if node is a bootstrap).
929        self.swarm.behaviour_mut().finish_prerouting_state();
930        // Run main loop.
931        self.run_main().await;
932    }
933
934    /// Run connection to bootstrap node.
935    pub async fn run_connection(&mut self) -> Result<(), Error> {
936        // If is the first node of ave network.
937        if self.node_type == NodeType::Bootstrap && self.boot_nodes.is_empty() {
938            self.change_state(NetworkState::Running).await;
939            Ok(())
940        } else {
941            self.change_state(NetworkState::Dial).await;
942
943            let mut retrys: u8 = 0;
944
945            // Per-round deadline: if any boot node's TCP connects but Identify never
946            // completes (NAT half-open, overloaded peer, protocol stall…), this timer
947            // moves the remaining nodes to retry_boot_nodes so the retry logic handles
948            // them instead of waiting up to idle_connection_timeout (90 s) per round.
949            let dialing_round_timeout = sleep(Duration::from_secs(0));
950            tokio::pin!(dialing_round_timeout);
951            let mut dialing_timeout_active = false;
952
953            loop {
954                match self.state {
955                    NetworkState::Dial => {
956                        // Dial to boot node.
957                        if self.boot_nodes.is_empty() {
958                            error!(target: TARGET, "no bootstrap nodes available");
959                            self.send_event(NetworkEvent::Error(
960                                Error::NoBootstrapNode,
961                            ))
962                            .await;
963                            self.change_state(NetworkState::Disconnected).await;
964                        } else {
965                            let copy_boot_nodes = self.boot_nodes.clone();
966
967                            for (peer, addresses) in copy_boot_nodes {
968                                if let Some(metrics) = self.metric_handle() {
969                                    metrics.inc_dial_attempt_bootstrap();
970                                }
971                                if let Err(e) = self.swarm.dial(
972                                    DialOpts::peer_id(peer)
973                                        .addresses(addresses.clone())
974                                        .build(),
975                                ) {
976                                    let (add_to_retry, new_addresses) = self
977                                        .handle_dial_error(e, &peer, true)
978                                        .unwrap_or((false, vec![]));
979                                    self.boot_nodes.remove(&peer);
980                                    if add_to_retry {
981                                        if new_addresses.is_empty() {
982                                            self.retry_boot_nodes
983                                                .insert(peer, addresses);
984                                        } else {
985                                            self.retry_boot_nodes
986                                                .insert(peer, new_addresses);
987                                        }
988                                    }
989                                }
990                            }
991                            if !self.boot_nodes.is_empty() {
992                                self.change_state(NetworkState::Dialing).await;
993                                dialing_round_timeout.as_mut().reset(
994                                    Instant::now() + Duration::from_secs(15),
995                                );
996                                dialing_timeout_active = true;
997                            } else {
998                                warn!(target: TARGET, "all bootstrap dials failed");
999                                self.change_state(NetworkState::Disconnected)
1000                                    .await;
1001                            }
1002                        }
1003                    }
1004                    NetworkState::Dialing => {
1005                        // No more bootnodes to send dial, none was successful nut one or more Dial fail by keepalivetimeout
1006                        if self.boot_nodes.is_empty()
1007                            && self.successful_dials == 0
1008                            && !self.retry_boot_nodes.is_empty()
1009                            && retrys < 3
1010                        {
1011                            retrys += 1;
1012                            let wait = Duration::from_secs(1u64 << retrys); // retrys=1→2s, retrys=2→4s
1013                            debug!(target: TARGET, attempt = retrys, wait_secs = wait.as_secs(), "retrying bootstrap dials");
1014
1015                            let backoff = sleep(wait);
1016                            tokio::pin!(backoff);
1017                            loop {
1018                                tokio::select! {
1019                                    _ = &mut backoff => break,
1020                                    event = self.swarm.select_next_some() => {
1021                                        self.handle_connection_events(event).await;
1022                                    }
1023                                    _ = self.graceful_token.cancelled() => {
1024                                        return Err(Error::Cancelled);
1025                                    }
1026                                    _ = self.crash_token.cancelled() => {
1027                                        return Err(Error::Cancelled);
1028                                    }
1029                                }
1030                            }
1031
1032                            dialing_timeout_active = false;
1033                            self.boot_nodes.clone_from(&self.retry_boot_nodes);
1034                            self.retry_boot_nodes.clear();
1035                            self.change_state(NetworkState::Dial).await;
1036                        }
1037                        // No more bootnodes to send dial and none was successful
1038                        else if self.boot_nodes.is_empty()
1039                            && self.successful_dials == 0
1040                        {
1041                            self.change_state(NetworkState::Disconnected).await;
1042                        // No more bootnodes to send dial and one or more was successful
1043                        } else if self.boot_nodes.is_empty() {
1044                            return Ok(());
1045                        }
1046                    }
1047                    NetworkState::Running => {
1048                        return Ok(());
1049                    }
1050                    NetworkState::Disconnected => {
1051                        return Err(Error::NoBootstrapNode);
1052                    }
1053                    _ => {}
1054                }
1055                if self.state != NetworkState::Disconnected {
1056                    tokio::select! {
1057                        event = self.swarm.select_next_some() => {
1058                            self.handle_connection_events(event).await;
1059                        }
1060                                    _ = self.graceful_token.cancelled() => {
1061                                        return Err(Error::Cancelled);
1062                                    }
1063                                    _ = self.crash_token.cancelled() => {
1064                                        return Err(Error::Cancelled);
1065                                    }
1066                        _ = &mut dialing_round_timeout, if dialing_timeout_active => {
1067                            warn!(
1068                                target: TARGET,
1069                                remaining = self.boot_nodes.len(),
1070                                "bootstrap round timed out waiting for Identify; \
1071                                 moving remaining peers to retry queue"
1072                            );
1073                            for (peer, addrs) in self.boot_nodes.drain() {
1074                                self.retry_boot_nodes.insert(peer, addrs);
1075                            }
1076                            dialing_timeout_active = false;
1077                        }
1078                    }
1079                }
1080            }
1081        }
1082    }
1083
1084    fn collect_retryable_transport_addresses(
1085        &self,
1086        items: Vec<(Multiaddr, libp2p::TransportError<std::io::Error>)>,
1087        trace_unreachable: bool,
1088    ) -> Vec<Multiaddr> {
1089        let mut new_addresses = vec![];
1090        for (address, error) in items {
1091            if trace_unreachable {
1092                trace!(target: TARGET, addr = %address, err = ?error, "address unreachable");
1093            }
1094
1095            if let libp2p::TransportError::Other(e) = error {
1096                match e.kind() {
1097                    std::io::ErrorKind::ConnectionRefused
1098                    | std::io::ErrorKind::TimedOut
1099                    | std::io::ErrorKind::ConnectionAborted
1100                    | std::io::ErrorKind::NotConnected
1101                    | std::io::ErrorKind::BrokenPipe
1102                    | std::io::ErrorKind::Interrupted
1103                    | std::io::ErrorKind::HostUnreachable
1104                    | std::io::ErrorKind::NetworkUnreachable => {
1105                        new_addresses.push(address);
1106                    }
1107                    _ => {}
1108                }
1109            };
1110        }
1111
1112        new_addresses
1113    }
1114
1115    /// Classify a dial error and return retry decision.
1116    ///
1117    /// - `bootstrap_flow = true` keeps bootstrap-specific behaviour/logs.
1118    /// - `bootstrap_flow = false` keeps runtime-specific behaviour/logs/cleanup.
1119    fn handle_dial_error(
1120        &mut self,
1121        e: DialError,
1122        peer_id: &PeerId,
1123        bootstrap_flow: bool,
1124    ) -> Option<(bool, Vec<Multiaddr>)> {
1125        match e {
1126            DialError::LocalPeerId { .. } => {
1127                if let Some(metrics) = self.metric_handle() {
1128                    metrics.observe_dial_failure("local_peer_id");
1129                }
1130                if bootstrap_flow {
1131                    warn!(target: TARGET, peer_id = %peer_id, "dial rejected: connected peer-id matches local peer");
1132                    return Some((false, vec![]));
1133                }
1134
1135                self.retry_by_peer.remove(peer_id);
1136                self.clear_pending_messages(peer_id);
1137                self.swarm
1138                    .behaviour_mut()
1139                    .clean_hard_peer_to_remove(peer_id);
1140                None
1141            }
1142            DialError::NoAddresses => {
1143                if let Some(metrics) = self.metric_handle() {
1144                    metrics.observe_dial_failure("no_addresses");
1145                }
1146                if bootstrap_flow {
1147                    debug!(target: TARGET, peer_id = %peer_id, "dial skipped: no addresses");
1148                }
1149                Some((false, vec![]))
1150            }
1151            DialError::DialPeerConditionFalse(_) => {
1152                if let Some(metrics) = self.metric_handle() {
1153                    metrics.observe_dial_failure("peer_condition");
1154                }
1155                if bootstrap_flow {
1156                    debug!(target: TARGET, peer_id = %peer_id, "dial skipped: peer condition not met");
1157                    return Some((false, vec![]));
1158                }
1159
1160                None
1161            }
1162            DialError::Denied { cause } => {
1163                if let Some(metrics) = self.metric_handle() {
1164                    metrics.observe_dial_failure("denied");
1165                }
1166                if bootstrap_flow {
1167                    debug!(target: TARGET, peer_id = %peer_id, cause = %cause, "dial denied by behaviour");
1168                }
1169                Some((false, vec![]))
1170            }
1171            DialError::Aborted => {
1172                if let Some(metrics) = self.metric_handle() {
1173                    metrics.observe_dial_failure("aborted");
1174                }
1175                if bootstrap_flow {
1176                    debug!(target: TARGET, peer_id = %peer_id, "dial aborted, will retry");
1177                }
1178                Some((true, vec![]))
1179            }
1180            DialError::WrongPeerId { obtained, .. } => {
1181                if let Some(metrics) = self.metric_handle() {
1182                    metrics.observe_dial_failure("wrong_peer_id");
1183                }
1184                if bootstrap_flow {
1185                    warn!(target: TARGET, expected = %peer_id, obtained = %obtained, "dial failed: peer identity mismatch");
1186                    return Some((false, vec![]));
1187                }
1188
1189                self.retry_by_peer.remove(peer_id);
1190                self.clear_pending_messages(peer_id);
1191                self.swarm
1192                    .behaviour_mut()
1193                    .clean_hard_peer_to_remove(peer_id);
1194                None
1195            }
1196            DialError::Transport(items) => {
1197                if let Some(metrics) = self.metric_handle() {
1198                    metrics.observe_dial_failure("transport");
1199                }
1200                if bootstrap_flow {
1201                    debug!(target: TARGET, peer_id = %peer_id, "transport dial failed");
1202                }
1203
1204                let new_addresses = self.collect_retryable_transport_addresses(
1205                    items,
1206                    bootstrap_flow,
1207                );
1208                if !new_addresses.is_empty() {
1209                    Some((true, new_addresses))
1210                } else {
1211                    Some((false, vec![]))
1212                }
1213            }
1214        }
1215    }
1216
1217    /// Handle connection events.
1218    async fn handle_connection_events(
1219        &mut self,
1220        event: SwarmEvent<BehaviourEvent>,
1221    ) {
1222        match event {
1223            SwarmEvent::ConnectionClosed { peer_id, .. } => {
1224                self.boot_nodes.remove(&peer_id);
1225            }
1226            SwarmEvent::OutgoingConnectionError {
1227                peer_id: Some(peer_id),
1228                error,
1229                ..
1230            } => {
1231                let (add_to_retry, new_addresses) = self
1232                    .handle_dial_error(error, &peer_id, true)
1233                    .unwrap_or((false, vec![]));
1234
1235                if let Some(addresses) = self.boot_nodes.remove(&peer_id)
1236                    && add_to_retry
1237                {
1238                    if new_addresses.is_empty() {
1239                        self.retry_boot_nodes.insert(peer_id, addresses);
1240                    } else {
1241                        self.retry_boot_nodes.insert(peer_id, new_addresses);
1242                    }
1243                }
1244            }
1245            SwarmEvent::Behaviour(BehaviourEvent::Identified {
1246                peer_id,
1247                info,
1248                connection_id,
1249            }) => {
1250                if !self
1251                    .check_protocols(&info.protocol_version, &info.protocols)
1252                {
1253                    warn!(target: TARGET, peer_id = %peer_id, protocol_version = %info.protocol_version, "peer uses incompatible protocols; closing connection");
1254
1255                    self.swarm
1256                        .behaviour_mut()
1257                        .close_connections(&peer_id, Some(connection_id));
1258                } else {
1259                    self.peer_action
1260                        .insert(peer_id, Action::Identified(connection_id));
1261
1262                    let mut any_address_is_valid = false;
1263                    for addr in info.listen_addrs {
1264                        if self
1265                            .swarm
1266                            .behaviour_mut()
1267                            .add_self_reported_address(&peer_id, &addr)
1268                        {
1269                            any_address_is_valid = true;
1270                        }
1271                    }
1272
1273                    if any_address_is_valid {
1274                        if self.boot_nodes.remove(&peer_id).is_some() {
1275                            self.successful_dials += 1;
1276                        }
1277                        self.peer_identify.insert(peer_id);
1278                    } else {
1279                        warn!(target: TARGET, peer_id = %peer_id, "bootstrap peer has no valid addresses");
1280
1281                        self.swarm
1282                            .behaviour_mut()
1283                            .close_connections(&peer_id, Some(connection_id));
1284                    }
1285                }
1286            }
1287            SwarmEvent::Behaviour(BehaviourEvent::IdentifyError {
1288                peer_id,
1289                error,
1290            }) => {
1291                self.observe_identify_error(&error);
1292                match error {
1293                    swarm::StreamUpgradeError::Timeout => {
1294                        // Recoverable: wait for ConnectionClosed to remove from boot_nodes.
1295                    }
1296                    _ => {
1297                        // Hard failure: Identify will never complete; move to retry immediately
1298                        // instead of waiting for the per-round timeout.
1299                        debug!(target: TARGET, peer_id = %peer_id, error = %error, "identify hard failure during bootstrap; queuing for retry");
1300                        if let Some(addrs) = self.boot_nodes.remove(&peer_id) {
1301                            self.retry_boot_nodes.insert(peer_id, addrs);
1302                        }
1303                    }
1304                }
1305            }
1306            _ => {}
1307        }
1308        self.refresh_runtime_metrics();
1309    }
1310
1311    fn clear_pending_messages(&mut self, peer_id: &PeerId) {
1312        warn!(target: TARGET, peer_id = %peer_id, "max dial attempts reached; dropping pending messages");
1313
1314        let dropped = self.drop_pending_outbound_messages(peer_id);
1315        if let Some(metrics) = self.metric_handle() {
1316            metrics.inc_max_retries_drop_by(dropped as u64);
1317        }
1318        self.peer_action.remove(peer_id);
1319        self.retry_by_peer.remove(peer_id);
1320        self.refresh_runtime_metrics();
1321    }
1322
1323    fn check_protocols(
1324        &self,
1325        protocol_version: &str,
1326        protocols: &[StreamProtocol],
1327    ) -> bool {
1328        let supp_protocols: HashSet<StreamProtocol> = protocols
1329            .iter()
1330            .cloned()
1331            .collect::<HashSet<StreamProtocol>>();
1332
1333        protocol_version == IDENTIFY_PROTOCOL
1334            && supp_protocols.contains(&StreamProtocol::new(REQRES_PROTOCOL))
1335    }
1336
1337    /// Run network worker.
1338    pub async fn run_main(&mut self) {
1339        info!(target: TARGET, "network worker started");
1340
1341        loop {
1342            tokio::select! {
1343                command = self.command_receiver.recv() => {
1344                    match command {
1345                        Some(command) => self.handle_command(command).await,
1346                        None => break,
1347                    }
1348                }
1349                event = self.swarm.select_next_some() => {
1350                    // Handle events.
1351                    self.handle_event(event).await;
1352                }
1353                _ = async {
1354                    if let Some(t) = &mut self.retry_timer {
1355                        t.as_mut().await;
1356                    }
1357                }, if self.retry_timer.is_some() => {
1358                    for (peer, kind, addrs) in self.drain_due_retries() {
1359                        if let Some(action) = self.peer_action.get(&peer) {
1360                            match (action, kind) {
1361                                (Action::Discover, RetryKind::Discover) => {
1362                                    self.swarm.behaviour_mut().discover(&peer);
1363                                },
1364                                (Action::Dial, RetryKind::Dial) => {
1365                                    if let Some(metrics) = self.metric_handle() {
1366                                        metrics.inc_dial_attempt_runtime();
1367                                    }
1368                                    if let Err(error) = self.swarm.dial(
1369                                        DialOpts::peer_id(peer)
1370                                            .addresses(addrs)
1371                                            .extend_addresses_through_behaviour()
1372                                            .build()
1373                                    ) && let Some((retry, new_address)) =
1374                                        self.handle_dial_error(error, &peer, false) {
1375
1376                                        self.peer_action.remove(&peer);
1377                                        if retry {
1378                                            let addr = new_address
1379                                                    .iter()
1380                                                    .filter(|x| {
1381                                                        !self
1382                                                        .swarm
1383                                                        .behaviour()
1384                                                        .is_invalid_address(x)
1385                                                    })
1386                                                    .cloned()
1387                                                    .collect::<Vec<Multiaddr>>();
1388
1389                                                if addr.is_empty() {
1390                                                    self.schedule_retry(peer, ScheduleType::Discover);
1391                                                } else {
1392                                                    self.schedule_retry(peer, ScheduleType::Dial(addr.clone()));
1393                                                }
1394                                        } else {
1395                                            self.schedule_retry(peer, ScheduleType::Discover);
1396                                        }
1397                                };
1398                                },
1399                                _ => {}
1400                            }
1401                        };
1402                    }
1403                },
1404                                    _ = self.graceful_token.cancelled() => {
1405                                        break;
1406                                    }
1407                                    _ = self.crash_token.cancelled() => {
1408                                        break;
1409                                    }
1410            }
1411        }
1412    }
1413
1414    async fn handle_command(&mut self, command: Command) {
1415        match command {
1416            Command::SendMessage { peer, message } => {
1417                if let Err(error) = self.send_message(peer, message) {
1418                    error!(target: TARGET, error = %error, "failed to deliver message");
1419                    self.send_event(NetworkEvent::Error(error)).await;
1420                }
1421            }
1422        }
1423    }
1424
1425    #[allow(clippy::needless_pass_by_ref_mut)]
1426    async fn message_to_helper(
1427        &mut self,
1428        message: MessagesHelper,
1429        peer_id: &PeerId,
1430    ) {
1431        let sender = match peer_id_to_ed25519_pubkey_bytes(peer_id) {
1432            Ok(public_key) => public_key,
1433            Err(e) => {
1434                warn!(target: TARGET, error = %e, "cannot resolve public key from peer id");
1435                return;
1436            }
1437        };
1438
1439        'Send: {
1440            if let Some(helper_sender) = self.helper_sender.as_ref() {
1441                match message {
1442                    MessagesHelper::Single(items) => {
1443                        if helper_sender
1444                            .send(CommandHelper::ReceivedMessage {
1445                                sender,
1446                                message: items,
1447                            })
1448                            .await
1449                            .is_err()
1450                        {
1451                            break 'Send;
1452                        }
1453                    }
1454                    MessagesHelper::Vec(items) => {
1455                        for item in items {
1456                            if helper_sender
1457                                .send(CommandHelper::ReceivedMessage {
1458                                    sender,
1459                                    message: item,
1460                                })
1461                                .await
1462                                .is_err()
1463                            {
1464                                break 'Send;
1465                            }
1466                        }
1467                    }
1468                }
1469
1470                return;
1471            }
1472        }
1473
1474        error!(target: TARGET, "helper channel closed; shutting down");
1475        self.crash_token.cancel();
1476    }
1477
1478    async fn handle_event(&mut self, event: SwarmEvent<BehaviourEvent>) {
1479        match event {
1480            SwarmEvent::Behaviour(event) => {
1481                match event {
1482                    BehaviourEvent::Identified {
1483                        peer_id,
1484                        info,
1485                        connection_id,
1486                    } => {
1487                        if !self.check_protocols(
1488                            &info.protocol_version,
1489                            &info.protocols,
1490                        ) {
1491                            warn!(
1492                                target: TARGET,
1493                                peer_id = %peer_id,
1494                                protocol_version = %info.protocol_version,
1495                                protocols = ?info.protocols,
1496                                "peer uses incompatible protocols; closing connection"
1497                            );
1498
1499                            self.clear_pending_messages(&peer_id);
1500
1501                            self.swarm
1502                                .behaviour_mut()
1503                                .clean_hard_peer_to_remove(&peer_id);
1504
1505                            self.swarm.behaviour_mut().close_connections(
1506                                &peer_id,
1507                                Some(connection_id),
1508                            );
1509                        } else {
1510                            self.peer_action.insert(
1511                                peer_id,
1512                                Action::Identified(connection_id),
1513                            );
1514
1515                            self.swarm
1516                                .behaviour_mut()
1517                                .clean_peer_to_remove(&peer_id);
1518                            for addr in info.listen_addrs {
1519                                self.swarm
1520                                    .behaviour_mut()
1521                                    .add_self_reported_address(&peer_id, &addr);
1522                            }
1523
1524                            self.peer_identify.insert(peer_id);
1525
1526                            if let Some(mut queue) =
1527                                self.pending_inbound_messages.remove(&peer_id)
1528                            {
1529                                let mut buffered = VecDeque::new();
1530                                for message in queue.drain() {
1531                                    self.observe_pending_message_age(
1532                                        message.enqueued_at,
1533                                    );
1534                                    buffered.push_back(message.payload);
1535                                }
1536                                self.message_to_helper(
1537                                    MessagesHelper::Vec(buffered),
1538                                    &peer_id,
1539                                )
1540                                .await;
1541                            };
1542
1543                            self.send_pending_outbound_messages(peer_id);
1544                        }
1545                    }
1546                    BehaviourEvent::IdentifyError { peer_id, error } => {
1547                        self.observe_identify_error(&error);
1548                        debug!(target: TARGET, peer_id = %peer_id, error = %error, "identify error");
1549
1550                        match error {
1551                            swarm::StreamUpgradeError::Timeout => {
1552                                // We do not clean since we will try to open the connection when it is
1553                                // confirmed that it has been closed in SwarmEvent::ConnectionClosed
1554                            }
1555                            swarm::StreamUpgradeError::Apply(..)
1556                            | swarm::StreamUpgradeError::NegotiationFailed
1557                            | swarm::StreamUpgradeError::Io(..) => {
1558                                // Do not call clear_pending_messages here — it removes
1559                                // peer_action, which ConnectionClosed needs to trigger
1560                                // its cleanup path. Clean only the state that
1561                                // ConnectionClosed won't reach.
1562                                self.drop_pending_outbound_messages(&peer_id);
1563                                self.retry_by_peer.remove(&peer_id);
1564                                self.response_channels.remove(&peer_id);
1565                                self.drop_pending_inbound_messages(&peer_id);
1566                            }
1567                        }
1568
1569                        self.swarm
1570                            .behaviour_mut()
1571                            .close_connections(&peer_id, None);
1572                    }
1573                    BehaviourEvent::ReqresMessage { peer_id, message } => {
1574                        let (message_data, is_request, response_channel) =
1575                            match message {
1576                                request_response::Message::Request {
1577                                    request,
1578                                    channel,
1579                                    ..
1580                                } => (request.0, true, Some(channel)),
1581                                request_response::Message::Response {
1582                                    response,
1583                                    ..
1584                                } => (response.0, false, None),
1585                            };
1586
1587                        if message_data.len() > self.max_app_message_bytes {
1588                            warn!(
1589                                target: TARGET,
1590                                peer_id = %peer_id,
1591                                size = message_data.len(),
1592                                max = self.max_app_message_bytes,
1593                                "inbound payload dropped: message too large",
1594                            );
1595                            if let Some(metrics) = self.metric_handle() {
1596                                metrics.inc_oversized_inbound_drop();
1597                            }
1598                            self.swarm
1599                                .behaviour_mut()
1600                                .close_connections(&peer_id, None);
1601                            self.refresh_runtime_metrics();
1602                            return;
1603                        }
1604
1605                        if is_request {
1606                            if let Some(metrics) = self.metric_handle() {
1607                                metrics.inc_reqres_request_received();
1608                            }
1609                            trace!(target: TARGET, peer_id = %peer_id, "request received");
1610                            if let Some(channel) = response_channel {
1611                                self.add_ephemeral_response(peer_id, channel);
1612                            }
1613                        } else {
1614                            if let Some(metrics) = self.metric_handle() {
1615                                metrics.inc_reqres_response_received();
1616                            }
1617                            trace!(target: TARGET, peer_id = %peer_id, "response received");
1618                        }
1619
1620                        if self.peer_identify.contains(&peer_id) {
1621                            self.message_to_helper(
1622                                MessagesHelper::Single(message_data),
1623                                &peer_id,
1624                            )
1625                            .await;
1626                        } else {
1627                            self.add_pending_inbound_message(
1628                                peer_id,
1629                                message_data,
1630                            );
1631                        }
1632                    }
1633                    BehaviourEvent::ReqresFailure {
1634                        peer_id,
1635                        direction,
1636                        kind,
1637                    } => {
1638                        if let Some(metrics) = self.metric_handle() {
1639                            metrics.observe_reqres_failure(
1640                                direction.as_metric_label(),
1641                                kind.as_metric_label(),
1642                            );
1643                        }
1644                        debug!(
1645                            target: TARGET,
1646                            peer_id = %peer_id,
1647                            direction = direction.as_metric_label(),
1648                            kind = kind.as_metric_label(),
1649                            "request-response failure"
1650                        );
1651                    }
1652                    BehaviourEvent::ClosestPeer { peer_id, info } => {
1653                        if matches!(
1654                            self.peer_action.get(&peer_id),
1655                            Some(Action::Discover)
1656                        ) {
1657                            self.peer_action.remove(&peer_id);
1658                            if let Some(info) = info {
1659                                let addr = info
1660                                    .addrs
1661                                    .iter()
1662                                    .filter(|x| {
1663                                        !self
1664                                            .swarm
1665                                            .behaviour()
1666                                            .is_invalid_address(x)
1667                                    })
1668                                    .cloned()
1669                                    .collect::<Vec<Multiaddr>>();
1670
1671                                if addr.is_empty() {
1672                                    self.schedule_retry(
1673                                        peer_id,
1674                                        ScheduleType::Discover,
1675                                    );
1676                                } else {
1677                                    self.schedule_retry(
1678                                        peer_id,
1679                                        ScheduleType::Dial(addr),
1680                                    );
1681                                }
1682                            } else {
1683                                self.schedule_retry(
1684                                    peer_id,
1685                                    ScheduleType::Discover,
1686                                );
1687                            };
1688                        }
1689                    }
1690                    BehaviourEvent::Dummy => {
1691                        // For contron_list, ReqRes events
1692                    }
1693                }
1694            }
1695            SwarmEvent::OutgoingConnectionError {
1696                error,
1697                peer_id: Some(peer_id),
1698                ..
1699            } => {
1700                if matches!(self.peer_action.get(&peer_id), Some(Action::Dial))
1701                {
1702                    self.peer_action.remove(&peer_id);
1703
1704                    self.swarm.behaviour_mut().add_peer_to_remove(&peer_id);
1705
1706                    if let Some((retry, new_address)) =
1707                        self.handle_dial_error(error, &peer_id, false)
1708                    {
1709                        if retry {
1710                            let addr = new_address
1711                                .iter()
1712                                .filter(|x| {
1713                                    !self
1714                                        .swarm
1715                                        .behaviour()
1716                                        .is_invalid_address(x)
1717                                })
1718                                .cloned()
1719                                .collect::<Vec<Multiaddr>>();
1720
1721                            if addr.is_empty() {
1722                                self.schedule_retry(
1723                                    peer_id,
1724                                    ScheduleType::Discover,
1725                                );
1726                            } else {
1727                                self.schedule_retry(
1728                                    peer_id,
1729                                    ScheduleType::Dial(addr),
1730                                );
1731                            }
1732                        } else {
1733                            self.schedule_retry(
1734                                peer_id,
1735                                ScheduleType::Discover,
1736                            );
1737                        }
1738                    };
1739                }
1740            }
1741            SwarmEvent::ConnectionClosed {
1742                peer_id,
1743                connection_id,
1744                num_established,
1745                ..
1746            } => {
1747                if num_established == 0 {
1748                    if let Some(Action::Identified(id)) =
1749                        self.peer_action.get(&peer_id)
1750                        && connection_id == *id
1751                    {
1752                        self.peer_action.remove(&peer_id);
1753
1754                        self.peer_identify.remove(&peer_id);
1755                        self.drop_pending_inbound_messages(&peer_id);
1756                        self.response_channels.remove(&peer_id);
1757
1758                        self.retry_by_peer.remove(&peer_id);
1759
1760                        if self
1761                            .pending_outbound_messages
1762                            .get(&peer_id)
1763                            .is_some_and(|q| !q.is_empty())
1764                        {
1765                            self.schedule_retry(
1766                                peer_id,
1767                                ScheduleType::Dial(vec![]),
1768                            );
1769                        }
1770                    } else if let Some(Action::Dial | Action::Discover) =
1771                        self.peer_action.get(&peer_id)
1772                    {
1773                        self.peer_action.remove(&peer_id);
1774                        self.retry_by_peer.remove(&peer_id);
1775                        self.drop_pending_inbound_messages(&peer_id);
1776                        self.response_channels.remove(&peer_id);
1777                        self.peer_identify.remove(&peer_id);
1778
1779                        if self
1780                            .pending_outbound_messages
1781                            .get(&peer_id)
1782                            .is_some_and(|q| !q.is_empty())
1783                        {
1784                            self.schedule_retry(
1785                                peer_id,
1786                                ScheduleType::Discover,
1787                            );
1788                        }
1789                    }
1790                }
1791            }
1792            SwarmEvent::IncomingConnectionError { .. } => {
1793                // We are not interested in this event at the moment.
1794                // The logs generate many false positives and cannot be associated with a
1795                // node since I do not have the peer-id. The best solution to avoid
1796                // confusion for the user is not to filter these errors.
1797            }
1798            SwarmEvent::ExpiredListenAddr { address, .. } => {
1799                warn!(target: TARGET, addr = %address, "listen address expired");
1800            }
1801            SwarmEvent::ListenerError { error, .. } => {
1802                error!(target: TARGET, error = %error, "listener error");
1803            }
1804            SwarmEvent::ConnectionEstablished {
1805                peer_id,
1806                connection_id,
1807                num_established,
1808                ..
1809            } => {
1810                if num_established.get() > 1 {
1811                    debug!(target: TARGET, peer_id = %peer_id, "duplicate connection detected; closing excess");
1812                    self.swarm
1813                        .behaviour_mut()
1814                        .close_connections(&peer_id, Some(connection_id));
1815                }
1816            }
1817            SwarmEvent::IncomingConnection { .. }
1818            | SwarmEvent::ListenerClosed { .. }
1819            | SwarmEvent::Dialing { .. }
1820            | SwarmEvent::NewExternalAddrCandidate { .. }
1821            | SwarmEvent::ExternalAddrConfirmed { .. }
1822            | SwarmEvent::ExternalAddrExpired { .. }
1823            | SwarmEvent::NewExternalAddrOfPeer { .. }
1824            | SwarmEvent::NewListenAddr { .. } => {
1825                // We are not interested in this event at the moment.
1826            }
1827            _ => {}
1828        }
1829        self.refresh_runtime_metrics();
1830    }
1831}
1832
1833#[cfg(test)]
1834mod tests {
1835
1836    use crate::routing::RoutingNode;
1837
1838    use super::*;
1839    use libp2p::core::{ConnectedPoint, Endpoint, transport::PortUse};
1840    use libp2p::identity::Keypair as Libp2pKeypair;
1841    use libp2p::swarm::ConnectionId;
1842    use prometheus_client::{encoding::text::encode, registry::Registry};
1843    use serde::Deserialize;
1844    use test_log::test;
1845
1846    use ave_common::identity::{KeyPair, keys::Ed25519Signer};
1847
1848    use serial_test::serial;
1849
1850    #[derive(Debug, Serialize, Deserialize)]
1851    pub struct Dummy;
1852
1853    fn metric_value(metrics: &str, name: &str) -> f64 {
1854        metrics
1855            .lines()
1856            .find_map(|line| {
1857                if line.starts_with(name) {
1858                    line.split_whitespace().nth(1)?.parse::<f64>().ok()
1859                } else {
1860                    None
1861                }
1862            })
1863            .unwrap_or(0.0)
1864    }
1865
1866    // Build a relay server.
1867    fn build_worker(
1868        boot_nodes: Vec<RoutingNode>,
1869        random_walk: bool,
1870        node_type: NodeType,
1871        graceful_token: CancellationToken,
1872        crash_token: CancellationToken,
1873        memory_addr: String,
1874    ) -> NetworkWorker<Dummy> {
1875        let config = create_config(
1876            boot_nodes,
1877            random_walk,
1878            node_type,
1879            vec![memory_addr],
1880        );
1881        let keys = KeyPair::Ed25519(Ed25519Signer::generate().unwrap());
1882
1883        NetworkWorker::new(
1884            &keys,
1885            config,
1886            None,
1887            graceful_token,
1888            crash_token,
1889            None,
1890            None,
1891        )
1892        .unwrap()
1893    }
1894
1895    // Create a config
1896    fn create_config(
1897        boot_nodes: Vec<RoutingNode>,
1898        random_walk: bool,
1899        node_type: NodeType,
1900        listen_addresses: Vec<String>,
1901    ) -> Config {
1902        let config = crate::routing::Config::default()
1903            .with_discovery_limit(50)
1904            .with_dht_random_walk(random_walk);
1905
1906        Config {
1907            boot_nodes,
1908            node_type,
1909            routing: config,
1910            external_addresses: vec![],
1911            listen_addresses,
1912            ..Default::default()
1913        }
1914    }
1915
1916    fn build_identified_event(
1917        peer_id: PeerId,
1918        public_key: libp2p::identity::PublicKey,
1919        connection_id: ConnectionId,
1920    ) -> SwarmEvent<BehaviourEvent> {
1921        SwarmEvent::Behaviour(BehaviourEvent::Identified {
1922            peer_id,
1923            info: Box::new(identify::Info {
1924                public_key,
1925                protocol_version: IDENTIFY_PROTOCOL.to_owned(),
1926                agent_version: "test-agent".to_owned(),
1927                listen_addrs: vec!["/memory/9999".parse().expect("multiaddr")],
1928                protocols: vec![StreamProtocol::new(REQRES_PROTOCOL)],
1929                observed_addr: "/memory/9998".parse().expect("multiaddr"),
1930                signed_peer_record: None,
1931            }),
1932            connection_id,
1933        })
1934    }
1935
1936    fn test_endpoint() -> ConnectedPoint {
1937        ConnectedPoint::Dialer {
1938            address: "/memory/9997".parse().expect("multiaddr"),
1939            role_override: Endpoint::Dialer,
1940            port_use: PortUse::New,
1941        }
1942    }
1943
1944    #[test]
1945    fn outbound_queue_respects_bytes_limit_and_updates_metrics() {
1946        let mut config = create_config(
1947            vec![],
1948            false,
1949            NodeType::Addressable,
1950            vec!["/memory/3100".to_owned()],
1951        );
1952        config.max_pending_outbound_bytes_per_peer = 16;
1953
1954        let keys = KeyPair::Ed25519(Ed25519Signer::generate().unwrap());
1955        let mut registry = Registry::default();
1956        let metrics = crate::metrics::register(&mut registry);
1957        let mut worker: NetworkWorker<Dummy> = NetworkWorker::new(
1958            &keys,
1959            config,
1960            None,
1961            CancellationToken::new(),
1962            CancellationToken::new(),
1963            None,
1964            Some(metrics),
1965        )
1966        .expect("worker");
1967
1968        let peer = PeerId::random();
1969        worker.add_pending_outbound_message(
1970            peer,
1971            Bytes::from_static(b"aaaaaaaaaaaa"), // 12 bytes
1972        );
1973        worker.add_pending_outbound_message(
1974            peer,
1975            Bytes::from_static(b"bbbbbbbbbbbb"), // 12 bytes -> evicts previous by bytes
1976        );
1977        worker.add_pending_outbound_message(
1978            peer,
1979            Bytes::from_static(b"cccc"), // 4 bytes
1980        );
1981
1982        let queue = worker
1983            .pending_outbound_messages
1984            .get(&peer)
1985            .expect("queue exists");
1986        assert_eq!(queue.len(), 2);
1987        assert_eq!(queue.bytes_len(), 16);
1988
1989        let mut text = String::new();
1990        encode(&mut text, &registry).expect("encode metrics");
1991
1992        assert_eq!(
1993            metric_value(
1994                &text,
1995                "network_messages_dropped_outbound_queue_bytes_limit_total"
1996            ),
1997            1.0
1998        );
1999        assert_eq!(
2000            metric_value(
2001                &text,
2002                "network_messages_dropped_outbound_queue_bytes_limit_per_peer_total"
2003            ),
2004            1.0
2005        );
2006        assert_eq!(
2007            metric_value(
2008                &text,
2009                "network_messages_dropped_outbound_queue_bytes_limit_global_total"
2010            ),
2011            0.0
2012        );
2013        assert_eq!(metric_value(&text, "network_pending_outbound_bytes"), 16.0);
2014        assert!(
2015            metric_value(&text, "network_pending_message_age_seconds_count")
2016                >= 1.0
2017        );
2018    }
2019
2020    #[test]
2021    fn zero_pending_bytes_limits_disable_byte_drops() {
2022        let mut config = create_config(
2023            vec![],
2024            false,
2025            NodeType::Addressable,
2026            vec!["/memory/3101".to_owned()],
2027        );
2028        config.max_pending_outbound_bytes_per_peer = 0;
2029        config.max_pending_inbound_bytes_per_peer = 0;
2030        config.max_pending_outbound_bytes_total = 0;
2031        config.max_pending_inbound_bytes_total = 0;
2032
2033        let keys = KeyPair::Ed25519(Ed25519Signer::generate().unwrap());
2034        let mut registry = Registry::default();
2035        let metrics = crate::metrics::register(&mut registry);
2036        let mut worker: NetworkWorker<Dummy> = NetworkWorker::new(
2037            &keys,
2038            config,
2039            None,
2040            CancellationToken::new(),
2041            CancellationToken::new(),
2042            None,
2043            Some(metrics),
2044        )
2045        .expect("worker");
2046
2047        let peer = PeerId::random();
2048        for i in 0..3u8 {
2049            let payload = Bytes::from(vec![i + 1; 12]);
2050            worker.add_pending_outbound_message(peer, payload);
2051        }
2052        for i in 0..3u8 {
2053            let payload = Bytes::from(vec![i + 7; 12]);
2054            worker.add_pending_inbound_message(peer, payload);
2055        }
2056
2057        let outbound = worker
2058            .pending_outbound_messages
2059            .get(&peer)
2060            .expect("outbound queue exists");
2061        let inbound = worker
2062            .pending_inbound_messages
2063            .get(&peer)
2064            .expect("inbound queue exists");
2065        assert_eq!(outbound.len(), 3);
2066        assert_eq!(outbound.bytes_len(), 36);
2067        assert_eq!(inbound.len(), 3);
2068        assert_eq!(inbound.bytes_len(), 36);
2069
2070        let mut text = String::new();
2071        encode(&mut text, &registry).expect("encode metrics");
2072        assert_eq!(
2073            metric_value(
2074                &text,
2075                "network_messages_dropped_outbound_queue_bytes_limit_total"
2076            ),
2077            0.0
2078        );
2079        assert_eq!(
2080            metric_value(
2081                &text,
2082                "network_messages_dropped_outbound_queue_bytes_limit_per_peer_total"
2083            ),
2084            0.0
2085        );
2086        assert_eq!(
2087            metric_value(
2088                &text,
2089                "network_messages_dropped_outbound_queue_bytes_limit_global_total"
2090            ),
2091            0.0
2092        );
2093        assert_eq!(
2094            metric_value(
2095                &text,
2096                "network_messages_dropped_inbound_queue_bytes_limit_total"
2097            ),
2098            0.0
2099        );
2100        assert_eq!(
2101            metric_value(
2102                &text,
2103                "network_messages_dropped_inbound_queue_bytes_limit_per_peer_total"
2104            ),
2105            0.0
2106        );
2107        assert_eq!(
2108            metric_value(
2109                &text,
2110                "network_messages_dropped_inbound_queue_bytes_limit_global_total"
2111            ),
2112            0.0
2113        );
2114    }
2115
2116    #[test]
2117    fn outbound_global_bytes_limit_applies_across_peers() {
2118        let mut config = create_config(
2119            vec![],
2120            false,
2121            NodeType::Addressable,
2122            vec!["/memory/3102".to_owned()],
2123        );
2124        config.max_pending_outbound_bytes_per_peer = 0;
2125        config.max_pending_outbound_bytes_total = 20;
2126
2127        let keys = KeyPair::Ed25519(Ed25519Signer::generate().unwrap());
2128        let mut registry = Registry::default();
2129        let metrics = crate::metrics::register(&mut registry);
2130        let mut worker: NetworkWorker<Dummy> = NetworkWorker::new(
2131            &keys,
2132            config,
2133            None,
2134            CancellationToken::new(),
2135            CancellationToken::new(),
2136            None,
2137            Some(metrics),
2138        )
2139        .expect("worker");
2140
2141        let peer_a = PeerId::random();
2142        let peer_b = PeerId::random();
2143
2144        worker.add_pending_outbound_message(
2145            peer_a,
2146            Bytes::from_static(b"aaaaaaaaaaaa"), // 12 bytes
2147        );
2148        worker.add_pending_outbound_message(
2149            peer_b,
2150            Bytes::from_static(b"bbbbbbbbbbbb"), // rejected by global limit
2151        );
2152
2153        assert_eq!(worker.pending_outbound_bytes_len(), 12);
2154        assert_eq!(
2155            worker
2156                .pending_outbound_messages
2157                .get(&peer_a)
2158                .expect("peer_a queue")
2159                .len(),
2160            1
2161        );
2162        assert_eq!(
2163            worker
2164                .pending_outbound_messages
2165                .get(&peer_b)
2166                .map_or(0, PendingQueue::len),
2167            0
2168        );
2169
2170        let mut text = String::new();
2171        encode(&mut text, &registry).expect("encode metrics");
2172        assert_eq!(
2173            metric_value(
2174                &text,
2175                "network_messages_dropped_outbound_queue_bytes_limit_total"
2176            ),
2177            1.0
2178        );
2179        assert_eq!(
2180            metric_value(
2181                &text,
2182                "network_messages_dropped_outbound_queue_bytes_limit_per_peer_total"
2183            ),
2184            0.0
2185        );
2186        assert_eq!(
2187            metric_value(
2188                &text,
2189                "network_messages_dropped_outbound_queue_bytes_limit_global_total"
2190            ),
2191            1.0
2192        );
2193    }
2194
2195    #[test(tokio::test)]
2196    #[serial]
2197    async fn pending_inbound_messages_keep_arrival_order_after_identify() {
2198        let mut worker = build_worker(
2199            vec![],
2200            false,
2201            NodeType::Addressable,
2202            CancellationToken::new(),
2203            CancellationToken::new(),
2204            "/memory/3200".to_owned(),
2205        );
2206        let remote_keys = Libp2pKeypair::generate_ed25519();
2207        let remote_peer = remote_keys.public().to_peer_id();
2208
2209        worker.add_pending_inbound_message(
2210            remote_peer,
2211            Bytes::from_static(b"msg-2"),
2212        );
2213        worker.add_pending_inbound_message(
2214            remote_peer,
2215            Bytes::from_static(b"msg-1"),
2216        );
2217        worker.add_pending_inbound_message(
2218            remote_peer,
2219            Bytes::from_static(b"msg-3"),
2220        );
2221
2222        let (helper_sender, mut helper_rx) = mpsc::channel(8);
2223        worker.add_helper_sender(helper_sender);
2224
2225        worker
2226            .handle_event(build_identified_event(
2227                remote_peer,
2228                remote_keys.public(),
2229                ConnectionId::new_unchecked(11),
2230            ))
2231            .await;
2232
2233        let mut received = Vec::new();
2234        for _ in 0..3 {
2235            let command = tokio::time::timeout(
2236                Duration::from_millis(200),
2237                helper_rx.recv(),
2238            )
2239            .await
2240            .expect("helper receive timeout")
2241            .expect("helper channel closed");
2242            let CommandHelper::ReceivedMessage { message, .. } = command else {
2243                panic!("unexpected helper command")
2244            };
2245            received.push(message);
2246        }
2247
2248        assert_eq!(
2249            received,
2250            vec![
2251                Bytes::from_static(b"msg-2"),
2252                Bytes::from_static(b"msg-1"),
2253                Bytes::from_static(b"msg-3"),
2254            ]
2255        );
2256        assert!(!worker.pending_inbound_messages.contains_key(&remote_peer));
2257    }
2258
2259    #[test(tokio::test)]
2260    #[serial]
2261    async fn flapping_connection_retries_then_flushes_outbound_queue() {
2262        let mut worker = build_worker(
2263            vec![],
2264            false,
2265            NodeType::Addressable,
2266            CancellationToken::new(),
2267            CancellationToken::new(),
2268            "/memory/3201".to_owned(),
2269        );
2270        let remote_keys = Libp2pKeypair::generate_ed25519();
2271        let remote_peer = remote_keys.public().to_peer_id();
2272        let first_connection = ConnectionId::new_unchecked(21);
2273        let second_connection = ConnectionId::new_unchecked(22);
2274
2275        worker.add_pending_outbound_message(
2276            remote_peer,
2277            Bytes::from_static(b"needs-redelivery"),
2278        );
2279        worker
2280            .peer_action
2281            .insert(remote_peer, Action::Identified(first_connection));
2282        worker.peer_identify.insert(remote_peer);
2283
2284        worker
2285            .handle_event(SwarmEvent::ConnectionClosed {
2286                peer_id: remote_peer,
2287                connection_id: first_connection,
2288                endpoint: test_endpoint(),
2289                num_established: 0,
2290                cause: None,
2291            })
2292            .await;
2293
2294        assert!(worker.retry_by_peer.contains_key(&remote_peer));
2295        assert!(matches!(
2296            worker.retry_by_peer.get(&remote_peer).map(|s| s.kind),
2297            Some(RetryKind::Dial)
2298        ));
2299        assert!(
2300            worker
2301                .pending_outbound_messages
2302                .get(&remote_peer)
2303                .is_some_and(|q| !q.is_empty())
2304        );
2305
2306        worker
2307            .handle_event(build_identified_event(
2308                remote_peer,
2309                remote_keys.public(),
2310                second_connection,
2311            ))
2312            .await;
2313
2314        assert!(!worker.pending_outbound_messages.contains_key(&remote_peer));
2315        assert!(!worker.retry_by_peer.contains_key(&remote_peer));
2316        assert!(matches!(
2317            worker.peer_action.get(&remote_peer),
2318            Some(Action::Identified(id)) if *id == second_connection
2319        ));
2320    }
2321
2322    #[test(tokio::test)]
2323    #[serial]
2324    async fn bootstrap_identify_timeout_keeps_bootnode_until_close() {
2325        let remote_keys = Libp2pKeypair::generate_ed25519();
2326        let remote_peer = remote_keys.public().to_peer_id();
2327        let boot_node = RoutingNode {
2328            peer_id: remote_peer.to_string(),
2329            address: vec!["/memory/3300".to_owned()],
2330        };
2331
2332        let mut worker = build_worker(
2333            vec![boot_node],
2334            false,
2335            NodeType::Addressable,
2336            CancellationToken::new(),
2337            CancellationToken::new(),
2338            "/memory/3301".to_owned(),
2339        );
2340
2341        assert!(worker.boot_nodes.contains_key(&remote_peer));
2342        assert!(!worker.retry_boot_nodes.contains_key(&remote_peer));
2343
2344        worker
2345            .handle_connection_events(SwarmEvent::Behaviour(
2346                BehaviourEvent::IdentifyError {
2347                    peer_id: remote_peer,
2348                    error: swarm::StreamUpgradeError::Timeout,
2349                },
2350            ))
2351            .await;
2352
2353        assert!(worker.boot_nodes.contains_key(&remote_peer));
2354        assert!(!worker.retry_boot_nodes.contains_key(&remote_peer));
2355    }
2356
2357    #[test(tokio::test)]
2358    #[serial]
2359    async fn test_no_boot_nodes() {
2360        let boot_nodes = vec![];
2361
2362        // Build a node.
2363        let node_addr = "/memory/3000";
2364        let mut node = build_worker(
2365            boot_nodes.clone(),
2366            false,
2367            NodeType::Addressable,
2368            CancellationToken::new(),
2369            CancellationToken::new(),
2370            node_addr.to_owned(),
2371        );
2372        if let Err(e) = node.run_connection().await {
2373            assert_eq!(
2374                e.to_string(),
2375                "cannot connect to the ave network: no reachable bootstrap node"
2376            );
2377        };
2378
2379        assert_eq!(node.state, NetworkState::Disconnected);
2380    }
2381
2382    #[test(tokio::test)]
2383    #[serial]
2384    async fn test_fake_boot_node() {
2385        let mut boot_nodes = vec![];
2386
2387        // Build a fake bootstrap node.
2388        let fake_boot_peer = PeerId::random();
2389        let fake_boot_addr = "/memory/3001";
2390        let fake_node = RoutingNode {
2391            peer_id: fake_boot_peer.to_string(),
2392            address: vec![fake_boot_addr.to_owned()],
2393        };
2394        boot_nodes.push(fake_node);
2395
2396        // Build a node.
2397        let node_addr = "/memory/3002";
2398        let mut node = build_worker(
2399            boot_nodes.clone(),
2400            false,
2401            NodeType::Addressable,
2402            CancellationToken::new(),
2403            CancellationToken::new(),
2404            node_addr.to_owned(),
2405        );
2406
2407        if let Err(e) = node.run_connection().await {
2408            assert_eq!(
2409                e.to_string(),
2410                "cannot connect to the ave network: no reachable bootstrap node"
2411            );
2412        };
2413
2414        assert_eq!(node.state, NetworkState::Disconnected);
2415    }
2416
2417    #[test(tokio::test)]
2418    #[serial]
2419    async fn test_connect() {
2420        let mut boot_nodes = vec![];
2421
2422        // Build a bootstrap node.
2423        let boot_addr = "/memory/3003";
2424        let mut boot = build_worker(
2425            boot_nodes.clone(),
2426            false,
2427            NodeType::Bootstrap,
2428            CancellationToken::new(),
2429            CancellationToken::new(),
2430            boot_addr.to_owned(),
2431        );
2432
2433        let boot_node = RoutingNode {
2434            peer_id: boot.local_peer_id().to_string(),
2435            address: vec![boot_addr.to_owned()],
2436        };
2437
2438        boot_nodes.push(boot_node);
2439
2440        // Build a node.
2441        let node_addr = "/memory/3004";
2442        let mut node = build_worker(
2443            boot_nodes,
2444            false,
2445            NodeType::Ephemeral,
2446            CancellationToken::new(),
2447            CancellationToken::new(),
2448            node_addr.to_owned(),
2449        );
2450
2451        // Spawn the boot node
2452        tokio::spawn(async move {
2453            boot.run_main().await;
2454        });
2455
2456        // Wait for connection.
2457        node.run_connection().await.unwrap();
2458    }
2459}