1use libp2p::{
2 Multiaddr, PeerId,
3 swarm::{
4 CloseConnection, ConnectionDenied, NetworkBehaviour, ToSwarm, dummy,
5 },
6};
7use serde::{Deserialize, Deserializer, Serialize};
8use std::{
9 collections::{HashSet, VecDeque},
10 fmt,
11 pin::Pin,
12 str::FromStr,
13 sync::Arc,
14 task::Poll,
15 time::Duration,
16};
17use tokio::{
18 sync::mpsc::{self, Receiver},
19 time::{Instant, MissedTickBehavior, interval},
20};
21use tokio_util::sync::CancellationToken;
22use tracing::{debug, warn};
23
24use crate::{
25 RoutingNode, metrics::NetworkMetrics, utils::request_update_lists,
26};
27
28const TARGET: &str = "ave::network::control";
29const fn default_request_timeout() -> Duration {
30 Duration::from_secs(5)
31}
32const fn default_max_concurrent_requests() -> usize {
33 8
34}
35
36#[derive(Clone, Debug, Deserialize, Serialize)]
38#[serde(default)]
39pub struct Config {
40 enable: bool,
42
43 allow_list: Vec<String>,
45
46 block_list: Vec<String>,
48
49 service_allow_list: Vec<String>,
51
52 service_block_list: Vec<String>,
54
55 #[serde(deserialize_with = "deserialize_duration_secs")]
57 interval_request: Duration,
58
59 #[serde(
61 default = "default_request_timeout",
62 deserialize_with = "deserialize_duration_secs"
63 )]
64 request_timeout: Duration,
65
66 #[serde(default = "default_max_concurrent_requests")]
69 max_concurrent_requests: usize,
70}
71
72fn deserialize_duration_secs<'de, D>(
73 deserializer: D,
74) -> Result<Duration, D::Error>
75where
76 D: Deserializer<'de>,
77{
78 let u: u64 = u64::deserialize(deserializer)?;
79 Ok(Duration::from_secs(u))
80}
81
82impl Default for Config {
83 fn default() -> Self {
84 Self {
85 enable: Default::default(),
86 allow_list: Default::default(),
87 block_list: Default::default(),
88 service_allow_list: Default::default(),
89 service_block_list: Default::default(),
90 interval_request: Duration::from_secs(60),
91 request_timeout: default_request_timeout(),
92 max_concurrent_requests: default_max_concurrent_requests(),
93 }
94 }
95}
96
97impl Config {
99 pub const fn with_enable(mut self, enable: bool) -> Self {
101 self.enable = enable;
102 self
103 }
104
105 pub fn with_allow_list(mut self, allow_list: Vec<String>) -> Self {
107 self.allow_list = allow_list;
108 self
109 }
110
111 pub fn with_block_list(mut self, block_list: Vec<String>) -> Self {
113 self.block_list = block_list;
114 self
115 }
116
117 pub fn with_service_allow_list(
119 mut self,
120 service_allow_list: Vec<String>,
121 ) -> Self {
122 self.service_allow_list = service_allow_list;
123 self
124 }
125
126 pub fn with_service_block_list(
128 mut self,
129 service_block_list: Vec<String>,
130 ) -> Self {
131 self.service_block_list = service_block_list;
132 self
133 }
134
135 pub const fn with_interval_request(mut self, interval: Duration) -> Self {
137 self.interval_request = interval;
138 self
139 }
140
141 pub const fn with_request_timeout(mut self, timeout: Duration) -> Self {
143 self.request_timeout = timeout;
144 self
145 }
146
147 pub const fn with_max_concurrent_requests(mut self, value: usize) -> Self {
149 self.max_concurrent_requests = value;
150 self
151 }
152
153 pub const fn get_interval_request(&self) -> Duration {
155 self.interval_request
156 }
157
158 pub const fn get_request_timeout(&self) -> Duration {
160 self.request_timeout
161 }
162
163 pub const fn get_max_concurrent_requests(&self) -> usize {
166 self.max_concurrent_requests
167 }
168
169 pub const fn get_enable(&self) -> bool {
171 self.enable
172 }
173
174 pub fn get_allow_list(&self) -> Vec<String> {
176 self.allow_list.clone()
177 }
178
179 pub fn get_block_list(&self) -> Vec<String> {
181 self.block_list.clone()
182 }
183
184 pub fn get_service_allow_list(&self) -> Vec<String> {
186 self.service_allow_list.clone()
187 }
188 pub fn get_service_block_list(&self) -> Vec<String> {
190 self.service_block_list.clone()
191 }
192}
193
194pub fn build_control_lists_updaters(
195 config: &Config,
196 graceful_token: CancellationToken,
197 crash_token: CancellationToken,
198 metrics: Option<Arc<NetworkMetrics>>,
199) -> Option<Receiver<Event>> {
200 if config.enable {
201 debug!(target: TARGET, "control list enabled");
202
203 let (sender, receiver) = mpsc::channel(8);
204 let update_interval = config.interval_request;
205 let service_allow = config.service_allow_list.clone();
206 let service_block = config.service_block_list.clone();
207 let metrics_updater = metrics;
208 let request_timeout = config.request_timeout;
209 let max_concurrent_requests = config.max_concurrent_requests;
210
211 tokio::spawn(async move {
212 let client = match reqwest::Client::builder()
213 .connect_timeout(request_timeout)
214 .build()
215 {
216 Ok(client) => client,
217 Err(e) => {
218 warn!(target: TARGET, error = %e, "failed to build control-list http client, falling back to default client");
219 reqwest::Client::new()
220 }
221 };
222
223 let mut last_allow_success: Option<Instant> = None;
224 let mut last_block_success: Option<Instant> = None;
225 let mut ticker = interval(update_interval);
226 ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
227 ticker.tick().await;
229 loop {
230 tokio::select! {
231 _ = ticker.tick() => {
232 let started_at = Instant::now();
233 let (
234 (vec_allow_peers, vec_block_peers),
235 (successful_allow, successful_block),
236 ) = request_update_lists(
237 client.clone(),
238 service_allow.clone(),
239 service_block.clone(),
240 request_timeout,
241 max_concurrent_requests,
242 graceful_token.clone(),
243 crash_token.clone()
244 )
245 .await;
246 if let Some(metrics) = metrics_updater.as_deref() {
247 metrics.observe_control_list_updater_duration_seconds(
248 started_at.elapsed().as_secs_f64(),
249 );
250 }
251
252 let now = Instant::now();
253
254 if successful_allow != 0 {
256 if let Some(metrics) = metrics_updater.as_deref() {
257 metrics.observe_control_list_allow_update(true);
258 }
259 last_allow_success = Some(now);
260 if let Err(e) = sender.send(Event::AllowListUpdated(vec_allow_peers)).await {
261 debug!(target: TARGET, error = %e, "allow-list update dropped: channel closed");
262 }
263 } else {
264 if let Some(metrics) = metrics_updater.as_deref() {
265 metrics.observe_control_list_allow_update(false);
266 }
267 warn!(target: TARGET, "allow-list not updated: no service responded successfully");
268 }
269
270 if successful_block != 0 {
272 if let Some(metrics) = metrics_updater.as_deref() {
273 metrics.observe_control_list_block_update(true);
274 }
275 last_block_success = Some(now);
276 if let Err(e) = sender.send(Event::BlockListUpdated(vec_block_peers)).await {
277 debug!(target: TARGET, error = %e, "block-list update dropped: channel closed");
278 }
279 } else {
280 if let Some(metrics) = metrics_updater.as_deref() {
281 metrics.observe_control_list_block_update(false);
282 }
283 warn!(target: TARGET, "block-list not updated: no service responded successfully");
284 }
285
286 if let Some(metrics) = metrics_updater.as_deref() {
287 let allow_age = last_allow_success
288 .map_or(-1, |t| now.duration_since(t).as_secs() as i64);
289 metrics
290 .set_control_list_allow_last_success_age_seconds(allow_age);
291
292 let block_age = last_block_success
293 .map_or(-1, |t| now.duration_since(t).as_secs() as i64);
294 metrics
295 .set_control_list_block_last_success_age_seconds(block_age);
296 }
297 }
298 _ = graceful_token.clone().cancelled_owned() => {
299 debug!(target: TARGET, "control list updater stopped");
300 break;
301 }
302 _ = crash_token.clone().cancelled_owned() => {
303 debug!(target: TARGET, "control list updater stopped");
304 break;
305 }
306 };
307 }
308 });
309
310 Some(receiver)
311 } else {
312 None
313 }
314}
315
316#[derive(Default, Debug)]
317pub struct Behaviour {
318 allow_peers: HashSet<PeerId>,
319 block_peers: HashSet<PeerId>,
320 close_connections: VecDeque<PeerId>,
321 enable: bool,
322 receiver: Option<Receiver<Event>>,
323 metrics: Option<Arc<NetworkMetrics>>,
324}
325
326impl Behaviour {
327 pub fn new(
329 config: Config,
330 boot_nodes: &[RoutingNode],
331 receiver: Option<Receiver<Event>>,
332 metrics: Option<Arc<NetworkMetrics>>,
333 ) -> Self {
334 if config.enable {
335 let mut full_allow_list = config.allow_list.clone();
336 for node in boot_nodes {
337 full_allow_list.push(node.peer_id.clone());
338 }
339
340 let behaviour = Self {
341 enable: true,
342 allow_peers: HashSet::from_iter(
343 full_allow_list
344 .iter()
345 .filter_map(|e| PeerId::from_str(e).ok()),
346 ),
347 block_peers: HashSet::from_iter(
348 config
349 .block_list
350 .iter()
351 .filter_map(|e| PeerId::from_str(e).ok()),
352 ),
353 receiver,
354 metrics,
355 ..Default::default()
356 };
357
358 if let Some(metrics) = behaviour.metrics.as_deref() {
359 metrics.set_control_list_allow_peers(
360 behaviour.allow_peers.len() as i64,
361 );
362 metrics.set_control_list_block_peers(
363 behaviour.block_peers.len() as i64,
364 );
365 metrics.set_control_list_allow_last_success_age_seconds(-1);
366 metrics.set_control_list_block_last_success_age_seconds(-1);
367 }
368
369 behaviour
370 } else {
371 let behaviour = Self {
372 metrics,
373 ..Default::default()
374 };
375
376 if let Some(metrics) = behaviour.metrics.as_deref() {
377 metrics.set_control_list_allow_peers(0);
378 metrics.set_control_list_block_peers(0);
379 metrics.set_control_list_allow_last_success_age_seconds(-1);
380 metrics.set_control_list_block_last_success_age_seconds(-1);
381 }
382
383 behaviour
384 }
385 }
386
387 fn update_allow_peers(&mut self, new_list: &[String]) {
389 let new_list: HashSet<PeerId> = HashSet::from_iter(
391 new_list
392 .to_vec()
393 .iter()
394 .filter_map(|e| PeerId::from_str(e).ok()),
395 );
396
397 let close_peers: Vec<PeerId> =
398 self.allow_peers.difference(&new_list).cloned().collect();
399 self.close_connections.extend(close_peers);
400 self.allow_peers.clone_from(&new_list);
401 if let Some(metrics) = self.metrics.as_deref() {
402 metrics.inc_control_list_allow_apply();
403 metrics.set_control_list_allow_peers(self.allow_peers.len() as i64);
404 }
405 }
406
407 fn update_block_peers(&mut self, new_list: &[String]) {
409 let new_list: HashSet<PeerId> = HashSet::from_iter(
411 new_list
412 .to_vec()
413 .iter()
414 .filter_map(|e| PeerId::from_str(e).ok()),
415 );
416
417 self.close_connections.extend(new_list.clone());
418 self.block_peers.clone_from(&new_list);
419 if let Some(metrics) = self.metrics.as_deref() {
420 metrics.inc_control_list_block_apply();
421 metrics.set_control_list_block_peers(self.block_peers.len() as i64);
422 }
423 }
424
425 fn check_allow(&self, peer: &PeerId) -> Result<(), ConnectionDenied> {
427 if self.allow_peers.contains(peer) {
428 return Ok(());
429 }
430
431 if let Some(metrics) = &self.metrics {
432 metrics.observe_control_list_denied("not_allowed");
433 }
434 debug!(target: TARGET, peer_id = %peer, "connection denied: peer not in allow list");
435 Err(ConnectionDenied::new(NotAllowed { peer: *peer }))
436 }
437
438 fn check_block(&self, peer: &PeerId) -> Result<(), ConnectionDenied> {
440 if !self.block_peers.contains(peer) {
441 return Ok(());
442 }
443
444 if let Some(metrics) = &self.metrics {
445 metrics.observe_control_list_denied("blocked");
446 }
447 debug!(target: TARGET, peer_id = %peer, "connection denied: peer is blocked");
448 Err(ConnectionDenied::new(Blocked { peer: *peer }))
449 }
450
451 fn check_lists(&self, peer: &PeerId) -> Result<(), ConnectionDenied> {
453 if self.enable {
454 self.check_block(peer)?;
455 self.check_allow(peer)?;
456 }
457
458 Ok(())
459 }
460}
461
462#[derive(Debug)]
464pub struct NotAllowed {
465 peer: PeerId,
466}
467
468impl fmt::Display for NotAllowed {
469 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
470 write!(f, "peer {} is not in the allow list", self.peer)
471 }
472}
473
474impl std::error::Error for NotAllowed {}
475
476#[derive(Debug)]
478pub struct Blocked {
479 peer: PeerId,
480}
481
482impl fmt::Display for Blocked {
483 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
484 write!(f, "peer {} is in the block list", self.peer)
485 }
486}
487
488impl std::error::Error for Blocked {}
489
490#[derive(Debug)]
492pub enum Event {
493 AllowListUpdated(Vec<String>),
494 BlockListUpdated(Vec<String>),
495}
496
497impl NetworkBehaviour for Behaviour {
498 type ConnectionHandler = dummy::ConnectionHandler;
499 type ToSwarm = Event;
500
501 fn handle_established_inbound_connection(
502 &mut self,
503 _connection_id: libp2p::swarm::ConnectionId,
504 peer: PeerId,
505 _: &libp2p::Multiaddr,
506 _: &libp2p::Multiaddr,
507 ) -> Result<libp2p::swarm::THandler<Self>, ConnectionDenied> {
508 self.check_lists(&peer)?;
509
510 Ok(dummy::ConnectionHandler)
511 }
512
513 fn handle_pending_outbound_connection(
514 &mut self,
515 _: libp2p::swarm::ConnectionId,
516 peer: Option<PeerId>,
517 _: &[libp2p::Multiaddr],
518 _: libp2p::core::Endpoint,
519 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
520 if let Some(peer) = peer {
521 self.check_lists(&peer)?;
522 }
523
524 Ok(vec![])
525 }
526
527 fn handle_established_outbound_connection(
528 &mut self,
529 _: libp2p::swarm::ConnectionId,
530 peer: PeerId,
531 _: &libp2p::Multiaddr,
532 _: libp2p::core::Endpoint,
533 _: libp2p::core::transport::PortUse,
534 ) -> Result<libp2p::swarm::THandler<Self>, ConnectionDenied> {
535 self.check_lists(&peer)?;
536
537 Ok(dummy::ConnectionHandler)
538 }
539
540 fn on_swarm_event(&mut self, _: libp2p::swarm::FromSwarm) {}
541
542 fn on_connection_handler_event(
543 &mut self,
544 _: PeerId,
545 _: libp2p::swarm::ConnectionId,
546 _: libp2p::swarm::THandlerOutEvent<Self>,
547 ) {
548 }
549
550 fn poll(
551 &mut self,
552 cx: &mut std::task::Context<'_>,
553 ) -> std::task::Poll<
554 libp2p::swarm::ToSwarm<
555 Self::ToSwarm,
556 libp2p::swarm::THandlerInEvent<Self>,
557 >,
558 > {
559 let mut receiver_opt = self.receiver.take();
560 if let Some(mut rx) = receiver_opt.as_mut() {
561 let mut cx = std::task::Context::from_waker(cx.waker());
562 while let Poll::Ready(Some(event)) =
563 Pin::new(&mut rx).poll_recv(&mut cx)
564 {
565 match event {
566 Event::AllowListUpdated(items) => {
567 self.update_allow_peers(&items)
568 }
569 Event::BlockListUpdated(items) => {
570 self.update_block_peers(&items)
571 }
572 }
573 }
574 }
575
576 self.receiver = receiver_opt;
577
578 if let Some(peer) = self.close_connections.pop_front() {
579 return Poll::Ready(ToSwarm::CloseConnection {
580 peer_id: peer,
581 connection: CloseConnection::All,
582 });
583 }
584
585 Poll::Pending
586 }
587}
588
589#[cfg(test)]
590mod tests {
591 use futures::StreamExt;
592 use libp2p::{
593 Swarm,
594 swarm::{
595 ConnectionError, DialError, ListenError, SwarmEvent,
596 dial_opts::DialOpts,
597 },
598 };
599 use libp2p_swarm_test::SwarmExt;
600 use prometheus_client::{encoding::text::encode, registry::Registry};
601 use serial_test::serial;
602 use test_log::test;
603 use tokio::{io::AsyncWriteExt, net::TcpListener, time::timeout};
604
605 use super::*;
606
607 fn metric_value(metrics: &str, name: &str) -> f64 {
608 metrics
609 .lines()
610 .find_map(|line| {
611 if line.starts_with(name) {
612 line.split_whitespace().nth(1)?.parse::<f64>().ok()
613 } else {
614 None
615 }
616 })
617 .unwrap_or(0.0)
618 }
619
620 impl Behaviour {
621 pub fn block_peer(&mut self, peer: PeerId) {
622 self.block_peers.insert(peer);
623 self.close_connections.push_back(peer);
624 }
625
626 pub fn allow_peer(&mut self, peer: PeerId) {
627 self.allow_peers.insert(peer);
628 }
629 pub fn set_enable(&mut self, enable: bool) {
630 self.enable = enable;
631 }
632 }
633
634 fn dial(
635 dialer: &mut Swarm<Behaviour>,
636 listener: &Swarm<Behaviour>,
637 ) -> Result<(), DialError> {
638 dialer.dial(
639 DialOpts::peer_id(*listener.local_peer_id())
640 .addresses(listener.external_addresses().cloned().collect())
641 .build(),
642 )
643 }
644
645 fn build_behaviours() -> (Swarm<Behaviour>, Swarm<Behaviour>) {
646 let mut behaviour = Behaviour::default();
647 behaviour.set_enable(true);
648 let dialer = Swarm::new_ephemeral_tokio(|_| behaviour);
649
650 let mut behaviour = Behaviour::default();
651 behaviour.set_enable(true);
652 let listener = Swarm::new_ephemeral_tokio(|_| behaviour);
653
654 (dialer, listener)
655 }
656
657 async fn spawn_slow_json_service(
658 delay: Duration,
659 ) -> (String, CancellationToken) {
660 let listener = TcpListener::bind("127.0.0.1:0")
661 .await
662 .expect("bind slow service");
663 let addr = listener.local_addr().expect("local addr");
664 let stop = CancellationToken::new();
665 let stop_task = stop.clone();
666
667 tokio::spawn(async move {
668 loop {
669 tokio::select! {
670 _ = stop_task.cancelled() => break,
671 incoming = listener.accept() => {
672 let Ok((mut socket, _)) = incoming else {
673 break;
674 };
675 tokio::spawn(async move {
676 tokio::time::sleep(delay).await;
677 let response = b"HTTP/1.1 200 OK\r\ncontent-type: application/json\r\ncontent-length: 2\r\nconnection: close\r\n\r\n[]";
678 let _ = socket.write_all(response).await;
679 let _ = socket.shutdown().await;
680 });
681 }
682 }
683 }
684 });
685
686 (format!("http://{addr}/list"), stop)
687 }
688
689 #[test(tokio::test)]
690 #[serial]
691 async fn cannot_dial_blocked_peer() {
692 let (mut dialer, mut listener) = build_behaviours();
693
694 listener.listen().with_memory_addr_external().await;
695
696 dialer.behaviour_mut().block_peer(*listener.local_peer_id());
697
698 let DialError::Denied { cause } =
699 dial(&mut dialer, &listener).unwrap_err()
700 else {
701 panic!("unexpected dial error")
702 };
703 assert!(cause.downcast::<Blocked>().is_ok());
704 }
705
706 #[test(tokio::test)]
707 #[serial]
708 async fn cannot_dial_not_allowed_peer() {
709 let (mut dialer, mut listener) = build_behaviours();
710
711 listener.listen().with_memory_addr_external().await;
712
713 let DialError::Denied { cause } =
714 dial(&mut dialer, &listener).unwrap_err()
715 else {
716 panic!("unexpected dial error")
717 };
718 assert!(cause.downcast::<NotAllowed>().is_ok());
719 }
720
721 #[test(tokio::test)]
722 #[serial]
723 async fn can_dial_allowed_not_blocked_peer() {
724 let (mut dialer, mut listener) = build_behaviours();
725
726 listener.listen().with_memory_addr_external().await;
727
728 dialer.behaviour_mut().allow_peer(*listener.local_peer_id());
729
730 dial(&mut dialer, &listener).unwrap();
731 }
732
733 #[test(tokio::test)]
734 #[serial]
735 async fn cannot_dial_allowed_blocked_peer() {
736 let (mut dialer, mut listener) = build_behaviours();
737 listener.listen().with_memory_addr_external().await;
738
739 dialer.behaviour_mut().block_peer(*listener.local_peer_id());
740 dialer.behaviour_mut().allow_peer(*listener.local_peer_id());
741
742 let DialError::Denied { cause } =
743 dial(&mut dialer, &listener).unwrap_err()
744 else {
745 panic!("unexpected dial error")
746 };
747 assert!(cause.downcast::<Blocked>().is_ok());
748 }
749
750 #[test(tokio::test)]
751 #[serial]
752 async fn blocked_peer_cannot_dial_us() {
753 let (mut dialer, mut listener) = build_behaviours();
754 listener.listen().with_memory_addr_external().await;
755
756 dialer.behaviour_mut().allow_peer(*listener.local_peer_id());
757 listener.behaviour_mut().block_peer(*dialer.local_peer_id());
758
759 dial(&mut dialer, &listener).unwrap();
760 tokio::spawn(dialer.loop_on_next());
761
762 let cause = listener
763 .wait(|e| match e {
764 SwarmEvent::IncomingConnectionError {
765 error: ListenError::Denied { cause },
766 ..
767 } => Some(cause),
768 _ => None,
769 })
770 .await;
771 assert!(cause.downcast::<Blocked>().is_ok());
772 }
773
774 #[test(tokio::test)]
775 #[serial]
776 async fn not_allowed_peer_cannot_dial_us() {
777 let (mut dialer, mut listener) = build_behaviours();
778 listener.listen().with_memory_addr_external().await;
779
780 dialer.behaviour_mut().allow_peer(*listener.local_peer_id());
781
782 dial(&mut dialer, &listener).unwrap();
783
784 let listener_loop = async move {
785 loop {
786 match listener.select_next_some().await {
787 SwarmEvent::IncomingConnectionError { error, .. } => {
788 let ListenError::Denied { cause } = error else {
789 panic!("Invalid Error")
790 };
791 assert!(cause.downcast::<NotAllowed>().is_ok());
792 break;
793 }
794 _ => {}
795 }
796 }
797 };
798
799 let dialer_loop = async move {
800 loop {
801 match dialer.select_next_some().await {
802 SwarmEvent::ConnectionClosed { cause, .. } => {
803 if let Some(error) = cause {
804 match error {
805 ConnectionError::IO(e) => {
806 assert_eq!(
807 e.to_string(),
808 "Right(Io(Kind(BrokenPipe)))"
809 );
810 break;
811 }
812 _ => {
813 panic!("Invalid error");
814 }
815 }
816 } else {
817 panic!("Missing error");
818 };
819 }
820 _ => {}
821 }
822 }
823 };
824 tokio::task::spawn(Box::pin(dialer_loop));
825 listener_loop.await;
826 }
827
828 #[test(tokio::test)]
829 #[serial]
830 async fn connections_get_closed_upon_disallow() {
831 let (mut dialer, mut listener) = build_behaviours();
832 listener.listen().with_memory_addr_external().await;
833
834 dialer.behaviour_mut().allow_peer(*listener.local_peer_id());
835 listener.behaviour_mut().allow_peer(*dialer.local_peer_id());
836 let dialer_peer = *dialer.local_peer_id();
837
838 dial(&mut dialer, &listener).unwrap();
839
840 let listener_loop = async move {
841 loop {
842 match listener.select_next_some().await {
843 SwarmEvent::ConnectionEstablished { .. } => {
844 listener.behaviour_mut().block_peer(dialer_peer);
845 }
846 SwarmEvent::ConnectionClosed { .. } => {
847 break;
848 }
849 _ => {}
850 }
851 }
852 };
853
854 let dialer_loop = async move {
855 loop {
856 match dialer.select_next_some().await {
857 SwarmEvent::ConnectionEstablished { .. } => {}
858 SwarmEvent::ConnectionClosed { cause, .. } => {
859 if let Some(error) = cause {
860 match error {
861 ConnectionError::IO(e) => {
862 assert_eq!(e.to_string(), "Right(Closed)");
863 break;
864 }
865 _ => {
866 panic!("Invalid error");
867 }
868 }
869 } else {
870 panic!("Missing error");
871 };
872 }
873 _ => {}
874 }
875 }
876 };
877
878 tokio::task::spawn(Box::pin(dialer_loop));
879 listener_loop.await;
880 }
881
882 #[test]
883 fn control_list_denied_metrics_by_reason() {
884 let mut registry = Registry::default();
885 let metrics = crate::metrics::register(&mut registry);
886
887 let config = Config::default().with_enable(true);
888 let behaviour = Behaviour::new(config, &[], None, Some(metrics));
889
890 let blocked_peer = PeerId::random();
891 let not_allowed_peer = PeerId::random();
892
893 let mut behaviour = behaviour;
894 behaviour.block_peers.insert(blocked_peer);
895
896 let _ = behaviour.check_block(&blocked_peer);
897 let _ = behaviour.check_allow(¬_allowed_peer);
898
899 let mut text = String::new();
900 encode(&mut text, ®istry).expect("encode metrics");
901
902 assert_eq!(
903 metric_value(
904 &text,
905 "network_control_list_denied_total{reason=\"blocked\"}"
906 ),
907 1.0
908 );
909 assert_eq!(
910 metric_value(
911 &text,
912 "network_control_list_denied_total{reason=\"not_allowed\"}"
913 ),
914 1.0
915 );
916 }
917
918 #[test(tokio::test)]
919 #[serial]
920 async fn slow_services_timeout_without_emitting_updates() {
921 let (url, slow_server_stop) =
922 spawn_slow_json_service(Duration::from_millis(250)).await;
923 let mut registry = Registry::default();
924 let metrics = crate::metrics::register(&mut registry);
925 let cancel = CancellationToken::new();
926
927 let config = Config::default()
928 .with_enable(true)
929 .with_interval_request(Duration::from_millis(20))
930 .with_request_timeout(Duration::from_millis(30))
931 .with_max_concurrent_requests(1)
932 .with_service_allow_list(vec![url.clone()])
933 .with_service_block_list(vec![url]);
934
935 let mut receiver = build_control_lists_updaters(
936 &config,
937 cancel.clone(),
938 CancellationToken::new(),
939 Some(metrics),
940 )
941 .expect("control-list updater receiver");
942
943 tokio::time::sleep(Duration::from_millis(170)).await;
944
945 let next_event =
946 timeout(Duration::from_millis(50), receiver.recv()).await;
947 assert!(
948 next_event.is_err(),
949 "slow timed-out services should not emit list updates"
950 );
951
952 let mut text = String::new();
953 encode(&mut text, ®istry).expect("encode metrics");
954
955 assert!(
956 metric_value(
957 &text,
958 "network_control_list_updates_total{list=\"allow\",result=\"failure\"}"
959 ) >= 1.0
960 );
961 assert!(
962 metric_value(
963 &text,
964 "network_control_list_updates_total{list=\"block\",result=\"failure\"}"
965 ) >= 1.0
966 );
967 assert_eq!(
968 metric_value(
969 &text,
970 "network_control_list_updates_total{list=\"allow\",result=\"success\"}"
971 ),
972 0.0
973 );
974 assert_eq!(
975 metric_value(
976 &text,
977 "network_control_list_updates_total{list=\"block\",result=\"success\"}"
978 ),
979 0.0
980 );
981
982 cancel.cancel();
983 slow_server_stop.cancel();
984 }
985
986 #[test(tokio::test)]
987 #[serial]
988 async fn zero_max_concurrent_requests_is_treated_as_one() {
989 let (url, server_stop) =
990 spawn_slow_json_service(Duration::from_millis(1)).await;
991 let mut registry = Registry::default();
992 let metrics = crate::metrics::register(&mut registry);
993 let cancel = CancellationToken::new();
994
995 let config = Config::default()
996 .with_enable(true)
997 .with_interval_request(Duration::from_millis(20))
998 .with_request_timeout(Duration::from_millis(200))
999 .with_max_concurrent_requests(0)
1000 .with_service_allow_list(vec![url.clone()])
1001 .with_service_block_list(vec![url]);
1002
1003 let mut receiver = build_control_lists_updaters(
1004 &config,
1005 cancel.clone(),
1006 CancellationToken::new(),
1007 Some(metrics),
1008 )
1009 .expect("control-list updater receiver");
1010
1011 let mut got_allow = false;
1012 let mut got_block = false;
1013 for _ in 0..20 {
1014 let event =
1015 timeout(Duration::from_millis(60), receiver.recv()).await;
1016 if let Ok(Some(event)) = event {
1017 match event {
1018 Event::AllowListUpdated(_) => got_allow = true,
1019 Event::BlockListUpdated(_) => got_block = true,
1020 }
1021 }
1022 if got_allow && got_block {
1023 break;
1024 }
1025 }
1026
1027 assert!(got_allow, "allow-list update should be emitted");
1028 assert!(got_block, "block-list update should be emitted");
1029
1030 let mut text = String::new();
1031 encode(&mut text, ®istry).expect("encode metrics");
1032 assert!(
1033 metric_value(
1034 &text,
1035 "network_control_list_updates_total{list=\"allow\",result=\"success\"}"
1036 ) >= 1.0
1037 );
1038 assert!(
1039 metric_value(
1040 &text,
1041 "network_control_list_updates_total{list=\"block\",result=\"success\"}"
1042 ) >= 1.0
1043 );
1044
1045 cancel.cancel();
1046 server_stop.cancel();
1047 }
1048
1049 #[test(tokio::test)]
1050 #[serial]
1051 async fn cancellation_stops_updater_during_slow_requests() {
1052 let (url, server_stop) =
1053 spawn_slow_json_service(Duration::from_secs(2)).await;
1054 let cancel = CancellationToken::new();
1055
1056 let config = Config::default()
1057 .with_enable(true)
1058 .with_interval_request(Duration::from_millis(10))
1059 .with_request_timeout(Duration::from_secs(5))
1060 .with_max_concurrent_requests(1)
1061 .with_service_allow_list(vec![url.clone()])
1062 .with_service_block_list(vec![url]);
1063
1064 let mut receiver = build_control_lists_updaters(
1065 &config,
1066 cancel.clone(),
1067 CancellationToken::new(),
1068 None,
1069 )
1070 .expect("control-list updater receiver");
1071
1072 tokio::time::sleep(Duration::from_millis(40)).await;
1073 cancel.cancel();
1074
1075 let closed = timeout(Duration::from_secs(1), async {
1076 loop {
1077 if receiver.recv().await.is_none() {
1078 break;
1079 }
1080 }
1081 })
1082 .await;
1083
1084 assert!(
1085 closed.is_ok(),
1086 "updater should stop and close channel after cancellation"
1087 );
1088
1089 server_stop.cancel();
1090 }
1091}