1use libp2p::{
2 Multiaddr, PeerId,
3 swarm::{
4 CloseConnection, ConnectionDenied, NetworkBehaviour, ToSwarm, dummy,
5 },
6};
7use serde::{Deserialize, Deserializer, Serialize};
8use std::{
9 collections::{HashSet, VecDeque},
10 fmt,
11 pin::Pin,
12 str::FromStr,
13 sync::Arc,
14 task::Poll,
15 time::Duration,
16};
17use tokio::{
18 sync::mpsc::{self, Receiver},
19 time::{Instant, MissedTickBehavior, interval},
20};
21use tokio_util::sync::CancellationToken;
22use tracing::{debug, warn};
23
24use crate::{
25 RoutingNode, metrics::NetworkMetrics, utils::request_update_lists,
26};
27
28const TARGET: &str = "ave::network::control";
29const fn default_request_timeout() -> Duration {
30 Duration::from_secs(5)
31}
32const fn default_max_concurrent_requests() -> usize {
33 8
34}
35
36#[derive(Clone, Debug, Deserialize, Serialize)]
38#[serde(default)]
39pub struct Config {
40 enable: bool,
42
43 allow_list: Vec<String>,
45
46 block_list: Vec<String>,
48
49 service_allow_list: Vec<String>,
51
52 service_block_list: Vec<String>,
54
55 #[serde(deserialize_with = "deserialize_duration_secs")]
57 interval_request: Duration,
58
59 #[serde(
61 default = "default_request_timeout",
62 deserialize_with = "deserialize_duration_secs"
63 )]
64 request_timeout: Duration,
65
66 #[serde(default = "default_max_concurrent_requests")]
69 max_concurrent_requests: usize,
70}
71
72fn deserialize_duration_secs<'de, D>(
73 deserializer: D,
74) -> Result<Duration, D::Error>
75where
76 D: Deserializer<'de>,
77{
78 let u: u64 = u64::deserialize(deserializer)?;
79 Ok(Duration::from_secs(u))
80}
81
82impl Default for Config {
83 fn default() -> Self {
84 Self {
85 enable: Default::default(),
86 allow_list: Default::default(),
87 block_list: Default::default(),
88 service_allow_list: Default::default(),
89 service_block_list: Default::default(),
90 interval_request: Duration::from_secs(60),
91 request_timeout: default_request_timeout(),
92 max_concurrent_requests: default_max_concurrent_requests(),
93 }
94 }
95}
96
97impl Config {
99 pub const fn with_enable(mut self, enable: bool) -> Self {
101 self.enable = enable;
102 self
103 }
104
105 pub fn with_allow_list(mut self, allow_list: Vec<String>) -> Self {
107 self.allow_list = allow_list;
108 self
109 }
110
111 pub fn with_block_list(mut self, block_list: Vec<String>) -> Self {
113 self.block_list = block_list;
114 self
115 }
116
117 pub fn with_service_allow_list(
119 mut self,
120 service_allow_list: Vec<String>,
121 ) -> Self {
122 self.service_allow_list = service_allow_list;
123 self
124 }
125
126 pub fn with_service_block_list(
128 mut self,
129 service_block_list: Vec<String>,
130 ) -> Self {
131 self.service_block_list = service_block_list;
132 self
133 }
134
135 pub const fn with_interval_request(mut self, interval: Duration) -> Self {
137 self.interval_request = interval;
138 self
139 }
140
141 pub const fn with_request_timeout(mut self, timeout: Duration) -> Self {
143 self.request_timeout = timeout;
144 self
145 }
146
147 pub const fn with_max_concurrent_requests(mut self, value: usize) -> Self {
149 self.max_concurrent_requests = value;
150 self
151 }
152
153 pub const fn get_interval_request(&self) -> Duration {
155 self.interval_request
156 }
157
158 pub const fn get_request_timeout(&self) -> Duration {
160 self.request_timeout
161 }
162
163 pub const fn get_max_concurrent_requests(&self) -> usize {
166 self.max_concurrent_requests
167 }
168
169 pub const fn get_enable(&self) -> bool {
171 self.enable
172 }
173
174 pub fn get_allow_list(&self) -> Vec<String> {
176 self.allow_list.clone()
177 }
178
179 pub fn get_block_list(&self) -> Vec<String> {
181 self.block_list.clone()
182 }
183
184 pub fn get_service_allow_list(&self) -> Vec<String> {
186 self.service_allow_list.clone()
187 }
188 pub fn get_service_block_list(&self) -> Vec<String> {
190 self.service_block_list.clone()
191 }
192}
193
194pub fn build_control_lists_updaters(
195 config: &Config,
196 graceful_token: CancellationToken,
197 crash_token: CancellationToken,
198 metrics: Option<Arc<NetworkMetrics>>,
199) -> Option<Receiver<Event>> {
200 if config.enable {
201 debug!(target: TARGET, "control list enabled");
202
203 let (sender, receiver) = mpsc::channel(8);
204 let update_interval = config.interval_request;
205 let service_allow = config.service_allow_list.clone();
206 let service_block = config.service_block_list.clone();
207 let metrics_updater = metrics;
208 let request_timeout = config.request_timeout;
209 let max_concurrent_requests = config.max_concurrent_requests;
210
211 tokio::spawn(async move {
212 let client = match reqwest::Client::builder()
213 .connect_timeout(request_timeout)
214 .build()
215 {
216 Ok(client) => client,
217 Err(e) => {
218 warn!(target: TARGET, error = %e, "failed to build control-list http client, falling back to default client");
219 reqwest::Client::new()
220 }
221 };
222
223 let mut last_allow_success: Option<Instant> = None;
224 let mut last_block_success: Option<Instant> = None;
225 let mut ticker = interval(update_interval);
226 ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
227 ticker.tick().await;
229 loop {
230 tokio::select! {
231 _ = ticker.tick() => {
232 if let Some(metrics) = metrics_updater.as_deref() {
233 metrics.inc_control_list_updater_run();
234 }
235 let started_at = Instant::now();
236 let (
237 (vec_allow_peers, vec_block_peers),
238 (successful_allow, successful_block),
239 ) = request_update_lists(
240 client.clone(),
241 service_allow.clone(),
242 service_block.clone(),
243 request_timeout,
244 max_concurrent_requests,
245 graceful_token.clone(),
246 crash_token.clone()
247 )
248 .await;
249 if let Some(metrics) = metrics_updater.as_deref() {
250 metrics.observe_control_list_updater_duration_seconds(
251 started_at.elapsed().as_secs_f64(),
252 );
253 }
254
255 let now = Instant::now();
256
257 if successful_allow != 0 {
259 if let Some(metrics) = metrics_updater.as_deref() {
260 metrics.observe_control_list_allow_update(true);
261 }
262 last_allow_success = Some(now);
263 if let Err(e) = sender.send(Event::AllowListUpdated(vec_allow_peers)).await {
264 debug!(target: TARGET, error = %e, "allow-list update dropped: channel closed");
265 }
266 } else {
267 if let Some(metrics) = metrics_updater.as_deref() {
268 metrics.observe_control_list_allow_update(false);
269 }
270 warn!(target: TARGET, "allow-list not updated: no service responded successfully");
271 }
272
273 if successful_block != 0 {
275 if let Some(metrics) = metrics_updater.as_deref() {
276 metrics.observe_control_list_block_update(true);
277 }
278 last_block_success = Some(now);
279 if let Err(e) = sender.send(Event::BlockListUpdated(vec_block_peers)).await {
280 debug!(target: TARGET, error = %e, "block-list update dropped: channel closed");
281 }
282 } else {
283 if let Some(metrics) = metrics_updater.as_deref() {
284 metrics.observe_control_list_block_update(false);
285 }
286 warn!(target: TARGET, "block-list not updated: no service responded successfully");
287 }
288
289 if let Some(metrics) = metrics_updater.as_deref() {
290 let allow_age = last_allow_success
291 .map_or(-1, |t| now.duration_since(t).as_secs() as i64);
292 metrics
293 .set_control_list_allow_last_success_age_seconds(allow_age);
294
295 let block_age = last_block_success
296 .map_or(-1, |t| now.duration_since(t).as_secs() as i64);
297 metrics
298 .set_control_list_block_last_success_age_seconds(block_age);
299 }
300 }
301 _ = graceful_token.clone().cancelled_owned() => {
302 debug!(target: TARGET, "control list updater stopped");
303 break;
304 }
305 _ = crash_token.clone().cancelled_owned() => {
306 debug!(target: TARGET, "control list updater stopped");
307 break;
308 }
309 };
310 }
311 });
312
313 Some(receiver)
314 } else {
315 None
316 }
317}
318
319#[derive(Default, Debug)]
320pub struct Behaviour {
321 allow_peers: HashSet<PeerId>,
322 block_peers: HashSet<PeerId>,
323 close_connections: VecDeque<PeerId>,
324 enable: bool,
325 receiver: Option<Receiver<Event>>,
326 metrics: Option<Arc<NetworkMetrics>>,
327}
328
329impl Behaviour {
330 pub fn new(
332 config: Config,
333 boot_nodes: &[RoutingNode],
334 receiver: Option<Receiver<Event>>,
335 metrics: Option<Arc<NetworkMetrics>>,
336 ) -> Self {
337 if config.enable {
338 let mut full_allow_list = config.allow_list.clone();
339 for node in boot_nodes {
340 full_allow_list.push(node.peer_id.clone());
341 }
342
343 let behaviour = Self {
344 enable: true,
345 allow_peers: HashSet::from_iter(
346 full_allow_list
347 .iter()
348 .filter_map(|e| PeerId::from_str(e).ok()),
349 ),
350 block_peers: HashSet::from_iter(
351 config
352 .block_list
353 .iter()
354 .filter_map(|e| PeerId::from_str(e).ok()),
355 ),
356 receiver,
357 metrics,
358 ..Default::default()
359 };
360
361 if let Some(metrics) = behaviour.metrics.as_deref() {
362 metrics.set_control_list_allow_peers(
363 behaviour.allow_peers.len() as i64,
364 );
365 metrics.set_control_list_block_peers(
366 behaviour.block_peers.len() as i64,
367 );
368 metrics.set_control_list_allow_last_success_age_seconds(-1);
369 metrics.set_control_list_block_last_success_age_seconds(-1);
370 }
371
372 behaviour
373 } else {
374 let behaviour = Self {
375 metrics,
376 ..Default::default()
377 };
378
379 if let Some(metrics) = behaviour.metrics.as_deref() {
380 metrics.set_control_list_allow_peers(0);
381 metrics.set_control_list_block_peers(0);
382 metrics.set_control_list_allow_last_success_age_seconds(-1);
383 metrics.set_control_list_block_last_success_age_seconds(-1);
384 }
385
386 behaviour
387 }
388 }
389
390 fn update_allow_peers(&mut self, new_list: &[String]) {
392 let new_list: HashSet<PeerId> = HashSet::from_iter(
394 new_list
395 .to_vec()
396 .iter()
397 .filter_map(|e| PeerId::from_str(e).ok()),
398 );
399
400 let close_peers: Vec<PeerId> =
401 self.allow_peers.difference(&new_list).cloned().collect();
402 self.close_connections.extend(close_peers);
403 self.allow_peers.clone_from(&new_list);
404 if let Some(metrics) = self.metrics.as_deref() {
405 metrics.inc_control_list_allow_apply();
406 metrics.set_control_list_allow_peers(self.allow_peers.len() as i64);
407 }
408 }
409
410 fn update_block_peers(&mut self, new_list: &[String]) {
412 let new_list: HashSet<PeerId> = HashSet::from_iter(
414 new_list
415 .to_vec()
416 .iter()
417 .filter_map(|e| PeerId::from_str(e).ok()),
418 );
419
420 self.close_connections.extend(new_list.clone());
421 self.block_peers.clone_from(&new_list);
422 if let Some(metrics) = self.metrics.as_deref() {
423 metrics.inc_control_list_block_apply();
424 metrics.set_control_list_block_peers(self.block_peers.len() as i64);
425 }
426 }
427
428 fn check_allow(&self, peer: &PeerId) -> Result<(), ConnectionDenied> {
430 if self.allow_peers.contains(peer) {
431 return Ok(());
432 }
433
434 if let Some(metrics) = &self.metrics {
435 metrics.observe_control_list_denied("not_allowed");
436 }
437 debug!(target: TARGET, peer_id = %peer, "connection denied: peer not in allow list");
438 Err(ConnectionDenied::new(NotAllowed { peer: *peer }))
439 }
440
441 fn check_block(&self, peer: &PeerId) -> Result<(), ConnectionDenied> {
443 if !self.block_peers.contains(peer) {
444 return Ok(());
445 }
446
447 if let Some(metrics) = &self.metrics {
448 metrics.observe_control_list_denied("blocked");
449 }
450 debug!(target: TARGET, peer_id = %peer, "connection denied: peer is blocked");
451 Err(ConnectionDenied::new(Blocked { peer: *peer }))
452 }
453
454 fn check_lists(&self, peer: &PeerId) -> Result<(), ConnectionDenied> {
456 if self.enable {
457 self.check_block(peer)?;
458 self.check_allow(peer)?;
459 }
460
461 Ok(())
462 }
463}
464
465#[derive(Debug)]
467pub struct NotAllowed {
468 peer: PeerId,
469}
470
471impl fmt::Display for NotAllowed {
472 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
473 write!(f, "peer {} is not in the allow list", self.peer)
474 }
475}
476
477impl std::error::Error for NotAllowed {}
478
479#[derive(Debug)]
481pub struct Blocked {
482 peer: PeerId,
483}
484
485impl fmt::Display for Blocked {
486 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
487 write!(f, "peer {} is in the block list", self.peer)
488 }
489}
490
491impl std::error::Error for Blocked {}
492
493#[derive(Debug)]
495pub enum Event {
496 AllowListUpdated(Vec<String>),
497 BlockListUpdated(Vec<String>),
498}
499
500impl NetworkBehaviour for Behaviour {
501 type ConnectionHandler = dummy::ConnectionHandler;
502 type ToSwarm = Event;
503
504 fn handle_established_inbound_connection(
505 &mut self,
506 _connection_id: libp2p::swarm::ConnectionId,
507 peer: PeerId,
508 _: &libp2p::Multiaddr,
509 _: &libp2p::Multiaddr,
510 ) -> Result<libp2p::swarm::THandler<Self>, ConnectionDenied> {
511 self.check_lists(&peer)?;
512
513 Ok(dummy::ConnectionHandler)
514 }
515
516 fn handle_pending_outbound_connection(
517 &mut self,
518 _: libp2p::swarm::ConnectionId,
519 peer: Option<PeerId>,
520 _: &[libp2p::Multiaddr],
521 _: libp2p::core::Endpoint,
522 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
523 if let Some(peer) = peer {
524 self.check_lists(&peer)?;
525 }
526
527 Ok(vec![])
528 }
529
530 fn handle_established_outbound_connection(
531 &mut self,
532 _: libp2p::swarm::ConnectionId,
533 peer: PeerId,
534 _: &libp2p::Multiaddr,
535 _: libp2p::core::Endpoint,
536 _: libp2p::core::transport::PortUse,
537 ) -> Result<libp2p::swarm::THandler<Self>, ConnectionDenied> {
538 self.check_lists(&peer)?;
539
540 Ok(dummy::ConnectionHandler)
541 }
542
543 fn on_swarm_event(&mut self, _: libp2p::swarm::FromSwarm) {}
544
545 fn on_connection_handler_event(
546 &mut self,
547 _: PeerId,
548 _: libp2p::swarm::ConnectionId,
549 _: libp2p::swarm::THandlerOutEvent<Self>,
550 ) {
551 }
552
553 fn poll(
554 &mut self,
555 cx: &mut std::task::Context<'_>,
556 ) -> std::task::Poll<
557 libp2p::swarm::ToSwarm<
558 Self::ToSwarm,
559 libp2p::swarm::THandlerInEvent<Self>,
560 >,
561 > {
562 let mut receiver_opt = self.receiver.take();
563 if let Some(mut rx) = receiver_opt.as_mut() {
564 let mut cx = std::task::Context::from_waker(cx.waker());
565 while let Poll::Ready(Some(event)) =
566 Pin::new(&mut rx).poll_recv(&mut cx)
567 {
568 match event {
569 Event::AllowListUpdated(items) => {
570 self.update_allow_peers(&items)
571 }
572 Event::BlockListUpdated(items) => {
573 self.update_block_peers(&items)
574 }
575 }
576 }
577 }
578
579 self.receiver = receiver_opt;
580
581 if let Some(peer) = self.close_connections.pop_front() {
582 return Poll::Ready(ToSwarm::CloseConnection {
583 peer_id: peer,
584 connection: CloseConnection::All,
585 });
586 }
587
588 Poll::Pending
589 }
590}
591
592#[cfg(test)]
593mod tests {
594 use futures::StreamExt;
595 use libp2p::{
596 Swarm,
597 swarm::{
598 ConnectionError, DialError, ListenError, SwarmEvent,
599 dial_opts::DialOpts,
600 },
601 };
602 use libp2p_swarm_test::SwarmExt;
603 use prometheus_client::{encoding::text::encode, registry::Registry};
604 use serial_test::serial;
605 use test_log::test;
606 use tokio::{io::AsyncWriteExt, net::TcpListener, time::timeout};
607
608 use super::*;
609
610 fn metric_value(metrics: &str, name: &str) -> f64 {
611 metrics
612 .lines()
613 .find_map(|line| {
614 if line.starts_with(name) {
615 line.split_whitespace().nth(1)?.parse::<f64>().ok()
616 } else {
617 None
618 }
619 })
620 .unwrap_or(0.0)
621 }
622
623 impl Behaviour {
624 pub fn block_peer(&mut self, peer: PeerId) {
625 self.block_peers.insert(peer);
626 self.close_connections.push_back(peer);
627 }
628
629 pub fn allow_peer(&mut self, peer: PeerId) {
630 self.allow_peers.insert(peer);
631 }
632 pub fn set_enable(&mut self, enable: bool) {
633 self.enable = enable;
634 }
635 }
636
637 fn dial(
638 dialer: &mut Swarm<Behaviour>,
639 listener: &Swarm<Behaviour>,
640 ) -> Result<(), DialError> {
641 dialer.dial(
642 DialOpts::peer_id(*listener.local_peer_id())
643 .addresses(listener.external_addresses().cloned().collect())
644 .build(),
645 )
646 }
647
648 fn build_behaviours() -> (Swarm<Behaviour>, Swarm<Behaviour>) {
649 let mut behaviour = Behaviour::default();
650 behaviour.set_enable(true);
651 let dialer = Swarm::new_ephemeral_tokio(|_| behaviour);
652
653 let mut behaviour = Behaviour::default();
654 behaviour.set_enable(true);
655 let listener = Swarm::new_ephemeral_tokio(|_| behaviour);
656
657 (dialer, listener)
658 }
659
660 async fn spawn_slow_json_service(
661 delay: Duration,
662 ) -> (String, CancellationToken) {
663 let listener = TcpListener::bind("127.0.0.1:0")
664 .await
665 .expect("bind slow service");
666 let addr = listener.local_addr().expect("local addr");
667 let stop = CancellationToken::new();
668 let stop_task = stop.clone();
669
670 tokio::spawn(async move {
671 loop {
672 tokio::select! {
673 _ = stop_task.cancelled() => break,
674 incoming = listener.accept() => {
675 let Ok((mut socket, _)) = incoming else {
676 break;
677 };
678 tokio::spawn(async move {
679 tokio::time::sleep(delay).await;
680 let response = b"HTTP/1.1 200 OK\r\ncontent-type: application/json\r\ncontent-length: 2\r\nconnection: close\r\n\r\n[]";
681 let _ = socket.write_all(response).await;
682 let _ = socket.shutdown().await;
683 });
684 }
685 }
686 }
687 });
688
689 (format!("http://{addr}/list"), stop)
690 }
691
692 #[test(tokio::test)]
693 #[serial]
694 async fn cannot_dial_blocked_peer() {
695 let (mut dialer, mut listener) = build_behaviours();
696
697 listener.listen().with_memory_addr_external().await;
698
699 dialer.behaviour_mut().block_peer(*listener.local_peer_id());
700
701 let DialError::Denied { cause } =
702 dial(&mut dialer, &listener).unwrap_err()
703 else {
704 panic!("unexpected dial error")
705 };
706 assert!(cause.downcast::<Blocked>().is_ok());
707 }
708
709 #[test(tokio::test)]
710 #[serial]
711 async fn cannot_dial_not_allowed_peer() {
712 let (mut dialer, mut listener) = build_behaviours();
713
714 listener.listen().with_memory_addr_external().await;
715
716 let DialError::Denied { cause } =
717 dial(&mut dialer, &listener).unwrap_err()
718 else {
719 panic!("unexpected dial error")
720 };
721 assert!(cause.downcast::<NotAllowed>().is_ok());
722 }
723
724 #[test(tokio::test)]
725 #[serial]
726 async fn can_dial_allowed_not_blocked_peer() {
727 let (mut dialer, mut listener) = build_behaviours();
728
729 listener.listen().with_memory_addr_external().await;
730
731 dialer.behaviour_mut().allow_peer(*listener.local_peer_id());
732
733 dial(&mut dialer, &listener).unwrap();
734 }
735
736 #[test(tokio::test)]
737 #[serial]
738 async fn cannot_dial_allowed_blocked_peer() {
739 let (mut dialer, mut listener) = build_behaviours();
740 listener.listen().with_memory_addr_external().await;
741
742 dialer.behaviour_mut().block_peer(*listener.local_peer_id());
743 dialer.behaviour_mut().allow_peer(*listener.local_peer_id());
744
745 let DialError::Denied { cause } =
746 dial(&mut dialer, &listener).unwrap_err()
747 else {
748 panic!("unexpected dial error")
749 };
750 assert!(cause.downcast::<Blocked>().is_ok());
751 }
752
753 #[test(tokio::test)]
754 #[serial]
755 async fn blocked_peer_cannot_dial_us() {
756 let (mut dialer, mut listener) = build_behaviours();
757 listener.listen().with_memory_addr_external().await;
758
759 dialer.behaviour_mut().allow_peer(*listener.local_peer_id());
760 listener.behaviour_mut().block_peer(*dialer.local_peer_id());
761
762 dial(&mut dialer, &listener).unwrap();
763 tokio::spawn(dialer.loop_on_next());
764
765 let cause = listener
766 .wait(|e| match e {
767 SwarmEvent::IncomingConnectionError {
768 error: ListenError::Denied { cause },
769 ..
770 } => Some(cause),
771 _ => None,
772 })
773 .await;
774 assert!(cause.downcast::<Blocked>().is_ok());
775 }
776
777 #[test(tokio::test)]
778 #[serial]
779 async fn not_allowed_peer_cannot_dial_us() {
780 let (mut dialer, mut listener) = build_behaviours();
781 listener.listen().with_memory_addr_external().await;
782
783 dialer.behaviour_mut().allow_peer(*listener.local_peer_id());
784
785 dial(&mut dialer, &listener).unwrap();
786
787 let listener_loop = async move {
788 loop {
789 match listener.select_next_some().await {
790 SwarmEvent::IncomingConnectionError { error, .. } => {
791 let ListenError::Denied { cause } = error else {
792 panic!("Invalid Error")
793 };
794 assert!(cause.downcast::<NotAllowed>().is_ok());
795 break;
796 }
797 _ => {}
798 }
799 }
800 };
801
802 let dialer_loop = async move {
803 loop {
804 match dialer.select_next_some().await {
805 SwarmEvent::ConnectionClosed { cause, .. } => {
806 if let Some(error) = cause {
807 match error {
808 ConnectionError::IO(e) => {
809 assert_eq!(
810 e.to_string(),
811 "Right(Io(Kind(BrokenPipe)))"
812 );
813 break;
814 }
815 _ => {
816 panic!("Invalid error");
817 }
818 }
819 } else {
820 panic!("Missing error");
821 };
822 }
823 _ => {}
824 }
825 }
826 };
827 tokio::task::spawn(Box::pin(dialer_loop));
828 listener_loop.await;
829 }
830
831 #[test(tokio::test)]
832 #[serial]
833 async fn connections_get_closed_upon_disallow() {
834 let (mut dialer, mut listener) = build_behaviours();
835 listener.listen().with_memory_addr_external().await;
836
837 dialer.behaviour_mut().allow_peer(*listener.local_peer_id());
838 listener.behaviour_mut().allow_peer(*dialer.local_peer_id());
839 let dialer_peer = *dialer.local_peer_id();
840
841 dial(&mut dialer, &listener).unwrap();
842
843 let listener_loop = async move {
844 loop {
845 match listener.select_next_some().await {
846 SwarmEvent::ConnectionEstablished { .. } => {
847 listener.behaviour_mut().block_peer(dialer_peer);
848 }
849 SwarmEvent::ConnectionClosed { .. } => {
850 break;
851 }
852 _ => {}
853 }
854 }
855 };
856
857 let dialer_loop = async move {
858 loop {
859 match dialer.select_next_some().await {
860 SwarmEvent::ConnectionEstablished { .. } => {}
861 SwarmEvent::ConnectionClosed { cause, .. } => {
862 if let Some(error) = cause {
863 match error {
864 ConnectionError::IO(e) => {
865 assert_eq!(e.to_string(), "Right(Closed)");
866 break;
867 }
868 _ => {
869 panic!("Invalid error");
870 }
871 }
872 } else {
873 panic!("Missing error");
874 };
875 }
876 _ => {}
877 }
878 }
879 };
880
881 tokio::task::spawn(Box::pin(dialer_loop));
882 listener_loop.await;
883 }
884
885 #[test]
886 fn control_list_denied_metrics_by_reason() {
887 let mut registry = Registry::default();
888 let metrics = crate::metrics::register(&mut registry);
889
890 let config = Config::default().with_enable(true);
891 let behaviour = Behaviour::new(config, &[], None, Some(metrics));
892
893 let blocked_peer = PeerId::random();
894 let not_allowed_peer = PeerId::random();
895
896 let mut behaviour = behaviour;
897 behaviour.block_peers.insert(blocked_peer);
898
899 let _ = behaviour.check_block(&blocked_peer);
900 let _ = behaviour.check_allow(¬_allowed_peer);
901
902 let mut text = String::new();
903 encode(&mut text, ®istry).expect("encode metrics");
904
905 assert_eq!(
906 metric_value(&text, "network_control_list_denied_total"),
907 2.0
908 );
909 assert_eq!(
910 metric_value(&text, "network_control_list_denied_blocked_total"),
911 1.0
912 );
913 assert_eq!(
914 metric_value(
915 &text,
916 "network_control_list_denied_not_allowed_total"
917 ),
918 1.0
919 );
920 }
921
922 #[test(tokio::test)]
923 #[serial]
924 async fn slow_services_timeout_without_emitting_updates() {
925 let (url, slow_server_stop) =
926 spawn_slow_json_service(Duration::from_millis(250)).await;
927 let mut registry = Registry::default();
928 let metrics = crate::metrics::register(&mut registry);
929 let cancel = CancellationToken::new();
930
931 let config = Config::default()
932 .with_enable(true)
933 .with_interval_request(Duration::from_millis(20))
934 .with_request_timeout(Duration::from_millis(30))
935 .with_max_concurrent_requests(1)
936 .with_service_allow_list(vec![url.clone()])
937 .with_service_block_list(vec![url]);
938
939 let mut receiver = build_control_lists_updaters(
940 &config,
941 cancel.clone(),
942 CancellationToken::new(),
943 Some(metrics),
944 )
945 .expect("control-list updater receiver");
946
947 tokio::time::sleep(Duration::from_millis(170)).await;
948
949 let next_event =
950 timeout(Duration::from_millis(50), receiver.recv()).await;
951 assert!(
952 next_event.is_err(),
953 "slow timed-out services should not emit list updates"
954 );
955
956 let mut text = String::new();
957 encode(&mut text, ®istry).expect("encode metrics");
958
959 assert!(
960 metric_value(&text, "network_control_list_updater_runs_total")
961 >= 1.0
962 );
963 assert!(
964 metric_value(
965 &text,
966 "network_control_list_allow_update_failure_total"
967 ) >= 1.0
968 );
969 assert!(
970 metric_value(
971 &text,
972 "network_control_list_block_update_failure_total"
973 ) >= 1.0
974 );
975 assert_eq!(
976 metric_value(
977 &text,
978 "network_control_list_allow_update_success_total"
979 ),
980 0.0
981 );
982 assert_eq!(
983 metric_value(
984 &text,
985 "network_control_list_block_update_success_total"
986 ),
987 0.0
988 );
989
990 cancel.cancel();
991 slow_server_stop.cancel();
992 }
993
994 #[test(tokio::test)]
995 #[serial]
996 async fn zero_max_concurrent_requests_is_treated_as_one() {
997 let (url, server_stop) =
998 spawn_slow_json_service(Duration::from_millis(1)).await;
999 let mut registry = Registry::default();
1000 let metrics = crate::metrics::register(&mut registry);
1001 let cancel = CancellationToken::new();
1002
1003 let config = Config::default()
1004 .with_enable(true)
1005 .with_interval_request(Duration::from_millis(20))
1006 .with_request_timeout(Duration::from_millis(200))
1007 .with_max_concurrent_requests(0)
1008 .with_service_allow_list(vec![url.clone()])
1009 .with_service_block_list(vec![url]);
1010
1011 let mut receiver = build_control_lists_updaters(
1012 &config,
1013 cancel.clone(),
1014 CancellationToken::new(),
1015 Some(metrics),
1016 )
1017 .expect("control-list updater receiver");
1018
1019 let mut got_allow = false;
1020 let mut got_block = false;
1021 for _ in 0..20 {
1022 let event =
1023 timeout(Duration::from_millis(60), receiver.recv()).await;
1024 if let Ok(Some(event)) = event {
1025 match event {
1026 Event::AllowListUpdated(_) => got_allow = true,
1027 Event::BlockListUpdated(_) => got_block = true,
1028 }
1029 }
1030 if got_allow && got_block {
1031 break;
1032 }
1033 }
1034
1035 assert!(got_allow, "allow-list update should be emitted");
1036 assert!(got_block, "block-list update should be emitted");
1037
1038 let mut text = String::new();
1039 encode(&mut text, ®istry).expect("encode metrics");
1040 assert!(
1041 metric_value(
1042 &text,
1043 "network_control_list_allow_update_success_total"
1044 ) >= 1.0
1045 );
1046 assert!(
1047 metric_value(
1048 &text,
1049 "network_control_list_block_update_success_total"
1050 ) >= 1.0
1051 );
1052
1053 cancel.cancel();
1054 server_stop.cancel();
1055 }
1056
1057 #[test(tokio::test)]
1058 #[serial]
1059 async fn cancellation_stops_updater_during_slow_requests() {
1060 let (url, server_stop) =
1061 spawn_slow_json_service(Duration::from_secs(2)).await;
1062 let cancel = CancellationToken::new();
1063
1064 let config = Config::default()
1065 .with_enable(true)
1066 .with_interval_request(Duration::from_millis(10))
1067 .with_request_timeout(Duration::from_secs(5))
1068 .with_max_concurrent_requests(1)
1069 .with_service_allow_list(vec![url.clone()])
1070 .with_service_block_list(vec![url]);
1071
1072 let mut receiver = build_control_lists_updaters(
1073 &config,
1074 cancel.clone(),
1075 CancellationToken::new(),
1076 None,
1077 )
1078 .expect("control-list updater receiver");
1079
1080 tokio::time::sleep(Duration::from_millis(40)).await;
1081 cancel.cancel();
1082
1083 let closed = timeout(Duration::from_secs(1), async {
1084 loop {
1085 if receiver.recv().await.is_none() {
1086 break;
1087 }
1088 }
1089 })
1090 .await;
1091
1092 assert!(
1093 closed.is_ok(),
1094 "updater should stop and close channel after cancellation"
1095 );
1096
1097 server_stop.cancel();
1098 }
1099}