Skip to main content

ave_network/
control_list.rs

1use libp2p::{
2    Multiaddr, PeerId,
3    swarm::{
4        CloseConnection, ConnectionDenied, NetworkBehaviour, ToSwarm, dummy,
5    },
6};
7use serde::{Deserialize, Deserializer, Serialize};
8use std::{
9    collections::{HashSet, VecDeque},
10    fmt,
11    pin::Pin,
12    str::FromStr,
13    sync::Arc,
14    task::Poll,
15    time::Duration,
16};
17use tokio::{
18    sync::mpsc::{self, Receiver},
19    time::{Instant, MissedTickBehavior, interval},
20};
21use tokio_util::sync::CancellationToken;
22use tracing::{debug, warn};
23
24use crate::{
25    RoutingNode, metrics::NetworkMetrics, utils::request_update_lists,
26};
27
28const TARGET: &str = "ave::network::control";
29const fn default_request_timeout() -> Duration {
30    Duration::from_secs(5)
31}
32const fn default_max_concurrent_requests() -> usize {
33    8
34}
35
36/// Configuration for the control list behaviour.
37#[derive(Clone, Debug, Deserialize, Serialize)]
38#[serde(default)]
39pub struct Config {
40    /// Activate allow and block lists
41    enable: bool,
42
43    /// Nodes allowed to make and receive connections
44    allow_list: Vec<String>,
45
46    /// Nodes that are not allowed to make and receive connections
47    block_list: Vec<String>,
48
49    /// Services where the node will go to query the list of allowed nodes.
50    service_allow_list: Vec<String>,
51
52    /// Servicse where the node will go to query the list of blocked nodes.
53    service_block_list: Vec<String>,
54
55    /// Time interval to be used for queries updating the lists
56    #[serde(deserialize_with = "deserialize_duration_secs")]
57    interval_request: Duration,
58
59    /// Timeout for each allow/block list HTTP request.
60    #[serde(
61        default = "default_request_timeout",
62        deserialize_with = "deserialize_duration_secs"
63    )]
64    request_timeout: Duration,
65
66    /// Maximum number of concurrent HTTP requests when refreshing control lists.
67    /// `0` is treated as `1` (sequential requests).
68    #[serde(default = "default_max_concurrent_requests")]
69    max_concurrent_requests: usize,
70}
71
72fn deserialize_duration_secs<'de, D>(
73    deserializer: D,
74) -> Result<Duration, D::Error>
75where
76    D: Deserializer<'de>,
77{
78    let u: u64 = u64::deserialize(deserializer)?;
79    Ok(Duration::from_secs(u))
80}
81
82impl Default for Config {
83    fn default() -> Self {
84        Self {
85            enable: Default::default(),
86            allow_list: Default::default(),
87            block_list: Default::default(),
88            service_allow_list: Default::default(),
89            service_block_list: Default::default(),
90            interval_request: Duration::from_secs(60),
91            request_timeout: default_request_timeout(),
92            max_concurrent_requests: default_max_concurrent_requests(),
93        }
94    }
95}
96
97/// Control List Settings
98impl Config {
99    /// Set enable
100    pub const fn with_enable(mut self, enable: bool) -> Self {
101        self.enable = enable;
102        self
103    }
104
105    /// Set allow list
106    pub fn with_allow_list(mut self, allow_list: Vec<String>) -> Self {
107        self.allow_list = allow_list;
108        self
109    }
110
111    /// Set block list
112    pub fn with_block_list(mut self, block_list: Vec<String>) -> Self {
113        self.block_list = block_list;
114        self
115    }
116
117    /// Set Service list to consult allow list
118    pub fn with_service_allow_list(
119        mut self,
120        service_allow_list: Vec<String>,
121    ) -> Self {
122        self.service_allow_list = service_allow_list;
123        self
124    }
125
126    /// Set Service list to consult block list
127    pub fn with_service_block_list(
128        mut self,
129        service_block_list: Vec<String>,
130    ) -> Self {
131        self.service_block_list = service_block_list;
132        self
133    }
134
135    /// Set interval request
136    pub const fn with_interval_request(mut self, interval: Duration) -> Self {
137        self.interval_request = interval;
138        self
139    }
140
141    /// Set request timeout.
142    pub const fn with_request_timeout(mut self, timeout: Duration) -> Self {
143        self.request_timeout = timeout;
144        self
145    }
146
147    /// Set max concurrent requests. `0` is treated as `1` at runtime.
148    pub const fn with_max_concurrent_requests(mut self, value: usize) -> Self {
149        self.max_concurrent_requests = value;
150        self
151    }
152
153    /// Set interval request
154    pub const fn get_interval_request(&self) -> Duration {
155        self.interval_request
156    }
157
158    /// Get request timeout
159    pub const fn get_request_timeout(&self) -> Duration {
160        self.request_timeout
161    }
162
163    /// Get max concurrent requests configured value.
164    /// Runtime uses at least one concurrent request.
165    pub const fn get_max_concurrent_requests(&self) -> usize {
166        self.max_concurrent_requests
167    }
168
169    /// Get enable
170    pub const fn get_enable(&self) -> bool {
171        self.enable
172    }
173
174    /// Get allow list
175    pub fn get_allow_list(&self) -> Vec<String> {
176        self.allow_list.clone()
177    }
178
179    /// Get block list
180    pub fn get_block_list(&self) -> Vec<String> {
181        self.block_list.clone()
182    }
183
184    /// Get Service list to consult allow list
185    pub fn get_service_allow_list(&self) -> Vec<String> {
186        self.service_allow_list.clone()
187    }
188    /// Get Service list to consult block list
189    pub fn get_service_block_list(&self) -> Vec<String> {
190        self.service_block_list.clone()
191    }
192}
193
194pub fn build_control_lists_updaters(
195    config: &Config,
196    graceful_token: CancellationToken,
197    crash_token: CancellationToken,
198    metrics: Option<Arc<NetworkMetrics>>,
199) -> Option<Receiver<Event>> {
200    if config.enable {
201        debug!(target: TARGET, "control list enabled");
202
203        let (sender, receiver) = mpsc::channel(8);
204        let update_interval = config.interval_request;
205        let service_allow = config.service_allow_list.clone();
206        let service_block = config.service_block_list.clone();
207        let metrics_updater = metrics;
208        let request_timeout = config.request_timeout;
209        let max_concurrent_requests = config.max_concurrent_requests;
210
211        tokio::spawn(async move {
212            let client = match reqwest::Client::builder()
213                .connect_timeout(request_timeout)
214                .build()
215            {
216                Ok(client) => client,
217                Err(e) => {
218                    warn!(target: TARGET, error = %e, "failed to build control-list http client, falling back to default client");
219                    reqwest::Client::new()
220                }
221            };
222
223            let mut last_allow_success: Option<Instant> = None;
224            let mut last_block_success: Option<Instant> = None;
225            let mut ticker = interval(update_interval);
226            ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
227            // Keep previous semantics: first update happens after `interval`.
228            ticker.tick().await;
229            loop {
230                tokio::select! {
231                    _ = ticker.tick() => {
232                        if let Some(metrics) = metrics_updater.as_deref() {
233                            metrics.inc_control_list_updater_run();
234                        }
235                        let started_at = Instant::now();
236                        let (
237                    (vec_allow_peers, vec_block_peers),
238                    (successful_allow, successful_block),
239                ) = request_update_lists(
240                    client.clone(),
241                    service_allow.clone(),
242                    service_block.clone(),
243                    request_timeout,
244                    max_concurrent_requests,
245                    graceful_token.clone(),
246                    crash_token.clone()
247                )
248                .await;
249                        if let Some(metrics) = metrics_updater.as_deref() {
250                            metrics.observe_control_list_updater_duration_seconds(
251                                started_at.elapsed().as_secs_f64(),
252                            );
253                        }
254
255                        let now = Instant::now();
256
257                // If at least 1 update of the list was possible
258                if successful_allow != 0 {
259                    if let Some(metrics) = metrics_updater.as_deref() {
260                        metrics.observe_control_list_allow_update(true);
261                    }
262                    last_allow_success = Some(now);
263                    if let Err(e) = sender.send(Event::AllowListUpdated(vec_allow_peers)).await {
264                        debug!(target: TARGET, error = %e, "allow-list update dropped: channel closed");
265                    }
266                } else {
267                    if let Some(metrics) = metrics_updater.as_deref() {
268                        metrics.observe_control_list_allow_update(false);
269                    }
270                    warn!(target: TARGET, "allow-list not updated: no service responded successfully");
271                }
272
273                // If at least 1 update of the list was possible
274                if successful_block != 0 {
275                    if let Some(metrics) = metrics_updater.as_deref() {
276                        metrics.observe_control_list_block_update(true);
277                    }
278                    last_block_success = Some(now);
279                    if let Err(e) = sender.send(Event::BlockListUpdated(vec_block_peers)).await {
280                        debug!(target: TARGET, error = %e, "block-list update dropped: channel closed");
281                    }
282                } else {
283                    if let Some(metrics) = metrics_updater.as_deref() {
284                        metrics.observe_control_list_block_update(false);
285                    }
286                    warn!(target: TARGET, "block-list not updated: no service responded successfully");
287                }
288
289                if let Some(metrics) = metrics_updater.as_deref() {
290                    let allow_age = last_allow_success
291                        .map_or(-1, |t| now.duration_since(t).as_secs() as i64);
292                    metrics
293                        .set_control_list_allow_last_success_age_seconds(allow_age);
294
295                    let block_age = last_block_success
296                        .map_or(-1, |t| now.duration_since(t).as_secs() as i64);
297                    metrics
298                        .set_control_list_block_last_success_age_seconds(block_age);
299                }
300                    }
301                    _ = graceful_token.clone().cancelled_owned() => {
302                        debug!(target: TARGET, "control list updater stopped");
303                        break;
304                    }
305                    _ = crash_token.clone().cancelled_owned() => {
306                        debug!(target: TARGET, "control list updater stopped");
307                        break;
308                    }
309                };
310            }
311        });
312
313        Some(receiver)
314    } else {
315        None
316    }
317}
318
319#[derive(Default, Debug)]
320pub struct Behaviour {
321    allow_peers: HashSet<PeerId>,
322    block_peers: HashSet<PeerId>,
323    close_connections: VecDeque<PeerId>,
324    enable: bool,
325    receiver: Option<Receiver<Event>>,
326    metrics: Option<Arc<NetworkMetrics>>,
327}
328
329impl Behaviour {
330    /// Creates a new control list `Behaviour`.
331    pub fn new(
332        config: Config,
333        boot_nodes: &[RoutingNode],
334        receiver: Option<Receiver<Event>>,
335        metrics: Option<Arc<NetworkMetrics>>,
336    ) -> Self {
337        if config.enable {
338            let mut full_allow_list = config.allow_list.clone();
339            for node in boot_nodes {
340                full_allow_list.push(node.peer_id.clone());
341            }
342
343            let behaviour = Self {
344                enable: true,
345                allow_peers: HashSet::from_iter(
346                    full_allow_list
347                        .iter()
348                        .filter_map(|e| PeerId::from_str(e).ok()),
349                ),
350                block_peers: HashSet::from_iter(
351                    config
352                        .block_list
353                        .iter()
354                        .filter_map(|e| PeerId::from_str(e).ok()),
355                ),
356                receiver,
357                metrics,
358                ..Default::default()
359            };
360
361            if let Some(metrics) = behaviour.metrics.as_deref() {
362                metrics.set_control_list_allow_peers(
363                    behaviour.allow_peers.len() as i64,
364                );
365                metrics.set_control_list_block_peers(
366                    behaviour.block_peers.len() as i64,
367                );
368                metrics.set_control_list_allow_last_success_age_seconds(-1);
369                metrics.set_control_list_block_last_success_age_seconds(-1);
370            }
371
372            behaviour
373        } else {
374            let behaviour = Self {
375                metrics,
376                ..Default::default()
377            };
378
379            if let Some(metrics) = behaviour.metrics.as_deref() {
380                metrics.set_control_list_allow_peers(0);
381                metrics.set_control_list_block_peers(0);
382                metrics.set_control_list_allow_last_success_age_seconds(-1);
383                metrics.set_control_list_block_last_success_age_seconds(-1);
384            }
385
386            behaviour
387        }
388    }
389
390    /// Method that update allow list
391    fn update_allow_peers(&mut self, new_list: &[String]) {
392        // New hashset of allow list.
393        let new_list: HashSet<PeerId> = HashSet::from_iter(
394            new_list
395                .to_vec()
396                .iter()
397                .filter_map(|e| PeerId::from_str(e).ok()),
398        );
399
400        let close_peers: Vec<PeerId> =
401            self.allow_peers.difference(&new_list).cloned().collect();
402        self.close_connections.extend(close_peers);
403        self.allow_peers.clone_from(&new_list);
404        if let Some(metrics) = self.metrics.as_deref() {
405            metrics.inc_control_list_allow_apply();
406            metrics.set_control_list_allow_peers(self.allow_peers.len() as i64);
407        }
408    }
409
410    /// Method that update block list
411    fn update_block_peers(&mut self, new_list: &[String]) {
412        // New hashset of block list.
413        let new_list: HashSet<PeerId> = HashSet::from_iter(
414            new_list
415                .to_vec()
416                .iter()
417                .filter_map(|e| PeerId::from_str(e).ok()),
418        );
419
420        self.close_connections.extend(new_list.clone());
421        self.block_peers.clone_from(&new_list);
422        if let Some(metrics) = self.metrics.as_deref() {
423            metrics.inc_control_list_block_apply();
424            metrics.set_control_list_block_peers(self.block_peers.len() as i64);
425        }
426    }
427
428    /// Method that check if a peer is in allow list
429    fn check_allow(&self, peer: &PeerId) -> Result<(), ConnectionDenied> {
430        if self.allow_peers.contains(peer) {
431            return Ok(());
432        }
433
434        if let Some(metrics) = &self.metrics {
435            metrics.observe_control_list_denied("not_allowed");
436        }
437        debug!(target: TARGET, peer_id = %peer, "connection denied: peer not in allow list");
438        Err(ConnectionDenied::new(NotAllowed { peer: *peer }))
439    }
440
441    /// Method that check if a peer is in block list
442    fn check_block(&self, peer: &PeerId) -> Result<(), ConnectionDenied> {
443        if !self.block_peers.contains(peer) {
444            return Ok(());
445        }
446
447        if let Some(metrics) = &self.metrics {
448            metrics.observe_control_list_denied("blocked");
449        }
450        debug!(target: TARGET, peer_id = %peer, "connection denied: peer is blocked");
451        Err(ConnectionDenied::new(Blocked { peer: *peer }))
452    }
453
454    /// Method that check all List
455    fn check_lists(&self, peer: &PeerId) -> Result<(), ConnectionDenied> {
456        if self.enable {
457            self.check_block(peer)?;
458            self.check_allow(peer)?;
459        }
460
461        Ok(())
462    }
463}
464
465/// A connection to this peer is not explicitly allowed and was thus [`denied`](ConnectionDenied).
466#[derive(Debug)]
467pub struct NotAllowed {
468    peer: PeerId,
469}
470
471impl fmt::Display for NotAllowed {
472    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
473        write!(f, "peer {} is not in the allow list", self.peer)
474    }
475}
476
477impl std::error::Error for NotAllowed {}
478
479/// A connection to this peer was explicitly blocked and was thus [`denied`](ConnectionDenied).
480#[derive(Debug)]
481pub struct Blocked {
482    peer: PeerId,
483}
484
485impl fmt::Display for Blocked {
486    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
487        write!(f, "peer {} is in the block list", self.peer)
488    }
489}
490
491impl std::error::Error for Blocked {}
492
493/// Event Struct for implement control list Behaviour in main Behaviour
494#[derive(Debug)]
495pub enum Event {
496    AllowListUpdated(Vec<String>),
497    BlockListUpdated(Vec<String>),
498}
499
500impl NetworkBehaviour for Behaviour {
501    type ConnectionHandler = dummy::ConnectionHandler;
502    type ToSwarm = Event;
503
504    fn handle_established_inbound_connection(
505        &mut self,
506        _connection_id: libp2p::swarm::ConnectionId,
507        peer: PeerId,
508        _: &libp2p::Multiaddr,
509        _: &libp2p::Multiaddr,
510    ) -> Result<libp2p::swarm::THandler<Self>, ConnectionDenied> {
511        self.check_lists(&peer)?;
512
513        Ok(dummy::ConnectionHandler)
514    }
515
516    fn handle_pending_outbound_connection(
517        &mut self,
518        _: libp2p::swarm::ConnectionId,
519        peer: Option<PeerId>,
520        _: &[libp2p::Multiaddr],
521        _: libp2p::core::Endpoint,
522    ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
523        if let Some(peer) = peer {
524            self.check_lists(&peer)?;
525        }
526
527        Ok(vec![])
528    }
529
530    fn handle_established_outbound_connection(
531        &mut self,
532        _: libp2p::swarm::ConnectionId,
533        peer: PeerId,
534        _: &libp2p::Multiaddr,
535        _: libp2p::core::Endpoint,
536        _: libp2p::core::transport::PortUse,
537    ) -> Result<libp2p::swarm::THandler<Self>, ConnectionDenied> {
538        self.check_lists(&peer)?;
539
540        Ok(dummy::ConnectionHandler)
541    }
542
543    fn on_swarm_event(&mut self, _: libp2p::swarm::FromSwarm) {}
544
545    fn on_connection_handler_event(
546        &mut self,
547        _: PeerId,
548        _: libp2p::swarm::ConnectionId,
549        _: libp2p::swarm::THandlerOutEvent<Self>,
550    ) {
551    }
552
553    fn poll(
554        &mut self,
555        cx: &mut std::task::Context<'_>,
556    ) -> std::task::Poll<
557        libp2p::swarm::ToSwarm<
558            Self::ToSwarm,
559            libp2p::swarm::THandlerInEvent<Self>,
560        >,
561    > {
562        let mut receiver_opt = self.receiver.take();
563        if let Some(mut rx) = receiver_opt.as_mut() {
564            let mut cx = std::task::Context::from_waker(cx.waker());
565            while let Poll::Ready(Some(event)) =
566                Pin::new(&mut rx).poll_recv(&mut cx)
567            {
568                match event {
569                    Event::AllowListUpdated(items) => {
570                        self.update_allow_peers(&items)
571                    }
572                    Event::BlockListUpdated(items) => {
573                        self.update_block_peers(&items)
574                    }
575                }
576            }
577        }
578
579        self.receiver = receiver_opt;
580
581        if let Some(peer) = self.close_connections.pop_front() {
582            return Poll::Ready(ToSwarm::CloseConnection {
583                peer_id: peer,
584                connection: CloseConnection::All,
585            });
586        }
587
588        Poll::Pending
589    }
590}
591
592#[cfg(test)]
593mod tests {
594    use futures::StreamExt;
595    use libp2p::{
596        Swarm,
597        swarm::{
598            ConnectionError, DialError, ListenError, SwarmEvent,
599            dial_opts::DialOpts,
600        },
601    };
602    use libp2p_swarm_test::SwarmExt;
603    use prometheus_client::{encoding::text::encode, registry::Registry};
604    use serial_test::serial;
605    use test_log::test;
606    use tokio::{io::AsyncWriteExt, net::TcpListener, time::timeout};
607
608    use super::*;
609
610    fn metric_value(metrics: &str, name: &str) -> f64 {
611        metrics
612            .lines()
613            .find_map(|line| {
614                if line.starts_with(name) {
615                    line.split_whitespace().nth(1)?.parse::<f64>().ok()
616                } else {
617                    None
618                }
619            })
620            .unwrap_or(0.0)
621    }
622
623    impl Behaviour {
624        pub fn block_peer(&mut self, peer: PeerId) {
625            self.block_peers.insert(peer);
626            self.close_connections.push_back(peer);
627        }
628
629        pub fn allow_peer(&mut self, peer: PeerId) {
630            self.allow_peers.insert(peer);
631        }
632        pub fn set_enable(&mut self, enable: bool) {
633            self.enable = enable;
634        }
635    }
636
637    fn dial(
638        dialer: &mut Swarm<Behaviour>,
639        listener: &Swarm<Behaviour>,
640    ) -> Result<(), DialError> {
641        dialer.dial(
642            DialOpts::peer_id(*listener.local_peer_id())
643                .addresses(listener.external_addresses().cloned().collect())
644                .build(),
645        )
646    }
647
648    fn build_behaviours() -> (Swarm<Behaviour>, Swarm<Behaviour>) {
649        let mut behaviour = Behaviour::default();
650        behaviour.set_enable(true);
651        let dialer = Swarm::new_ephemeral_tokio(|_| behaviour);
652
653        let mut behaviour = Behaviour::default();
654        behaviour.set_enable(true);
655        let listener = Swarm::new_ephemeral_tokio(|_| behaviour);
656
657        (dialer, listener)
658    }
659
660    async fn spawn_slow_json_service(
661        delay: Duration,
662    ) -> (String, CancellationToken) {
663        let listener = TcpListener::bind("127.0.0.1:0")
664            .await
665            .expect("bind slow service");
666        let addr = listener.local_addr().expect("local addr");
667        let stop = CancellationToken::new();
668        let stop_task = stop.clone();
669
670        tokio::spawn(async move {
671            loop {
672                tokio::select! {
673                    _ = stop_task.cancelled() => break,
674                    incoming = listener.accept() => {
675                        let Ok((mut socket, _)) = incoming else {
676                            break;
677                        };
678                        tokio::spawn(async move {
679                            tokio::time::sleep(delay).await;
680                            let response = b"HTTP/1.1 200 OK\r\ncontent-type: application/json\r\ncontent-length: 2\r\nconnection: close\r\n\r\n[]";
681                            let _ = socket.write_all(response).await;
682                            let _ = socket.shutdown().await;
683                        });
684                    }
685                }
686            }
687        });
688
689        (format!("http://{addr}/list"), stop)
690    }
691
692    #[test(tokio::test)]
693    #[serial]
694    async fn cannot_dial_blocked_peer() {
695        let (mut dialer, mut listener) = build_behaviours();
696
697        listener.listen().with_memory_addr_external().await;
698
699        dialer.behaviour_mut().block_peer(*listener.local_peer_id());
700
701        let DialError::Denied { cause } =
702            dial(&mut dialer, &listener).unwrap_err()
703        else {
704            panic!("unexpected dial error")
705        };
706        assert!(cause.downcast::<Blocked>().is_ok());
707    }
708
709    #[test(tokio::test)]
710    #[serial]
711    async fn cannot_dial_not_allowed_peer() {
712        let (mut dialer, mut listener) = build_behaviours();
713
714        listener.listen().with_memory_addr_external().await;
715
716        let DialError::Denied { cause } =
717            dial(&mut dialer, &listener).unwrap_err()
718        else {
719            panic!("unexpected dial error")
720        };
721        assert!(cause.downcast::<NotAllowed>().is_ok());
722    }
723
724    #[test(tokio::test)]
725    #[serial]
726    async fn can_dial_allowed_not_blocked_peer() {
727        let (mut dialer, mut listener) = build_behaviours();
728
729        listener.listen().with_memory_addr_external().await;
730
731        dialer.behaviour_mut().allow_peer(*listener.local_peer_id());
732
733        dial(&mut dialer, &listener).unwrap();
734    }
735
736    #[test(tokio::test)]
737    #[serial]
738    async fn cannot_dial_allowed_blocked_peer() {
739        let (mut dialer, mut listener) = build_behaviours();
740        listener.listen().with_memory_addr_external().await;
741
742        dialer.behaviour_mut().block_peer(*listener.local_peer_id());
743        dialer.behaviour_mut().allow_peer(*listener.local_peer_id());
744
745        let DialError::Denied { cause } =
746            dial(&mut dialer, &listener).unwrap_err()
747        else {
748            panic!("unexpected dial error")
749        };
750        assert!(cause.downcast::<Blocked>().is_ok());
751    }
752
753    #[test(tokio::test)]
754    #[serial]
755    async fn blocked_peer_cannot_dial_us() {
756        let (mut dialer, mut listener) = build_behaviours();
757        listener.listen().with_memory_addr_external().await;
758
759        dialer.behaviour_mut().allow_peer(*listener.local_peer_id());
760        listener.behaviour_mut().block_peer(*dialer.local_peer_id());
761
762        dial(&mut dialer, &listener).unwrap();
763        tokio::spawn(dialer.loop_on_next());
764
765        let cause = listener
766            .wait(|e| match e {
767                SwarmEvent::IncomingConnectionError {
768                    error: ListenError::Denied { cause },
769                    ..
770                } => Some(cause),
771                _ => None,
772            })
773            .await;
774        assert!(cause.downcast::<Blocked>().is_ok());
775    }
776
777    #[test(tokio::test)]
778    #[serial]
779    async fn not_allowed_peer_cannot_dial_us() {
780        let (mut dialer, mut listener) = build_behaviours();
781        listener.listen().with_memory_addr_external().await;
782
783        dialer.behaviour_mut().allow_peer(*listener.local_peer_id());
784
785        dial(&mut dialer, &listener).unwrap();
786
787        let listener_loop = async move {
788            loop {
789                match listener.select_next_some().await {
790                    SwarmEvent::IncomingConnectionError { error, .. } => {
791                        let ListenError::Denied { cause } = error else {
792                            panic!("Invalid Error")
793                        };
794                        assert!(cause.downcast::<NotAllowed>().is_ok());
795                        break;
796                    }
797                    _ => {}
798                }
799            }
800        };
801
802        let dialer_loop = async move {
803            loop {
804                match dialer.select_next_some().await {
805                    SwarmEvent::ConnectionClosed { cause, .. } => {
806                        if let Some(error) = cause {
807                            match error {
808                                ConnectionError::IO(e) => {
809                                    assert_eq!(
810                                        e.to_string(),
811                                        "Right(Io(Kind(BrokenPipe)))"
812                                    );
813                                    break;
814                                }
815                                _ => {
816                                    panic!("Invalid error");
817                                }
818                            }
819                        } else {
820                            panic!("Missing error");
821                        };
822                    }
823                    _ => {}
824                }
825            }
826        };
827        tokio::task::spawn(Box::pin(dialer_loop));
828        listener_loop.await;
829    }
830
831    #[test(tokio::test)]
832    #[serial]
833    async fn connections_get_closed_upon_disallow() {
834        let (mut dialer, mut listener) = build_behaviours();
835        listener.listen().with_memory_addr_external().await;
836
837        dialer.behaviour_mut().allow_peer(*listener.local_peer_id());
838        listener.behaviour_mut().allow_peer(*dialer.local_peer_id());
839        let dialer_peer = *dialer.local_peer_id();
840
841        dial(&mut dialer, &listener).unwrap();
842
843        let listener_loop = async move {
844            loop {
845                match listener.select_next_some().await {
846                    SwarmEvent::ConnectionEstablished { .. } => {
847                        listener.behaviour_mut().block_peer(dialer_peer);
848                    }
849                    SwarmEvent::ConnectionClosed { .. } => {
850                        break;
851                    }
852                    _ => {}
853                }
854            }
855        };
856
857        let dialer_loop = async move {
858            loop {
859                match dialer.select_next_some().await {
860                    SwarmEvent::ConnectionEstablished { .. } => {}
861                    SwarmEvent::ConnectionClosed { cause, .. } => {
862                        if let Some(error) = cause {
863                            match error {
864                                ConnectionError::IO(e) => {
865                                    assert_eq!(e.to_string(), "Right(Closed)");
866                                    break;
867                                }
868                                _ => {
869                                    panic!("Invalid error");
870                                }
871                            }
872                        } else {
873                            panic!("Missing error");
874                        };
875                    }
876                    _ => {}
877                }
878            }
879        };
880
881        tokio::task::spawn(Box::pin(dialer_loop));
882        listener_loop.await;
883    }
884
885    #[test]
886    fn control_list_denied_metrics_by_reason() {
887        let mut registry = Registry::default();
888        let metrics = crate::metrics::register(&mut registry);
889
890        let config = Config::default().with_enable(true);
891        let behaviour = Behaviour::new(config, &[], None, Some(metrics));
892
893        let blocked_peer = PeerId::random();
894        let not_allowed_peer = PeerId::random();
895
896        let mut behaviour = behaviour;
897        behaviour.block_peers.insert(blocked_peer);
898
899        let _ = behaviour.check_block(&blocked_peer);
900        let _ = behaviour.check_allow(&not_allowed_peer);
901
902        let mut text = String::new();
903        encode(&mut text, &registry).expect("encode metrics");
904
905        assert_eq!(
906            metric_value(&text, "network_control_list_denied_total"),
907            2.0
908        );
909        assert_eq!(
910            metric_value(&text, "network_control_list_denied_blocked_total"),
911            1.0
912        );
913        assert_eq!(
914            metric_value(
915                &text,
916                "network_control_list_denied_not_allowed_total"
917            ),
918            1.0
919        );
920    }
921
922    #[test(tokio::test)]
923    #[serial]
924    async fn slow_services_timeout_without_emitting_updates() {
925        let (url, slow_server_stop) =
926            spawn_slow_json_service(Duration::from_millis(250)).await;
927        let mut registry = Registry::default();
928        let metrics = crate::metrics::register(&mut registry);
929        let cancel = CancellationToken::new();
930
931        let config = Config::default()
932            .with_enable(true)
933            .with_interval_request(Duration::from_millis(20))
934            .with_request_timeout(Duration::from_millis(30))
935            .with_max_concurrent_requests(1)
936            .with_service_allow_list(vec![url.clone()])
937            .with_service_block_list(vec![url]);
938
939        let mut receiver = build_control_lists_updaters(
940            &config,
941            cancel.clone(),
942            CancellationToken::new(),
943            Some(metrics),
944        )
945        .expect("control-list updater receiver");
946
947        tokio::time::sleep(Duration::from_millis(170)).await;
948
949        let next_event =
950            timeout(Duration::from_millis(50), receiver.recv()).await;
951        assert!(
952            next_event.is_err(),
953            "slow timed-out services should not emit list updates"
954        );
955
956        let mut text = String::new();
957        encode(&mut text, &registry).expect("encode metrics");
958
959        assert!(
960            metric_value(&text, "network_control_list_updater_runs_total")
961                >= 1.0
962        );
963        assert!(
964            metric_value(
965                &text,
966                "network_control_list_allow_update_failure_total"
967            ) >= 1.0
968        );
969        assert!(
970            metric_value(
971                &text,
972                "network_control_list_block_update_failure_total"
973            ) >= 1.0
974        );
975        assert_eq!(
976            metric_value(
977                &text,
978                "network_control_list_allow_update_success_total"
979            ),
980            0.0
981        );
982        assert_eq!(
983            metric_value(
984                &text,
985                "network_control_list_block_update_success_total"
986            ),
987            0.0
988        );
989
990        cancel.cancel();
991        slow_server_stop.cancel();
992    }
993
994    #[test(tokio::test)]
995    #[serial]
996    async fn zero_max_concurrent_requests_is_treated_as_one() {
997        let (url, server_stop) =
998            spawn_slow_json_service(Duration::from_millis(1)).await;
999        let mut registry = Registry::default();
1000        let metrics = crate::metrics::register(&mut registry);
1001        let cancel = CancellationToken::new();
1002
1003        let config = Config::default()
1004            .with_enable(true)
1005            .with_interval_request(Duration::from_millis(20))
1006            .with_request_timeout(Duration::from_millis(200))
1007            .with_max_concurrent_requests(0)
1008            .with_service_allow_list(vec![url.clone()])
1009            .with_service_block_list(vec![url]);
1010
1011        let mut receiver = build_control_lists_updaters(
1012            &config,
1013            cancel.clone(),
1014            CancellationToken::new(),
1015            Some(metrics),
1016        )
1017        .expect("control-list updater receiver");
1018
1019        let mut got_allow = false;
1020        let mut got_block = false;
1021        for _ in 0..20 {
1022            let event =
1023                timeout(Duration::from_millis(60), receiver.recv()).await;
1024            if let Ok(Some(event)) = event {
1025                match event {
1026                    Event::AllowListUpdated(_) => got_allow = true,
1027                    Event::BlockListUpdated(_) => got_block = true,
1028                }
1029            }
1030            if got_allow && got_block {
1031                break;
1032            }
1033        }
1034
1035        assert!(got_allow, "allow-list update should be emitted");
1036        assert!(got_block, "block-list update should be emitted");
1037
1038        let mut text = String::new();
1039        encode(&mut text, &registry).expect("encode metrics");
1040        assert!(
1041            metric_value(
1042                &text,
1043                "network_control_list_allow_update_success_total"
1044            ) >= 1.0
1045        );
1046        assert!(
1047            metric_value(
1048                &text,
1049                "network_control_list_block_update_success_total"
1050            ) >= 1.0
1051        );
1052
1053        cancel.cancel();
1054        server_stop.cancel();
1055    }
1056
1057    #[test(tokio::test)]
1058    #[serial]
1059    async fn cancellation_stops_updater_during_slow_requests() {
1060        let (url, server_stop) =
1061            spawn_slow_json_service(Duration::from_secs(2)).await;
1062        let cancel = CancellationToken::new();
1063
1064        let config = Config::default()
1065            .with_enable(true)
1066            .with_interval_request(Duration::from_millis(10))
1067            .with_request_timeout(Duration::from_secs(5))
1068            .with_max_concurrent_requests(1)
1069            .with_service_allow_list(vec![url.clone()])
1070            .with_service_block_list(vec![url]);
1071
1072        let mut receiver = build_control_lists_updaters(
1073            &config,
1074            cancel.clone(),
1075            CancellationToken::new(),
1076            None,
1077        )
1078        .expect("control-list updater receiver");
1079
1080        tokio::time::sleep(Duration::from_millis(40)).await;
1081        cancel.cancel();
1082
1083        let closed = timeout(Duration::from_secs(1), async {
1084            loop {
1085                if receiver.recv().await.is_none() {
1086                    break;
1087                }
1088            }
1089        })
1090        .await;
1091
1092        assert!(
1093            closed.is_ok(),
1094            "updater should stop and close channel after cancellation"
1095        );
1096
1097        server_stop.cancel();
1098    }
1099}