Skip to main content

ave_network/
worker.rs

1//! # Network worker.
2//!
3
4use crate::{
5    Command, CommandHelper, Config, Error, Event as NetworkEvent, MachineSpec,
6    Monitor, MonitorMessage, NodeType, ResolvedSpec,
7    behaviour::{Behaviour, Event as BehaviourEvent, ReqResMessage},
8    metrics::NetworkMetrics,
9    resolve_spec,
10    service::NetworkService,
11    transport::build_transport,
12    utils::{
13        Action, Due, IDENTIFY_PROTOCOL, LimitsConfig, MessagesHelper,
14        NetworkState, REQRES_PROTOCOL, RetryKind, RetryState, ScheduleType,
15        convert_addresses, convert_boot_nodes, peer_id_to_ed25519_pubkey_bytes,
16    },
17};
18
19use std::{
20    collections::{BinaryHeap, HashSet},
21    fmt::Debug,
22    num::{NonZeroU8, NonZeroUsize},
23    pin::Pin,
24    sync::Arc,
25    time::Duration,
26};
27
28use ave_actors::ActorRef;
29use ave_common::identity::KeyPair;
30
31use libp2p::{
32    Multiaddr, PeerId, StreamProtocol, Swarm, identify,
33    identity::{Keypair, ed25519},
34    request_response::{self, ResponseChannel},
35    swarm::{self, DialError, SwarmEvent, dial_opts::DialOpts},
36};
37
38use futures::StreamExt;
39use serde::Serialize;
40use tokio::{
41    sync::mpsc,
42    time::{Instant, Sleep, sleep, sleep_until},
43};
44use tokio_util::sync::CancellationToken;
45use tracing::{debug, error, info, trace, warn};
46
47use bytes::Bytes;
48use std::collections::{HashMap, VecDeque};
49
50const TARGET: &str = "ave::network::worker";
51
52/// Maximum number of outbound messages queued per peer while disconnected.
53/// When this limit is reached the oldest message is evicted to make room.
54const MAX_PENDING_MESSAGES_PER_PEER: usize = 100;
55
56/// Bounded queue of outbound messages for a single peer.
57///
58/// Keeps the 100 most recent messages; when full the oldest is evicted.
59#[derive(Default)]
60struct PendingQueue {
61    messages: VecDeque<PendingMessage>,
62    pending_bytes: usize,
63}
64
65struct PendingMessage {
66    payload: Bytes,
67    enqueued_at: Instant,
68}
69
70impl PendingQueue {
71    fn contains(&self, message: &Bytes) -> bool {
72        self.messages.iter().any(|x| x.payload == *message)
73    }
74
75    fn pop_front(&mut self) -> Option<PendingMessage> {
76        let popped = self.messages.pop_front()?;
77        self.pending_bytes =
78            self.pending_bytes.saturating_sub(popped.payload.len());
79        Some(popped)
80    }
81
82    fn push_back(&mut self, message: Bytes) {
83        self.pending_bytes += message.len();
84        self.messages.push_back(PendingMessage {
85            payload: message,
86            enqueued_at: Instant::now(),
87        });
88    }
89
90    fn drain(&mut self) -> impl Iterator<Item = PendingMessage> + '_ {
91        self.pending_bytes = 0;
92        self.messages.drain(..)
93    }
94
95    fn is_empty(&self) -> bool {
96        self.messages.is_empty()
97    }
98
99    fn len(&self) -> usize {
100        self.messages.len()
101    }
102
103    const fn bytes_len(&self) -> usize {
104        self.pending_bytes
105    }
106}
107
108/// Main network worker. Must be polled in order for the network to advance.
109///
110/// The worker is responsible for handling the network events and commands.
111///
112pub struct NetworkWorker<T>
113where
114    T: Debug + Serialize,
115{
116    /// Local Peer ID.
117    local_peer_id: PeerId,
118
119    /// Network service.
120    service: NetworkService,
121
122    /// The libp2p swarm.
123    swarm: Swarm<Behaviour>,
124
125    /// The network state.
126    state: NetworkState,
127
128    /// The command receiver.
129    command_receiver: mpsc::Receiver<Command>,
130
131    /// The command sender to Helper Intermediary.
132    helper_sender: Option<mpsc::Sender<CommandHelper<T>>>,
133
134    /// Monitor actor.
135    monitor: Option<ActorRef<Monitor>>,
136
137    /// The cancellation token.
138    graceful_token: CancellationToken,
139    crash_token: CancellationToken,
140
141    /// Node type.
142    node_type: NodeType,
143
144    /// Safe mode keeps the worker alive but isolated from peers.
145    safe_mode: bool,
146
147    /// List of boot noodes.
148    boot_nodes: HashMap<PeerId, Vec<Multiaddr>>,
149
150    /// nodes with which it has not been possible to establish a connection by keepAliveTimeout in pre-routing.
151    retry_boot_nodes: HashMap<PeerId, Vec<Multiaddr>>,
152
153    /// Pending outbound messages to the peer (bounded by count and bytes).
154    pending_outbound_messages: HashMap<PeerId, PendingQueue>,
155
156    pending_inbound_messages: HashMap<PeerId, PendingQueue>,
157
158    /// Ephemeral responses.
159    response_channels:
160        HashMap<PeerId, VecDeque<ResponseChannel<ReqResMessage>>>,
161
162    /// Successful dials
163    successful_dials: u64,
164
165    peer_identify: HashSet<PeerId>,
166
167    retry_by_peer: HashMap<PeerId, RetryState>,
168
169    retry_queue: BinaryHeap<Due>,
170
171    retry_timer: Option<Pin<Box<Sleep>>>,
172
173    peer_action: HashMap<PeerId, Action>,
174
175    max_app_message_bytes: usize,
176    max_pending_outbound_bytes_per_peer: usize,
177    max_pending_inbound_bytes_per_peer: usize,
178    max_pending_outbound_bytes_total: usize,
179    max_pending_inbound_bytes_total: usize,
180
181    metrics: Option<Arc<NetworkMetrics>>,
182}
183
184/// Runtime dependencies and optional integrations for `NetworkWorker`.
185pub struct NetworkWorkerRuntime {
186    /// Monitor actor that receives network events.
187    pub monitor: Option<ActorRef<Monitor>>,
188    /// Graceful shutdown token shared with the rest of the node.
189    pub graceful_token: CancellationToken,
190    /// Crash token used to force-stop the node on unrecoverable failures.
191    pub crash_token: CancellationToken,
192    /// Optional machine spec used to derive sizing limits.
193    pub machine_spec: Option<MachineSpec>,
194    /// Optional metrics handle for network instrumentation.
195    pub metrics: Option<Arc<NetworkMetrics>>,
196}
197
198impl<T: Debug + Serialize> NetworkWorker<T> {
199    /// Create a new `NetworkWorker`.
200    pub fn new(
201        keys: &KeyPair,
202        config: Config,
203        safe_mode: bool,
204        runtime: NetworkWorkerRuntime,
205    ) -> Result<Self, Error> {
206        // Create channels to communicate commands
207        info!(target: TARGET, "network initialising");
208        let (command_sender, command_receiver) = mpsc::channel(512);
209
210        let key = match keys {
211            KeyPair::Ed25519(ed25519_signer) => {
212                let sk_bytes = ed25519_signer
213                    .secret_key_bytes()
214                    .map_err(|e| Error::KeyExtraction(e.to_string()))?;
215
216                let sk = ed25519::SecretKey::try_from_bytes(sk_bytes)
217                    .map_err(|e| Error::KeyExtraction(e.to_string()))?;
218
219                let kp = ed25519::Keypair::from(sk);
220                Keypair::from(kp)
221            }
222        };
223
224        // Generate the `PeerId` from the public key.
225        let local_peer_id = key.public().to_peer_id();
226
227        let boot_nodes = convert_boot_nodes(&config.boot_nodes);
228
229        // Create the listen addressess.
230        let addresses = convert_addresses(&config.listen_addresses)?;
231
232        // Create the listen addressess.
233        let external_addresses = convert_addresses(&config.external_addresses)?;
234
235        let node_type = config.node_type.clone();
236        // Resolve machine sizing from the declared spec, or auto-detect from host.
237        let ResolvedSpec { ram_mb, cpu_cores } =
238            resolve_spec(runtime.machine_spec);
239
240        let limits = LimitsConfig::build(ram_mb, cpu_cores);
241        let max_app_message_bytes = config.max_app_message_bytes;
242        let max_pending_outbound_bytes_per_peer =
243            config.max_pending_outbound_bytes_per_peer;
244        let max_pending_inbound_bytes_per_peer =
245            config.max_pending_inbound_bytes_per_peer;
246        let max_pending_outbound_bytes_total =
247            config.max_pending_outbound_bytes_total;
248        let max_pending_inbound_bytes_total =
249            config.max_pending_inbound_bytes_total;
250
251        // Build transport.
252        let transport = build_transport(&key, limits.clone())?;
253
254        let behaviour = Behaviour::new(
255            &key.public(),
256            config,
257            runtime.graceful_token.clone(),
258            runtime.crash_token.clone(),
259            limits,
260            runtime.metrics.clone(),
261        );
262
263        // Create the swarm.
264        let mut swarm = Swarm::new(
265            transport,
266            behaviour,
267            local_peer_id,
268            swarm::Config::with_tokio_executor()
269                .with_idle_connection_timeout(Duration::from_secs(90))
270                .with_max_negotiating_inbound_streams(32)
271                .with_notify_handler_buffer_size(
272                    NonZeroUsize::new(32).expect("32 > 0"),
273                )
274                .with_per_connection_event_buffer_size(16)
275                .with_dial_concurrency_factor(
276                    NonZeroU8::new(2).expect("2 > 0"),
277                ),
278        );
279
280        let service = NetworkService::new(command_sender);
281
282        if addresses.is_empty() {
283            // Listen on all tcp addresses.
284            swarm
285                .listen_on(
286                    "/ip4/0.0.0.0/tcp/0"
287                        .parse::<Multiaddr>()
288                        .map_err(|e| Error::InvalidAddress(e.to_string()))?,
289                )
290                .map_err(|e| Error::Listen(format!("0.0.0.0:0: {e}")))?;
291            info!(target: TARGET, "listening on all interfaces");
292        } else {
293            // Listen on the external addresses.
294            for addr in addresses.iter() {
295                info!(target: TARGET, addr = %addr, "listening on address");
296                swarm
297                    .listen_on(addr.clone())
298                    .map_err(|e| Error::Listen(format!("{addr}: {e}")))?;
299            }
300        }
301
302        if !external_addresses.is_empty() {
303            for addr in external_addresses.iter() {
304                debug!(target: TARGET, addr = %addr, "external address registered");
305                swarm.add_external_address(addr.clone());
306            }
307        }
308
309        info!(target: TARGET, peer_id = %local_peer_id, "local peer id");
310
311        let worker = Self {
312            local_peer_id,
313            service,
314            swarm,
315            state: NetworkState::Start,
316            command_receiver,
317            helper_sender: None,
318            monitor: runtime.monitor,
319            graceful_token: runtime.graceful_token,
320            crash_token: runtime.crash_token,
321            node_type,
322            safe_mode,
323            boot_nodes,
324            retry_boot_nodes: HashMap::new(),
325            pending_outbound_messages: HashMap::default(),
326            pending_inbound_messages: HashMap::default(),
327            response_channels: HashMap::default(),
328            successful_dials: 0,
329            peer_identify: HashSet::new(),
330            retry_by_peer: HashMap::new(),
331            retry_queue: BinaryHeap::new(),
332            retry_timer: None,
333            peer_action: HashMap::new(),
334            max_app_message_bytes,
335            max_pending_outbound_bytes_per_peer,
336            max_pending_inbound_bytes_per_peer,
337            max_pending_outbound_bytes_total,
338            max_pending_inbound_bytes_total,
339            metrics: runtime.metrics,
340        };
341
342        if let Some(metrics) = worker.metric_handle() {
343            metrics.set_state_current(&worker.state);
344        }
345        worker.refresh_runtime_metrics();
346
347        Ok(worker)
348    }
349
350    fn metric_handle(&self) -> Option<&NetworkMetrics> {
351        self.metrics.as_deref()
352    }
353
354    const fn is_safe_mode(&self) -> bool {
355        self.safe_mode
356    }
357
358    fn pending_outbound_messages_len(&self) -> usize {
359        self.pending_outbound_messages
360            .values()
361            .map(PendingQueue::len)
362            .sum()
363    }
364
365    fn pending_outbound_bytes_len(&self) -> usize {
366        self.pending_outbound_messages
367            .values()
368            .map(PendingQueue::bytes_len)
369            .sum()
370    }
371
372    fn pending_inbound_messages_len(&self) -> usize {
373        self.pending_inbound_messages
374            .values()
375            .map(PendingQueue::len)
376            .sum()
377    }
378
379    fn pending_inbound_bytes_len(&self) -> usize {
380        self.pending_inbound_messages
381            .values()
382            .map(PendingQueue::bytes_len)
383            .sum()
384    }
385
386    fn pending_response_channels_len(&self) -> usize {
387        self.response_channels.values().map(VecDeque::len).sum()
388    }
389
390    fn refresh_runtime_metrics(&self) {
391        let Some(metrics) = self.metric_handle() else {
392            return;
393        };
394
395        metrics.set_retry_queue_len(self.retry_queue.len() as i64);
396        metrics.set_pending_outbound_peers(
397            self.pending_outbound_messages.len() as i64,
398        );
399        metrics.set_pending_outbound_messages(
400            self.pending_outbound_messages_len() as i64,
401        );
402        metrics.set_pending_outbound_bytes(
403            self.pending_outbound_bytes_len() as i64
404        );
405        metrics.set_pending_inbound_peers(
406            self.pending_inbound_messages.len() as i64
407        );
408        metrics.set_pending_inbound_messages(
409            self.pending_inbound_messages_len() as i64,
410        );
411        metrics
412            .set_pending_inbound_bytes(self.pending_inbound_bytes_len() as i64);
413        metrics.set_identified_peers(self.peer_identify.len() as i64);
414        metrics.set_response_channels_pending(
415            self.pending_response_channels_len() as i64,
416        );
417    }
418
419    fn observe_pending_message_age(&self, enqueued_at: Instant) {
420        if let Some(metrics) = self.metric_handle() {
421            metrics.observe_pending_message_age_seconds(
422                enqueued_at.elapsed().as_secs_f64(),
423            );
424        }
425    }
426
427    fn drop_pending_outbound_messages(&mut self, peer_id: &PeerId) -> usize {
428        let Some(mut queue) = self.pending_outbound_messages.remove(peer_id)
429        else {
430            return 0;
431        };
432
433        let mut dropped = 0usize;
434        for message in queue.drain() {
435            dropped += 1;
436            self.observe_pending_message_age(message.enqueued_at);
437        }
438        dropped
439    }
440
441    fn drop_pending_inbound_messages(&mut self, peer_id: &PeerId) {
442        if let Some(mut queue) = self.pending_inbound_messages.remove(peer_id) {
443            for message in queue.drain() {
444                self.observe_pending_message_age(message.enqueued_at);
445            }
446        }
447    }
448
449    fn observe_identify_error(
450        &self,
451        error: &swarm::StreamUpgradeError<identify::UpgradeError>,
452    ) {
453        let kind = match error {
454            swarm::StreamUpgradeError::Timeout => "timeout",
455            swarm::StreamUpgradeError::Io(_) => "io",
456            swarm::StreamUpgradeError::NegotiationFailed => "negotiation",
457            swarm::StreamUpgradeError::Apply(_) => "other",
458        };
459
460        if let Some(metrics) = self.metric_handle() {
461            metrics.observe_identify_error(kind);
462        }
463    }
464
465    fn schedule_retry(&mut self, peer: PeerId, schedule_type: ScheduleType) {
466        if self.is_safe_mode() {
467            self.peer_action.remove(&peer);
468            self.retry_by_peer.remove(&peer);
469            self.refresh_runtime_metrics();
470            return;
471        }
472
473        if self.peer_action.contains_key(&peer) {
474            return;
475        }
476
477        let (kind, addrs) = match schedule_type {
478            ScheduleType::Discover => (RetryKind::Discover, vec![]),
479            ScheduleType::Dial(multiaddrs) => (RetryKind::Dial, multiaddrs),
480        };
481
482        let now = Instant::now();
483        let base = Duration::from_millis(250);
484        let cap = Duration::from_secs(30);
485
486        let entry = self.retry_by_peer.entry(peer).or_insert(RetryState {
487            attempts: 0,
488            when: now,
489            kind,
490            addrs: vec![],
491        });
492
493        let when = if matches!(
494            (entry.kind, kind),
495            (RetryKind::Discover, RetryKind::Dial)
496        ) {
497            now
498        } else {
499            if entry.attempts >= 8 {
500                self.clear_pending_messages(&peer);
501                return;
502            }
503
504            // Exponential backoff: 250ms * 2^attempt, capped at 30s
505            // attempts 0-7 → ~250ms, 500ms, 1s, 2s, 4s, 8s, 16s, 30s
506            let exp = 1u32 << entry.attempts.min(7);
507            let mut delay = base * exp;
508            if delay > cap {
509                delay = cap;
510            }
511
512            // jitter 80–120% determinista por peer (sin RNG externo)
513            // Fold all bytes to avoid the fixed multihash prefix dominating.
514            let hash = peer
515                .to_bytes()
516                .iter()
517                .fold(0u32, |acc, &b| acc.wrapping_add(b as u32));
518            let j = 80 + (hash % 41);
519            delay = delay * j / 100;
520
521            now + delay
522        };
523
524        entry.when = when;
525        entry.kind = kind;
526        entry.addrs = addrs;
527
528        self.peer_action.insert(peer, Action::from(kind));
529
530        self.retry_queue.push(Due(peer, entry.when));
531        self.arm_retry_timer();
532        self.refresh_runtime_metrics();
533    }
534
535    fn arm_retry_timer(&mut self) {
536        if let Some(next) = self.retry_queue.peek() {
537            match &mut self.retry_timer {
538                Some(timer) => timer.as_mut().reset(next.1),
539                None => self.retry_timer = Some(Box::pin(sleep_until(next.1))),
540            }
541        }
542    }
543
544    fn drain_due_retries(
545        &mut self,
546    ) -> Vec<(PeerId, RetryKind, Vec<Multiaddr>)> {
547        let now = Instant::now();
548        let mut out = Vec::new();
549        while let Some(Due(peer, when)) = self.retry_queue.peek().cloned() {
550            if when > now {
551                break;
552            }
553
554            self.retry_queue.pop();
555            // Match the exact instant to reject stale Due entries. Multiple Due
556            // entries for the same peer can accumulate (e.g. Discover → Dial
557            // transition pushes a second entry without removing the first).
558            // Comparing `when` (from the popped Due) against `retry_by_peer[peer].when`
559            // ensures only the current scheduling cycle fires.
560            if let Some(retry) = self.retry_by_peer.get(&peer).cloned()
561                && retry.when == when
562            {
563                self.retry_by_peer
564                    .entry(peer)
565                    .and_modify(|x| x.attempts += 1);
566                out.push((peer, retry.kind, retry.addrs));
567            }
568        }
569
570        if self.retry_queue.is_empty() {
571            self.retry_timer = None;
572        } else {
573            self.arm_retry_timer();
574        }
575        self.refresh_runtime_metrics();
576        out
577    }
578
579    /// Add sender helper
580    pub fn add_helper_sender(
581        &mut self,
582        helper_sender: mpsc::Sender<CommandHelper<T>>,
583    ) {
584        self.helper_sender = Some(helper_sender);
585    }
586
587    /// Get the local peer ID.
588    pub const fn local_peer_id(&self) -> PeerId {
589        self.local_peer_id
590    }
591
592    /// Send message to a peer.
593    fn send_message(
594        &mut self,
595        peer: PeerId,
596        message: Bytes,
597    ) -> Result<(), Error> {
598        if self.is_safe_mode() {
599            debug!(
600                target: TARGET,
601                peer_id = %peer,
602                size = message.len(),
603                "safe mode active; dropping outbound message"
604            );
605            return Ok(());
606        }
607
608        if message.len() > self.max_app_message_bytes {
609            warn!(
610                target: TARGET,
611                peer_id = %peer,
612                size = message.len(),
613                max = self.max_app_message_bytes,
614                "outbound payload rejected: message too large",
615            );
616            if let Some(metrics) = self.metric_handle() {
617                metrics.inc_oversized_outbound_drop();
618            }
619            self.refresh_runtime_metrics();
620            return Err(Error::MessageTooLarge {
621                size: message.len(),
622                max: self.max_app_message_bytes,
623            });
624        }
625
626        if let Some(mut responses) = self.response_channels.remove(&peer) {
627            while let Some(response_channel) = responses.pop_front() {
628                match self
629                    .swarm
630                    .behaviour_mut()
631                    .send_response(response_channel, message.clone())
632                {
633                    Ok(()) => {
634                        if !responses.is_empty() {
635                            self.response_channels.insert(peer, responses);
636                        }
637                        self.refresh_runtime_metrics();
638                        return Ok(());
639                    }
640                    Err(e) => {
641                        debug!(target: TARGET, peer_id = %peer, error = %e, "failed to send response: channel may already be consumed");
642                    }
643                }
644            }
645        }
646
647        self.add_pending_outbound_message(peer, message);
648
649        if self.swarm.behaviour_mut().is_known_peer(&peer) {
650            if let Some(Action::Identified(..)) = self.peer_action.get(&peer) {
651                self.send_pending_outbound_messages(peer);
652            } else {
653                self.schedule_retry(peer, ScheduleType::Dial(vec![]));
654            }
655        } else {
656            self.schedule_retry(peer, ScheduleType::Discover);
657        }
658
659        Ok(())
660    }
661
662    /// Add pending message to peer.
663    ///
664    /// If count/bytes limits are reached, oldest messages are evicted first.
665    fn add_pending_outbound_message(&mut self, peer: PeerId, message: Bytes) {
666        let message_len = message.len();
667        let mut dropped_count = 0u64;
668        let mut dropped_bytes_limit_peer = 0u64;
669        let mut dropped_bytes_limit_global = 0u64;
670        let mut dropped_messages = Vec::new();
671        let mut duplicate = false;
672        let mut total_pending_bytes = self.pending_outbound_bytes_len();
673        let per_peer_limit = self.max_pending_outbound_bytes_per_peer;
674        let global_limit = self.max_pending_outbound_bytes_total;
675
676        {
677            let queue = self.pending_outbound_messages.entry(peer).or_default();
678            if queue.contains(&message) {
679                duplicate = true;
680            } else {
681                while queue.len() >= MAX_PENDING_MESSAGES_PER_PEER {
682                    if let Some(evicted) = queue.pop_front() {
683                        dropped_count += 1;
684                        total_pending_bytes = total_pending_bytes
685                            .saturating_sub(evicted.payload.len());
686                        dropped_messages.push(evicted);
687                    } else {
688                        break;
689                    }
690                }
691
692                if per_peer_limit > 0 {
693                    while queue.bytes_len() + message_len > per_peer_limit {
694                        if let Some(evicted) = queue.pop_front() {
695                            dropped_bytes_limit_peer += 1;
696                            total_pending_bytes = total_pending_bytes
697                                .saturating_sub(evicted.payload.len());
698                            dropped_messages.push(evicted);
699                        } else {
700                            break;
701                        }
702                    }
703                }
704
705                if per_peer_limit > 0
706                    && queue.bytes_len() + message_len > per_peer_limit
707                {
708                    dropped_bytes_limit_peer += 1;
709                } else if global_limit > 0
710                    && total_pending_bytes.saturating_add(message_len)
711                        > global_limit
712                {
713                    dropped_bytes_limit_global += 1;
714                } else {
715                    queue.push_back(message);
716                }
717            }
718        }
719
720        if duplicate {
721            self.refresh_runtime_metrics();
722            return;
723        }
724
725        for evicted in dropped_messages {
726            self.observe_pending_message_age(evicted.enqueued_at);
727        }
728
729        if dropped_count > 0 {
730            warn!(
731                target: TARGET,
732                peer_id = %peer,
733                dropped = dropped_count,
734                max_messages = MAX_PENDING_MESSAGES_PER_PEER,
735                "outbound queue count limit reached; oldest messages evicted",
736            );
737        }
738
739        if dropped_bytes_limit_peer > 0 {
740            warn!(
741                target: TARGET,
742                peer_id = %peer,
743                dropped = dropped_bytes_limit_peer,
744                message_bytes = message_len,
745                max_queue_bytes = per_peer_limit,
746                "outbound queue bytes limit reached; messages evicted/dropped",
747            );
748        }
749
750        if dropped_bytes_limit_global > 0 {
751            warn!(
752                target: TARGET,
753                peer_id = %peer,
754                dropped = dropped_bytes_limit_global,
755                message_bytes = message_len,
756                max_queue_bytes_total = global_limit,
757                "outbound global queue bytes limit reached; messages dropped",
758            );
759        }
760
761        if let Some(metrics) = self.metric_handle() {
762            metrics.inc_outbound_queue_drop_by(dropped_count);
763            metrics.inc_outbound_queue_bytes_drop_per_peer_by(
764                dropped_bytes_limit_peer,
765            );
766            metrics.inc_outbound_queue_bytes_drop_global_by(
767                dropped_bytes_limit_global,
768            );
769        }
770
771        self.refresh_runtime_metrics();
772    }
773
774    fn add_pending_inbound_message(&mut self, peer: PeerId, message: Bytes) {
775        let message_len = message.len();
776        let mut dropped_count = 0u64;
777        let mut dropped_bytes_limit_peer = 0u64;
778        let mut dropped_bytes_limit_global = 0u64;
779        let mut dropped_messages = Vec::new();
780        let mut duplicate = false;
781        let mut total_pending_bytes = self.pending_inbound_bytes_len();
782        let per_peer_limit = self.max_pending_inbound_bytes_per_peer;
783        let global_limit = self.max_pending_inbound_bytes_total;
784
785        {
786            let queue = self.pending_inbound_messages.entry(peer).or_default();
787            if queue.contains(&message) {
788                duplicate = true;
789            } else {
790                while queue.len() >= MAX_PENDING_MESSAGES_PER_PEER {
791                    if let Some(evicted) = queue.pop_front() {
792                        dropped_count += 1;
793                        total_pending_bytes = total_pending_bytes
794                            .saturating_sub(evicted.payload.len());
795                        dropped_messages.push(evicted);
796                    } else {
797                        break;
798                    }
799                }
800
801                if per_peer_limit > 0 {
802                    while queue.bytes_len() + message_len > per_peer_limit {
803                        if let Some(evicted) = queue.pop_front() {
804                            dropped_bytes_limit_peer += 1;
805                            total_pending_bytes = total_pending_bytes
806                                .saturating_sub(evicted.payload.len());
807                            dropped_messages.push(evicted);
808                        } else {
809                            break;
810                        }
811                    }
812                }
813
814                if per_peer_limit > 0
815                    && queue.bytes_len() + message_len > per_peer_limit
816                {
817                    dropped_bytes_limit_peer += 1;
818                } else if global_limit > 0
819                    && total_pending_bytes.saturating_add(message_len)
820                        > global_limit
821                {
822                    dropped_bytes_limit_global += 1;
823                } else {
824                    queue.push_back(message);
825                }
826            }
827        }
828
829        if duplicate {
830            self.refresh_runtime_metrics();
831            return;
832        }
833
834        for evicted in dropped_messages {
835            self.observe_pending_message_age(evicted.enqueued_at);
836        }
837
838        if dropped_count > 0 {
839            warn!(
840                target: TARGET,
841                peer_id = %peer,
842                dropped = dropped_count,
843                max_messages = MAX_PENDING_MESSAGES_PER_PEER,
844                "inbound queue count limit reached; oldest messages evicted",
845            );
846        }
847
848        if dropped_bytes_limit_peer > 0 {
849            warn!(
850                target: TARGET,
851                peer_id = %peer,
852                dropped = dropped_bytes_limit_peer,
853                message_bytes = message_len,
854                max_queue_bytes = per_peer_limit,
855                "inbound queue bytes limit reached; messages evicted/dropped",
856            );
857        }
858
859        if dropped_bytes_limit_global > 0 {
860            warn!(
861                target: TARGET,
862                peer_id = %peer,
863                dropped = dropped_bytes_limit_global,
864                message_bytes = message_len,
865                max_queue_bytes_total = global_limit,
866                "inbound global queue bytes limit reached; messages dropped",
867            );
868        }
869
870        if let Some(metrics) = self.metric_handle() {
871            metrics.inc_inbound_queue_drop_by(dropped_count);
872            metrics.inc_inbound_queue_bytes_drop_per_peer_by(
873                dropped_bytes_limit_peer,
874            );
875            metrics.inc_inbound_queue_bytes_drop_global_by(
876                dropped_bytes_limit_global,
877            );
878        }
879
880        self.refresh_runtime_metrics();
881    }
882
883    /// Add ephemeral response.
884    fn add_ephemeral_response(
885        &mut self,
886        peer: PeerId,
887        response_channel: ResponseChannel<ReqResMessage>,
888    ) {
889        self.response_channels
890            .entry(peer)
891            .or_default()
892            .push_back(response_channel);
893        self.refresh_runtime_metrics();
894    }
895
896    /// Send pending messages to peer.
897    fn send_pending_outbound_messages(&mut self, peer: PeerId) {
898        if let Some(mut queue) = self.pending_outbound_messages.remove(&peer) {
899            for message in queue.drain() {
900                self.observe_pending_message_age(message.enqueued_at);
901                self.swarm
902                    .behaviour_mut()
903                    .send_message(&peer, message.payload);
904            }
905        }
906
907        self.retry_by_peer.remove(&peer);
908        self.refresh_runtime_metrics();
909    }
910
911    /// Get the network service.
912    pub fn service(&self) -> NetworkService {
913        self.service.clone()
914    }
915
916    /// Change the network state.
917    async fn change_state(&mut self, state: NetworkState) {
918        trace!(target: TARGET, state = ?state, "state changed");
919        self.state = state.clone();
920        if let Some(metrics) = self.metric_handle() {
921            metrics.observe_state_transition(&state);
922        }
923        self.send_event(NetworkEvent::StateChanged(state)).await;
924    }
925
926    /// Send event
927    async fn send_event(&mut self, event: NetworkEvent) {
928        if let Some(monitor) = self.monitor.as_mut()
929            && let Err(e) = monitor.tell(MonitorMessage::Network(event)).await
930        {
931            error!(target: TARGET, error = %e, "failed to forward event to monitor");
932            self.crash_token.cancel();
933        }
934    }
935
936    /// Run the network worker.
937    pub async fn run(&mut self) {
938        let bootstrap_start = Instant::now();
939
940        // Run connection to bootstrap node.
941        if let Err(error) = self.run_connection().await {
942            if let Some(metrics) = self.metric_handle() {
943                metrics.observe_bootstrap_duration_seconds(
944                    "failure",
945                    bootstrap_start.elapsed().as_secs_f64(),
946                );
947            }
948            error!(target: TARGET, error = %error, "bootstrap connection failed");
949            self.send_event(NetworkEvent::Error(error)).await;
950            // Irrecoverable error. Cancel the node.
951            self.crash_token.cancel();
952            return;
953        }
954        if let Some(metrics) = self.metric_handle() {
955            metrics.observe_bootstrap_duration_seconds(
956                "success",
957                bootstrap_start.elapsed().as_secs_f64(),
958            );
959        }
960
961        if self.state != NetworkState::Running {
962            self.change_state(NetworkState::Running).await;
963        }
964
965        // Finish pre routing state, activating random walk (if node is a bootstrap).
966        if !self.is_safe_mode() {
967            self.swarm.behaviour_mut().finish_prerouting_state();
968        }
969        // Run main loop.
970        self.run_main().await;
971    }
972
973    /// Run connection to bootstrap node.
974    pub async fn run_connection(&mut self) -> Result<(), Error> {
975        if self.is_safe_mode() {
976            self.change_state(NetworkState::Running).await;
977            return Ok(());
978        }
979
980        // If is the first node of ave network.
981        if self.node_type == NodeType::Bootstrap && self.boot_nodes.is_empty() {
982            self.change_state(NetworkState::Running).await;
983            Ok(())
984        } else {
985            self.change_state(NetworkState::Dial).await;
986
987            let mut retrys: u8 = 0;
988
989            // Per-round deadline: if any boot node's TCP connects but Identify never
990            // completes (NAT half-open, overloaded peer, protocol stall…), this timer
991            // moves the remaining nodes to retry_boot_nodes so the retry logic handles
992            // them instead of waiting up to idle_connection_timeout (90 s) per round.
993            let dialing_round_timeout = sleep(Duration::from_secs(0));
994            tokio::pin!(dialing_round_timeout);
995            let mut dialing_timeout_active = false;
996
997            loop {
998                match self.state {
999                    NetworkState::Dial => {
1000                        // Dial to boot node.
1001                        if self.boot_nodes.is_empty() {
1002                            error!(target: TARGET, "no bootstrap nodes available");
1003                            self.send_event(NetworkEvent::Error(
1004                                Error::NoBootstrapNode,
1005                            ))
1006                            .await;
1007                            self.change_state(NetworkState::Disconnected).await;
1008                        } else {
1009                            let copy_boot_nodes = self.boot_nodes.clone();
1010
1011                            for (peer, addresses) in copy_boot_nodes {
1012                                if let Some(metrics) = self.metric_handle() {
1013                                    metrics.inc_dial_attempt_bootstrap();
1014                                }
1015                                if let Err(e) = self.swarm.dial(
1016                                    DialOpts::peer_id(peer)
1017                                        .addresses(addresses.clone())
1018                                        .build(),
1019                                ) {
1020                                    let (add_to_retry, new_addresses) = self
1021                                        .handle_dial_error(e, &peer, true)
1022                                        .unwrap_or((false, vec![]));
1023                                    self.boot_nodes.remove(&peer);
1024                                    if add_to_retry {
1025                                        if new_addresses.is_empty() {
1026                                            self.retry_boot_nodes
1027                                                .insert(peer, addresses);
1028                                        } else {
1029                                            self.retry_boot_nodes
1030                                                .insert(peer, new_addresses);
1031                                        }
1032                                    }
1033                                }
1034                            }
1035                            if !self.boot_nodes.is_empty() {
1036                                self.change_state(NetworkState::Dialing).await;
1037                                dialing_round_timeout.as_mut().reset(
1038                                    Instant::now() + Duration::from_secs(15),
1039                                );
1040                                dialing_timeout_active = true;
1041                            } else {
1042                                warn!(target: TARGET, "all bootstrap dials failed");
1043                                self.change_state(NetworkState::Disconnected)
1044                                    .await;
1045                            }
1046                        }
1047                    }
1048                    NetworkState::Dialing => {
1049                        // No more bootnodes to send dial, none was successful nut one or more Dial fail by keepalivetimeout
1050                        if self.boot_nodes.is_empty()
1051                            && self.successful_dials == 0
1052                            && !self.retry_boot_nodes.is_empty()
1053                            && retrys < 3
1054                        {
1055                            retrys += 1;
1056                            let wait = Duration::from_secs(1u64 << retrys); // retrys=1→2s, retrys=2→4s
1057                            debug!(target: TARGET, attempt = retrys, wait_secs = wait.as_secs(), "retrying bootstrap dials");
1058
1059                            let backoff = sleep(wait);
1060                            tokio::pin!(backoff);
1061                            loop {
1062                                tokio::select! {
1063                                    _ = &mut backoff => break,
1064                                    event = self.swarm.select_next_some() => {
1065                                        self.handle_connection_events(event).await;
1066                                    }
1067                                    _ = self.graceful_token.cancelled() => {
1068                                        return Err(Error::Cancelled);
1069                                    }
1070                                    _ = self.crash_token.cancelled() => {
1071                                        return Err(Error::Cancelled);
1072                                    }
1073                                }
1074                            }
1075
1076                            dialing_timeout_active = false;
1077                            self.boot_nodes.clone_from(&self.retry_boot_nodes);
1078                            self.retry_boot_nodes.clear();
1079                            self.change_state(NetworkState::Dial).await;
1080                        }
1081                        // No more bootnodes to send dial and none was successful
1082                        else if self.boot_nodes.is_empty()
1083                            && self.successful_dials == 0
1084                        {
1085                            self.change_state(NetworkState::Disconnected).await;
1086                        // No more bootnodes to send dial and one or more was successful
1087                        } else if self.boot_nodes.is_empty() {
1088                            return Ok(());
1089                        }
1090                    }
1091                    NetworkState::Running => {
1092                        return Ok(());
1093                    }
1094                    NetworkState::Disconnected => {
1095                        return Err(Error::NoBootstrapNode);
1096                    }
1097                    _ => {}
1098                }
1099                if self.state != NetworkState::Disconnected {
1100                    tokio::select! {
1101                        event = self.swarm.select_next_some() => {
1102                            self.handle_connection_events(event).await;
1103                        }
1104                                    _ = self.graceful_token.cancelled() => {
1105                                        return Err(Error::Cancelled);
1106                                    }
1107                                    _ = self.crash_token.cancelled() => {
1108                                        return Err(Error::Cancelled);
1109                                    }
1110                        _ = &mut dialing_round_timeout, if dialing_timeout_active => {
1111                            warn!(
1112                                target: TARGET,
1113                                remaining = self.boot_nodes.len(),
1114                                "bootstrap round timed out waiting for Identify; \
1115                                 moving remaining peers to retry queue"
1116                            );
1117                            for (peer, addrs) in self.boot_nodes.drain() {
1118                                self.retry_boot_nodes.insert(peer, addrs);
1119                            }
1120                            dialing_timeout_active = false;
1121                        }
1122                    }
1123                }
1124            }
1125        }
1126    }
1127
1128    fn collect_retryable_transport_addresses(
1129        &self,
1130        items: Vec<(Multiaddr, libp2p::TransportError<std::io::Error>)>,
1131        trace_unreachable: bool,
1132    ) -> Vec<Multiaddr> {
1133        let mut new_addresses = vec![];
1134        for (address, error) in items {
1135            if trace_unreachable {
1136                trace!(target: TARGET, addr = %address, err = ?error, "address unreachable");
1137            }
1138
1139            if let libp2p::TransportError::Other(e) = error {
1140                match e.kind() {
1141                    std::io::ErrorKind::ConnectionRefused
1142                    | std::io::ErrorKind::TimedOut
1143                    | std::io::ErrorKind::ConnectionAborted
1144                    | std::io::ErrorKind::NotConnected
1145                    | std::io::ErrorKind::BrokenPipe
1146                    | std::io::ErrorKind::Interrupted
1147                    | std::io::ErrorKind::HostUnreachable
1148                    | std::io::ErrorKind::NetworkUnreachable => {
1149                        new_addresses.push(address);
1150                    }
1151                    _ => {}
1152                }
1153            };
1154        }
1155
1156        new_addresses
1157    }
1158
1159    /// Classify a dial error and return retry decision.
1160    ///
1161    /// - `bootstrap_flow = true` keeps bootstrap-specific behaviour/logs.
1162    /// - `bootstrap_flow = false` keeps runtime-specific behaviour/logs/cleanup.
1163    fn handle_dial_error(
1164        &mut self,
1165        e: DialError,
1166        peer_id: &PeerId,
1167        bootstrap_flow: bool,
1168    ) -> Option<(bool, Vec<Multiaddr>)> {
1169        let phase = if bootstrap_flow {
1170            "bootstrap"
1171        } else {
1172            "runtime"
1173        };
1174        match e {
1175            DialError::LocalPeerId { .. } => {
1176                if let Some(metrics) = self.metric_handle() {
1177                    metrics.observe_dial_failure(phase, "local_peer_id");
1178                }
1179                if bootstrap_flow {
1180                    warn!(target: TARGET, peer_id = %peer_id, "dial rejected: connected peer-id matches local peer");
1181                    return Some((false, vec![]));
1182                }
1183
1184                self.retry_by_peer.remove(peer_id);
1185                self.clear_pending_messages(peer_id);
1186                self.swarm
1187                    .behaviour_mut()
1188                    .clean_hard_peer_to_remove(peer_id);
1189                None
1190            }
1191            DialError::NoAddresses => {
1192                if let Some(metrics) = self.metric_handle() {
1193                    metrics.observe_dial_failure(phase, "no_addresses");
1194                }
1195                if bootstrap_flow {
1196                    debug!(target: TARGET, peer_id = %peer_id, "dial skipped: no addresses");
1197                }
1198                Some((false, vec![]))
1199            }
1200            DialError::DialPeerConditionFalse(_) => {
1201                if let Some(metrics) = self.metric_handle() {
1202                    metrics.observe_dial_failure(phase, "peer_condition");
1203                }
1204                if bootstrap_flow {
1205                    debug!(target: TARGET, peer_id = %peer_id, "dial skipped: peer condition not met");
1206                    return Some((false, vec![]));
1207                }
1208
1209                None
1210            }
1211            DialError::Denied { cause } => {
1212                if let Some(metrics) = self.metric_handle() {
1213                    metrics.observe_dial_failure(phase, "denied");
1214                }
1215                if bootstrap_flow {
1216                    debug!(target: TARGET, peer_id = %peer_id, cause = %cause, "dial denied by behaviour");
1217                }
1218                Some((false, vec![]))
1219            }
1220            DialError::Aborted => {
1221                if let Some(metrics) = self.metric_handle() {
1222                    metrics.observe_dial_failure(phase, "aborted");
1223                }
1224                if bootstrap_flow {
1225                    debug!(target: TARGET, peer_id = %peer_id, "dial aborted, will retry");
1226                }
1227                Some((true, vec![]))
1228            }
1229            DialError::WrongPeerId { obtained, .. } => {
1230                if let Some(metrics) = self.metric_handle() {
1231                    metrics.observe_dial_failure(phase, "wrong_peer_id");
1232                }
1233                if bootstrap_flow {
1234                    warn!(target: TARGET, expected = %peer_id, obtained = %obtained, "dial failed: peer identity mismatch");
1235                    return Some((false, vec![]));
1236                }
1237
1238                self.retry_by_peer.remove(peer_id);
1239                self.clear_pending_messages(peer_id);
1240                self.swarm
1241                    .behaviour_mut()
1242                    .clean_hard_peer_to_remove(peer_id);
1243                None
1244            }
1245            DialError::Transport(items) => {
1246                if let Some(metrics) = self.metric_handle() {
1247                    metrics.observe_dial_failure(phase, "transport");
1248                }
1249                if bootstrap_flow {
1250                    debug!(target: TARGET, peer_id = %peer_id, "transport dial failed");
1251                }
1252
1253                let new_addresses = self.collect_retryable_transport_addresses(
1254                    items,
1255                    bootstrap_flow,
1256                );
1257                if !new_addresses.is_empty() {
1258                    Some((true, new_addresses))
1259                } else {
1260                    Some((false, vec![]))
1261                }
1262            }
1263        }
1264    }
1265
1266    /// Handle connection events.
1267    async fn handle_connection_events(
1268        &mut self,
1269        event: SwarmEvent<BehaviourEvent>,
1270    ) {
1271        match event {
1272            SwarmEvent::ConnectionClosed { peer_id, .. } => {
1273                self.boot_nodes.remove(&peer_id);
1274            }
1275            SwarmEvent::OutgoingConnectionError {
1276                peer_id: Some(peer_id),
1277                error,
1278                ..
1279            } => {
1280                let (add_to_retry, new_addresses) = self
1281                    .handle_dial_error(error, &peer_id, true)
1282                    .unwrap_or((false, vec![]));
1283
1284                if let Some(addresses) = self.boot_nodes.remove(&peer_id)
1285                    && add_to_retry
1286                {
1287                    if new_addresses.is_empty() {
1288                        self.retry_boot_nodes.insert(peer_id, addresses);
1289                    } else {
1290                        self.retry_boot_nodes.insert(peer_id, new_addresses);
1291                    }
1292                }
1293            }
1294            SwarmEvent::Behaviour(BehaviourEvent::Identified {
1295                peer_id,
1296                info,
1297                connection_id,
1298            }) => {
1299                if !self
1300                    .check_protocols(&info.protocol_version, &info.protocols)
1301                {
1302                    warn!(target: TARGET, peer_id = %peer_id, protocol_version = %info.protocol_version, "peer uses incompatible protocols; closing connection");
1303
1304                    self.swarm
1305                        .behaviour_mut()
1306                        .close_connections(&peer_id, Some(connection_id));
1307                } else {
1308                    self.peer_action
1309                        .insert(peer_id, Action::Identified(connection_id));
1310
1311                    let mut any_address_is_valid = false;
1312                    for addr in info.listen_addrs {
1313                        if self
1314                            .swarm
1315                            .behaviour_mut()
1316                            .add_self_reported_address(&peer_id, &addr)
1317                        {
1318                            any_address_is_valid = true;
1319                        }
1320                    }
1321
1322                    if any_address_is_valid {
1323                        if self.boot_nodes.remove(&peer_id).is_some() {
1324                            self.successful_dials += 1;
1325                        }
1326                        self.peer_identify.insert(peer_id);
1327                    } else {
1328                        warn!(target: TARGET, peer_id = %peer_id, "bootstrap peer has no valid addresses");
1329
1330                        self.swarm
1331                            .behaviour_mut()
1332                            .close_connections(&peer_id, Some(connection_id));
1333                    }
1334                }
1335            }
1336            SwarmEvent::Behaviour(BehaviourEvent::IdentifyError {
1337                peer_id,
1338                error,
1339            }) => {
1340                self.observe_identify_error(&error);
1341                match error {
1342                    swarm::StreamUpgradeError::Timeout => {
1343                        // Recoverable: wait for ConnectionClosed to remove from boot_nodes.
1344                    }
1345                    _ => {
1346                        // Hard failure: Identify will never complete; move to retry immediately
1347                        // instead of waiting for the per-round timeout.
1348                        debug!(target: TARGET, peer_id = %peer_id, error = %error, "identify hard failure during bootstrap; queuing for retry");
1349                        if let Some(addrs) = self.boot_nodes.remove(&peer_id) {
1350                            self.retry_boot_nodes.insert(peer_id, addrs);
1351                        }
1352                    }
1353                }
1354            }
1355            _ => {}
1356        }
1357        self.refresh_runtime_metrics();
1358    }
1359
1360    fn clear_pending_messages(&mut self, peer_id: &PeerId) {
1361        warn!(target: TARGET, peer_id = %peer_id, "max dial attempts reached; dropping pending messages");
1362
1363        let dropped = self.drop_pending_outbound_messages(peer_id);
1364        if let Some(metrics) = self.metric_handle() {
1365            metrics.inc_max_retries_drop_by(dropped as u64);
1366        }
1367        self.peer_action.remove(peer_id);
1368        self.retry_by_peer.remove(peer_id);
1369        self.refresh_runtime_metrics();
1370    }
1371
1372    fn check_protocols(
1373        &self,
1374        protocol_version: &str,
1375        protocols: &[StreamProtocol],
1376    ) -> bool {
1377        let supp_protocols: HashSet<StreamProtocol> = protocols
1378            .iter()
1379            .cloned()
1380            .collect::<HashSet<StreamProtocol>>();
1381
1382        protocol_version == IDENTIFY_PROTOCOL
1383            && supp_protocols.contains(&StreamProtocol::new(REQRES_PROTOCOL))
1384    }
1385
1386    /// Run network worker.
1387    pub async fn run_main(&mut self) {
1388        info!(target: TARGET, "network worker started");
1389
1390        loop {
1391            tokio::select! {
1392                command = self.command_receiver.recv() => {
1393                    match command {
1394                        Some(command) => self.handle_command(command).await,
1395                        None => break,
1396                    }
1397                }
1398                event = self.swarm.select_next_some() => {
1399                    // Handle events.
1400                    self.handle_event(event).await;
1401                }
1402                _ = async {
1403                    if let Some(t) = &mut self.retry_timer {
1404                        t.as_mut().await;
1405                    }
1406                }, if self.retry_timer.is_some() => {
1407                    for (peer, kind, addrs) in self.drain_due_retries() {
1408                        if let Some(action) = self.peer_action.get(&peer) {
1409                            match (action, kind) {
1410                                (Action::Discover, RetryKind::Discover) => {
1411                                    self.swarm.behaviour_mut().discover(&peer);
1412                                },
1413                                (Action::Dial, RetryKind::Dial) => {
1414                                    if let Some(metrics) = self.metric_handle() {
1415                                        metrics.inc_dial_attempt_runtime();
1416                                    }
1417                                    if let Err(error) = self.swarm.dial(
1418                                        DialOpts::peer_id(peer)
1419                                            .addresses(addrs)
1420                                            .extend_addresses_through_behaviour()
1421                                            .build()
1422                                    ) && let Some((retry, new_address)) =
1423                                        self.handle_dial_error(error, &peer, false) {
1424
1425                                        self.peer_action.remove(&peer);
1426                                        if retry {
1427                                            let addr = new_address
1428                                                    .iter()
1429                                                    .filter(|x| {
1430                                                        !self
1431                                                        .swarm
1432                                                        .behaviour()
1433                                                        .is_invalid_address(x)
1434                                                    })
1435                                                    .cloned()
1436                                                    .collect::<Vec<Multiaddr>>();
1437
1438                                                if addr.is_empty() {
1439                                                    self.schedule_retry(peer, ScheduleType::Discover);
1440                                                } else {
1441                                                    self.schedule_retry(peer, ScheduleType::Dial(addr.clone()));
1442                                                }
1443                                        } else {
1444                                            self.schedule_retry(peer, ScheduleType::Discover);
1445                                        }
1446                                };
1447                                },
1448                                _ => {}
1449                            }
1450                        };
1451                    }
1452                },
1453                                    _ = self.graceful_token.cancelled() => {
1454                                        break;
1455                                    }
1456                                    _ = self.crash_token.cancelled() => {
1457                                        break;
1458                                    }
1459            }
1460        }
1461    }
1462
1463    async fn handle_command(&mut self, command: Command) {
1464        match command {
1465            Command::SendMessage { peer, message } => {
1466                if self.is_safe_mode() {
1467                    debug!(
1468                        target: TARGET,
1469                        peer_id = %peer,
1470                        size = message.len(),
1471                        "safe mode active; ignoring send command"
1472                    );
1473                    return;
1474                }
1475                if let Err(error) = self.send_message(peer, message) {
1476                    error!(target: TARGET, error = %error, "failed to deliver message");
1477                    self.send_event(NetworkEvent::Error(error)).await;
1478                }
1479            }
1480        }
1481    }
1482
1483    async fn message_to_helper(
1484        &mut self,
1485        message: MessagesHelper,
1486        peer_id: &PeerId,
1487    ) {
1488        let sender = match peer_id_to_ed25519_pubkey_bytes(peer_id) {
1489            Ok(public_key) => public_key,
1490            Err(e) => {
1491                warn!(target: TARGET, error = %e, "cannot resolve public key from peer id");
1492                return;
1493            }
1494        };
1495
1496        'Send: {
1497            if let Some(helper_sender) = self.helper_sender.as_mut() {
1498                match message {
1499                    MessagesHelper::Single(items) => {
1500                        if helper_sender
1501                            .send(CommandHelper::ReceivedMessage {
1502                                sender,
1503                                message: items,
1504                            })
1505                            .await
1506                            .is_err()
1507                        {
1508                            break 'Send;
1509                        }
1510                    }
1511                    MessagesHelper::Vec(items) => {
1512                        for item in items {
1513                            if helper_sender
1514                                .send(CommandHelper::ReceivedMessage {
1515                                    sender,
1516                                    message: item,
1517                                })
1518                                .await
1519                                .is_err()
1520                            {
1521                                break 'Send;
1522                            }
1523                        }
1524                    }
1525                }
1526
1527                return;
1528            }
1529        }
1530
1531        error!(target: TARGET, "helper channel closed; shutting down");
1532        self.crash_token.cancel();
1533    }
1534
1535    async fn handle_event(&mut self, event: SwarmEvent<BehaviourEvent>) {
1536        if self.is_safe_mode() {
1537            match event {
1538                SwarmEvent::Behaviour(BehaviourEvent::ReqresMessage {
1539                    peer_id,
1540                    ..
1541                }) => {
1542                    debug!(
1543                        target: TARGET,
1544                        peer_id = %peer_id,
1545                        "safe mode active; dropping inbound reqres message"
1546                    );
1547                    self.swarm
1548                        .behaviour_mut()
1549                        .close_connections(&peer_id, None);
1550                }
1551                SwarmEvent::Behaviour(BehaviourEvent::Identified {
1552                    peer_id,
1553                    connection_id,
1554                    ..
1555                }) => {
1556                    debug!(
1557                        target: TARGET,
1558                        peer_id = %peer_id,
1559                        "safe mode active; closing identified peer"
1560                    );
1561                    self.swarm
1562                        .behaviour_mut()
1563                        .close_connections(&peer_id, Some(connection_id));
1564                }
1565                SwarmEvent::Behaviour(BehaviourEvent::IdentifyError {
1566                    peer_id,
1567                    ..
1568                }) => {
1569                    self.swarm
1570                        .behaviour_mut()
1571                        .close_connections(&peer_id, None);
1572                }
1573                SwarmEvent::ConnectionEstablished {
1574                    peer_id,
1575                    connection_id,
1576                    ..
1577                } => {
1578                    debug!(
1579                        target: TARGET,
1580                        peer_id = %peer_id,
1581                        "safe mode active; closing established connection"
1582                    );
1583                    self.swarm
1584                        .behaviour_mut()
1585                        .close_connections(&peer_id, Some(connection_id));
1586                }
1587                SwarmEvent::OutgoingConnectionError { .. }
1588                | SwarmEvent::ConnectionClosed { .. }
1589                | SwarmEvent::IncomingConnectionError { .. }
1590                | SwarmEvent::IncomingConnection { .. }
1591                | SwarmEvent::ListenerClosed { .. }
1592                | SwarmEvent::Dialing { .. }
1593                | SwarmEvent::NewExternalAddrCandidate { .. }
1594                | SwarmEvent::ExternalAddrConfirmed { .. }
1595                | SwarmEvent::ExternalAddrExpired { .. }
1596                | SwarmEvent::NewExternalAddrOfPeer { .. }
1597                | SwarmEvent::NewListenAddr { .. }
1598                | SwarmEvent::ExpiredListenAddr { .. }
1599                | SwarmEvent::ListenerError { .. }
1600                | SwarmEvent::Behaviour(BehaviourEvent::ReqresFailure {
1601                    ..
1602                })
1603                | SwarmEvent::Behaviour(BehaviourEvent::ClosestPeer {
1604                    ..
1605                })
1606                | SwarmEvent::Behaviour(BehaviourEvent::Dummy) => {}
1607                _ => {}
1608            }
1609            self.refresh_runtime_metrics();
1610            return;
1611        }
1612
1613        match event {
1614            SwarmEvent::Behaviour(event) => {
1615                match event {
1616                    BehaviourEvent::Identified {
1617                        peer_id,
1618                        info,
1619                        connection_id,
1620                    } => {
1621                        if !self.check_protocols(
1622                            &info.protocol_version,
1623                            &info.protocols,
1624                        ) {
1625                            warn!(
1626                                target: TARGET,
1627                                peer_id = %peer_id,
1628                                protocol_version = %info.protocol_version,
1629                                protocols = ?info.protocols,
1630                                "peer uses incompatible protocols; closing connection"
1631                            );
1632
1633                            self.clear_pending_messages(&peer_id);
1634
1635                            self.swarm
1636                                .behaviour_mut()
1637                                .clean_hard_peer_to_remove(&peer_id);
1638
1639                            self.swarm.behaviour_mut().close_connections(
1640                                &peer_id,
1641                                Some(connection_id),
1642                            );
1643                        } else {
1644                            self.peer_action.insert(
1645                                peer_id,
1646                                Action::Identified(connection_id),
1647                            );
1648
1649                            self.swarm
1650                                .behaviour_mut()
1651                                .clean_peer_to_remove(&peer_id);
1652                            for addr in info.listen_addrs {
1653                                self.swarm
1654                                    .behaviour_mut()
1655                                    .add_self_reported_address(&peer_id, &addr);
1656                            }
1657
1658                            self.peer_identify.insert(peer_id);
1659
1660                            if let Some(mut queue) =
1661                                self.pending_inbound_messages.remove(&peer_id)
1662                            {
1663                                let mut buffered = VecDeque::new();
1664                                for message in queue.drain() {
1665                                    self.observe_pending_message_age(
1666                                        message.enqueued_at,
1667                                    );
1668                                    buffered.push_back(message.payload);
1669                                }
1670                                self.message_to_helper(
1671                                    MessagesHelper::Vec(buffered),
1672                                    &peer_id,
1673                                )
1674                                .await;
1675                            };
1676
1677                            self.send_pending_outbound_messages(peer_id);
1678                        }
1679                    }
1680                    BehaviourEvent::IdentifyError { peer_id, error } => {
1681                        self.observe_identify_error(&error);
1682                        debug!(target: TARGET, peer_id = %peer_id, error = %error, "identify error");
1683
1684                        match error {
1685                            swarm::StreamUpgradeError::Timeout => {
1686                                // We do not clean since we will try to open the connection when it is
1687                                // confirmed that it has been closed in SwarmEvent::ConnectionClosed
1688                            }
1689                            swarm::StreamUpgradeError::Apply(..)
1690                            | swarm::StreamUpgradeError::NegotiationFailed
1691                            | swarm::StreamUpgradeError::Io(..) => {
1692                                // Do not call clear_pending_messages here — it removes
1693                                // peer_action, which ConnectionClosed needs to trigger
1694                                // its cleanup path. Clean only the state that
1695                                // ConnectionClosed won't reach.
1696                                self.drop_pending_outbound_messages(&peer_id);
1697                                self.retry_by_peer.remove(&peer_id);
1698                                self.response_channels.remove(&peer_id);
1699                                self.drop_pending_inbound_messages(&peer_id);
1700                            }
1701                        }
1702
1703                        self.swarm
1704                            .behaviour_mut()
1705                            .close_connections(&peer_id, None);
1706                    }
1707                    BehaviourEvent::ReqresMessage { peer_id, message } => {
1708                        let (message_data, is_request, response_channel) =
1709                            match message {
1710                                request_response::Message::Request {
1711                                    request,
1712                                    channel,
1713                                    ..
1714                                } => (request.0, true, Some(channel)),
1715                                request_response::Message::Response {
1716                                    response,
1717                                    ..
1718                                } => (response.0, false, None),
1719                            };
1720
1721                        if message_data.len() > self.max_app_message_bytes {
1722                            warn!(
1723                                target: TARGET,
1724                                peer_id = %peer_id,
1725                                size = message_data.len(),
1726                                max = self.max_app_message_bytes,
1727                                "inbound payload dropped: message too large",
1728                            );
1729                            if let Some(metrics) = self.metric_handle() {
1730                                metrics.inc_oversized_inbound_drop();
1731                            }
1732                            self.swarm
1733                                .behaviour_mut()
1734                                .close_connections(&peer_id, None);
1735                            self.refresh_runtime_metrics();
1736                            return;
1737                        }
1738
1739                        if is_request {
1740                            if let Some(metrics) = self.metric_handle() {
1741                                metrics.inc_reqres_request_received();
1742                            }
1743                            trace!(target: TARGET, peer_id = %peer_id, "request received");
1744                            if let Some(channel) = response_channel {
1745                                self.add_ephemeral_response(peer_id, channel);
1746                            }
1747                        } else {
1748                            if let Some(metrics) = self.metric_handle() {
1749                                metrics.inc_reqres_response_received();
1750                            }
1751                            trace!(target: TARGET, peer_id = %peer_id, "response received");
1752                        }
1753
1754                        if self.peer_identify.contains(&peer_id) {
1755                            self.message_to_helper(
1756                                MessagesHelper::Single(message_data),
1757                                &peer_id,
1758                            )
1759                            .await;
1760                        } else {
1761                            self.add_pending_inbound_message(
1762                                peer_id,
1763                                message_data,
1764                            );
1765                        }
1766                    }
1767                    BehaviourEvent::ReqresFailure {
1768                        peer_id,
1769                        direction,
1770                        kind,
1771                    } => {
1772                        if let Some(metrics) = self.metric_handle() {
1773                            metrics.observe_reqres_failure(
1774                                direction.as_metric_label(),
1775                                kind.as_metric_label(),
1776                            );
1777                        }
1778                        debug!(
1779                            target: TARGET,
1780                            peer_id = %peer_id,
1781                            direction = direction.as_metric_label(),
1782                            kind = kind.as_metric_label(),
1783                            "request-response failure"
1784                        );
1785                    }
1786                    BehaviourEvent::ClosestPeer { peer_id, info } => {
1787                        if matches!(
1788                            self.peer_action.get(&peer_id),
1789                            Some(Action::Discover)
1790                        ) {
1791                            self.peer_action.remove(&peer_id);
1792                            if let Some(info) = info {
1793                                let addr = info
1794                                    .addrs
1795                                    .iter()
1796                                    .filter(|x| {
1797                                        !self
1798                                            .swarm
1799                                            .behaviour()
1800                                            .is_invalid_address(x)
1801                                    })
1802                                    .cloned()
1803                                    .collect::<Vec<Multiaddr>>();
1804
1805                                if addr.is_empty() {
1806                                    self.schedule_retry(
1807                                        peer_id,
1808                                        ScheduleType::Discover,
1809                                    );
1810                                } else {
1811                                    self.schedule_retry(
1812                                        peer_id,
1813                                        ScheduleType::Dial(addr),
1814                                    );
1815                                }
1816                            } else {
1817                                self.schedule_retry(
1818                                    peer_id,
1819                                    ScheduleType::Discover,
1820                                );
1821                            };
1822                        }
1823                    }
1824                    BehaviourEvent::Dummy => {
1825                        // For contron_list, ReqRes events
1826                    }
1827                }
1828            }
1829            SwarmEvent::OutgoingConnectionError {
1830                error,
1831                peer_id: Some(peer_id),
1832                ..
1833            } => {
1834                if matches!(self.peer_action.get(&peer_id), Some(Action::Dial))
1835                {
1836                    self.peer_action.remove(&peer_id);
1837
1838                    self.swarm.behaviour_mut().add_peer_to_remove(&peer_id);
1839
1840                    if let Some((retry, new_address)) =
1841                        self.handle_dial_error(error, &peer_id, false)
1842                    {
1843                        if retry {
1844                            let addr = new_address
1845                                .iter()
1846                                .filter(|x| {
1847                                    !self
1848                                        .swarm
1849                                        .behaviour()
1850                                        .is_invalid_address(x)
1851                                })
1852                                .cloned()
1853                                .collect::<Vec<Multiaddr>>();
1854
1855                            if addr.is_empty() {
1856                                self.schedule_retry(
1857                                    peer_id,
1858                                    ScheduleType::Discover,
1859                                );
1860                            } else {
1861                                self.schedule_retry(
1862                                    peer_id,
1863                                    ScheduleType::Dial(addr),
1864                                );
1865                            }
1866                        } else {
1867                            self.schedule_retry(
1868                                peer_id,
1869                                ScheduleType::Discover,
1870                            );
1871                        }
1872                    };
1873                }
1874            }
1875            SwarmEvent::ConnectionClosed {
1876                peer_id,
1877                connection_id,
1878                num_established: 0,
1879                ..
1880            } => {
1881                if let Some(Action::Identified(id)) =
1882                    self.peer_action.get(&peer_id)
1883                    && connection_id == *id
1884                {
1885                    self.peer_action.remove(&peer_id);
1886
1887                    self.peer_identify.remove(&peer_id);
1888                    self.drop_pending_inbound_messages(&peer_id);
1889                    self.response_channels.remove(&peer_id);
1890
1891                    self.retry_by_peer.remove(&peer_id);
1892
1893                    if self
1894                        .pending_outbound_messages
1895                        .get(&peer_id)
1896                        .is_some_and(|q| !q.is_empty())
1897                    {
1898                        self.schedule_retry(
1899                            peer_id,
1900                            ScheduleType::Dial(vec![]),
1901                        );
1902                    }
1903                } else if let Some(Action::Dial | Action::Discover) =
1904                    self.peer_action.get(&peer_id)
1905                {
1906                    self.peer_action.remove(&peer_id);
1907                    self.retry_by_peer.remove(&peer_id);
1908                    self.drop_pending_inbound_messages(&peer_id);
1909                    self.response_channels.remove(&peer_id);
1910                    self.peer_identify.remove(&peer_id);
1911
1912                    if self
1913                        .pending_outbound_messages
1914                        .get(&peer_id)
1915                        .is_some_and(|q| !q.is_empty())
1916                    {
1917                        self.schedule_retry(peer_id, ScheduleType::Discover);
1918                    }
1919                }
1920            }
1921            SwarmEvent::IncomingConnectionError { .. } => {
1922                // We are not interested in this event at the moment.
1923                // The logs generate many false positives and cannot be associated with a
1924                // node since I do not have the peer-id. The best solution to avoid
1925                // confusion for the user is not to filter these errors.
1926            }
1927            SwarmEvent::ExpiredListenAddr { address, .. } => {
1928                warn!(target: TARGET, addr = %address, "listen address expired");
1929            }
1930            SwarmEvent::ListenerError { error, .. } => {
1931                error!(target: TARGET, error = %error, "listener error");
1932            }
1933            SwarmEvent::ConnectionEstablished {
1934                peer_id,
1935                connection_id,
1936                num_established,
1937                ..
1938            } if num_established.get() > 1 => {
1939                debug!(target: TARGET, peer_id = %peer_id, "duplicate connection detected; closing excess");
1940                self.swarm
1941                    .behaviour_mut()
1942                    .close_connections(&peer_id, Some(connection_id));
1943            }
1944            SwarmEvent::IncomingConnection { .. }
1945            | SwarmEvent::ListenerClosed { .. }
1946            | SwarmEvent::Dialing { .. }
1947            | SwarmEvent::NewExternalAddrCandidate { .. }
1948            | SwarmEvent::ExternalAddrConfirmed { .. }
1949            | SwarmEvent::ExternalAddrExpired { .. }
1950            | SwarmEvent::NewExternalAddrOfPeer { .. }
1951            | SwarmEvent::NewListenAddr { .. } => {
1952                // We are not interested in this event at the moment.
1953            }
1954            _ => {}
1955        }
1956        self.refresh_runtime_metrics();
1957    }
1958}
1959
1960#[cfg(test)]
1961mod tests {
1962
1963    use crate::routing::RoutingNode;
1964
1965    use super::*;
1966    use libp2p::core::{ConnectedPoint, Endpoint, transport::PortUse};
1967    use libp2p::identity::Keypair as Libp2pKeypair;
1968    use libp2p::swarm::ConnectionId;
1969    use prometheus_client::{encoding::text::encode, registry::Registry};
1970    use serde::Deserialize;
1971    use test_log::test;
1972
1973    use ave_common::identity::{KeyPair, keys::Ed25519Signer};
1974
1975    use serial_test::serial;
1976
1977    #[derive(Debug, Serialize, Deserialize)]
1978    pub struct Dummy;
1979
1980    fn metric_value(metrics: &str, name: &str) -> f64 {
1981        metrics
1982            .lines()
1983            .find_map(|line| {
1984                if line.starts_with(name) {
1985                    line.split_whitespace().nth(1)?.parse::<f64>().ok()
1986                } else {
1987                    None
1988                }
1989            })
1990            .unwrap_or(0.0)
1991    }
1992
1993    // Build a relay server.
1994    fn build_worker(
1995        boot_nodes: Vec<RoutingNode>,
1996        random_walk: bool,
1997        node_type: NodeType,
1998        graceful_token: CancellationToken,
1999        crash_token: CancellationToken,
2000        memory_addr: String,
2001    ) -> NetworkWorker<Dummy> {
2002        let config = create_config(
2003            boot_nodes,
2004            random_walk,
2005            node_type,
2006            vec![memory_addr],
2007        );
2008        let keys = KeyPair::Ed25519(Ed25519Signer::generate().unwrap());
2009
2010        NetworkWorker::new(
2011            &keys,
2012            config,
2013            false,
2014            NetworkWorkerRuntime {
2015                monitor: None,
2016                graceful_token,
2017                crash_token,
2018                machine_spec: None,
2019                metrics: None,
2020            },
2021        )
2022        .unwrap()
2023    }
2024
2025    // Create a config
2026    fn create_config(
2027        boot_nodes: Vec<RoutingNode>,
2028        random_walk: bool,
2029        node_type: NodeType,
2030        listen_addresses: Vec<String>,
2031    ) -> Config {
2032        let config = crate::routing::Config::default()
2033            .with_discovery_limit(50)
2034            .with_dht_random_walk(random_walk);
2035
2036        Config {
2037            boot_nodes,
2038            node_type,
2039            routing: config,
2040            external_addresses: vec![],
2041            listen_addresses,
2042            ..Default::default()
2043        }
2044    }
2045
2046    fn build_identified_event(
2047        peer_id: PeerId,
2048        public_key: libp2p::identity::PublicKey,
2049        connection_id: ConnectionId,
2050    ) -> SwarmEvent<BehaviourEvent> {
2051        SwarmEvent::Behaviour(BehaviourEvent::Identified {
2052            peer_id,
2053            info: Box::new(identify::Info {
2054                public_key,
2055                protocol_version: IDENTIFY_PROTOCOL.to_owned(),
2056                agent_version: "test-agent".to_owned(),
2057                listen_addrs: vec!["/memory/9999".parse().expect("multiaddr")],
2058                protocols: vec![StreamProtocol::new(REQRES_PROTOCOL)],
2059                observed_addr: "/memory/9998".parse().expect("multiaddr"),
2060                signed_peer_record: None,
2061            }),
2062            connection_id,
2063        })
2064    }
2065
2066    fn test_endpoint() -> ConnectedPoint {
2067        ConnectedPoint::Dialer {
2068            address: "/memory/9997".parse().expect("multiaddr"),
2069            role_override: Endpoint::Dialer,
2070            port_use: PortUse::New,
2071        }
2072    }
2073
2074    #[test]
2075    fn outbound_queue_respects_bytes_limit_and_updates_metrics() {
2076        let mut config = create_config(
2077            vec![],
2078            false,
2079            NodeType::Addressable,
2080            vec!["/memory/3100".to_owned()],
2081        );
2082        config.max_pending_outbound_bytes_per_peer = 16;
2083
2084        let keys = KeyPair::Ed25519(Ed25519Signer::generate().unwrap());
2085        let mut registry = Registry::default();
2086        let metrics = crate::metrics::register(&mut registry);
2087        let mut worker: NetworkWorker<Dummy> = NetworkWorker::new(
2088            &keys,
2089            config,
2090            false,
2091            NetworkWorkerRuntime {
2092                monitor: None,
2093                graceful_token: CancellationToken::new(),
2094                crash_token: CancellationToken::new(),
2095                machine_spec: None,
2096                metrics: Some(metrics),
2097            },
2098        )
2099        .expect("worker");
2100
2101        let peer = PeerId::random();
2102        worker.add_pending_outbound_message(
2103            peer,
2104            Bytes::from_static(b"aaaaaaaaaaaa"), // 12 bytes
2105        );
2106        worker.add_pending_outbound_message(
2107            peer,
2108            Bytes::from_static(b"bbbbbbbbbbbb"), // 12 bytes -> evicts previous by bytes
2109        );
2110        worker.add_pending_outbound_message(
2111            peer,
2112            Bytes::from_static(b"cccc"), // 4 bytes
2113        );
2114
2115        let queue = worker
2116            .pending_outbound_messages
2117            .get(&peer)
2118            .expect("queue exists");
2119        assert_eq!(queue.len(), 2);
2120        assert_eq!(queue.bytes_len(), 16);
2121
2122        let mut text = String::new();
2123        encode(&mut text, &registry).expect("encode metrics");
2124
2125        assert_eq!(
2126            metric_value(
2127                &text,
2128                "network_messages_dropped_total{direction=\"outbound\",reason=\"queue_bytes_limit_per_peer\"}"
2129            ),
2130            1.0
2131        );
2132        assert_eq!(
2133            metric_value(
2134                &text,
2135                "network_messages_dropped_total{direction=\"outbound\",reason=\"queue_bytes_limit_global\"}"
2136            ),
2137            0.0
2138        );
2139        assert_eq!(metric_value(&text, "network_pending_outbound_bytes"), 16.0);
2140        assert!(
2141            metric_value(&text, "network_pending_message_age_seconds_count")
2142                >= 1.0
2143        );
2144    }
2145
2146    #[test]
2147    fn zero_pending_bytes_limits_disable_byte_drops() {
2148        let mut config = create_config(
2149            vec![],
2150            false,
2151            NodeType::Addressable,
2152            vec!["/memory/3101".to_owned()],
2153        );
2154        config.max_pending_outbound_bytes_per_peer = 0;
2155        config.max_pending_inbound_bytes_per_peer = 0;
2156        config.max_pending_outbound_bytes_total = 0;
2157        config.max_pending_inbound_bytes_total = 0;
2158
2159        let keys = KeyPair::Ed25519(Ed25519Signer::generate().unwrap());
2160        let mut registry = Registry::default();
2161        let metrics = crate::metrics::register(&mut registry);
2162        let mut worker: NetworkWorker<Dummy> = NetworkWorker::new(
2163            &keys,
2164            config,
2165            false,
2166            NetworkWorkerRuntime {
2167                monitor: None,
2168                graceful_token: CancellationToken::new(),
2169                crash_token: CancellationToken::new(),
2170                machine_spec: None,
2171                metrics: Some(metrics),
2172            },
2173        )
2174        .expect("worker");
2175
2176        let peer = PeerId::random();
2177        for i in 0..3u8 {
2178            let payload = Bytes::from(vec![i + 1; 12]);
2179            worker.add_pending_outbound_message(peer, payload);
2180        }
2181        for i in 0..3u8 {
2182            let payload = Bytes::from(vec![i + 7; 12]);
2183            worker.add_pending_inbound_message(peer, payload);
2184        }
2185
2186        let outbound = worker
2187            .pending_outbound_messages
2188            .get(&peer)
2189            .expect("outbound queue exists");
2190        let inbound = worker
2191            .pending_inbound_messages
2192            .get(&peer)
2193            .expect("inbound queue exists");
2194        assert_eq!(outbound.len(), 3);
2195        assert_eq!(outbound.bytes_len(), 36);
2196        assert_eq!(inbound.len(), 3);
2197        assert_eq!(inbound.bytes_len(), 36);
2198
2199        let mut text = String::new();
2200        encode(&mut text, &registry).expect("encode metrics");
2201        assert_eq!(
2202            metric_value(
2203                &text,
2204                "network_messages_dropped_total{direction=\"outbound\",reason=\"queue_bytes_limit_per_peer\"}"
2205            ),
2206            0.0
2207        );
2208        assert_eq!(
2209            metric_value(
2210                &text,
2211                "network_messages_dropped_total{direction=\"outbound\",reason=\"queue_bytes_limit_global\"}"
2212            ),
2213            0.0
2214        );
2215        assert_eq!(
2216            metric_value(
2217                &text,
2218                "network_messages_dropped_total{direction=\"inbound\",reason=\"queue_bytes_limit_per_peer\"}"
2219            ),
2220            0.0
2221        );
2222        assert_eq!(
2223            metric_value(
2224                &text,
2225                "network_messages_dropped_total{direction=\"inbound\",reason=\"queue_bytes_limit_global\"}"
2226            ),
2227            0.0
2228        );
2229    }
2230
2231    #[test]
2232    fn outbound_global_bytes_limit_applies_across_peers() {
2233        let mut config = create_config(
2234            vec![],
2235            false,
2236            NodeType::Addressable,
2237            vec!["/memory/3102".to_owned()],
2238        );
2239        config.max_pending_outbound_bytes_per_peer = 0;
2240        config.max_pending_outbound_bytes_total = 20;
2241
2242        let keys = KeyPair::Ed25519(Ed25519Signer::generate().unwrap());
2243        let mut registry = Registry::default();
2244        let metrics = crate::metrics::register(&mut registry);
2245        let mut worker: NetworkWorker<Dummy> = NetworkWorker::new(
2246            &keys,
2247            config,
2248            false,
2249            NetworkWorkerRuntime {
2250                monitor: None,
2251                graceful_token: CancellationToken::new(),
2252                crash_token: CancellationToken::new(),
2253                machine_spec: None,
2254                metrics: Some(metrics),
2255            },
2256        )
2257        .expect("worker");
2258
2259        let peer_a = PeerId::random();
2260        let peer_b = PeerId::random();
2261
2262        worker.add_pending_outbound_message(
2263            peer_a,
2264            Bytes::from_static(b"aaaaaaaaaaaa"), // 12 bytes
2265        );
2266        worker.add_pending_outbound_message(
2267            peer_b,
2268            Bytes::from_static(b"bbbbbbbbbbbb"), // rejected by global limit
2269        );
2270
2271        assert_eq!(worker.pending_outbound_bytes_len(), 12);
2272        assert_eq!(
2273            worker
2274                .pending_outbound_messages
2275                .get(&peer_a)
2276                .expect("peer_a queue")
2277                .len(),
2278            1
2279        );
2280        assert_eq!(
2281            worker
2282                .pending_outbound_messages
2283                .get(&peer_b)
2284                .map_or(0, PendingQueue::len),
2285            0
2286        );
2287
2288        let mut text = String::new();
2289        encode(&mut text, &registry).expect("encode metrics");
2290        assert_eq!(
2291            metric_value(
2292                &text,
2293                "network_messages_dropped_total{direction=\"outbound\",reason=\"queue_bytes_limit_per_peer\"}"
2294            ),
2295            0.0
2296        );
2297        assert_eq!(
2298            metric_value(
2299                &text,
2300                "network_messages_dropped_total{direction=\"outbound\",reason=\"queue_bytes_limit_global\"}"
2301            ),
2302            1.0
2303        );
2304    }
2305
2306    #[test]
2307    fn dial_failures_include_phase_label() {
2308        let mut registry = Registry::default();
2309        let metrics = crate::metrics::register(&mut registry);
2310
2311        metrics.observe_dial_failure("bootstrap", "transport");
2312        metrics.observe_dial_failure("runtime", "denied");
2313
2314        let mut text = String::new();
2315        encode(&mut text, &registry).expect("encode metrics");
2316
2317        assert_eq!(
2318            metric_value(
2319                &text,
2320                "network_dial_failures_total{phase=\"bootstrap\",kind=\"transport\"}"
2321            ),
2322            1.0
2323        );
2324        assert_eq!(
2325            metric_value(
2326                &text,
2327                "network_dial_failures_total{phase=\"runtime\",kind=\"denied\"}"
2328            ),
2329            1.0
2330        );
2331    }
2332
2333    #[test]
2334    fn bootstrap_duration_is_labeled_by_result() {
2335        let mut registry = Registry::default();
2336        let metrics = crate::metrics::register(&mut registry);
2337
2338        metrics.observe_bootstrap_duration_seconds("success", 0.5);
2339        metrics.observe_bootstrap_duration_seconds("failure", 1.25);
2340
2341        let mut text = String::new();
2342        encode(&mut text, &registry).expect("encode metrics");
2343
2344        assert_eq!(
2345            metric_value(
2346                &text,
2347                "network_bootstrap_duration_seconds_count{result=\"success\"}"
2348            ),
2349            1.0
2350        );
2351        assert_eq!(
2352            metric_value(
2353                &text,
2354                "network_bootstrap_duration_seconds_count{result=\"failure\"}"
2355            ),
2356            1.0
2357        );
2358    }
2359
2360    #[test]
2361    fn network_state_is_one_hot() {
2362        let mut registry = Registry::default();
2363        let metrics = crate::metrics::register(&mut registry);
2364
2365        metrics.set_state_current(&NetworkState::Running);
2366
2367        let mut text = String::new();
2368        encode(&mut text, &registry).expect("encode metrics");
2369
2370        assert_eq!(
2371            metric_value(&text, "network_state{state=\"running\"}"),
2372            1.0
2373        );
2374        assert_eq!(metric_value(&text, "network_state{state=\"start\"}"), 0.0);
2375        assert_eq!(metric_value(&text, "network_state{state=\"dial\"}"), 0.0);
2376        assert_eq!(
2377            metric_value(&text, "network_state{state=\"dialing\"}"),
2378            0.0
2379        );
2380        assert_eq!(
2381            metric_value(&text, "network_state{state=\"disconnected\"}"),
2382            0.0
2383        );
2384    }
2385
2386    #[test(tokio::test)]
2387    #[serial]
2388    async fn pending_inbound_messages_keep_arrival_order_after_identify() {
2389        let mut worker = build_worker(
2390            vec![],
2391            false,
2392            NodeType::Addressable,
2393            CancellationToken::new(),
2394            CancellationToken::new(),
2395            "/memory/3200".to_owned(),
2396        );
2397        let remote_keys = Libp2pKeypair::generate_ed25519();
2398        let remote_peer = remote_keys.public().to_peer_id();
2399
2400        worker.add_pending_inbound_message(
2401            remote_peer,
2402            Bytes::from_static(b"msg-2"),
2403        );
2404        worker.add_pending_inbound_message(
2405            remote_peer,
2406            Bytes::from_static(b"msg-1"),
2407        );
2408        worker.add_pending_inbound_message(
2409            remote_peer,
2410            Bytes::from_static(b"msg-3"),
2411        );
2412
2413        let (helper_sender, mut helper_rx) = mpsc::channel(8);
2414        worker.add_helper_sender(helper_sender);
2415
2416        worker
2417            .handle_event(build_identified_event(
2418                remote_peer,
2419                remote_keys.public(),
2420                ConnectionId::new_unchecked(11),
2421            ))
2422            .await;
2423
2424        let mut received = Vec::new();
2425        for _ in 0..3 {
2426            let command = tokio::time::timeout(
2427                Duration::from_millis(200),
2428                helper_rx.recv(),
2429            )
2430            .await
2431            .expect("helper receive timeout")
2432            .expect("helper channel closed");
2433            let CommandHelper::ReceivedMessage { message, .. } = command else {
2434                panic!("unexpected helper command")
2435            };
2436            received.push(message);
2437        }
2438
2439        assert_eq!(
2440            received,
2441            vec![
2442                Bytes::from_static(b"msg-2"),
2443                Bytes::from_static(b"msg-1"),
2444                Bytes::from_static(b"msg-3"),
2445            ]
2446        );
2447        assert!(!worker.pending_inbound_messages.contains_key(&remote_peer));
2448    }
2449
2450    #[test(tokio::test)]
2451    #[serial]
2452    async fn flapping_connection_retries_then_flushes_outbound_queue() {
2453        let mut worker = build_worker(
2454            vec![],
2455            false,
2456            NodeType::Addressable,
2457            CancellationToken::new(),
2458            CancellationToken::new(),
2459            "/memory/3201".to_owned(),
2460        );
2461        let remote_keys = Libp2pKeypair::generate_ed25519();
2462        let remote_peer = remote_keys.public().to_peer_id();
2463        let first_connection = ConnectionId::new_unchecked(21);
2464        let second_connection = ConnectionId::new_unchecked(22);
2465
2466        worker.add_pending_outbound_message(
2467            remote_peer,
2468            Bytes::from_static(b"needs-redelivery"),
2469        );
2470        worker
2471            .peer_action
2472            .insert(remote_peer, Action::Identified(first_connection));
2473        worker.peer_identify.insert(remote_peer);
2474
2475        worker
2476            .handle_event(SwarmEvent::ConnectionClosed {
2477                peer_id: remote_peer,
2478                connection_id: first_connection,
2479                endpoint: test_endpoint(),
2480                num_established: 0,
2481                cause: None,
2482            })
2483            .await;
2484
2485        assert!(worker.retry_by_peer.contains_key(&remote_peer));
2486        assert!(matches!(
2487            worker.retry_by_peer.get(&remote_peer).map(|s| s.kind),
2488            Some(RetryKind::Dial)
2489        ));
2490        assert!(
2491            worker
2492                .pending_outbound_messages
2493                .get(&remote_peer)
2494                .is_some_and(|q| !q.is_empty())
2495        );
2496
2497        worker
2498            .handle_event(build_identified_event(
2499                remote_peer,
2500                remote_keys.public(),
2501                second_connection,
2502            ))
2503            .await;
2504
2505        assert!(!worker.pending_outbound_messages.contains_key(&remote_peer));
2506        assert!(!worker.retry_by_peer.contains_key(&remote_peer));
2507        assert!(matches!(
2508            worker.peer_action.get(&remote_peer),
2509            Some(Action::Identified(id)) if *id == second_connection
2510        ));
2511    }
2512
2513    #[test(tokio::test)]
2514    #[serial]
2515    async fn bootstrap_identify_timeout_keeps_bootnode_until_close() {
2516        let remote_keys = Libp2pKeypair::generate_ed25519();
2517        let remote_peer = remote_keys.public().to_peer_id();
2518        let boot_node = RoutingNode {
2519            peer_id: remote_peer.to_string(),
2520            address: vec!["/memory/3300".to_owned()],
2521        };
2522
2523        let mut worker = build_worker(
2524            vec![boot_node],
2525            false,
2526            NodeType::Addressable,
2527            CancellationToken::new(),
2528            CancellationToken::new(),
2529            "/memory/3301".to_owned(),
2530        );
2531
2532        assert!(worker.boot_nodes.contains_key(&remote_peer));
2533        assert!(!worker.retry_boot_nodes.contains_key(&remote_peer));
2534
2535        worker
2536            .handle_connection_events(SwarmEvent::Behaviour(
2537                BehaviourEvent::IdentifyError {
2538                    peer_id: remote_peer,
2539                    error: swarm::StreamUpgradeError::Timeout,
2540                },
2541            ))
2542            .await;
2543
2544        assert!(worker.boot_nodes.contains_key(&remote_peer));
2545        assert!(!worker.retry_boot_nodes.contains_key(&remote_peer));
2546    }
2547
2548    #[test(tokio::test)]
2549    #[serial]
2550    async fn test_no_boot_nodes() {
2551        let boot_nodes = vec![];
2552
2553        // Build a node.
2554        let node_addr = "/memory/3000";
2555        let mut node = build_worker(
2556            boot_nodes.clone(),
2557            false,
2558            NodeType::Addressable,
2559            CancellationToken::new(),
2560            CancellationToken::new(),
2561            node_addr.to_owned(),
2562        );
2563        if let Err(e) = node.run_connection().await {
2564            assert_eq!(
2565                e.to_string(),
2566                "cannot connect to the ave network: no reachable bootstrap node"
2567            );
2568        };
2569
2570        assert_eq!(node.state, NetworkState::Disconnected);
2571    }
2572
2573    #[test(tokio::test)]
2574    #[serial]
2575    async fn test_fake_boot_node() {
2576        let mut boot_nodes = vec![];
2577
2578        // Build a fake bootstrap node.
2579        let fake_boot_peer = PeerId::random();
2580        let fake_boot_addr = "/memory/3001";
2581        let fake_node = RoutingNode {
2582            peer_id: fake_boot_peer.to_string(),
2583            address: vec![fake_boot_addr.to_owned()],
2584        };
2585        boot_nodes.push(fake_node);
2586
2587        // Build a node.
2588        let node_addr = "/memory/3002";
2589        let mut node = build_worker(
2590            boot_nodes.clone(),
2591            false,
2592            NodeType::Addressable,
2593            CancellationToken::new(),
2594            CancellationToken::new(),
2595            node_addr.to_owned(),
2596        );
2597
2598        if let Err(e) = node.run_connection().await {
2599            assert_eq!(
2600                e.to_string(),
2601                "cannot connect to the ave network: no reachable bootstrap node"
2602            );
2603        };
2604
2605        assert_eq!(node.state, NetworkState::Disconnected);
2606    }
2607
2608    #[test(tokio::test)]
2609    #[serial]
2610    async fn test_connect() {
2611        let mut boot_nodes = vec![];
2612
2613        // Build a bootstrap node.
2614        let boot_addr = "/memory/3003";
2615        let mut boot = build_worker(
2616            boot_nodes.clone(),
2617            false,
2618            NodeType::Bootstrap,
2619            CancellationToken::new(),
2620            CancellationToken::new(),
2621            boot_addr.to_owned(),
2622        );
2623
2624        let boot_node = RoutingNode {
2625            peer_id: boot.local_peer_id().to_string(),
2626            address: vec![boot_addr.to_owned()],
2627        };
2628
2629        boot_nodes.push(boot_node);
2630
2631        // Build a node.
2632        let node_addr = "/memory/3004";
2633        let mut node = build_worker(
2634            boot_nodes,
2635            false,
2636            NodeType::Ephemeral,
2637            CancellationToken::new(),
2638            CancellationToken::new(),
2639            node_addr.to_owned(),
2640        );
2641
2642        // Spawn the boot node
2643        tokio::spawn(async move {
2644            boot.run_main().await;
2645        });
2646
2647        // Wait for connection.
2648        node.run_connection().await.unwrap();
2649    }
2650}