Skip to main content

ave_network/
control_list.rs

1use libp2p::{
2    Multiaddr, PeerId,
3    swarm::{
4        CloseConnection, ConnectionDenied, NetworkBehaviour, ToSwarm, dummy,
5    },
6};
7use serde::{Deserialize, Deserializer, Serialize};
8use std::{
9    collections::{HashSet, VecDeque},
10    fmt,
11    pin::Pin,
12    str::FromStr,
13    sync::Arc,
14    task::Poll,
15    time::Duration,
16};
17use tokio::{
18    sync::mpsc::{self, Receiver},
19    time::{Instant, MissedTickBehavior, interval},
20};
21use tokio_util::sync::CancellationToken;
22use tracing::{debug, warn};
23
24use crate::{
25    RoutingNode, metrics::NetworkMetrics, utils::request_update_lists,
26};
27
28const TARGET: &str = "ave::network::control";
29const fn default_request_timeout() -> Duration {
30    Duration::from_secs(5)
31}
32const fn default_max_concurrent_requests() -> usize {
33    8
34}
35
36/// Configuration for the control list behaviour.
37#[derive(Clone, Debug, Deserialize, Serialize)]
38#[serde(default)]
39pub struct Config {
40    /// Activate allow and block lists
41    enable: bool,
42
43    /// Nodes allowed to make and receive connections
44    allow_list: Vec<String>,
45
46    /// Nodes that are not allowed to make and receive connections
47    block_list: Vec<String>,
48
49    /// Services where the node will go to query the list of allowed nodes.
50    service_allow_list: Vec<String>,
51
52    /// Servicse where the node will go to query the list of blocked nodes.
53    service_block_list: Vec<String>,
54
55    /// Time interval to be used for queries updating the lists
56    #[serde(deserialize_with = "deserialize_duration_secs")]
57    interval_request: Duration,
58
59    /// Timeout for each allow/block list HTTP request.
60    #[serde(
61        default = "default_request_timeout",
62        deserialize_with = "deserialize_duration_secs"
63    )]
64    request_timeout: Duration,
65
66    /// Maximum number of concurrent HTTP requests when refreshing control lists.
67    /// `0` is treated as `1` (sequential requests).
68    #[serde(default = "default_max_concurrent_requests")]
69    max_concurrent_requests: usize,
70}
71
72fn deserialize_duration_secs<'de, D>(
73    deserializer: D,
74) -> Result<Duration, D::Error>
75where
76    D: Deserializer<'de>,
77{
78    let u: u64 = u64::deserialize(deserializer)?;
79    Ok(Duration::from_secs(u))
80}
81
82impl Default for Config {
83    fn default() -> Self {
84        Self {
85            enable: Default::default(),
86            allow_list: Default::default(),
87            block_list: Default::default(),
88            service_allow_list: Default::default(),
89            service_block_list: Default::default(),
90            interval_request: Duration::from_secs(60),
91            request_timeout: default_request_timeout(),
92            max_concurrent_requests: default_max_concurrent_requests(),
93        }
94    }
95}
96
97/// Control List Settings
98impl Config {
99    /// Set enable
100    pub const fn with_enable(mut self, enable: bool) -> Self {
101        self.enable = enable;
102        self
103    }
104
105    /// Set allow list
106    pub fn with_allow_list(mut self, allow_list: Vec<String>) -> Self {
107        self.allow_list = allow_list;
108        self
109    }
110
111    /// Set block list
112    pub fn with_block_list(mut self, block_list: Vec<String>) -> Self {
113        self.block_list = block_list;
114        self
115    }
116
117    /// Set Service list to consult allow list
118    pub fn with_service_allow_list(
119        mut self,
120        service_allow_list: Vec<String>,
121    ) -> Self {
122        self.service_allow_list = service_allow_list;
123        self
124    }
125
126    /// Set Service list to consult block list
127    pub fn with_service_block_list(
128        mut self,
129        service_block_list: Vec<String>,
130    ) -> Self {
131        self.service_block_list = service_block_list;
132        self
133    }
134
135    /// Set interval request
136    pub const fn with_interval_request(mut self, interval: Duration) -> Self {
137        self.interval_request = interval;
138        self
139    }
140
141    /// Set request timeout.
142    pub const fn with_request_timeout(mut self, timeout: Duration) -> Self {
143        self.request_timeout = timeout;
144        self
145    }
146
147    /// Set max concurrent requests. `0` is treated as `1` at runtime.
148    pub const fn with_max_concurrent_requests(mut self, value: usize) -> Self {
149        self.max_concurrent_requests = value;
150        self
151    }
152
153    /// Set interval request
154    pub const fn get_interval_request(&self) -> Duration {
155        self.interval_request
156    }
157
158    /// Get request timeout
159    pub const fn get_request_timeout(&self) -> Duration {
160        self.request_timeout
161    }
162
163    /// Get max concurrent requests configured value.
164    /// Runtime uses at least one concurrent request.
165    pub const fn get_max_concurrent_requests(&self) -> usize {
166        self.max_concurrent_requests
167    }
168
169    /// Get enable
170    pub const fn get_enable(&self) -> bool {
171        self.enable
172    }
173
174    /// Get allow list
175    pub fn get_allow_list(&self) -> Vec<String> {
176        self.allow_list.clone()
177    }
178
179    /// Get block list
180    pub fn get_block_list(&self) -> Vec<String> {
181        self.block_list.clone()
182    }
183
184    /// Get Service list to consult allow list
185    pub fn get_service_allow_list(&self) -> Vec<String> {
186        self.service_allow_list.clone()
187    }
188    /// Get Service list to consult block list
189    pub fn get_service_block_list(&self) -> Vec<String> {
190        self.service_block_list.clone()
191    }
192}
193
194pub fn build_control_lists_updaters(
195    config: &Config,
196    graceful_token: CancellationToken,
197    crash_token: CancellationToken,
198    metrics: Option<Arc<NetworkMetrics>>,
199) -> Option<Receiver<Event>> {
200    if config.enable {
201        debug!(target: TARGET, "control list enabled");
202
203        let (sender, receiver) = mpsc::channel(8);
204        let update_interval = config.interval_request;
205        let service_allow = config.service_allow_list.clone();
206        let service_block = config.service_block_list.clone();
207        let metrics_updater = metrics;
208        let request_timeout = config.request_timeout;
209        let max_concurrent_requests = config.max_concurrent_requests;
210
211        tokio::spawn(async move {
212            let client = match reqwest::Client::builder()
213                .connect_timeout(request_timeout)
214                .build()
215            {
216                Ok(client) => client,
217                Err(e) => {
218                    warn!(target: TARGET, error = %e, "failed to build control-list http client, falling back to default client");
219                    reqwest::Client::new()
220                }
221            };
222
223            let mut last_allow_success: Option<Instant> = None;
224            let mut last_block_success: Option<Instant> = None;
225            let mut ticker = interval(update_interval);
226            ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
227            // Keep previous semantics: first update happens after `interval`.
228            ticker.tick().await;
229            loop {
230                tokio::select! {
231                    _ = ticker.tick() => {
232                        let started_at = Instant::now();
233                        let (
234                    (vec_allow_peers, vec_block_peers),
235                    (successful_allow, successful_block),
236                ) = request_update_lists(
237                    client.clone(),
238                    service_allow.clone(),
239                    service_block.clone(),
240                    request_timeout,
241                    max_concurrent_requests,
242                    graceful_token.clone(),
243                    crash_token.clone()
244                )
245                .await;
246                        if let Some(metrics) = metrics_updater.as_deref() {
247                            metrics.observe_control_list_updater_duration_seconds(
248                                started_at.elapsed().as_secs_f64(),
249                            );
250                        }
251
252                        let now = Instant::now();
253
254                // If at least 1 update of the list was possible
255                if successful_allow != 0 {
256                    if let Some(metrics) = metrics_updater.as_deref() {
257                        metrics.observe_control_list_allow_update(true);
258                    }
259                    last_allow_success = Some(now);
260                    if let Err(e) = sender.send(Event::AllowListUpdated(vec_allow_peers)).await {
261                        debug!(target: TARGET, error = %e, "allow-list update dropped: channel closed");
262                    }
263                } else {
264                    if let Some(metrics) = metrics_updater.as_deref() {
265                        metrics.observe_control_list_allow_update(false);
266                    }
267                    warn!(target: TARGET, "allow-list not updated: no service responded successfully");
268                }
269
270                // If at least 1 update of the list was possible
271                if successful_block != 0 {
272                    if let Some(metrics) = metrics_updater.as_deref() {
273                        metrics.observe_control_list_block_update(true);
274                    }
275                    last_block_success = Some(now);
276                    if let Err(e) = sender.send(Event::BlockListUpdated(vec_block_peers)).await {
277                        debug!(target: TARGET, error = %e, "block-list update dropped: channel closed");
278                    }
279                } else {
280                    if let Some(metrics) = metrics_updater.as_deref() {
281                        metrics.observe_control_list_block_update(false);
282                    }
283                    warn!(target: TARGET, "block-list not updated: no service responded successfully");
284                }
285
286                if let Some(metrics) = metrics_updater.as_deref() {
287                    let allow_age = last_allow_success
288                        .map_or(-1, |t| now.duration_since(t).as_secs() as i64);
289                    metrics
290                        .set_control_list_allow_last_success_age_seconds(allow_age);
291
292                    let block_age = last_block_success
293                        .map_or(-1, |t| now.duration_since(t).as_secs() as i64);
294                    metrics
295                        .set_control_list_block_last_success_age_seconds(block_age);
296                }
297                    }
298                    _ = graceful_token.clone().cancelled_owned() => {
299                        debug!(target: TARGET, "control list updater stopped");
300                        break;
301                    }
302                    _ = crash_token.clone().cancelled_owned() => {
303                        debug!(target: TARGET, "control list updater stopped");
304                        break;
305                    }
306                };
307            }
308        });
309
310        Some(receiver)
311    } else {
312        None
313    }
314}
315
316#[derive(Default, Debug)]
317pub struct Behaviour {
318    allow_peers: HashSet<PeerId>,
319    block_peers: HashSet<PeerId>,
320    close_connections: VecDeque<PeerId>,
321    enable: bool,
322    receiver: Option<Receiver<Event>>,
323    metrics: Option<Arc<NetworkMetrics>>,
324}
325
326impl Behaviour {
327    /// Creates a new control list `Behaviour`.
328    pub fn new(
329        config: Config,
330        boot_nodes: &[RoutingNode],
331        receiver: Option<Receiver<Event>>,
332        metrics: Option<Arc<NetworkMetrics>>,
333    ) -> Self {
334        if config.enable {
335            let mut full_allow_list = config.allow_list.clone();
336            for node in boot_nodes {
337                full_allow_list.push(node.peer_id.clone());
338            }
339
340            let behaviour = Self {
341                enable: true,
342                allow_peers: HashSet::from_iter(
343                    full_allow_list
344                        .iter()
345                        .filter_map(|e| PeerId::from_str(e).ok()),
346                ),
347                block_peers: HashSet::from_iter(
348                    config
349                        .block_list
350                        .iter()
351                        .filter_map(|e| PeerId::from_str(e).ok()),
352                ),
353                receiver,
354                metrics,
355                ..Default::default()
356            };
357
358            if let Some(metrics) = behaviour.metrics.as_deref() {
359                metrics.set_control_list_allow_peers(
360                    behaviour.allow_peers.len() as i64,
361                );
362                metrics.set_control_list_block_peers(
363                    behaviour.block_peers.len() as i64,
364                );
365                metrics.set_control_list_allow_last_success_age_seconds(-1);
366                metrics.set_control_list_block_last_success_age_seconds(-1);
367            }
368
369            behaviour
370        } else {
371            let behaviour = Self {
372                metrics,
373                ..Default::default()
374            };
375
376            if let Some(metrics) = behaviour.metrics.as_deref() {
377                metrics.set_control_list_allow_peers(0);
378                metrics.set_control_list_block_peers(0);
379                metrics.set_control_list_allow_last_success_age_seconds(-1);
380                metrics.set_control_list_block_last_success_age_seconds(-1);
381            }
382
383            behaviour
384        }
385    }
386
387    /// Method that update allow list
388    fn update_allow_peers(&mut self, new_list: &[String]) {
389        // New hashset of allow list.
390        let new_list: HashSet<PeerId> = HashSet::from_iter(
391            new_list
392                .to_vec()
393                .iter()
394                .filter_map(|e| PeerId::from_str(e).ok()),
395        );
396
397        let close_peers: Vec<PeerId> =
398            self.allow_peers.difference(&new_list).cloned().collect();
399        self.close_connections.extend(close_peers);
400        self.allow_peers.clone_from(&new_list);
401        if let Some(metrics) = self.metrics.as_deref() {
402            metrics.inc_control_list_allow_apply();
403            metrics.set_control_list_allow_peers(self.allow_peers.len() as i64);
404        }
405    }
406
407    /// Method that update block list
408    fn update_block_peers(&mut self, new_list: &[String]) {
409        // New hashset of block list.
410        let new_list: HashSet<PeerId> = HashSet::from_iter(
411            new_list
412                .to_vec()
413                .iter()
414                .filter_map(|e| PeerId::from_str(e).ok()),
415        );
416
417        self.close_connections.extend(new_list.clone());
418        self.block_peers.clone_from(&new_list);
419        if let Some(metrics) = self.metrics.as_deref() {
420            metrics.inc_control_list_block_apply();
421            metrics.set_control_list_block_peers(self.block_peers.len() as i64);
422        }
423    }
424
425    /// Method that check if a peer is in allow list
426    fn check_allow(&self, peer: &PeerId) -> Result<(), ConnectionDenied> {
427        if self.allow_peers.contains(peer) {
428            return Ok(());
429        }
430
431        if let Some(metrics) = &self.metrics {
432            metrics.observe_control_list_denied("not_allowed");
433        }
434        debug!(target: TARGET, peer_id = %peer, "connection denied: peer not in allow list");
435        Err(ConnectionDenied::new(NotAllowed { peer: *peer }))
436    }
437
438    /// Method that check if a peer is in block list
439    fn check_block(&self, peer: &PeerId) -> Result<(), ConnectionDenied> {
440        if !self.block_peers.contains(peer) {
441            return Ok(());
442        }
443
444        if let Some(metrics) = &self.metrics {
445            metrics.observe_control_list_denied("blocked");
446        }
447        debug!(target: TARGET, peer_id = %peer, "connection denied: peer is blocked");
448        Err(ConnectionDenied::new(Blocked { peer: *peer }))
449    }
450
451    /// Method that check all List
452    fn check_lists(&self, peer: &PeerId) -> Result<(), ConnectionDenied> {
453        if self.enable {
454            self.check_block(peer)?;
455            self.check_allow(peer)?;
456        }
457
458        Ok(())
459    }
460}
461
462/// A connection to this peer is not explicitly allowed and was thus [`denied`](ConnectionDenied).
463#[derive(Debug)]
464pub struct NotAllowed {
465    peer: PeerId,
466}
467
468impl fmt::Display for NotAllowed {
469    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
470        write!(f, "peer {} is not in the allow list", self.peer)
471    }
472}
473
474impl std::error::Error for NotAllowed {}
475
476/// A connection to this peer was explicitly blocked and was thus [`denied`](ConnectionDenied).
477#[derive(Debug)]
478pub struct Blocked {
479    peer: PeerId,
480}
481
482impl fmt::Display for Blocked {
483    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
484        write!(f, "peer {} is in the block list", self.peer)
485    }
486}
487
488impl std::error::Error for Blocked {}
489
490/// Event Struct for implement control list Behaviour in main Behaviour
491#[derive(Debug)]
492pub enum Event {
493    AllowListUpdated(Vec<String>),
494    BlockListUpdated(Vec<String>),
495}
496
497impl NetworkBehaviour for Behaviour {
498    type ConnectionHandler = dummy::ConnectionHandler;
499    type ToSwarm = Event;
500
501    fn handle_established_inbound_connection(
502        &mut self,
503        _connection_id: libp2p::swarm::ConnectionId,
504        peer: PeerId,
505        _: &libp2p::Multiaddr,
506        _: &libp2p::Multiaddr,
507    ) -> Result<libp2p::swarm::THandler<Self>, ConnectionDenied> {
508        self.check_lists(&peer)?;
509
510        Ok(dummy::ConnectionHandler)
511    }
512
513    fn handle_pending_outbound_connection(
514        &mut self,
515        _: libp2p::swarm::ConnectionId,
516        peer: Option<PeerId>,
517        _: &[libp2p::Multiaddr],
518        _: libp2p::core::Endpoint,
519    ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
520        if let Some(peer) = peer {
521            self.check_lists(&peer)?;
522        }
523
524        Ok(vec![])
525    }
526
527    fn handle_established_outbound_connection(
528        &mut self,
529        _: libp2p::swarm::ConnectionId,
530        peer: PeerId,
531        _: &libp2p::Multiaddr,
532        _: libp2p::core::Endpoint,
533        _: libp2p::core::transport::PortUse,
534    ) -> Result<libp2p::swarm::THandler<Self>, ConnectionDenied> {
535        self.check_lists(&peer)?;
536
537        Ok(dummy::ConnectionHandler)
538    }
539
540    fn on_swarm_event(&mut self, _: libp2p::swarm::FromSwarm) {}
541
542    fn on_connection_handler_event(
543        &mut self,
544        _: PeerId,
545        _: libp2p::swarm::ConnectionId,
546        _: libp2p::swarm::THandlerOutEvent<Self>,
547    ) {
548    }
549
550    fn poll(
551        &mut self,
552        cx: &mut std::task::Context<'_>,
553    ) -> std::task::Poll<
554        libp2p::swarm::ToSwarm<
555            Self::ToSwarm,
556            libp2p::swarm::THandlerInEvent<Self>,
557        >,
558    > {
559        let mut receiver_opt = self.receiver.take();
560        if let Some(mut rx) = receiver_opt.as_mut() {
561            let mut cx = std::task::Context::from_waker(cx.waker());
562            while let Poll::Ready(Some(event)) =
563                Pin::new(&mut rx).poll_recv(&mut cx)
564            {
565                match event {
566                    Event::AllowListUpdated(items) => {
567                        self.update_allow_peers(&items)
568                    }
569                    Event::BlockListUpdated(items) => {
570                        self.update_block_peers(&items)
571                    }
572                }
573            }
574        }
575
576        self.receiver = receiver_opt;
577
578        if let Some(peer) = self.close_connections.pop_front() {
579            return Poll::Ready(ToSwarm::CloseConnection {
580                peer_id: peer,
581                connection: CloseConnection::All,
582            });
583        }
584
585        Poll::Pending
586    }
587}
588
589#[cfg(test)]
590mod tests {
591    use futures::StreamExt;
592    use libp2p::{
593        Swarm,
594        swarm::{
595            ConnectionError, DialError, ListenError, SwarmEvent,
596            dial_opts::DialOpts,
597        },
598    };
599    use libp2p_swarm_test::SwarmExt;
600    use prometheus_client::{encoding::text::encode, registry::Registry};
601    use serial_test::serial;
602    use test_log::test;
603    use tokio::{io::AsyncWriteExt, net::TcpListener, time::timeout};
604
605    use super::*;
606
607    fn metric_value(metrics: &str, name: &str) -> f64 {
608        metrics
609            .lines()
610            .find_map(|line| {
611                if line.starts_with(name) {
612                    line.split_whitespace().nth(1)?.parse::<f64>().ok()
613                } else {
614                    None
615                }
616            })
617            .unwrap_or(0.0)
618    }
619
620    impl Behaviour {
621        pub fn block_peer(&mut self, peer: PeerId) {
622            self.block_peers.insert(peer);
623            self.close_connections.push_back(peer);
624        }
625
626        pub fn allow_peer(&mut self, peer: PeerId) {
627            self.allow_peers.insert(peer);
628        }
629        pub fn set_enable(&mut self, enable: bool) {
630            self.enable = enable;
631        }
632    }
633
634    fn dial(
635        dialer: &mut Swarm<Behaviour>,
636        listener: &Swarm<Behaviour>,
637    ) -> Result<(), DialError> {
638        dialer.dial(
639            DialOpts::peer_id(*listener.local_peer_id())
640                .addresses(listener.external_addresses().cloned().collect())
641                .build(),
642        )
643    }
644
645    fn build_behaviours() -> (Swarm<Behaviour>, Swarm<Behaviour>) {
646        let mut behaviour = Behaviour::default();
647        behaviour.set_enable(true);
648        let dialer = Swarm::new_ephemeral_tokio(|_| behaviour);
649
650        let mut behaviour = Behaviour::default();
651        behaviour.set_enable(true);
652        let listener = Swarm::new_ephemeral_tokio(|_| behaviour);
653
654        (dialer, listener)
655    }
656
657    async fn spawn_slow_json_service(
658        delay: Duration,
659    ) -> (String, CancellationToken) {
660        let listener = TcpListener::bind("127.0.0.1:0")
661            .await
662            .expect("bind slow service");
663        let addr = listener.local_addr().expect("local addr");
664        let stop = CancellationToken::new();
665        let stop_task = stop.clone();
666
667        tokio::spawn(async move {
668            loop {
669                tokio::select! {
670                    _ = stop_task.cancelled() => break,
671                    incoming = listener.accept() => {
672                        let Ok((mut socket, _)) = incoming else {
673                            break;
674                        };
675                        tokio::spawn(async move {
676                            tokio::time::sleep(delay).await;
677                            let response = b"HTTP/1.1 200 OK\r\ncontent-type: application/json\r\ncontent-length: 2\r\nconnection: close\r\n\r\n[]";
678                            let _ = socket.write_all(response).await;
679                            let _ = socket.shutdown().await;
680                        });
681                    }
682                }
683            }
684        });
685
686        (format!("http://{addr}/list"), stop)
687    }
688
689    #[test(tokio::test)]
690    #[serial]
691    async fn cannot_dial_blocked_peer() {
692        let (mut dialer, mut listener) = build_behaviours();
693
694        listener.listen().with_memory_addr_external().await;
695
696        dialer.behaviour_mut().block_peer(*listener.local_peer_id());
697
698        let DialError::Denied { cause } =
699            dial(&mut dialer, &listener).unwrap_err()
700        else {
701            panic!("unexpected dial error")
702        };
703        assert!(cause.downcast::<Blocked>().is_ok());
704    }
705
706    #[test(tokio::test)]
707    #[serial]
708    async fn cannot_dial_not_allowed_peer() {
709        let (mut dialer, mut listener) = build_behaviours();
710
711        listener.listen().with_memory_addr_external().await;
712
713        let DialError::Denied { cause } =
714            dial(&mut dialer, &listener).unwrap_err()
715        else {
716            panic!("unexpected dial error")
717        };
718        assert!(cause.downcast::<NotAllowed>().is_ok());
719    }
720
721    #[test(tokio::test)]
722    #[serial]
723    async fn can_dial_allowed_not_blocked_peer() {
724        let (mut dialer, mut listener) = build_behaviours();
725
726        listener.listen().with_memory_addr_external().await;
727
728        dialer.behaviour_mut().allow_peer(*listener.local_peer_id());
729
730        dial(&mut dialer, &listener).unwrap();
731    }
732
733    #[test(tokio::test)]
734    #[serial]
735    async fn cannot_dial_allowed_blocked_peer() {
736        let (mut dialer, mut listener) = build_behaviours();
737        listener.listen().with_memory_addr_external().await;
738
739        dialer.behaviour_mut().block_peer(*listener.local_peer_id());
740        dialer.behaviour_mut().allow_peer(*listener.local_peer_id());
741
742        let DialError::Denied { cause } =
743            dial(&mut dialer, &listener).unwrap_err()
744        else {
745            panic!("unexpected dial error")
746        };
747        assert!(cause.downcast::<Blocked>().is_ok());
748    }
749
750    #[test(tokio::test)]
751    #[serial]
752    async fn blocked_peer_cannot_dial_us() {
753        let (mut dialer, mut listener) = build_behaviours();
754        listener.listen().with_memory_addr_external().await;
755
756        dialer.behaviour_mut().allow_peer(*listener.local_peer_id());
757        listener.behaviour_mut().block_peer(*dialer.local_peer_id());
758
759        dial(&mut dialer, &listener).unwrap();
760        tokio::spawn(dialer.loop_on_next());
761
762        let cause = listener
763            .wait(|e| match e {
764                SwarmEvent::IncomingConnectionError {
765                    error: ListenError::Denied { cause },
766                    ..
767                } => Some(cause),
768                _ => None,
769            })
770            .await;
771        assert!(cause.downcast::<Blocked>().is_ok());
772    }
773
774    #[test(tokio::test)]
775    #[serial]
776    async fn not_allowed_peer_cannot_dial_us() {
777        let (mut dialer, mut listener) = build_behaviours();
778        listener.listen().with_memory_addr_external().await;
779
780        dialer.behaviour_mut().allow_peer(*listener.local_peer_id());
781
782        dial(&mut dialer, &listener).unwrap();
783
784        let listener_loop = async move {
785            loop {
786                match listener.select_next_some().await {
787                    SwarmEvent::IncomingConnectionError { error, .. } => {
788                        let ListenError::Denied { cause } = error else {
789                            panic!("Invalid Error")
790                        };
791                        assert!(cause.downcast::<NotAllowed>().is_ok());
792                        break;
793                    }
794                    _ => {}
795                }
796            }
797        };
798
799        let dialer_loop = async move {
800            loop {
801                match dialer.select_next_some().await {
802                    SwarmEvent::ConnectionClosed { cause, .. } => {
803                        if let Some(error) = cause {
804                            match error {
805                                ConnectionError::IO(e) => {
806                                    assert_eq!(
807                                        e.to_string(),
808                                        "Right(Io(Kind(BrokenPipe)))"
809                                    );
810                                    break;
811                                }
812                                _ => {
813                                    panic!("Invalid error");
814                                }
815                            }
816                        } else {
817                            panic!("Missing error");
818                        };
819                    }
820                    _ => {}
821                }
822            }
823        };
824        tokio::task::spawn(Box::pin(dialer_loop));
825        listener_loop.await;
826    }
827
828    #[test(tokio::test)]
829    #[serial]
830    async fn connections_get_closed_upon_disallow() {
831        let (mut dialer, mut listener) = build_behaviours();
832        listener.listen().with_memory_addr_external().await;
833
834        dialer.behaviour_mut().allow_peer(*listener.local_peer_id());
835        listener.behaviour_mut().allow_peer(*dialer.local_peer_id());
836        let dialer_peer = *dialer.local_peer_id();
837
838        dial(&mut dialer, &listener).unwrap();
839
840        let listener_loop = async move {
841            loop {
842                match listener.select_next_some().await {
843                    SwarmEvent::ConnectionEstablished { .. } => {
844                        listener.behaviour_mut().block_peer(dialer_peer);
845                    }
846                    SwarmEvent::ConnectionClosed { .. } => {
847                        break;
848                    }
849                    _ => {}
850                }
851            }
852        };
853
854        let dialer_loop = async move {
855            loop {
856                match dialer.select_next_some().await {
857                    SwarmEvent::ConnectionEstablished { .. } => {}
858                    SwarmEvent::ConnectionClosed { cause, .. } => {
859                        if let Some(error) = cause {
860                            match error {
861                                ConnectionError::IO(e) => {
862                                    assert_eq!(e.to_string(), "Right(Closed)");
863                                    break;
864                                }
865                                _ => {
866                                    panic!("Invalid error");
867                                }
868                            }
869                        } else {
870                            panic!("Missing error");
871                        };
872                    }
873                    _ => {}
874                }
875            }
876        };
877
878        tokio::task::spawn(Box::pin(dialer_loop));
879        listener_loop.await;
880    }
881
882    #[test]
883    fn control_list_denied_metrics_by_reason() {
884        let mut registry = Registry::default();
885        let metrics = crate::metrics::register(&mut registry);
886
887        let config = Config::default().with_enable(true);
888        let behaviour = Behaviour::new(config, &[], None, Some(metrics));
889
890        let blocked_peer = PeerId::random();
891        let not_allowed_peer = PeerId::random();
892
893        let mut behaviour = behaviour;
894        behaviour.block_peers.insert(blocked_peer);
895
896        let _ = behaviour.check_block(&blocked_peer);
897        let _ = behaviour.check_allow(&not_allowed_peer);
898
899        let mut text = String::new();
900        encode(&mut text, &registry).expect("encode metrics");
901
902        assert_eq!(
903            metric_value(
904                &text,
905                "network_control_list_denied_total{reason=\"blocked\"}"
906            ),
907            1.0
908        );
909        assert_eq!(
910            metric_value(
911                &text,
912                "network_control_list_denied_total{reason=\"not_allowed\"}"
913            ),
914            1.0
915        );
916    }
917
918    #[test(tokio::test)]
919    #[serial]
920    async fn slow_services_timeout_without_emitting_updates() {
921        let (url, slow_server_stop) =
922            spawn_slow_json_service(Duration::from_millis(250)).await;
923        let mut registry = Registry::default();
924        let metrics = crate::metrics::register(&mut registry);
925        let cancel = CancellationToken::new();
926
927        let config = Config::default()
928            .with_enable(true)
929            .with_interval_request(Duration::from_millis(20))
930            .with_request_timeout(Duration::from_millis(30))
931            .with_max_concurrent_requests(1)
932            .with_service_allow_list(vec![url.clone()])
933            .with_service_block_list(vec![url]);
934
935        let mut receiver = build_control_lists_updaters(
936            &config,
937            cancel.clone(),
938            CancellationToken::new(),
939            Some(metrics),
940        )
941        .expect("control-list updater receiver");
942
943        tokio::time::sleep(Duration::from_millis(170)).await;
944
945        let next_event =
946            timeout(Duration::from_millis(50), receiver.recv()).await;
947        assert!(
948            next_event.is_err(),
949            "slow timed-out services should not emit list updates"
950        );
951
952        let mut text = String::new();
953        encode(&mut text, &registry).expect("encode metrics");
954
955        assert!(
956            metric_value(
957                &text,
958                "network_control_list_updates_total{list=\"allow\",result=\"failure\"}"
959            ) >= 1.0
960        );
961        assert!(
962            metric_value(
963                &text,
964                "network_control_list_updates_total{list=\"block\",result=\"failure\"}"
965            ) >= 1.0
966        );
967        assert_eq!(
968            metric_value(
969                &text,
970                "network_control_list_updates_total{list=\"allow\",result=\"success\"}"
971            ),
972            0.0
973        );
974        assert_eq!(
975            metric_value(
976                &text,
977                "network_control_list_updates_total{list=\"block\",result=\"success\"}"
978            ),
979            0.0
980        );
981
982        cancel.cancel();
983        slow_server_stop.cancel();
984    }
985
986    #[test(tokio::test)]
987    #[serial]
988    async fn zero_max_concurrent_requests_is_treated_as_one() {
989        let (url, server_stop) =
990            spawn_slow_json_service(Duration::from_millis(1)).await;
991        let mut registry = Registry::default();
992        let metrics = crate::metrics::register(&mut registry);
993        let cancel = CancellationToken::new();
994
995        let config = Config::default()
996            .with_enable(true)
997            .with_interval_request(Duration::from_millis(20))
998            .with_request_timeout(Duration::from_millis(200))
999            .with_max_concurrent_requests(0)
1000            .with_service_allow_list(vec![url.clone()])
1001            .with_service_block_list(vec![url]);
1002
1003        let mut receiver = build_control_lists_updaters(
1004            &config,
1005            cancel.clone(),
1006            CancellationToken::new(),
1007            Some(metrics),
1008        )
1009        .expect("control-list updater receiver");
1010
1011        let mut got_allow = false;
1012        let mut got_block = false;
1013        for _ in 0..20 {
1014            let event =
1015                timeout(Duration::from_millis(60), receiver.recv()).await;
1016            if let Ok(Some(event)) = event {
1017                match event {
1018                    Event::AllowListUpdated(_) => got_allow = true,
1019                    Event::BlockListUpdated(_) => got_block = true,
1020                }
1021            }
1022            if got_allow && got_block {
1023                break;
1024            }
1025        }
1026
1027        assert!(got_allow, "allow-list update should be emitted");
1028        assert!(got_block, "block-list update should be emitted");
1029
1030        let mut text = String::new();
1031        encode(&mut text, &registry).expect("encode metrics");
1032        assert!(
1033            metric_value(
1034                &text,
1035                "network_control_list_updates_total{list=\"allow\",result=\"success\"}"
1036            ) >= 1.0
1037        );
1038        assert!(
1039            metric_value(
1040                &text,
1041                "network_control_list_updates_total{list=\"block\",result=\"success\"}"
1042            ) >= 1.0
1043        );
1044
1045        cancel.cancel();
1046        server_stop.cancel();
1047    }
1048
1049    #[test(tokio::test)]
1050    #[serial]
1051    async fn cancellation_stops_updater_during_slow_requests() {
1052        let (url, server_stop) =
1053            spawn_slow_json_service(Duration::from_secs(2)).await;
1054        let cancel = CancellationToken::new();
1055
1056        let config = Config::default()
1057            .with_enable(true)
1058            .with_interval_request(Duration::from_millis(10))
1059            .with_request_timeout(Duration::from_secs(5))
1060            .with_max_concurrent_requests(1)
1061            .with_service_allow_list(vec![url.clone()])
1062            .with_service_block_list(vec![url]);
1063
1064        let mut receiver = build_control_lists_updaters(
1065            &config,
1066            cancel.clone(),
1067            CancellationToken::new(),
1068            None,
1069        )
1070        .expect("control-list updater receiver");
1071
1072        tokio::time::sleep(Duration::from_millis(40)).await;
1073        cancel.cancel();
1074
1075        let closed = timeout(Duration::from_secs(1), async {
1076            loop {
1077                if receiver.recv().await.is_none() {
1078                    break;
1079                }
1080            }
1081        })
1082        .await;
1083
1084        assert!(
1085            closed.is_ok(),
1086            "updater should stop and close channel after cancellation"
1087        );
1088
1089        server_stop.cancel();
1090    }
1091}