Skip to main content

ave_network/
worker.rs

1//! # Network worker.
2//!
3
4use crate::{
5    Command, CommandHelper, Config, Error, Event as NetworkEvent, MachineSpec,
6    Monitor, MonitorMessage, NodeType, ResolvedSpec,
7    behaviour::{Behaviour, Event as BehaviourEvent, ReqResMessage},
8    metrics::NetworkMetrics,
9    resolve_spec,
10    service::NetworkService,
11    transport::build_transport,
12    utils::{
13        Action, Due, IDENTIFY_PROTOCOL, LimitsConfig, MessagesHelper,
14        NetworkState, REQRES_PROTOCOL, RetryKind, RetryState, ScheduleType,
15        convert_addresses, convert_boot_nodes, peer_id_to_ed25519_pubkey_bytes,
16    },
17};
18
19use std::{
20    collections::{BinaryHeap, HashSet},
21    fmt::Debug,
22    num::{NonZeroU8, NonZeroUsize},
23    pin::Pin,
24    sync::Arc,
25    time::Duration,
26};
27
28use ave_actors::ActorRef;
29use ave_common::identity::KeyPair;
30
31use libp2p::{
32    Multiaddr, PeerId, StreamProtocol, Swarm, identify,
33    identity::{Keypair, ed25519},
34    request_response::{self, ResponseChannel},
35    swarm::{self, DialError, SwarmEvent, dial_opts::DialOpts},
36};
37
38use futures::StreamExt;
39use serde::Serialize;
40use tokio::{
41    sync::mpsc,
42    time::{Instant, Sleep, sleep, sleep_until},
43};
44use tokio_util::sync::CancellationToken;
45use tracing::{debug, error, info, trace, warn};
46
47use bytes::Bytes;
48use std::collections::{HashMap, VecDeque};
49
50const TARGET: &str = "ave::network::worker";
51
52/// Maximum number of outbound messages queued per peer while disconnected.
53/// When this limit is reached the oldest message is evicted to make room.
54const MAX_PENDING_MESSAGES_PER_PEER: usize = 100;
55
56/// Bounded queue of outbound messages for a single peer.
57///
58/// Keeps the 100 most recent messages; when full the oldest is evicted.
59#[derive(Default)]
60struct PendingQueue {
61    messages: VecDeque<PendingMessage>,
62    pending_bytes: usize,
63}
64
65struct PendingMessage {
66    payload: Bytes,
67    enqueued_at: Instant,
68}
69
70impl PendingQueue {
71    fn contains(&self, message: &Bytes) -> bool {
72        self.messages.iter().any(|x| x.payload == *message)
73    }
74
75    fn pop_front(&mut self) -> Option<PendingMessage> {
76        let popped = self.messages.pop_front()?;
77        self.pending_bytes =
78            self.pending_bytes.saturating_sub(popped.payload.len());
79        Some(popped)
80    }
81
82    fn push_back(&mut self, message: Bytes) {
83        self.pending_bytes += message.len();
84        self.messages.push_back(PendingMessage {
85            payload: message,
86            enqueued_at: Instant::now(),
87        });
88    }
89
90    fn drain(&mut self) -> impl Iterator<Item = PendingMessage> + '_ {
91        self.pending_bytes = 0;
92        self.messages.drain(..)
93    }
94
95    fn is_empty(&self) -> bool {
96        self.messages.is_empty()
97    }
98
99    fn len(&self) -> usize {
100        self.messages.len()
101    }
102
103    const fn bytes_len(&self) -> usize {
104        self.pending_bytes
105    }
106}
107
108/// Main network worker. Must be polled in order for the network to advance.
109///
110/// The worker is responsible for handling the network events and commands.
111///
112pub struct NetworkWorker<T>
113where
114    T: Debug + Serialize,
115{
116    /// Local Peer ID.
117    local_peer_id: PeerId,
118
119    /// Network service.
120    service: NetworkService,
121
122    /// The libp2p swarm.
123    swarm: Swarm<Behaviour>,
124
125    /// The network state.
126    state: NetworkState,
127
128    /// The command receiver.
129    command_receiver: mpsc::Receiver<Command>,
130
131    /// The command sender to Helper Intermediary.
132    helper_sender: Option<mpsc::Sender<CommandHelper<T>>>,
133
134    /// Monitor actor.
135    monitor: Option<ActorRef<Monitor>>,
136
137    /// The cancellation token.
138    graceful_token: CancellationToken,
139    crash_token: CancellationToken,
140
141    /// Node type.
142    node_type: NodeType,
143
144    /// Safe mode keeps the worker alive but isolated from peers.
145    safe_mode: bool,
146
147    /// List of boot noodes.
148    boot_nodes: HashMap<PeerId, Vec<Multiaddr>>,
149
150    /// nodes with which it has not been possible to establish a connection by keepAliveTimeout in pre-routing.
151    retry_boot_nodes: HashMap<PeerId, Vec<Multiaddr>>,
152
153    /// Pending outbound messages to the peer (bounded by count and bytes).
154    pending_outbound_messages: HashMap<PeerId, PendingQueue>,
155
156    pending_inbound_messages: HashMap<PeerId, PendingQueue>,
157
158    /// Ephemeral responses.
159    response_channels:
160        HashMap<PeerId, VecDeque<ResponseChannel<ReqResMessage>>>,
161
162    /// Successful dials
163    successful_dials: u64,
164
165    peer_identify: HashSet<PeerId>,
166
167    retry_by_peer: HashMap<PeerId, RetryState>,
168
169    retry_queue: BinaryHeap<Due>,
170
171    retry_timer: Option<Pin<Box<Sleep>>>,
172
173    peer_action: HashMap<PeerId, Action>,
174
175    max_app_message_bytes: usize,
176    max_pending_outbound_bytes_per_peer: usize,
177    max_pending_inbound_bytes_per_peer: usize,
178    max_pending_outbound_bytes_total: usize,
179    max_pending_inbound_bytes_total: usize,
180
181    metrics: Option<Arc<NetworkMetrics>>,
182}
183
184impl<T: Debug + Serialize> NetworkWorker<T> {
185    /// Create a new `NetworkWorker`.
186    pub fn new(
187        keys: &KeyPair,
188        config: Config,
189        monitor: Option<ActorRef<Monitor>>,
190        graceful_token: CancellationToken,
191        crash_token: CancellationToken,
192        machine_spec: Option<MachineSpec>,
193        metrics: Option<Arc<NetworkMetrics>>,
194    ) -> Result<Self, Error> {
195        // Create channels to communicate commands
196        info!(target: TARGET, "network initialising");
197        let (command_sender, command_receiver) = mpsc::channel(512);
198
199        let key = match keys {
200            KeyPair::Ed25519(ed25519_signer) => {
201                let sk_bytes = ed25519_signer
202                    .secret_key_bytes()
203                    .map_err(|e| Error::KeyExtraction(e.to_string()))?;
204
205                let sk = ed25519::SecretKey::try_from_bytes(sk_bytes)
206                    .map_err(|e| Error::KeyExtraction(e.to_string()))?;
207
208                let kp = ed25519::Keypair::from(sk);
209                Keypair::from(kp)
210            }
211        };
212
213        // Generate the `PeerId` from the public key.
214        let local_peer_id = key.public().to_peer_id();
215
216        let boot_nodes = convert_boot_nodes(&config.boot_nodes);
217
218        // Create the listen addressess.
219        let addresses = convert_addresses(&config.listen_addresses)?;
220
221        // Create the listen addressess.
222        let external_addresses = convert_addresses(&config.external_addresses)?;
223
224        let node_type = config.node_type.clone();
225        let safe_mode = config.safe_mode;
226
227        // Resolve machine sizing from the declared spec, or auto-detect from host.
228        let ResolvedSpec { ram_mb, cpu_cores } = resolve_spec(machine_spec);
229
230        let limits = LimitsConfig::build(ram_mb, cpu_cores);
231        let max_app_message_bytes = config.max_app_message_bytes;
232        let max_pending_outbound_bytes_per_peer =
233            config.max_pending_outbound_bytes_per_peer;
234        let max_pending_inbound_bytes_per_peer =
235            config.max_pending_inbound_bytes_per_peer;
236        let max_pending_outbound_bytes_total =
237            config.max_pending_outbound_bytes_total;
238        let max_pending_inbound_bytes_total =
239            config.max_pending_inbound_bytes_total;
240
241        // Build transport.
242        let transport = build_transport(&key, limits.clone())?;
243
244        let behaviour = Behaviour::new(
245            &key.public(),
246            config,
247            graceful_token.clone(),
248            crash_token.clone(),
249            limits,
250            metrics.clone(),
251        );
252
253        // Create the swarm.
254        let mut swarm = Swarm::new(
255            transport,
256            behaviour,
257            local_peer_id,
258            swarm::Config::with_tokio_executor()
259                .with_idle_connection_timeout(Duration::from_secs(90))
260                .with_max_negotiating_inbound_streams(32)
261                .with_notify_handler_buffer_size(
262                    NonZeroUsize::new(32).expect("32 > 0"),
263                )
264                .with_per_connection_event_buffer_size(16)
265                .with_dial_concurrency_factor(
266                    NonZeroU8::new(2).expect("2 > 0"),
267                ),
268        );
269
270        let service = NetworkService::new(command_sender);
271
272        if addresses.is_empty() {
273            // Listen on all tcp addresses.
274            swarm
275                .listen_on(
276                    "/ip4/0.0.0.0/tcp/0"
277                        .parse::<Multiaddr>()
278                        .map_err(|e| Error::InvalidAddress(e.to_string()))?,
279                )
280                .map_err(|e| Error::Listen(format!("0.0.0.0:0: {e}")))?;
281            info!(target: TARGET, "listening on all interfaces");
282        } else {
283            // Listen on the external addresses.
284            for addr in addresses.iter() {
285                info!(target: TARGET, addr = %addr, "listening on address");
286                swarm
287                    .listen_on(addr.clone())
288                    .map_err(|e| Error::Listen(format!("{addr}: {e}")))?;
289            }
290        }
291
292        if !external_addresses.is_empty() {
293            for addr in external_addresses.iter() {
294                debug!(target: TARGET, addr = %addr, "external address registered");
295                swarm.add_external_address(addr.clone());
296            }
297        }
298
299        info!(target: TARGET, peer_id = %local_peer_id, "local peer id");
300
301        let worker = Self {
302            local_peer_id,
303            service,
304            swarm,
305            state: NetworkState::Start,
306            command_receiver,
307            helper_sender: None,
308            monitor,
309            graceful_token,
310            crash_token,
311            node_type,
312            safe_mode,
313            boot_nodes,
314            retry_boot_nodes: HashMap::new(),
315            pending_outbound_messages: HashMap::default(),
316            pending_inbound_messages: HashMap::default(),
317            response_channels: HashMap::default(),
318            successful_dials: 0,
319            peer_identify: HashSet::new(),
320            retry_by_peer: HashMap::new(),
321            retry_queue: BinaryHeap::new(),
322            retry_timer: None,
323            peer_action: HashMap::new(),
324            max_app_message_bytes,
325            max_pending_outbound_bytes_per_peer,
326            max_pending_inbound_bytes_per_peer,
327            max_pending_outbound_bytes_total,
328            max_pending_inbound_bytes_total,
329            metrics,
330        };
331
332        if let Some(metrics) = worker.metric_handle() {
333            metrics.set_state_current(&worker.state);
334        }
335        worker.refresh_runtime_metrics();
336
337        Ok(worker)
338    }
339
340    fn metric_handle(&self) -> Option<&NetworkMetrics> {
341        self.metrics.as_deref()
342    }
343
344    const fn is_safe_mode(&self) -> bool {
345        self.safe_mode
346    }
347
348    fn pending_outbound_messages_len(&self) -> usize {
349        self.pending_outbound_messages
350            .values()
351            .map(PendingQueue::len)
352            .sum()
353    }
354
355    fn pending_outbound_bytes_len(&self) -> usize {
356        self.pending_outbound_messages
357            .values()
358            .map(PendingQueue::bytes_len)
359            .sum()
360    }
361
362    fn pending_inbound_messages_len(&self) -> usize {
363        self.pending_inbound_messages
364            .values()
365            .map(PendingQueue::len)
366            .sum()
367    }
368
369    fn pending_inbound_bytes_len(&self) -> usize {
370        self.pending_inbound_messages
371            .values()
372            .map(PendingQueue::bytes_len)
373            .sum()
374    }
375
376    fn pending_response_channels_len(&self) -> usize {
377        self.response_channels.values().map(VecDeque::len).sum()
378    }
379
380    fn refresh_runtime_metrics(&self) {
381        let Some(metrics) = self.metric_handle() else {
382            return;
383        };
384
385        metrics.set_retry_queue_len(self.retry_queue.len() as i64);
386        metrics.set_pending_outbound_peers(
387            self.pending_outbound_messages.len() as i64,
388        );
389        metrics.set_pending_outbound_messages(
390            self.pending_outbound_messages_len() as i64,
391        );
392        metrics.set_pending_outbound_bytes(
393            self.pending_outbound_bytes_len() as i64
394        );
395        metrics.set_pending_inbound_peers(
396            self.pending_inbound_messages.len() as i64
397        );
398        metrics.set_pending_inbound_messages(
399            self.pending_inbound_messages_len() as i64,
400        );
401        metrics
402            .set_pending_inbound_bytes(self.pending_inbound_bytes_len() as i64);
403        metrics.set_identified_peers(self.peer_identify.len() as i64);
404        metrics.set_response_channels_pending(
405            self.pending_response_channels_len() as i64,
406        );
407    }
408
409    fn observe_pending_message_age(&self, enqueued_at: Instant) {
410        if let Some(metrics) = self.metric_handle() {
411            metrics.observe_pending_message_age_seconds(
412                enqueued_at.elapsed().as_secs_f64(),
413            );
414        }
415    }
416
417    fn drop_pending_outbound_messages(&mut self, peer_id: &PeerId) -> usize {
418        let Some(mut queue) = self.pending_outbound_messages.remove(peer_id)
419        else {
420            return 0;
421        };
422
423        let mut dropped = 0usize;
424        for message in queue.drain() {
425            dropped += 1;
426            self.observe_pending_message_age(message.enqueued_at);
427        }
428        dropped
429    }
430
431    fn drop_pending_inbound_messages(&mut self, peer_id: &PeerId) {
432        if let Some(mut queue) = self.pending_inbound_messages.remove(peer_id) {
433            for message in queue.drain() {
434                self.observe_pending_message_age(message.enqueued_at);
435            }
436        }
437    }
438
439    fn observe_identify_error(
440        &self,
441        error: &swarm::StreamUpgradeError<identify::UpgradeError>,
442    ) {
443        let kind = match error {
444            swarm::StreamUpgradeError::Timeout => "timeout",
445            swarm::StreamUpgradeError::Io(_) => "io",
446            swarm::StreamUpgradeError::NegotiationFailed => "negotiation",
447            swarm::StreamUpgradeError::Apply(_) => "other",
448        };
449
450        if let Some(metrics) = self.metric_handle() {
451            metrics.observe_identify_error(kind);
452        }
453    }
454
455    fn schedule_retry(&mut self, peer: PeerId, schedule_type: ScheduleType) {
456        if self.is_safe_mode() {
457            self.peer_action.remove(&peer);
458            self.retry_by_peer.remove(&peer);
459            self.refresh_runtime_metrics();
460            return;
461        }
462
463        if self.peer_action.contains_key(&peer) {
464            return;
465        }
466
467        let (kind, addrs) = match schedule_type {
468            ScheduleType::Discover => (RetryKind::Discover, vec![]),
469            ScheduleType::Dial(multiaddrs) => (RetryKind::Dial, multiaddrs),
470        };
471
472        let now = Instant::now();
473        let base = Duration::from_millis(250);
474        let cap = Duration::from_secs(30);
475
476        let entry = self.retry_by_peer.entry(peer).or_insert(RetryState {
477            attempts: 0,
478            when: now,
479            kind,
480            addrs: vec![],
481        });
482
483        let when = if matches!(
484            (entry.kind, kind),
485            (RetryKind::Discover, RetryKind::Dial)
486        ) {
487            now
488        } else {
489            if entry.attempts >= 8 {
490                self.clear_pending_messages(&peer);
491                return;
492            }
493
494            // Exponential backoff: 250ms * 2^attempt, capped at 30s
495            // attempts 0-7 → ~250ms, 500ms, 1s, 2s, 4s, 8s, 16s, 30s
496            let exp = 1u32 << entry.attempts.min(7);
497            let mut delay = base * exp;
498            if delay > cap {
499                delay = cap;
500            }
501
502            // jitter 80–120% determinista por peer (sin RNG externo)
503            // Fold all bytes to avoid the fixed multihash prefix dominating.
504            let hash = peer
505                .to_bytes()
506                .iter()
507                .fold(0u32, |acc, &b| acc.wrapping_add(b as u32));
508            let j = 80 + (hash % 41);
509            delay = delay * j / 100;
510
511            now + delay
512        };
513
514        entry.when = when;
515        entry.kind = kind;
516        entry.addrs = addrs;
517
518        self.peer_action.insert(peer, Action::from(kind));
519
520        self.retry_queue.push(Due(peer, entry.when));
521        self.arm_retry_timer();
522        self.refresh_runtime_metrics();
523    }
524
525    fn arm_retry_timer(&mut self) {
526        if let Some(next) = self.retry_queue.peek() {
527            match &mut self.retry_timer {
528                Some(timer) => timer.as_mut().reset(next.1),
529                None => self.retry_timer = Some(Box::pin(sleep_until(next.1))),
530            }
531        }
532    }
533
534    fn drain_due_retries(
535        &mut self,
536    ) -> Vec<(PeerId, RetryKind, Vec<Multiaddr>)> {
537        let now = Instant::now();
538        let mut out = Vec::new();
539        while let Some(Due(peer, when)) = self.retry_queue.peek().cloned() {
540            if when > now {
541                break;
542            }
543
544            self.retry_queue.pop();
545            // Match the exact instant to reject stale Due entries. Multiple Due
546            // entries for the same peer can accumulate (e.g. Discover → Dial
547            // transition pushes a second entry without removing the first).
548            // Comparing `when` (from the popped Due) against `retry_by_peer[peer].when`
549            // ensures only the current scheduling cycle fires.
550            if let Some(retry) = self.retry_by_peer.get(&peer).cloned()
551                && retry.when == when
552            {
553                self.retry_by_peer
554                    .entry(peer)
555                    .and_modify(|x| x.attempts += 1);
556                out.push((peer, retry.kind, retry.addrs));
557            }
558        }
559
560        if self.retry_queue.is_empty() {
561            self.retry_timer = None;
562        } else {
563            self.arm_retry_timer();
564        }
565        self.refresh_runtime_metrics();
566        out
567    }
568
569    /// Add sender helper
570    pub fn add_helper_sender(
571        &mut self,
572        helper_sender: mpsc::Sender<CommandHelper<T>>,
573    ) {
574        self.helper_sender = Some(helper_sender);
575    }
576
577    /// Get the local peer ID.
578    pub const fn local_peer_id(&self) -> PeerId {
579        self.local_peer_id
580    }
581
582    /// Send message to a peer.
583    fn send_message(
584        &mut self,
585        peer: PeerId,
586        message: Bytes,
587    ) -> Result<(), Error> {
588        if self.is_safe_mode() {
589            debug!(
590                target: TARGET,
591                peer_id = %peer,
592                size = message.len(),
593                "safe mode active; dropping outbound message"
594            );
595            return Ok(());
596        }
597
598        if message.len() > self.max_app_message_bytes {
599            warn!(
600                target: TARGET,
601                peer_id = %peer,
602                size = message.len(),
603                max = self.max_app_message_bytes,
604                "outbound payload rejected: message too large",
605            );
606            if let Some(metrics) = self.metric_handle() {
607                metrics.inc_oversized_outbound_drop();
608            }
609            self.refresh_runtime_metrics();
610            return Err(Error::MessageTooLarge {
611                size: message.len(),
612                max: self.max_app_message_bytes,
613            });
614        }
615
616        if let Some(mut responses) = self.response_channels.remove(&peer) {
617            while let Some(response_channel) = responses.pop_front() {
618                match self
619                    .swarm
620                    .behaviour_mut()
621                    .send_response(response_channel, message.clone())
622                {
623                    Ok(()) => {
624                        if !responses.is_empty() {
625                            self.response_channels.insert(peer, responses);
626                        }
627                        self.refresh_runtime_metrics();
628                        return Ok(());
629                    }
630                    Err(e) => {
631                        debug!(target: TARGET, peer_id = %peer, error = %e, "failed to send response: channel may already be consumed");
632                    }
633                }
634            }
635        }
636
637        self.add_pending_outbound_message(peer, message);
638
639        if self.swarm.behaviour_mut().is_known_peer(&peer) {
640            if let Some(Action::Identified(..)) = self.peer_action.get(&peer) {
641                self.send_pending_outbound_messages(peer);
642            } else {
643                self.schedule_retry(peer, ScheduleType::Dial(vec![]));
644            }
645        } else {
646            self.schedule_retry(peer, ScheduleType::Discover);
647        }
648
649        Ok(())
650    }
651
652    /// Add pending message to peer.
653    ///
654    /// If count/bytes limits are reached, oldest messages are evicted first.
655    fn add_pending_outbound_message(&mut self, peer: PeerId, message: Bytes) {
656        let message_len = message.len();
657        let mut dropped_count = 0u64;
658        let mut dropped_bytes_limit_peer = 0u64;
659        let mut dropped_bytes_limit_global = 0u64;
660        let mut dropped_messages = Vec::new();
661        let mut duplicate = false;
662        let mut total_pending_bytes = self.pending_outbound_bytes_len();
663        let per_peer_limit = self.max_pending_outbound_bytes_per_peer;
664        let global_limit = self.max_pending_outbound_bytes_total;
665
666        {
667            let queue = self.pending_outbound_messages.entry(peer).or_default();
668            if queue.contains(&message) {
669                duplicate = true;
670            } else {
671                while queue.len() >= MAX_PENDING_MESSAGES_PER_PEER {
672                    if let Some(evicted) = queue.pop_front() {
673                        dropped_count += 1;
674                        total_pending_bytes = total_pending_bytes
675                            .saturating_sub(evicted.payload.len());
676                        dropped_messages.push(evicted);
677                    } else {
678                        break;
679                    }
680                }
681
682                if per_peer_limit > 0 {
683                    while queue.bytes_len() + message_len > per_peer_limit {
684                        if let Some(evicted) = queue.pop_front() {
685                            dropped_bytes_limit_peer += 1;
686                            total_pending_bytes = total_pending_bytes
687                                .saturating_sub(evicted.payload.len());
688                            dropped_messages.push(evicted);
689                        } else {
690                            break;
691                        }
692                    }
693                }
694
695                if per_peer_limit > 0
696                    && queue.bytes_len() + message_len > per_peer_limit
697                {
698                    dropped_bytes_limit_peer += 1;
699                } else if global_limit > 0
700                    && total_pending_bytes.saturating_add(message_len)
701                        > global_limit
702                {
703                    dropped_bytes_limit_global += 1;
704                } else {
705                    queue.push_back(message);
706                }
707            }
708        }
709
710        if duplicate {
711            self.refresh_runtime_metrics();
712            return;
713        }
714
715        for evicted in dropped_messages {
716            self.observe_pending_message_age(evicted.enqueued_at);
717        }
718
719        if dropped_count > 0 {
720            warn!(
721                target: TARGET,
722                peer_id = %peer,
723                dropped = dropped_count,
724                max_messages = MAX_PENDING_MESSAGES_PER_PEER,
725                "outbound queue count limit reached; oldest messages evicted",
726            );
727        }
728
729        if dropped_bytes_limit_peer > 0 {
730            warn!(
731                target: TARGET,
732                peer_id = %peer,
733                dropped = dropped_bytes_limit_peer,
734                message_bytes = message_len,
735                max_queue_bytes = per_peer_limit,
736                "outbound queue bytes limit reached; messages evicted/dropped",
737            );
738        }
739
740        if dropped_bytes_limit_global > 0 {
741            warn!(
742                target: TARGET,
743                peer_id = %peer,
744                dropped = dropped_bytes_limit_global,
745                message_bytes = message_len,
746                max_queue_bytes_total = global_limit,
747                "outbound global queue bytes limit reached; messages dropped",
748            );
749        }
750
751        if let Some(metrics) = self.metric_handle() {
752            metrics.inc_outbound_queue_drop_by(dropped_count);
753            metrics.inc_outbound_queue_bytes_drop_per_peer_by(
754                dropped_bytes_limit_peer,
755            );
756            metrics.inc_outbound_queue_bytes_drop_global_by(
757                dropped_bytes_limit_global,
758            );
759        }
760
761        self.refresh_runtime_metrics();
762    }
763
764    fn add_pending_inbound_message(&mut self, peer: PeerId, message: Bytes) {
765        let message_len = message.len();
766        let mut dropped_count = 0u64;
767        let mut dropped_bytes_limit_peer = 0u64;
768        let mut dropped_bytes_limit_global = 0u64;
769        let mut dropped_messages = Vec::new();
770        let mut duplicate = false;
771        let mut total_pending_bytes = self.pending_inbound_bytes_len();
772        let per_peer_limit = self.max_pending_inbound_bytes_per_peer;
773        let global_limit = self.max_pending_inbound_bytes_total;
774
775        {
776            let queue = self.pending_inbound_messages.entry(peer).or_default();
777            if queue.contains(&message) {
778                duplicate = true;
779            } else {
780                while queue.len() >= MAX_PENDING_MESSAGES_PER_PEER {
781                    if let Some(evicted) = queue.pop_front() {
782                        dropped_count += 1;
783                        total_pending_bytes = total_pending_bytes
784                            .saturating_sub(evicted.payload.len());
785                        dropped_messages.push(evicted);
786                    } else {
787                        break;
788                    }
789                }
790
791                if per_peer_limit > 0 {
792                    while queue.bytes_len() + message_len > per_peer_limit {
793                        if let Some(evicted) = queue.pop_front() {
794                            dropped_bytes_limit_peer += 1;
795                            total_pending_bytes = total_pending_bytes
796                                .saturating_sub(evicted.payload.len());
797                            dropped_messages.push(evicted);
798                        } else {
799                            break;
800                        }
801                    }
802                }
803
804                if per_peer_limit > 0
805                    && queue.bytes_len() + message_len > per_peer_limit
806                {
807                    dropped_bytes_limit_peer += 1;
808                } else if global_limit > 0
809                    && total_pending_bytes.saturating_add(message_len)
810                        > global_limit
811                {
812                    dropped_bytes_limit_global += 1;
813                } else {
814                    queue.push_back(message);
815                }
816            }
817        }
818
819        if duplicate {
820            self.refresh_runtime_metrics();
821            return;
822        }
823
824        for evicted in dropped_messages {
825            self.observe_pending_message_age(evicted.enqueued_at);
826        }
827
828        if dropped_count > 0 {
829            warn!(
830                target: TARGET,
831                peer_id = %peer,
832                dropped = dropped_count,
833                max_messages = MAX_PENDING_MESSAGES_PER_PEER,
834                "inbound queue count limit reached; oldest messages evicted",
835            );
836        }
837
838        if dropped_bytes_limit_peer > 0 {
839            warn!(
840                target: TARGET,
841                peer_id = %peer,
842                dropped = dropped_bytes_limit_peer,
843                message_bytes = message_len,
844                max_queue_bytes = per_peer_limit,
845                "inbound queue bytes limit reached; messages evicted/dropped",
846            );
847        }
848
849        if dropped_bytes_limit_global > 0 {
850            warn!(
851                target: TARGET,
852                peer_id = %peer,
853                dropped = dropped_bytes_limit_global,
854                message_bytes = message_len,
855                max_queue_bytes_total = global_limit,
856                "inbound global queue bytes limit reached; messages dropped",
857            );
858        }
859
860        if let Some(metrics) = self.metric_handle() {
861            metrics.inc_inbound_queue_drop_by(dropped_count);
862            metrics.inc_inbound_queue_bytes_drop_per_peer_by(
863                dropped_bytes_limit_peer,
864            );
865            metrics.inc_inbound_queue_bytes_drop_global_by(
866                dropped_bytes_limit_global,
867            );
868        }
869
870        self.refresh_runtime_metrics();
871    }
872
873    /// Add ephemeral response.
874    fn add_ephemeral_response(
875        &mut self,
876        peer: PeerId,
877        response_channel: ResponseChannel<ReqResMessage>,
878    ) {
879        self.response_channels
880            .entry(peer)
881            .or_default()
882            .push_back(response_channel);
883        self.refresh_runtime_metrics();
884    }
885
886    /// Send pending messages to peer.
887    fn send_pending_outbound_messages(&mut self, peer: PeerId) {
888        if let Some(mut queue) = self.pending_outbound_messages.remove(&peer) {
889            for message in queue.drain() {
890                self.observe_pending_message_age(message.enqueued_at);
891                self.swarm
892                    .behaviour_mut()
893                    .send_message(&peer, message.payload);
894            }
895        }
896
897        self.retry_by_peer.remove(&peer);
898        self.refresh_runtime_metrics();
899    }
900
901    /// Get the network service.
902    pub fn service(&self) -> NetworkService {
903        self.service.clone()
904    }
905
906    /// Change the network state.
907    async fn change_state(&mut self, state: NetworkState) {
908        trace!(target: TARGET, state = ?state, "state changed");
909        self.state = state.clone();
910        if let Some(metrics) = self.metric_handle() {
911            metrics.observe_state_transition(&state);
912        }
913        self.send_event(NetworkEvent::StateChanged(state)).await;
914    }
915
916    /// Send event
917    #[allow(clippy::needless_pass_by_ref_mut)]
918    async fn send_event(&mut self, event: NetworkEvent) {
919        if let Some(monitor) = self.monitor.clone()
920            && let Err(e) = monitor.tell(MonitorMessage::Network(event)).await
921        {
922            error!(target: TARGET, error = %e, "failed to forward event to monitor");
923            self.crash_token.cancel();
924        }
925    }
926
927    /// Run the network worker.
928    pub async fn run(&mut self) {
929        let bootstrap_start = Instant::now();
930
931        // Run connection to bootstrap node.
932        if let Err(error) = self.run_connection().await {
933            if let Some(metrics) = self.metric_handle() {
934                metrics.observe_bootstrap_duration_seconds(
935                    "failure",
936                    bootstrap_start.elapsed().as_secs_f64(),
937                );
938            }
939            error!(target: TARGET, error = %error, "bootstrap connection failed");
940            self.send_event(NetworkEvent::Error(error)).await;
941            // Irrecoverable error. Cancel the node.
942            self.crash_token.cancel();
943            return;
944        }
945        if let Some(metrics) = self.metric_handle() {
946            metrics.observe_bootstrap_duration_seconds(
947                "success",
948                bootstrap_start.elapsed().as_secs_f64(),
949            );
950        }
951
952        if self.state != NetworkState::Running {
953            self.change_state(NetworkState::Running).await;
954        }
955
956        // Finish pre routing state, activating random walk (if node is a bootstrap).
957        if !self.is_safe_mode() {
958            self.swarm.behaviour_mut().finish_prerouting_state();
959        }
960        // Run main loop.
961        self.run_main().await;
962    }
963
964    /// Run connection to bootstrap node.
965    pub async fn run_connection(&mut self) -> Result<(), Error> {
966        if self.is_safe_mode() {
967            self.change_state(NetworkState::Running).await;
968            return Ok(());
969        }
970
971        // If is the first node of ave network.
972        if self.node_type == NodeType::Bootstrap && self.boot_nodes.is_empty() {
973            self.change_state(NetworkState::Running).await;
974            Ok(())
975        } else {
976            self.change_state(NetworkState::Dial).await;
977
978            let mut retrys: u8 = 0;
979
980            // Per-round deadline: if any boot node's TCP connects but Identify never
981            // completes (NAT half-open, overloaded peer, protocol stall…), this timer
982            // moves the remaining nodes to retry_boot_nodes so the retry logic handles
983            // them instead of waiting up to idle_connection_timeout (90 s) per round.
984            let dialing_round_timeout = sleep(Duration::from_secs(0));
985            tokio::pin!(dialing_round_timeout);
986            let mut dialing_timeout_active = false;
987
988            loop {
989                match self.state {
990                    NetworkState::Dial => {
991                        // Dial to boot node.
992                        if self.boot_nodes.is_empty() {
993                            error!(target: TARGET, "no bootstrap nodes available");
994                            self.send_event(NetworkEvent::Error(
995                                Error::NoBootstrapNode,
996                            ))
997                            .await;
998                            self.change_state(NetworkState::Disconnected).await;
999                        } else {
1000                            let copy_boot_nodes = self.boot_nodes.clone();
1001
1002                            for (peer, addresses) in copy_boot_nodes {
1003                                if let Some(metrics) = self.metric_handle() {
1004                                    metrics.inc_dial_attempt_bootstrap();
1005                                }
1006                                if let Err(e) = self.swarm.dial(
1007                                    DialOpts::peer_id(peer)
1008                                        .addresses(addresses.clone())
1009                                        .build(),
1010                                ) {
1011                                    let (add_to_retry, new_addresses) = self
1012                                        .handle_dial_error(e, &peer, true)
1013                                        .unwrap_or((false, vec![]));
1014                                    self.boot_nodes.remove(&peer);
1015                                    if add_to_retry {
1016                                        if new_addresses.is_empty() {
1017                                            self.retry_boot_nodes
1018                                                .insert(peer, addresses);
1019                                        } else {
1020                                            self.retry_boot_nodes
1021                                                .insert(peer, new_addresses);
1022                                        }
1023                                    }
1024                                }
1025                            }
1026                            if !self.boot_nodes.is_empty() {
1027                                self.change_state(NetworkState::Dialing).await;
1028                                dialing_round_timeout.as_mut().reset(
1029                                    Instant::now() + Duration::from_secs(15),
1030                                );
1031                                dialing_timeout_active = true;
1032                            } else {
1033                                warn!(target: TARGET, "all bootstrap dials failed");
1034                                self.change_state(NetworkState::Disconnected)
1035                                    .await;
1036                            }
1037                        }
1038                    }
1039                    NetworkState::Dialing => {
1040                        // No more bootnodes to send dial, none was successful nut one or more Dial fail by keepalivetimeout
1041                        if self.boot_nodes.is_empty()
1042                            && self.successful_dials == 0
1043                            && !self.retry_boot_nodes.is_empty()
1044                            && retrys < 3
1045                        {
1046                            retrys += 1;
1047                            let wait = Duration::from_secs(1u64 << retrys); // retrys=1→2s, retrys=2→4s
1048                            debug!(target: TARGET, attempt = retrys, wait_secs = wait.as_secs(), "retrying bootstrap dials");
1049
1050                            let backoff = sleep(wait);
1051                            tokio::pin!(backoff);
1052                            loop {
1053                                tokio::select! {
1054                                    _ = &mut backoff => break,
1055                                    event = self.swarm.select_next_some() => {
1056                                        self.handle_connection_events(event).await;
1057                                    }
1058                                    _ = self.graceful_token.cancelled() => {
1059                                        return Err(Error::Cancelled);
1060                                    }
1061                                    _ = self.crash_token.cancelled() => {
1062                                        return Err(Error::Cancelled);
1063                                    }
1064                                }
1065                            }
1066
1067                            dialing_timeout_active = false;
1068                            self.boot_nodes.clone_from(&self.retry_boot_nodes);
1069                            self.retry_boot_nodes.clear();
1070                            self.change_state(NetworkState::Dial).await;
1071                        }
1072                        // No more bootnodes to send dial and none was successful
1073                        else if self.boot_nodes.is_empty()
1074                            && self.successful_dials == 0
1075                        {
1076                            self.change_state(NetworkState::Disconnected).await;
1077                        // No more bootnodes to send dial and one or more was successful
1078                        } else if self.boot_nodes.is_empty() {
1079                            return Ok(());
1080                        }
1081                    }
1082                    NetworkState::Running => {
1083                        return Ok(());
1084                    }
1085                    NetworkState::Disconnected => {
1086                        return Err(Error::NoBootstrapNode);
1087                    }
1088                    _ => {}
1089                }
1090                if self.state != NetworkState::Disconnected {
1091                    tokio::select! {
1092                        event = self.swarm.select_next_some() => {
1093                            self.handle_connection_events(event).await;
1094                        }
1095                                    _ = self.graceful_token.cancelled() => {
1096                                        return Err(Error::Cancelled);
1097                                    }
1098                                    _ = self.crash_token.cancelled() => {
1099                                        return Err(Error::Cancelled);
1100                                    }
1101                        _ = &mut dialing_round_timeout, if dialing_timeout_active => {
1102                            warn!(
1103                                target: TARGET,
1104                                remaining = self.boot_nodes.len(),
1105                                "bootstrap round timed out waiting for Identify; \
1106                                 moving remaining peers to retry queue"
1107                            );
1108                            for (peer, addrs) in self.boot_nodes.drain() {
1109                                self.retry_boot_nodes.insert(peer, addrs);
1110                            }
1111                            dialing_timeout_active = false;
1112                        }
1113                    }
1114                }
1115            }
1116        }
1117    }
1118
1119    fn collect_retryable_transport_addresses(
1120        &self,
1121        items: Vec<(Multiaddr, libp2p::TransportError<std::io::Error>)>,
1122        trace_unreachable: bool,
1123    ) -> Vec<Multiaddr> {
1124        let mut new_addresses = vec![];
1125        for (address, error) in items {
1126            if trace_unreachable {
1127                trace!(target: TARGET, addr = %address, err = ?error, "address unreachable");
1128            }
1129
1130            if let libp2p::TransportError::Other(e) = error {
1131                match e.kind() {
1132                    std::io::ErrorKind::ConnectionRefused
1133                    | std::io::ErrorKind::TimedOut
1134                    | std::io::ErrorKind::ConnectionAborted
1135                    | std::io::ErrorKind::NotConnected
1136                    | std::io::ErrorKind::BrokenPipe
1137                    | std::io::ErrorKind::Interrupted
1138                    | std::io::ErrorKind::HostUnreachable
1139                    | std::io::ErrorKind::NetworkUnreachable => {
1140                        new_addresses.push(address);
1141                    }
1142                    _ => {}
1143                }
1144            };
1145        }
1146
1147        new_addresses
1148    }
1149
1150    /// Classify a dial error and return retry decision.
1151    ///
1152    /// - `bootstrap_flow = true` keeps bootstrap-specific behaviour/logs.
1153    /// - `bootstrap_flow = false` keeps runtime-specific behaviour/logs/cleanup.
1154    fn handle_dial_error(
1155        &mut self,
1156        e: DialError,
1157        peer_id: &PeerId,
1158        bootstrap_flow: bool,
1159    ) -> Option<(bool, Vec<Multiaddr>)> {
1160        let phase = if bootstrap_flow {
1161            "bootstrap"
1162        } else {
1163            "runtime"
1164        };
1165        match e {
1166            DialError::LocalPeerId { .. } => {
1167                if let Some(metrics) = self.metric_handle() {
1168                    metrics.observe_dial_failure(phase, "local_peer_id");
1169                }
1170                if bootstrap_flow {
1171                    warn!(target: TARGET, peer_id = %peer_id, "dial rejected: connected peer-id matches local peer");
1172                    return Some((false, vec![]));
1173                }
1174
1175                self.retry_by_peer.remove(peer_id);
1176                self.clear_pending_messages(peer_id);
1177                self.swarm
1178                    .behaviour_mut()
1179                    .clean_hard_peer_to_remove(peer_id);
1180                None
1181            }
1182            DialError::NoAddresses => {
1183                if let Some(metrics) = self.metric_handle() {
1184                    metrics.observe_dial_failure(phase, "no_addresses");
1185                }
1186                if bootstrap_flow {
1187                    debug!(target: TARGET, peer_id = %peer_id, "dial skipped: no addresses");
1188                }
1189                Some((false, vec![]))
1190            }
1191            DialError::DialPeerConditionFalse(_) => {
1192                if let Some(metrics) = self.metric_handle() {
1193                    metrics.observe_dial_failure(phase, "peer_condition");
1194                }
1195                if bootstrap_flow {
1196                    debug!(target: TARGET, peer_id = %peer_id, "dial skipped: peer condition not met");
1197                    return Some((false, vec![]));
1198                }
1199
1200                None
1201            }
1202            DialError::Denied { cause } => {
1203                if let Some(metrics) = self.metric_handle() {
1204                    metrics.observe_dial_failure(phase, "denied");
1205                }
1206                if bootstrap_flow {
1207                    debug!(target: TARGET, peer_id = %peer_id, cause = %cause, "dial denied by behaviour");
1208                }
1209                Some((false, vec![]))
1210            }
1211            DialError::Aborted => {
1212                if let Some(metrics) = self.metric_handle() {
1213                    metrics.observe_dial_failure(phase, "aborted");
1214                }
1215                if bootstrap_flow {
1216                    debug!(target: TARGET, peer_id = %peer_id, "dial aborted, will retry");
1217                }
1218                Some((true, vec![]))
1219            }
1220            DialError::WrongPeerId { obtained, .. } => {
1221                if let Some(metrics) = self.metric_handle() {
1222                    metrics.observe_dial_failure(phase, "wrong_peer_id");
1223                }
1224                if bootstrap_flow {
1225                    warn!(target: TARGET, expected = %peer_id, obtained = %obtained, "dial failed: peer identity mismatch");
1226                    return Some((false, vec![]));
1227                }
1228
1229                self.retry_by_peer.remove(peer_id);
1230                self.clear_pending_messages(peer_id);
1231                self.swarm
1232                    .behaviour_mut()
1233                    .clean_hard_peer_to_remove(peer_id);
1234                None
1235            }
1236            DialError::Transport(items) => {
1237                if let Some(metrics) = self.metric_handle() {
1238                    metrics.observe_dial_failure(phase, "transport");
1239                }
1240                if bootstrap_flow {
1241                    debug!(target: TARGET, peer_id = %peer_id, "transport dial failed");
1242                }
1243
1244                let new_addresses = self.collect_retryable_transport_addresses(
1245                    items,
1246                    bootstrap_flow,
1247                );
1248                if !new_addresses.is_empty() {
1249                    Some((true, new_addresses))
1250                } else {
1251                    Some((false, vec![]))
1252                }
1253            }
1254        }
1255    }
1256
1257    /// Handle connection events.
1258    async fn handle_connection_events(
1259        &mut self,
1260        event: SwarmEvent<BehaviourEvent>,
1261    ) {
1262        match event {
1263            SwarmEvent::ConnectionClosed { peer_id, .. } => {
1264                self.boot_nodes.remove(&peer_id);
1265            }
1266            SwarmEvent::OutgoingConnectionError {
1267                peer_id: Some(peer_id),
1268                error,
1269                ..
1270            } => {
1271                let (add_to_retry, new_addresses) = self
1272                    .handle_dial_error(error, &peer_id, true)
1273                    .unwrap_or((false, vec![]));
1274
1275                if let Some(addresses) = self.boot_nodes.remove(&peer_id)
1276                    && add_to_retry
1277                {
1278                    if new_addresses.is_empty() {
1279                        self.retry_boot_nodes.insert(peer_id, addresses);
1280                    } else {
1281                        self.retry_boot_nodes.insert(peer_id, new_addresses);
1282                    }
1283                }
1284            }
1285            SwarmEvent::Behaviour(BehaviourEvent::Identified {
1286                peer_id,
1287                info,
1288                connection_id,
1289            }) => {
1290                if !self
1291                    .check_protocols(&info.protocol_version, &info.protocols)
1292                {
1293                    warn!(target: TARGET, peer_id = %peer_id, protocol_version = %info.protocol_version, "peer uses incompatible protocols; closing connection");
1294
1295                    self.swarm
1296                        .behaviour_mut()
1297                        .close_connections(&peer_id, Some(connection_id));
1298                } else {
1299                    self.peer_action
1300                        .insert(peer_id, Action::Identified(connection_id));
1301
1302                    let mut any_address_is_valid = false;
1303                    for addr in info.listen_addrs {
1304                        if self
1305                            .swarm
1306                            .behaviour_mut()
1307                            .add_self_reported_address(&peer_id, &addr)
1308                        {
1309                            any_address_is_valid = true;
1310                        }
1311                    }
1312
1313                    if any_address_is_valid {
1314                        if self.boot_nodes.remove(&peer_id).is_some() {
1315                            self.successful_dials += 1;
1316                        }
1317                        self.peer_identify.insert(peer_id);
1318                    } else {
1319                        warn!(target: TARGET, peer_id = %peer_id, "bootstrap peer has no valid addresses");
1320
1321                        self.swarm
1322                            .behaviour_mut()
1323                            .close_connections(&peer_id, Some(connection_id));
1324                    }
1325                }
1326            }
1327            SwarmEvent::Behaviour(BehaviourEvent::IdentifyError {
1328                peer_id,
1329                error,
1330            }) => {
1331                self.observe_identify_error(&error);
1332                match error {
1333                    swarm::StreamUpgradeError::Timeout => {
1334                        // Recoverable: wait for ConnectionClosed to remove from boot_nodes.
1335                    }
1336                    _ => {
1337                        // Hard failure: Identify will never complete; move to retry immediately
1338                        // instead of waiting for the per-round timeout.
1339                        debug!(target: TARGET, peer_id = %peer_id, error = %error, "identify hard failure during bootstrap; queuing for retry");
1340                        if let Some(addrs) = self.boot_nodes.remove(&peer_id) {
1341                            self.retry_boot_nodes.insert(peer_id, addrs);
1342                        }
1343                    }
1344                }
1345            }
1346            _ => {}
1347        }
1348        self.refresh_runtime_metrics();
1349    }
1350
1351    fn clear_pending_messages(&mut self, peer_id: &PeerId) {
1352        warn!(target: TARGET, peer_id = %peer_id, "max dial attempts reached; dropping pending messages");
1353
1354        let dropped = self.drop_pending_outbound_messages(peer_id);
1355        if let Some(metrics) = self.metric_handle() {
1356            metrics.inc_max_retries_drop_by(dropped as u64);
1357        }
1358        self.peer_action.remove(peer_id);
1359        self.retry_by_peer.remove(peer_id);
1360        self.refresh_runtime_metrics();
1361    }
1362
1363    fn check_protocols(
1364        &self,
1365        protocol_version: &str,
1366        protocols: &[StreamProtocol],
1367    ) -> bool {
1368        let supp_protocols: HashSet<StreamProtocol> = protocols
1369            .iter()
1370            .cloned()
1371            .collect::<HashSet<StreamProtocol>>();
1372
1373        protocol_version == IDENTIFY_PROTOCOL
1374            && supp_protocols.contains(&StreamProtocol::new(REQRES_PROTOCOL))
1375    }
1376
1377    /// Run network worker.
1378    pub async fn run_main(&mut self) {
1379        info!(target: TARGET, "network worker started");
1380
1381        loop {
1382            tokio::select! {
1383                command = self.command_receiver.recv() => {
1384                    match command {
1385                        Some(command) => self.handle_command(command).await,
1386                        None => break,
1387                    }
1388                }
1389                event = self.swarm.select_next_some() => {
1390                    // Handle events.
1391                    self.handle_event(event).await;
1392                }
1393                _ = async {
1394                    if let Some(t) = &mut self.retry_timer {
1395                        t.as_mut().await;
1396                    }
1397                }, if self.retry_timer.is_some() => {
1398                    for (peer, kind, addrs) in self.drain_due_retries() {
1399                        if let Some(action) = self.peer_action.get(&peer) {
1400                            match (action, kind) {
1401                                (Action::Discover, RetryKind::Discover) => {
1402                                    self.swarm.behaviour_mut().discover(&peer);
1403                                },
1404                                (Action::Dial, RetryKind::Dial) => {
1405                                    if let Some(metrics) = self.metric_handle() {
1406                                        metrics.inc_dial_attempt_runtime();
1407                                    }
1408                                    if let Err(error) = self.swarm.dial(
1409                                        DialOpts::peer_id(peer)
1410                                            .addresses(addrs)
1411                                            .extend_addresses_through_behaviour()
1412                                            .build()
1413                                    ) && let Some((retry, new_address)) =
1414                                        self.handle_dial_error(error, &peer, false) {
1415
1416                                        self.peer_action.remove(&peer);
1417                                        if retry {
1418                                            let addr = new_address
1419                                                    .iter()
1420                                                    .filter(|x| {
1421                                                        !self
1422                                                        .swarm
1423                                                        .behaviour()
1424                                                        .is_invalid_address(x)
1425                                                    })
1426                                                    .cloned()
1427                                                    .collect::<Vec<Multiaddr>>();
1428
1429                                                if addr.is_empty() {
1430                                                    self.schedule_retry(peer, ScheduleType::Discover);
1431                                                } else {
1432                                                    self.schedule_retry(peer, ScheduleType::Dial(addr.clone()));
1433                                                }
1434                                        } else {
1435                                            self.schedule_retry(peer, ScheduleType::Discover);
1436                                        }
1437                                };
1438                                },
1439                                _ => {}
1440                            }
1441                        };
1442                    }
1443                },
1444                                    _ = self.graceful_token.cancelled() => {
1445                                        break;
1446                                    }
1447                                    _ = self.crash_token.cancelled() => {
1448                                        break;
1449                                    }
1450            }
1451        }
1452    }
1453
1454    async fn handle_command(&mut self, command: Command) {
1455        match command {
1456            Command::SendMessage { peer, message } => {
1457                if self.is_safe_mode() {
1458                    debug!(
1459                        target: TARGET,
1460                        peer_id = %peer,
1461                        size = message.len(),
1462                        "safe mode active; ignoring send command"
1463                    );
1464                    return;
1465                }
1466                if let Err(error) = self.send_message(peer, message) {
1467                    error!(target: TARGET, error = %error, "failed to deliver message");
1468                    self.send_event(NetworkEvent::Error(error)).await;
1469                }
1470            }
1471        }
1472    }
1473
1474    #[allow(clippy::needless_pass_by_ref_mut)]
1475    async fn message_to_helper(
1476        &mut self,
1477        message: MessagesHelper,
1478        peer_id: &PeerId,
1479    ) {
1480        let sender = match peer_id_to_ed25519_pubkey_bytes(peer_id) {
1481            Ok(public_key) => public_key,
1482            Err(e) => {
1483                warn!(target: TARGET, error = %e, "cannot resolve public key from peer id");
1484                return;
1485            }
1486        };
1487
1488        'Send: {
1489            if let Some(helper_sender) = self.helper_sender.as_ref() {
1490                match message {
1491                    MessagesHelper::Single(items) => {
1492                        if helper_sender
1493                            .send(CommandHelper::ReceivedMessage {
1494                                sender,
1495                                message: items,
1496                            })
1497                            .await
1498                            .is_err()
1499                        {
1500                            break 'Send;
1501                        }
1502                    }
1503                    MessagesHelper::Vec(items) => {
1504                        for item in items {
1505                            if helper_sender
1506                                .send(CommandHelper::ReceivedMessage {
1507                                    sender,
1508                                    message: item,
1509                                })
1510                                .await
1511                                .is_err()
1512                            {
1513                                break 'Send;
1514                            }
1515                        }
1516                    }
1517                }
1518
1519                return;
1520            }
1521        }
1522
1523        error!(target: TARGET, "helper channel closed; shutting down");
1524        self.crash_token.cancel();
1525    }
1526
1527    async fn handle_event(&mut self, event: SwarmEvent<BehaviourEvent>) {
1528        if self.is_safe_mode() {
1529            match event {
1530                SwarmEvent::Behaviour(BehaviourEvent::ReqresMessage {
1531                    peer_id,
1532                    ..
1533                }) => {
1534                    debug!(
1535                        target: TARGET,
1536                        peer_id = %peer_id,
1537                        "safe mode active; dropping inbound reqres message"
1538                    );
1539                    self.swarm
1540                        .behaviour_mut()
1541                        .close_connections(&peer_id, None);
1542                }
1543                SwarmEvent::Behaviour(BehaviourEvent::Identified {
1544                    peer_id,
1545                    connection_id,
1546                    ..
1547                }) => {
1548                    debug!(
1549                        target: TARGET,
1550                        peer_id = %peer_id,
1551                        "safe mode active; closing identified peer"
1552                    );
1553                    self.swarm
1554                        .behaviour_mut()
1555                        .close_connections(&peer_id, Some(connection_id));
1556                }
1557                SwarmEvent::Behaviour(BehaviourEvent::IdentifyError {
1558                    peer_id,
1559                    ..
1560                }) => {
1561                    self.swarm
1562                        .behaviour_mut()
1563                        .close_connections(&peer_id, None);
1564                }
1565                SwarmEvent::ConnectionEstablished {
1566                    peer_id,
1567                    connection_id,
1568                    ..
1569                } => {
1570                    debug!(
1571                        target: TARGET,
1572                        peer_id = %peer_id,
1573                        "safe mode active; closing established connection"
1574                    );
1575                    self.swarm
1576                        .behaviour_mut()
1577                        .close_connections(&peer_id, Some(connection_id));
1578                }
1579                SwarmEvent::OutgoingConnectionError { .. }
1580                | SwarmEvent::ConnectionClosed { .. }
1581                | SwarmEvent::IncomingConnectionError { .. }
1582                | SwarmEvent::IncomingConnection { .. }
1583                | SwarmEvent::ListenerClosed { .. }
1584                | SwarmEvent::Dialing { .. }
1585                | SwarmEvent::NewExternalAddrCandidate { .. }
1586                | SwarmEvent::ExternalAddrConfirmed { .. }
1587                | SwarmEvent::ExternalAddrExpired { .. }
1588                | SwarmEvent::NewExternalAddrOfPeer { .. }
1589                | SwarmEvent::NewListenAddr { .. }
1590                | SwarmEvent::ExpiredListenAddr { .. }
1591                | SwarmEvent::ListenerError { .. }
1592                | SwarmEvent::Behaviour(BehaviourEvent::ReqresFailure {
1593                    ..
1594                })
1595                | SwarmEvent::Behaviour(BehaviourEvent::ClosestPeer {
1596                    ..
1597                })
1598                | SwarmEvent::Behaviour(BehaviourEvent::Dummy) => {}
1599                _ => {}
1600            }
1601            self.refresh_runtime_metrics();
1602            return;
1603        }
1604
1605        match event {
1606            SwarmEvent::Behaviour(event) => {
1607                match event {
1608                    BehaviourEvent::Identified {
1609                        peer_id,
1610                        info,
1611                        connection_id,
1612                    } => {
1613                        if !self.check_protocols(
1614                            &info.protocol_version,
1615                            &info.protocols,
1616                        ) {
1617                            warn!(
1618                                target: TARGET,
1619                                peer_id = %peer_id,
1620                                protocol_version = %info.protocol_version,
1621                                protocols = ?info.protocols,
1622                                "peer uses incompatible protocols; closing connection"
1623                            );
1624
1625                            self.clear_pending_messages(&peer_id);
1626
1627                            self.swarm
1628                                .behaviour_mut()
1629                                .clean_hard_peer_to_remove(&peer_id);
1630
1631                            self.swarm.behaviour_mut().close_connections(
1632                                &peer_id,
1633                                Some(connection_id),
1634                            );
1635                        } else {
1636                            self.peer_action.insert(
1637                                peer_id,
1638                                Action::Identified(connection_id),
1639                            );
1640
1641                            self.swarm
1642                                .behaviour_mut()
1643                                .clean_peer_to_remove(&peer_id);
1644                            for addr in info.listen_addrs {
1645                                self.swarm
1646                                    .behaviour_mut()
1647                                    .add_self_reported_address(&peer_id, &addr);
1648                            }
1649
1650                            self.peer_identify.insert(peer_id);
1651
1652                            if let Some(mut queue) =
1653                                self.pending_inbound_messages.remove(&peer_id)
1654                            {
1655                                let mut buffered = VecDeque::new();
1656                                for message in queue.drain() {
1657                                    self.observe_pending_message_age(
1658                                        message.enqueued_at,
1659                                    );
1660                                    buffered.push_back(message.payload);
1661                                }
1662                                self.message_to_helper(
1663                                    MessagesHelper::Vec(buffered),
1664                                    &peer_id,
1665                                )
1666                                .await;
1667                            };
1668
1669                            self.send_pending_outbound_messages(peer_id);
1670                        }
1671                    }
1672                    BehaviourEvent::IdentifyError { peer_id, error } => {
1673                        self.observe_identify_error(&error);
1674                        debug!(target: TARGET, peer_id = %peer_id, error = %error, "identify error");
1675
1676                        match error {
1677                            swarm::StreamUpgradeError::Timeout => {
1678                                // We do not clean since we will try to open the connection when it is
1679                                // confirmed that it has been closed in SwarmEvent::ConnectionClosed
1680                            }
1681                            swarm::StreamUpgradeError::Apply(..)
1682                            | swarm::StreamUpgradeError::NegotiationFailed
1683                            | swarm::StreamUpgradeError::Io(..) => {
1684                                // Do not call clear_pending_messages here — it removes
1685                                // peer_action, which ConnectionClosed needs to trigger
1686                                // its cleanup path. Clean only the state that
1687                                // ConnectionClosed won't reach.
1688                                self.drop_pending_outbound_messages(&peer_id);
1689                                self.retry_by_peer.remove(&peer_id);
1690                                self.response_channels.remove(&peer_id);
1691                                self.drop_pending_inbound_messages(&peer_id);
1692                            }
1693                        }
1694
1695                        self.swarm
1696                            .behaviour_mut()
1697                            .close_connections(&peer_id, None);
1698                    }
1699                    BehaviourEvent::ReqresMessage { peer_id, message } => {
1700                        let (message_data, is_request, response_channel) =
1701                            match message {
1702                                request_response::Message::Request {
1703                                    request,
1704                                    channel,
1705                                    ..
1706                                } => (request.0, true, Some(channel)),
1707                                request_response::Message::Response {
1708                                    response,
1709                                    ..
1710                                } => (response.0, false, None),
1711                            };
1712
1713                        if message_data.len() > self.max_app_message_bytes {
1714                            warn!(
1715                                target: TARGET,
1716                                peer_id = %peer_id,
1717                                size = message_data.len(),
1718                                max = self.max_app_message_bytes,
1719                                "inbound payload dropped: message too large",
1720                            );
1721                            if let Some(metrics) = self.metric_handle() {
1722                                metrics.inc_oversized_inbound_drop();
1723                            }
1724                            self.swarm
1725                                .behaviour_mut()
1726                                .close_connections(&peer_id, None);
1727                            self.refresh_runtime_metrics();
1728                            return;
1729                        }
1730
1731                        if is_request {
1732                            if let Some(metrics) = self.metric_handle() {
1733                                metrics.inc_reqres_request_received();
1734                            }
1735                            trace!(target: TARGET, peer_id = %peer_id, "request received");
1736                            if let Some(channel) = response_channel {
1737                                self.add_ephemeral_response(peer_id, channel);
1738                            }
1739                        } else {
1740                            if let Some(metrics) = self.metric_handle() {
1741                                metrics.inc_reqres_response_received();
1742                            }
1743                            trace!(target: TARGET, peer_id = %peer_id, "response received");
1744                        }
1745
1746                        if self.peer_identify.contains(&peer_id) {
1747                            self.message_to_helper(
1748                                MessagesHelper::Single(message_data),
1749                                &peer_id,
1750                            )
1751                            .await;
1752                        } else {
1753                            self.add_pending_inbound_message(
1754                                peer_id,
1755                                message_data,
1756                            );
1757                        }
1758                    }
1759                    BehaviourEvent::ReqresFailure {
1760                        peer_id,
1761                        direction,
1762                        kind,
1763                    } => {
1764                        if let Some(metrics) = self.metric_handle() {
1765                            metrics.observe_reqres_failure(
1766                                direction.as_metric_label(),
1767                                kind.as_metric_label(),
1768                            );
1769                        }
1770                        debug!(
1771                            target: TARGET,
1772                            peer_id = %peer_id,
1773                            direction = direction.as_metric_label(),
1774                            kind = kind.as_metric_label(),
1775                            "request-response failure"
1776                        );
1777                    }
1778                    BehaviourEvent::ClosestPeer { peer_id, info } => {
1779                        if matches!(
1780                            self.peer_action.get(&peer_id),
1781                            Some(Action::Discover)
1782                        ) {
1783                            self.peer_action.remove(&peer_id);
1784                            if let Some(info) = info {
1785                                let addr = info
1786                                    .addrs
1787                                    .iter()
1788                                    .filter(|x| {
1789                                        !self
1790                                            .swarm
1791                                            .behaviour()
1792                                            .is_invalid_address(x)
1793                                    })
1794                                    .cloned()
1795                                    .collect::<Vec<Multiaddr>>();
1796
1797                                if addr.is_empty() {
1798                                    self.schedule_retry(
1799                                        peer_id,
1800                                        ScheduleType::Discover,
1801                                    );
1802                                } else {
1803                                    self.schedule_retry(
1804                                        peer_id,
1805                                        ScheduleType::Dial(addr),
1806                                    );
1807                                }
1808                            } else {
1809                                self.schedule_retry(
1810                                    peer_id,
1811                                    ScheduleType::Discover,
1812                                );
1813                            };
1814                        }
1815                    }
1816                    BehaviourEvent::Dummy => {
1817                        // For contron_list, ReqRes events
1818                    }
1819                }
1820            }
1821            SwarmEvent::OutgoingConnectionError {
1822                error,
1823                peer_id: Some(peer_id),
1824                ..
1825            } => {
1826                if matches!(self.peer_action.get(&peer_id), Some(Action::Dial))
1827                {
1828                    self.peer_action.remove(&peer_id);
1829
1830                    self.swarm.behaviour_mut().add_peer_to_remove(&peer_id);
1831
1832                    if let Some((retry, new_address)) =
1833                        self.handle_dial_error(error, &peer_id, false)
1834                    {
1835                        if retry {
1836                            let addr = new_address
1837                                .iter()
1838                                .filter(|x| {
1839                                    !self
1840                                        .swarm
1841                                        .behaviour()
1842                                        .is_invalid_address(x)
1843                                })
1844                                .cloned()
1845                                .collect::<Vec<Multiaddr>>();
1846
1847                            if addr.is_empty() {
1848                                self.schedule_retry(
1849                                    peer_id,
1850                                    ScheduleType::Discover,
1851                                );
1852                            } else {
1853                                self.schedule_retry(
1854                                    peer_id,
1855                                    ScheduleType::Dial(addr),
1856                                );
1857                            }
1858                        } else {
1859                            self.schedule_retry(
1860                                peer_id,
1861                                ScheduleType::Discover,
1862                            );
1863                        }
1864                    };
1865                }
1866            }
1867            SwarmEvent::ConnectionClosed {
1868                peer_id,
1869                connection_id,
1870                num_established,
1871                ..
1872            } => {
1873                if num_established == 0 {
1874                    if let Some(Action::Identified(id)) =
1875                        self.peer_action.get(&peer_id)
1876                        && connection_id == *id
1877                    {
1878                        self.peer_action.remove(&peer_id);
1879
1880                        self.peer_identify.remove(&peer_id);
1881                        self.drop_pending_inbound_messages(&peer_id);
1882                        self.response_channels.remove(&peer_id);
1883
1884                        self.retry_by_peer.remove(&peer_id);
1885
1886                        if self
1887                            .pending_outbound_messages
1888                            .get(&peer_id)
1889                            .is_some_and(|q| !q.is_empty())
1890                        {
1891                            self.schedule_retry(
1892                                peer_id,
1893                                ScheduleType::Dial(vec![]),
1894                            );
1895                        }
1896                    } else if let Some(Action::Dial | Action::Discover) =
1897                        self.peer_action.get(&peer_id)
1898                    {
1899                        self.peer_action.remove(&peer_id);
1900                        self.retry_by_peer.remove(&peer_id);
1901                        self.drop_pending_inbound_messages(&peer_id);
1902                        self.response_channels.remove(&peer_id);
1903                        self.peer_identify.remove(&peer_id);
1904
1905                        if self
1906                            .pending_outbound_messages
1907                            .get(&peer_id)
1908                            .is_some_and(|q| !q.is_empty())
1909                        {
1910                            self.schedule_retry(
1911                                peer_id,
1912                                ScheduleType::Discover,
1913                            );
1914                        }
1915                    }
1916                }
1917            }
1918            SwarmEvent::IncomingConnectionError { .. } => {
1919                // We are not interested in this event at the moment.
1920                // The logs generate many false positives and cannot be associated with a
1921                // node since I do not have the peer-id. The best solution to avoid
1922                // confusion for the user is not to filter these errors.
1923            }
1924            SwarmEvent::ExpiredListenAddr { address, .. } => {
1925                warn!(target: TARGET, addr = %address, "listen address expired");
1926            }
1927            SwarmEvent::ListenerError { error, .. } => {
1928                error!(target: TARGET, error = %error, "listener error");
1929            }
1930            SwarmEvent::ConnectionEstablished {
1931                peer_id,
1932                connection_id,
1933                num_established,
1934                ..
1935            } => {
1936                if num_established.get() > 1 {
1937                    debug!(target: TARGET, peer_id = %peer_id, "duplicate connection detected; closing excess");
1938                    self.swarm
1939                        .behaviour_mut()
1940                        .close_connections(&peer_id, Some(connection_id));
1941                }
1942            }
1943            SwarmEvent::IncomingConnection { .. }
1944            | SwarmEvent::ListenerClosed { .. }
1945            | SwarmEvent::Dialing { .. }
1946            | SwarmEvent::NewExternalAddrCandidate { .. }
1947            | SwarmEvent::ExternalAddrConfirmed { .. }
1948            | SwarmEvent::ExternalAddrExpired { .. }
1949            | SwarmEvent::NewExternalAddrOfPeer { .. }
1950            | SwarmEvent::NewListenAddr { .. } => {
1951                // We are not interested in this event at the moment.
1952            }
1953            _ => {}
1954        }
1955        self.refresh_runtime_metrics();
1956    }
1957}
1958
1959#[cfg(test)]
1960mod tests {
1961
1962    use crate::routing::RoutingNode;
1963
1964    use super::*;
1965    use libp2p::core::{ConnectedPoint, Endpoint, transport::PortUse};
1966    use libp2p::identity::Keypair as Libp2pKeypair;
1967    use libp2p::swarm::ConnectionId;
1968    use prometheus_client::{encoding::text::encode, registry::Registry};
1969    use serde::Deserialize;
1970    use test_log::test;
1971
1972    use ave_common::identity::{KeyPair, keys::Ed25519Signer};
1973
1974    use serial_test::serial;
1975
1976    #[derive(Debug, Serialize, Deserialize)]
1977    pub struct Dummy;
1978
1979    fn metric_value(metrics: &str, name: &str) -> f64 {
1980        metrics
1981            .lines()
1982            .find_map(|line| {
1983                if line.starts_with(name) {
1984                    line.split_whitespace().nth(1)?.parse::<f64>().ok()
1985                } else {
1986                    None
1987                }
1988            })
1989            .unwrap_or(0.0)
1990    }
1991
1992    // Build a relay server.
1993    fn build_worker(
1994        boot_nodes: Vec<RoutingNode>,
1995        random_walk: bool,
1996        node_type: NodeType,
1997        graceful_token: CancellationToken,
1998        crash_token: CancellationToken,
1999        memory_addr: String,
2000    ) -> NetworkWorker<Dummy> {
2001        let config = create_config(
2002            boot_nodes,
2003            random_walk,
2004            node_type,
2005            vec![memory_addr],
2006        );
2007        let keys = KeyPair::Ed25519(Ed25519Signer::generate().unwrap());
2008
2009        NetworkWorker::new(
2010            &keys,
2011            config,
2012            None,
2013            graceful_token,
2014            crash_token,
2015            None,
2016            None,
2017        )
2018        .unwrap()
2019    }
2020
2021    // Create a config
2022    fn create_config(
2023        boot_nodes: Vec<RoutingNode>,
2024        random_walk: bool,
2025        node_type: NodeType,
2026        listen_addresses: Vec<String>,
2027    ) -> Config {
2028        let config = crate::routing::Config::default()
2029            .with_discovery_limit(50)
2030            .with_dht_random_walk(random_walk);
2031
2032        Config {
2033            boot_nodes,
2034            node_type,
2035            routing: config,
2036            external_addresses: vec![],
2037            listen_addresses,
2038            ..Default::default()
2039        }
2040    }
2041
2042    fn build_identified_event(
2043        peer_id: PeerId,
2044        public_key: libp2p::identity::PublicKey,
2045        connection_id: ConnectionId,
2046    ) -> SwarmEvent<BehaviourEvent> {
2047        SwarmEvent::Behaviour(BehaviourEvent::Identified {
2048            peer_id,
2049            info: Box::new(identify::Info {
2050                public_key,
2051                protocol_version: IDENTIFY_PROTOCOL.to_owned(),
2052                agent_version: "test-agent".to_owned(),
2053                listen_addrs: vec!["/memory/9999".parse().expect("multiaddr")],
2054                protocols: vec![StreamProtocol::new(REQRES_PROTOCOL)],
2055                observed_addr: "/memory/9998".parse().expect("multiaddr"),
2056                signed_peer_record: None,
2057            }),
2058            connection_id,
2059        })
2060    }
2061
2062    fn test_endpoint() -> ConnectedPoint {
2063        ConnectedPoint::Dialer {
2064            address: "/memory/9997".parse().expect("multiaddr"),
2065            role_override: Endpoint::Dialer,
2066            port_use: PortUse::New,
2067        }
2068    }
2069
2070    #[test]
2071    fn outbound_queue_respects_bytes_limit_and_updates_metrics() {
2072        let mut config = create_config(
2073            vec![],
2074            false,
2075            NodeType::Addressable,
2076            vec!["/memory/3100".to_owned()],
2077        );
2078        config.max_pending_outbound_bytes_per_peer = 16;
2079
2080        let keys = KeyPair::Ed25519(Ed25519Signer::generate().unwrap());
2081        let mut registry = Registry::default();
2082        let metrics = crate::metrics::register(&mut registry);
2083        let mut worker: NetworkWorker<Dummy> = NetworkWorker::new(
2084            &keys,
2085            config,
2086            None,
2087            CancellationToken::new(),
2088            CancellationToken::new(),
2089            None,
2090            Some(metrics),
2091        )
2092        .expect("worker");
2093
2094        let peer = PeerId::random();
2095        worker.add_pending_outbound_message(
2096            peer,
2097            Bytes::from_static(b"aaaaaaaaaaaa"), // 12 bytes
2098        );
2099        worker.add_pending_outbound_message(
2100            peer,
2101            Bytes::from_static(b"bbbbbbbbbbbb"), // 12 bytes -> evicts previous by bytes
2102        );
2103        worker.add_pending_outbound_message(
2104            peer,
2105            Bytes::from_static(b"cccc"), // 4 bytes
2106        );
2107
2108        let queue = worker
2109            .pending_outbound_messages
2110            .get(&peer)
2111            .expect("queue exists");
2112        assert_eq!(queue.len(), 2);
2113        assert_eq!(queue.bytes_len(), 16);
2114
2115        let mut text = String::new();
2116        encode(&mut text, &registry).expect("encode metrics");
2117
2118        assert_eq!(
2119            metric_value(
2120                &text,
2121                "network_messages_dropped_total{direction=\"outbound\",reason=\"queue_bytes_limit_per_peer\"}"
2122            ),
2123            1.0
2124        );
2125        assert_eq!(
2126            metric_value(
2127                &text,
2128                "network_messages_dropped_total{direction=\"outbound\",reason=\"queue_bytes_limit_global\"}"
2129            ),
2130            0.0
2131        );
2132        assert_eq!(metric_value(&text, "network_pending_outbound_bytes"), 16.0);
2133        assert!(
2134            metric_value(&text, "network_pending_message_age_seconds_count")
2135                >= 1.0
2136        );
2137    }
2138
2139    #[test]
2140    fn zero_pending_bytes_limits_disable_byte_drops() {
2141        let mut config = create_config(
2142            vec![],
2143            false,
2144            NodeType::Addressable,
2145            vec!["/memory/3101".to_owned()],
2146        );
2147        config.max_pending_outbound_bytes_per_peer = 0;
2148        config.max_pending_inbound_bytes_per_peer = 0;
2149        config.max_pending_outbound_bytes_total = 0;
2150        config.max_pending_inbound_bytes_total = 0;
2151
2152        let keys = KeyPair::Ed25519(Ed25519Signer::generate().unwrap());
2153        let mut registry = Registry::default();
2154        let metrics = crate::metrics::register(&mut registry);
2155        let mut worker: NetworkWorker<Dummy> = NetworkWorker::new(
2156            &keys,
2157            config,
2158            None,
2159            CancellationToken::new(),
2160            CancellationToken::new(),
2161            None,
2162            Some(metrics),
2163        )
2164        .expect("worker");
2165
2166        let peer = PeerId::random();
2167        for i in 0..3u8 {
2168            let payload = Bytes::from(vec![i + 1; 12]);
2169            worker.add_pending_outbound_message(peer, payload);
2170        }
2171        for i in 0..3u8 {
2172            let payload = Bytes::from(vec![i + 7; 12]);
2173            worker.add_pending_inbound_message(peer, payload);
2174        }
2175
2176        let outbound = worker
2177            .pending_outbound_messages
2178            .get(&peer)
2179            .expect("outbound queue exists");
2180        let inbound = worker
2181            .pending_inbound_messages
2182            .get(&peer)
2183            .expect("inbound queue exists");
2184        assert_eq!(outbound.len(), 3);
2185        assert_eq!(outbound.bytes_len(), 36);
2186        assert_eq!(inbound.len(), 3);
2187        assert_eq!(inbound.bytes_len(), 36);
2188
2189        let mut text = String::new();
2190        encode(&mut text, &registry).expect("encode metrics");
2191        assert_eq!(
2192            metric_value(
2193                &text,
2194                "network_messages_dropped_total{direction=\"outbound\",reason=\"queue_bytes_limit_per_peer\"}"
2195            ),
2196            0.0
2197        );
2198        assert_eq!(
2199            metric_value(
2200                &text,
2201                "network_messages_dropped_total{direction=\"outbound\",reason=\"queue_bytes_limit_global\"}"
2202            ),
2203            0.0
2204        );
2205        assert_eq!(
2206            metric_value(
2207                &text,
2208                "network_messages_dropped_total{direction=\"inbound\",reason=\"queue_bytes_limit_per_peer\"}"
2209            ),
2210            0.0
2211        );
2212        assert_eq!(
2213            metric_value(
2214                &text,
2215                "network_messages_dropped_total{direction=\"inbound\",reason=\"queue_bytes_limit_global\"}"
2216            ),
2217            0.0
2218        );
2219    }
2220
2221    #[test]
2222    fn outbound_global_bytes_limit_applies_across_peers() {
2223        let mut config = create_config(
2224            vec![],
2225            false,
2226            NodeType::Addressable,
2227            vec!["/memory/3102".to_owned()],
2228        );
2229        config.max_pending_outbound_bytes_per_peer = 0;
2230        config.max_pending_outbound_bytes_total = 20;
2231
2232        let keys = KeyPair::Ed25519(Ed25519Signer::generate().unwrap());
2233        let mut registry = Registry::default();
2234        let metrics = crate::metrics::register(&mut registry);
2235        let mut worker: NetworkWorker<Dummy> = NetworkWorker::new(
2236            &keys,
2237            config,
2238            None,
2239            CancellationToken::new(),
2240            CancellationToken::new(),
2241            None,
2242            Some(metrics),
2243        )
2244        .expect("worker");
2245
2246        let peer_a = PeerId::random();
2247        let peer_b = PeerId::random();
2248
2249        worker.add_pending_outbound_message(
2250            peer_a,
2251            Bytes::from_static(b"aaaaaaaaaaaa"), // 12 bytes
2252        );
2253        worker.add_pending_outbound_message(
2254            peer_b,
2255            Bytes::from_static(b"bbbbbbbbbbbb"), // rejected by global limit
2256        );
2257
2258        assert_eq!(worker.pending_outbound_bytes_len(), 12);
2259        assert_eq!(
2260            worker
2261                .pending_outbound_messages
2262                .get(&peer_a)
2263                .expect("peer_a queue")
2264                .len(),
2265            1
2266        );
2267        assert_eq!(
2268            worker
2269                .pending_outbound_messages
2270                .get(&peer_b)
2271                .map_or(0, PendingQueue::len),
2272            0
2273        );
2274
2275        let mut text = String::new();
2276        encode(&mut text, &registry).expect("encode metrics");
2277        assert_eq!(
2278            metric_value(
2279                &text,
2280                "network_messages_dropped_total{direction=\"outbound\",reason=\"queue_bytes_limit_per_peer\"}"
2281            ),
2282            0.0
2283        );
2284        assert_eq!(
2285            metric_value(
2286                &text,
2287                "network_messages_dropped_total{direction=\"outbound\",reason=\"queue_bytes_limit_global\"}"
2288            ),
2289            1.0
2290        );
2291    }
2292
2293    #[test]
2294    fn dial_failures_include_phase_label() {
2295        let mut registry = Registry::default();
2296        let metrics = crate::metrics::register(&mut registry);
2297
2298        metrics.observe_dial_failure("bootstrap", "transport");
2299        metrics.observe_dial_failure("runtime", "denied");
2300
2301        let mut text = String::new();
2302        encode(&mut text, &registry).expect("encode metrics");
2303
2304        assert_eq!(
2305            metric_value(
2306                &text,
2307                "network_dial_failures_total{phase=\"bootstrap\",kind=\"transport\"}"
2308            ),
2309            1.0
2310        );
2311        assert_eq!(
2312            metric_value(
2313                &text,
2314                "network_dial_failures_total{phase=\"runtime\",kind=\"denied\"}"
2315            ),
2316            1.0
2317        );
2318    }
2319
2320    #[test]
2321    fn bootstrap_duration_is_labeled_by_result() {
2322        let mut registry = Registry::default();
2323        let metrics = crate::metrics::register(&mut registry);
2324
2325        metrics.observe_bootstrap_duration_seconds("success", 0.5);
2326        metrics.observe_bootstrap_duration_seconds("failure", 1.25);
2327
2328        let mut text = String::new();
2329        encode(&mut text, &registry).expect("encode metrics");
2330
2331        assert_eq!(
2332            metric_value(
2333                &text,
2334                "network_bootstrap_duration_seconds_count{result=\"success\"}"
2335            ),
2336            1.0
2337        );
2338        assert_eq!(
2339            metric_value(
2340                &text,
2341                "network_bootstrap_duration_seconds_count{result=\"failure\"}"
2342            ),
2343            1.0
2344        );
2345    }
2346
2347    #[test]
2348    fn network_state_is_one_hot() {
2349        let mut registry = Registry::default();
2350        let metrics = crate::metrics::register(&mut registry);
2351
2352        metrics.set_state_current(&NetworkState::Running);
2353
2354        let mut text = String::new();
2355        encode(&mut text, &registry).expect("encode metrics");
2356
2357        assert_eq!(
2358            metric_value(&text, "network_state{state=\"running\"}"),
2359            1.0
2360        );
2361        assert_eq!(metric_value(&text, "network_state{state=\"start\"}"), 0.0);
2362        assert_eq!(metric_value(&text, "network_state{state=\"dial\"}"), 0.0);
2363        assert_eq!(
2364            metric_value(&text, "network_state{state=\"dialing\"}"),
2365            0.0
2366        );
2367        assert_eq!(
2368            metric_value(&text, "network_state{state=\"disconnected\"}"),
2369            0.0
2370        );
2371    }
2372
2373    #[test(tokio::test)]
2374    #[serial]
2375    async fn pending_inbound_messages_keep_arrival_order_after_identify() {
2376        let mut worker = build_worker(
2377            vec![],
2378            false,
2379            NodeType::Addressable,
2380            CancellationToken::new(),
2381            CancellationToken::new(),
2382            "/memory/3200".to_owned(),
2383        );
2384        let remote_keys = Libp2pKeypair::generate_ed25519();
2385        let remote_peer = remote_keys.public().to_peer_id();
2386
2387        worker.add_pending_inbound_message(
2388            remote_peer,
2389            Bytes::from_static(b"msg-2"),
2390        );
2391        worker.add_pending_inbound_message(
2392            remote_peer,
2393            Bytes::from_static(b"msg-1"),
2394        );
2395        worker.add_pending_inbound_message(
2396            remote_peer,
2397            Bytes::from_static(b"msg-3"),
2398        );
2399
2400        let (helper_sender, mut helper_rx) = mpsc::channel(8);
2401        worker.add_helper_sender(helper_sender);
2402
2403        worker
2404            .handle_event(build_identified_event(
2405                remote_peer,
2406                remote_keys.public(),
2407                ConnectionId::new_unchecked(11),
2408            ))
2409            .await;
2410
2411        let mut received = Vec::new();
2412        for _ in 0..3 {
2413            let command = tokio::time::timeout(
2414                Duration::from_millis(200),
2415                helper_rx.recv(),
2416            )
2417            .await
2418            .expect("helper receive timeout")
2419            .expect("helper channel closed");
2420            let CommandHelper::ReceivedMessage { message, .. } = command else {
2421                panic!("unexpected helper command")
2422            };
2423            received.push(message);
2424        }
2425
2426        assert_eq!(
2427            received,
2428            vec![
2429                Bytes::from_static(b"msg-2"),
2430                Bytes::from_static(b"msg-1"),
2431                Bytes::from_static(b"msg-3"),
2432            ]
2433        );
2434        assert!(!worker.pending_inbound_messages.contains_key(&remote_peer));
2435    }
2436
2437    #[test(tokio::test)]
2438    #[serial]
2439    async fn flapping_connection_retries_then_flushes_outbound_queue() {
2440        let mut worker = build_worker(
2441            vec![],
2442            false,
2443            NodeType::Addressable,
2444            CancellationToken::new(),
2445            CancellationToken::new(),
2446            "/memory/3201".to_owned(),
2447        );
2448        let remote_keys = Libp2pKeypair::generate_ed25519();
2449        let remote_peer = remote_keys.public().to_peer_id();
2450        let first_connection = ConnectionId::new_unchecked(21);
2451        let second_connection = ConnectionId::new_unchecked(22);
2452
2453        worker.add_pending_outbound_message(
2454            remote_peer,
2455            Bytes::from_static(b"needs-redelivery"),
2456        );
2457        worker
2458            .peer_action
2459            .insert(remote_peer, Action::Identified(first_connection));
2460        worker.peer_identify.insert(remote_peer);
2461
2462        worker
2463            .handle_event(SwarmEvent::ConnectionClosed {
2464                peer_id: remote_peer,
2465                connection_id: first_connection,
2466                endpoint: test_endpoint(),
2467                num_established: 0,
2468                cause: None,
2469            })
2470            .await;
2471
2472        assert!(worker.retry_by_peer.contains_key(&remote_peer));
2473        assert!(matches!(
2474            worker.retry_by_peer.get(&remote_peer).map(|s| s.kind),
2475            Some(RetryKind::Dial)
2476        ));
2477        assert!(
2478            worker
2479                .pending_outbound_messages
2480                .get(&remote_peer)
2481                .is_some_and(|q| !q.is_empty())
2482        );
2483
2484        worker
2485            .handle_event(build_identified_event(
2486                remote_peer,
2487                remote_keys.public(),
2488                second_connection,
2489            ))
2490            .await;
2491
2492        assert!(!worker.pending_outbound_messages.contains_key(&remote_peer));
2493        assert!(!worker.retry_by_peer.contains_key(&remote_peer));
2494        assert!(matches!(
2495            worker.peer_action.get(&remote_peer),
2496            Some(Action::Identified(id)) if *id == second_connection
2497        ));
2498    }
2499
2500    #[test(tokio::test)]
2501    #[serial]
2502    async fn bootstrap_identify_timeout_keeps_bootnode_until_close() {
2503        let remote_keys = Libp2pKeypair::generate_ed25519();
2504        let remote_peer = remote_keys.public().to_peer_id();
2505        let boot_node = RoutingNode {
2506            peer_id: remote_peer.to_string(),
2507            address: vec!["/memory/3300".to_owned()],
2508        };
2509
2510        let mut worker = build_worker(
2511            vec![boot_node],
2512            false,
2513            NodeType::Addressable,
2514            CancellationToken::new(),
2515            CancellationToken::new(),
2516            "/memory/3301".to_owned(),
2517        );
2518
2519        assert!(worker.boot_nodes.contains_key(&remote_peer));
2520        assert!(!worker.retry_boot_nodes.contains_key(&remote_peer));
2521
2522        worker
2523            .handle_connection_events(SwarmEvent::Behaviour(
2524                BehaviourEvent::IdentifyError {
2525                    peer_id: remote_peer,
2526                    error: swarm::StreamUpgradeError::Timeout,
2527                },
2528            ))
2529            .await;
2530
2531        assert!(worker.boot_nodes.contains_key(&remote_peer));
2532        assert!(!worker.retry_boot_nodes.contains_key(&remote_peer));
2533    }
2534
2535    #[test(tokio::test)]
2536    #[serial]
2537    async fn test_no_boot_nodes() {
2538        let boot_nodes = vec![];
2539
2540        // Build a node.
2541        let node_addr = "/memory/3000";
2542        let mut node = build_worker(
2543            boot_nodes.clone(),
2544            false,
2545            NodeType::Addressable,
2546            CancellationToken::new(),
2547            CancellationToken::new(),
2548            node_addr.to_owned(),
2549        );
2550        if let Err(e) = node.run_connection().await {
2551            assert_eq!(
2552                e.to_string(),
2553                "cannot connect to the ave network: no reachable bootstrap node"
2554            );
2555        };
2556
2557        assert_eq!(node.state, NetworkState::Disconnected);
2558    }
2559
2560    #[test(tokio::test)]
2561    #[serial]
2562    async fn test_fake_boot_node() {
2563        let mut boot_nodes = vec![];
2564
2565        // Build a fake bootstrap node.
2566        let fake_boot_peer = PeerId::random();
2567        let fake_boot_addr = "/memory/3001";
2568        let fake_node = RoutingNode {
2569            peer_id: fake_boot_peer.to_string(),
2570            address: vec![fake_boot_addr.to_owned()],
2571        };
2572        boot_nodes.push(fake_node);
2573
2574        // Build a node.
2575        let node_addr = "/memory/3002";
2576        let mut node = build_worker(
2577            boot_nodes.clone(),
2578            false,
2579            NodeType::Addressable,
2580            CancellationToken::new(),
2581            CancellationToken::new(),
2582            node_addr.to_owned(),
2583        );
2584
2585        if let Err(e) = node.run_connection().await {
2586            assert_eq!(
2587                e.to_string(),
2588                "cannot connect to the ave network: no reachable bootstrap node"
2589            );
2590        };
2591
2592        assert_eq!(node.state, NetworkState::Disconnected);
2593    }
2594
2595    #[test(tokio::test)]
2596    #[serial]
2597    async fn test_connect() {
2598        let mut boot_nodes = vec![];
2599
2600        // Build a bootstrap node.
2601        let boot_addr = "/memory/3003";
2602        let mut boot = build_worker(
2603            boot_nodes.clone(),
2604            false,
2605            NodeType::Bootstrap,
2606            CancellationToken::new(),
2607            CancellationToken::new(),
2608            boot_addr.to_owned(),
2609        );
2610
2611        let boot_node = RoutingNode {
2612            peer_id: boot.local_peer_id().to_string(),
2613            address: vec![boot_addr.to_owned()],
2614        };
2615
2616        boot_nodes.push(boot_node);
2617
2618        // Build a node.
2619        let node_addr = "/memory/3004";
2620        let mut node = build_worker(
2621            boot_nodes,
2622            false,
2623            NodeType::Ephemeral,
2624            CancellationToken::new(),
2625            CancellationToken::new(),
2626            node_addr.to_owned(),
2627        );
2628
2629        // Spawn the boot node
2630        tokio::spawn(async move {
2631            boot.run_main().await;
2632        });
2633
2634        // Wait for connection.
2635        node.run_connection().await.unwrap();
2636    }
2637}