1use std::{
59 collections::{HashMap, VecDeque, hash_map},
60 sync::{Arc, Mutex, Weak},
61 time::{Duration, SystemTime},
62};
63
64use scc::HashIndex;
65use scion_proto::{address::IsdAsn, path::Path, scmp::ScmpErrorMessage};
66use scion_sdk_utils::backoff::BackoffConfig;
67use tokio::sync::broadcast::{self};
68
69use crate::{
70 path::{
71 PathStrategy,
72 fetcher::{
73 PathFetcherImpl,
74 traits::{PathFetchError, PathFetcher},
75 },
76 manager::{
77 issues::{IssueKind, IssueMarker, IssueMarkerTarget, SendError},
78 pathset::{PathSet, PathSetHandle, PathSetTask},
79 traits::{PathManager, PathPrefetcher, PathWaitError, SyncPathManager},
80 },
81 types::PathManagerPath,
82 },
83 scionstack::{
84 ScionSocketSendError, scmp_handler::ScmpErrorReceiver, socket::SendErrorReceiver,
85 },
86};
87
88mod algo;
89mod issues;
92mod pathset;
94pub mod reliability;
96pub mod traits;
98
99#[derive(Debug, Clone, Copy)]
101pub struct MultiPathManagerConfig {
102 max_cached_paths_per_pair: usize,
104 refetch_interval: Duration,
106 min_refetch_delay: Duration,
108 min_expiry_threshold: Duration,
110 max_idle_period: Duration,
112 fetch_failure_backoff: BackoffConfig,
114 issue_cache_size: usize,
116 issue_broadcast_size: usize,
118 issue_deduplication_window: Duration,
120 path_swap_score_threshold: f32,
122}
123
124impl Default for MultiPathManagerConfig {
125 fn default() -> Self {
126 MultiPathManagerConfig {
127 max_cached_paths_per_pair: 50,
128 refetch_interval: Duration::from_secs(60 * 30), min_refetch_delay: Duration::from_secs(60),
130 min_expiry_threshold: Duration::from_secs(60 * 5), max_idle_period: Duration::from_secs(60 * 2), fetch_failure_backoff: BackoffConfig {
133 minimum_delay_secs: 60.0,
134 maximum_delay_secs: 300.0,
135 factor: 1.5,
136 jitter_secs: 5.0,
137 },
138 issue_cache_size: 100,
139 issue_broadcast_size: 10,
140 issue_deduplication_window: Duration::from_secs(10),
142 path_swap_score_threshold: 0.5,
143 }
144 }
145}
146
147impl MultiPathManagerConfig {
148 fn validate(&self) -> Result<(), &'static str> {
150 if self.min_refetch_delay > self.refetch_interval {
151 return Err("min_refetch_delay must be smaller than refetch_interval");
152 }
154
155 if self.min_refetch_delay > self.min_expiry_threshold {
156 return Err("min_refetch_delay must be smaller than min_expiry_threshold");
157 }
159
160 Ok(())
161 }
162}
163
164pub struct MultiPathManager<F: PathFetcher = PathFetcherImpl>(Arc<MultiPathManagerInner<F>>);
166
167impl<F> Clone for MultiPathManager<F>
168where
169 F: PathFetcher,
170{
171 fn clone(&self) -> Self {
172 MultiPathManager(self.0.clone())
173 }
174}
175
176struct MultiPathManagerInner<F: PathFetcher> {
177 config: MultiPathManagerConfig,
178 fetcher: F,
179 path_strategy: PathStrategy,
180 issue_manager: Mutex<PathIssueManager>,
181 managed_paths: HashIndex<(IsdAsn, IsdAsn), (PathSetHandle, PathSetTask)>,
182}
183
184impl<F: PathFetcher> MultiPathManager<F> {
185 pub fn new(
187 config: MultiPathManagerConfig,
188 fetcher: F,
189 path_strategy: PathStrategy,
190 ) -> Result<Self, &'static str> {
191 config.validate()?;
192
193 let issue_manager = Mutex::new(PathIssueManager::new(
194 config.issue_cache_size,
195 config.issue_broadcast_size,
196 config.issue_deduplication_window,
197 ));
198
199 Ok(MultiPathManager(Arc::new(MultiPathManagerInner {
200 config,
201 fetcher,
202 issue_manager,
203 path_strategy,
204 managed_paths: HashIndex::new(),
205 })))
206 }
207
208 pub fn try_path(&self, src: IsdAsn, dst: IsdAsn, now: SystemTime) -> Option<Path> {
214 let try_path = self
215 .0
216 .managed_paths
217 .peek_with(&(src, dst), |_, (handle, _)| {
218 handle.try_active_path().as_deref().map(|p| p.0.clone())
219 })
220 .flatten();
221
222 match try_path {
223 Some(active) => {
224 let expired = active.is_expired(now.into()).unwrap_or(true);
227 debug_assert!(!expired, "Returned expired path from try_get_path");
228
229 Some(active)
230 }
231 None => {
232 self.fast_ensure_managed_paths(src, dst);
234 None
235 }
236 }
237 }
238
239 pub async fn path(
246 &self,
247 src: IsdAsn,
248 dst: IsdAsn,
249 now: SystemTime,
250 ) -> Result<Path, Arc<PathFetchError>> {
251 let try_path = self
252 .0
253 .managed_paths
254 .peek_with(&(src, dst), |_, (handle, _)| {
255 handle.try_active_path().as_deref().map(|p| p.0.clone())
256 })
257 .flatten();
258
259 let res = match try_path {
260 Some(active) => Ok(active),
261 None => {
262 let path_set = self.ensure_managed_paths(src, dst);
264
265 let active = path_set.active_path().await.as_ref().map(|p| p.0.clone());
267
268 match active {
270 Some(active) => Ok(active),
271 None => {
272 let last_error = path_set.current_error();
274 match last_error {
275 Some(e) => Err(e),
276 None => {
277 Err(Arc::new(PathFetchError::NoPathsFound))
281 }
282 }
283 }
284 }
285 }
286 };
287
288 if let Ok(active) = &res {
289 let expired = active.is_expired(now.into()).unwrap_or(true);
292 debug_assert!(!expired, "Returned expired path from get_path");
293 }
294
295 res
296 }
297
298 pub fn weak_ref(&self) -> MultiPathManagerRef<F> {
300 MultiPathManagerRef(Arc::downgrade(&self.0))
301 }
302
303 fn fast_ensure_managed_paths(&self, src: IsdAsn, dst: IsdAsn) {
307 if self.0.managed_paths.contains(&(src, dst)) {
308 return;
309 }
310
311 self.ensure_managed_paths(src, dst);
312 }
313
314 fn ensure_managed_paths(&self, src: IsdAsn, dst: IsdAsn) -> PathSetHandle {
318 let entry = match self.0.managed_paths.entry_sync((src, dst)) {
319 scc::hash_index::Entry::Occupied(occupied) => {
320 tracing::trace!(%src, %dst, "Already managing paths for src-dst pair");
321 occupied
322 }
323 scc::hash_index::Entry::Vacant(vacant) => {
324 tracing::info!(%src, %dst, "Starting to manage paths for src-dst pair");
325 let managed = PathSet::new(
326 src,
327 dst,
328 self.weak_ref(),
329 self.0.config,
330 self.0.issue_manager.lock().unwrap().issues_subscriber(),
331 );
332
333 vacant.insert_entry(managed.manage())
334 }
335 };
336
337 entry.get().0.clone()
338 }
339
340 pub fn stop_managing_paths(&self, src: IsdAsn, dst: IsdAsn) {
342 if self.0.managed_paths.remove_sync(&(src, dst)) {
343 tracing::info!(%src, %dst, "Stopped managing paths for src-dst pair");
344 }
345 }
346
347 pub fn report_path_issue(&self, timestamp: SystemTime, issue: IssueKind, path: Option<&Path>) {
349 let Some(applies_to) = issue.target_type(path) else {
350 return;
352 };
353
354 if matches!(applies_to, IssueMarkerTarget::DestinationNetwork { .. }) {
355 return;
357 }
358
359 tracing::debug!(%issue, "New path issue");
360
361 let issue_marker = IssueMarker {
362 target: applies_to,
363 timestamp,
364 penalty: issue.penalty(),
365 };
366
367 {
369 let mut issues_guard = self.0.issue_manager.lock().unwrap();
370 issues_guard.add_issue(issue, issue_marker.clone());
371 }
372 }
373}
374
375impl<F: PathFetcher> ScmpErrorReceiver for MultiPathManager<F> {
376 fn report_scmp_error(&self, scmp_error: ScmpErrorMessage, path: &Path) {
377 self.report_path_issue(
378 SystemTime::now(),
379 IssueKind::Scmp { error: scmp_error },
380 Some(path),
381 );
382 }
383}
384
385impl<F: PathFetcher> SendErrorReceiver for MultiPathManager<F> {
386 fn report_send_error(&self, error: &ScionSocketSendError) {
387 if let Some(send_error) = SendError::from_socket_send_error(error) {
388 self.report_path_issue(
389 SystemTime::now(),
390 IssueKind::Socket { err: send_error },
391 None,
392 );
393 }
394 }
395}
396
397impl<F: PathFetcher> SyncPathManager for MultiPathManager<F> {
398 fn register_path(
399 &self,
400 _src: IsdAsn,
401 _dst: IsdAsn,
402 _now: chrono::DateTime<chrono::Utc>,
403 _path: Path<bytes::Bytes>,
404 ) {
405 }
409
410 fn try_cached_path(
411 &self,
412 src: IsdAsn,
413 dst: IsdAsn,
414 now: chrono::DateTime<chrono::Utc>,
415 ) -> std::io::Result<Option<Path<bytes::Bytes>>> {
416 Ok(self.try_path(src, dst, now.into()))
417 }
418}
419
420impl<F: PathFetcher> PathManager for MultiPathManager<F> {
421 fn path_wait(
422 &self,
423 src: IsdAsn,
424 dst: IsdAsn,
425 now: chrono::DateTime<chrono::Utc>,
426 ) -> impl crate::types::ResFut<'_, Path<bytes::Bytes>, PathWaitError> {
427 async move {
428 match self.path(src, dst, now.into()).await {
429 Ok(path) => Ok(path),
430 Err(e) => {
431 match &*e {
432 PathFetchError::FetchSegments(error) => {
433 Err(PathWaitError::FetchFailed(format!("{error}")))
434 }
435 PathFetchError::InternalError(msg) => {
436 Err(PathWaitError::FetchFailed(msg.to_string()))
437 }
438 PathFetchError::NoPathsFound => Err(PathWaitError::NoPathFound),
439 }
440 }
441 }
442 }
443 }
444}
445
446impl<F: PathFetcher> PathPrefetcher for MultiPathManager<F> {
447 fn prefetch_path(&self, src: IsdAsn, dst: IsdAsn) {
448 self.ensure_managed_paths(src, dst);
449 }
450}
451
452pub struct MultiPathManagerRef<F: PathFetcher>(Weak<MultiPathManagerInner<F>>);
456
457impl<F: PathFetcher> Clone for MultiPathManagerRef<F> {
458 fn clone(&self) -> Self {
459 MultiPathManagerRef(self.0.clone())
460 }
461}
462
463impl<F: PathFetcher> MultiPathManagerRef<F> {
464 pub fn get(&self) -> Option<MultiPathManager<F>> {
466 self.0.upgrade().map(|arc| MultiPathManager(arc))
467 }
468}
469
470struct PathIssueManager {
474 max_entries: usize,
476 deduplication_window: Duration,
477
478 cache: HashMap<u64, IssueMarker>,
481 fifo_issues: VecDeque<(u64, SystemTime)>,
483
484 issue_broadcast_tx: broadcast::Sender<(u64, IssueMarker)>,
486}
487
488impl PathIssueManager {
489 fn new(max_entries: usize, broadcast_buffer: usize, deduplication_window: Duration) -> Self {
490 let (issue_broadcast_tx, _) = broadcast::channel(broadcast_buffer);
491 PathIssueManager {
492 max_entries,
493 deduplication_window,
494 cache: HashMap::new(),
495 fifo_issues: VecDeque::new(),
496 issue_broadcast_tx,
497 }
498 }
499
500 pub fn issues_subscriber(&self) -> broadcast::Receiver<(u64, IssueMarker)> {
502 self.issue_broadcast_tx.subscribe()
503 }
504
505 pub fn add_issue(&mut self, issue: IssueKind, marker: IssueMarker) {
514 let id = issue.dedup_id(&marker.target);
515
516 if let Some(existing_marker) = self.cache.get(&id) {
518 let time_since_last_seen = marker
519 .timestamp
520 .duration_since(existing_marker.timestamp)
521 .unwrap_or_else(|_| Duration::from_secs(0));
522
523 if time_since_last_seen < self.deduplication_window {
524 tracing::trace!(%id, ?time_since_last_seen, ?marker, %issue, "Ignoring duplicate path issue");
525 return;
527 }
528 }
529
530 self.issue_broadcast_tx.send((id, marker.clone())).ok();
532
533 if self.cache.len() >= self.max_entries {
534 self.pop_front();
535 }
536
537 self.fifo_issues.push_back((id, marker.timestamp)); self.cache.insert(id, marker);
540 }
541
542 pub fn apply_cached_issues(&self, entry: &mut PathManagerPath, now: SystemTime) -> bool {
550 let mut applied = false;
551 for issue in self.cache.values() {
552 if issue.target.matches_path(&entry.path, &entry.fingerprint) {
553 entry.reliability.update(issue.decayed_penalty(now), now);
554 applied = true;
555 }
556 }
557 applied
558 }
559
560 fn pop_front(&mut self) -> Option<IssueMarker> {
562 let (issue_id, timestamp) = self.fifo_issues.pop_front()?;
563
564 match self.cache.entry(issue_id) {
565 hash_map::Entry::Occupied(occupied_entry) => {
566 match occupied_entry.get().timestamp == timestamp {
568 true => Some(occupied_entry.remove()),
569 false => None, }
571 }
572 hash_map::Entry::Vacant(_) => {
573 debug_assert!(false, "Bad cache: issue ID not found in cache");
574 None
575 }
576 }
577 }
578}
579
580#[cfg(test)]
581mod tests {
582 use helpers::*;
583 use tokio::time::timeout;
584
585 use super::*;
586
587 #[tokio::test]
589 #[test_log::test]
590 async fn should_create_pathset_on_request() {
591 let cfg = base_config();
592 let fetcher = MockFetcher::new(generate_responses(5, 0, BASE_TIME, DEFAULT_EXP_UNITS));
593
594 let mgr = MultiPathManager::new(cfg, fetcher, PathStrategy::default())
595 .expect("Should create manager");
596
597 assert!(mgr.0.managed_paths.is_empty());
599
600 let path = mgr.try_path(SRC_ADDR.isd_asn(), DST_ADDR.isd_asn(), BASE_TIME);
602 assert!(path.is_none());
604
605 assert!(
607 mgr.0
608 .managed_paths
609 .contains(&(SRC_ADDR.isd_asn(), DST_ADDR.isd_asn()))
610 );
611 }
612
613 #[tokio::test]
615 #[test_log::test]
616 async fn should_remove_idle_pathsets() {
617 let mut cfg = base_config();
618 cfg.max_idle_period = Duration::from_millis(10); let fetcher = MockFetcher::new(generate_responses(5, 0, BASE_TIME, DEFAULT_EXP_UNITS));
621
622 let mgr = MultiPathManager::new(cfg, fetcher, PathStrategy::default())
623 .expect("Should create manager");
624
625 let handle = mgr.ensure_managed_paths(SRC_ADDR.isd_asn(), DST_ADDR.isd_asn());
627
628 assert!(
630 mgr.0
631 .managed_paths
632 .contains(&(SRC_ADDR.isd_asn(), DST_ADDR.isd_asn()))
633 );
634
635 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
637
638 let contains = mgr
640 .0
641 .managed_paths
642 .contains(&(SRC_ADDR.isd_asn(), DST_ADDR.isd_asn()));
643
644 assert!(!contains, "Idle path set should be removed");
645
646 let err = handle.current_error();
647 assert!(
648 err.is_some(),
649 "Handle should report error after path set removal"
650 );
651 println!("Error after idle removal: {:?}", err);
652 assert!(
653 err.unwrap().to_string().contains("idle"),
654 "Error message should indicate idle removal"
655 );
656 }
657
658 #[tokio::test]
660 #[test_log::test]
661 async fn should_cancel_pathset_tasks_on_drop() {
662 let cfg: MultiPathManagerConfig = base_config();
663 let fetcher = MockFetcher::new(generate_responses(5, 0, BASE_TIME, DEFAULT_EXP_UNITS));
664
665 let mgr = MultiPathManager::new(cfg, fetcher, PathStrategy::default())
666 .expect("Should create manager");
667
668 let handle = mgr.ensure_managed_paths(SRC_ADDR.isd_asn(), DST_ADDR.isd_asn());
670 handle.wait_initialized().await;
671
672 let mut set_entry = mgr
673 .0
674 .managed_paths
675 .get_sync(&(SRC_ADDR.isd_asn(), DST_ADDR.isd_asn()))
676 .unwrap();
677
678 let task_handle = unsafe {
679 let swap_handle = tokio::spawn(async {});
682 std::mem::replace(&mut set_entry.get_mut().1._task, swap_handle)
683 };
684
685 let cancel_token = set_entry.get().1.cancel_token.clone();
686
687 let count = mgr.0.managed_paths.len();
688 assert_eq!(count, 1, "Should have 1 managed path set");
689
690 drop(mgr);
692 assert!(
694 cancel_token.is_cancelled(),
695 "Cancel token should be triggered"
696 );
697
698 timeout(Duration::from_millis(50), task_handle)
700 .await
701 .unwrap()
702 .unwrap();
703
704 let err = handle
705 .shared
706 .sync
707 .lock()
708 .unwrap()
709 .current_error
710 .clone()
711 .expect("Should have error after manager drop");
712
713 assert!(
716 err.to_string().contains("cancelled") || err.to_string().contains("dropped"),
717 "Error message should indicate cancellation or manager drop"
718 );
719 }
720
721 mod issue_handling {
722 use scc::HashIndex;
723 use scion_proto::address::{Asn, Isd};
724
725 use super::*;
726 use crate::path::{
727 manager::{MultiPathManagerInner, PathIssueManager, reliability::ReliabilityScore},
728 types::Score,
729 };
730
731 #[tokio::test]
734 #[test_log::test]
735 async fn should_ingest_issues_and_apply_to_existing_paths() {
736 let cfg = base_config();
737 let fetcher = MockFetcher::new(generate_responses(5, 0, BASE_TIME, DEFAULT_EXP_UNITS));
738 let (mgr, mut path_set) = manual_pathset(BASE_TIME, fetcher.clone(), cfg, None);
739
740 path_set.maintain(BASE_TIME, &mgr).await;
741
742 let first_path = &path_set.internal.cached_paths[0];
744 let first_fp = first_path.fingerprint;
745
746 let issue = IssueKind::Socket {
748 err: SendError::FirstHopUnreachable {
749 isd_asn: first_path.path.source(),
750 interface_id: first_path.path.first_hop_egress_interface().unwrap().id,
751 address: None,
752 msg: "test".into(),
753 },
754 };
755
756 let penalty = Score::new_clamped(-0.3);
757 let marker = IssueMarker {
758 target: issue.target_type(Some(&first_path.path)).unwrap(),
759 timestamp: BASE_TIME,
760 penalty,
761 };
762
763 {
764 let mut issues_guard = mgr.0.issue_manager.lock().unwrap();
765 issues_guard.add_issue(issue, marker);
767 assert!(!issues_guard.cache.is_empty(), "Issue should be in cache");
769 }
770 let recv_result = path_set.internal.issue_rx.recv().await;
772 path_set.handle_issue_rx(BASE_TIME, recv_result, &mgr);
773
774 let updated_path = path_set
776 .internal
777 .cached_paths
778 .iter()
779 .find(|e| e.fingerprint == first_fp)
780 .expect("Path should still exist");
781
782 let updated_score = updated_path.reliability.score(BASE_TIME).value();
783
784 assert!(
785 updated_score == penalty.value(),
786 "Path score should be updated by penalty. Expected: {}, Got: {}",
787 penalty.value(),
788 updated_score
789 );
790
791 let later_time = BASE_TIME + Duration::from_secs(30);
793 let decayed_score = updated_path.reliability.score(later_time).value();
794 assert!(
795 decayed_score > updated_score,
796 "Path score should recover over time. Updated: {}, Decayed: {}",
797 updated_score,
798 decayed_score
799 );
800 }
801
802 #[tokio::test]
803 #[test_log::test]
804 async fn should_deduplicate_issues_within_window() {
805 let cfg = base_config();
806 let mgr_inner = MultiPathManagerInner {
807 config: cfg,
808 fetcher: MockFetcher::new(Ok(vec![])),
809 path_strategy: PathStrategy::default(),
810 issue_manager: Mutex::new(PathIssueManager::new(64, 64, Duration::from_secs(10))),
811 managed_paths: HashIndex::new(),
812 };
813 let mgr = MultiPathManager(Arc::new(mgr_inner));
814
815 let issue_marker = IssueMarker {
816 target: IssueMarkerTarget::FirstHop {
817 isd_asn: SRC_ADDR.isd_asn(),
818 egress_interface: 1,
819 },
820 timestamp: BASE_TIME,
821 penalty: Score::new_clamped(-0.3),
822 };
823
824 let issue = IssueKind::Socket {
825 err: SendError::FirstHopUnreachable {
826 isd_asn: SRC_ADDR.isd_asn(),
827 interface_id: 1,
828 address: None,
829 msg: "test".into(),
830 },
831 };
832
833 mgr.0
835 .issue_manager
836 .lock()
837 .unwrap()
838 .add_issue(issue.clone(), issue_marker.clone());
839 let cache_size_1 = mgr.0.issue_manager.lock().unwrap().cache.len();
840 assert_eq!(cache_size_1, 1);
841
842 let issue_marker_2 = IssueMarker {
844 timestamp: BASE_TIME + Duration::from_secs(1), ..issue_marker.clone()
846 };
847 mgr.0
848 .issue_manager
849 .lock()
850 .unwrap()
851 .add_issue(issue.clone(), issue_marker_2);
852
853 let fifo_size = mgr.0.issue_manager.lock().unwrap().fifo_issues.len();
854 let cache_size_2 = mgr.0.issue_manager.lock().unwrap().cache.len();
855 assert_eq!(cache_size_2, 1, "Duplicate issue should be ignored");
856 assert_eq!(
857 fifo_size, 1,
858 "FIFO queue size should remain unchanged on duplicate issue"
859 );
860
861 let issue_marker_3 = IssueMarker {
863 timestamp: BASE_TIME + Duration::from_secs(11), ..issue_marker
865 };
866 mgr.0
867 .issue_manager
868 .lock()
869 .unwrap()
870 .add_issue(issue, issue_marker_3);
871
872 let fifo_size_3 = mgr.0.issue_manager.lock().unwrap().fifo_issues.len();
873 let cache_size_3 = mgr.0.issue_manager.lock().unwrap().cache.len();
874 assert_eq!(
875 cache_size_3, 1,
876 "Issue outside dedup window should update existing"
877 );
878 assert_eq!(
879 fifo_size_3, 2,
880 "FIFO queue size should increase for new issue outside dedup window"
881 );
882 }
883
884 #[tokio::test]
887 #[test_log::test]
888 async fn should_apply_issues_to_new_paths_on_fetch() {
889 let cfg = base_config();
890 let fetcher = MockFetcher::new(Ok(vec![]));
891 let (mgr, mut path_set) = manual_pathset(BASE_TIME, fetcher.clone(), cfg, None);
892
893 path_set.maintain(BASE_TIME, &mgr).await;
894
895 let issue_marker = IssueMarker {
897 target: IssueMarkerTarget::FirstHop {
898 isd_asn: SRC_ADDR.isd_asn(),
899 egress_interface: 1,
900 },
901 timestamp: BASE_TIME,
902 penalty: Score::new_clamped(-0.5),
903 };
904
905 let issue = IssueKind::Socket {
906 err: SendError::FirstHopUnreachable {
907 isd_asn: SRC_ADDR.isd_asn(),
908 interface_id: 1,
909 address: None,
910 msg: "test".into(),
911 },
912 };
913
914 mgr.0
916 .issue_manager
917 .lock()
918 .unwrap()
919 .add_issue(issue, issue_marker);
920
921 path_set.drain_and_apply_issue_channel(BASE_TIME);
923
924 fetcher.lock().unwrap().set_response(generate_responses(
926 3,
927 0,
928 BASE_TIME + Duration::from_secs(1),
929 DEFAULT_EXP_UNITS,
930 ));
931
932 let next_refetch = path_set.internal.next_refetch;
933 path_set.maintain(next_refetch, &mgr).await;
934
935 let affected_path = path_set
937 .internal
938 .cached_paths
939 .first()
940 .expect("Path should exist");
941
942 let score = affected_path
943 .reliability
944 .score(BASE_TIME + Duration::from_secs(1))
945 .value();
946 assert!(
947 score < 0.0,
948 "Newly fetched path should have cached issue applied. Score: {}",
949 score
950 );
951 }
952
953 #[tokio::test]
955 #[test_log::test]
956 async fn should_trigger_active_path_reevaluation_on_issue() {
957 let cfg = base_config();
958 let fetcher = MockFetcher::new(generate_responses(5, 0, BASE_TIME, DEFAULT_EXP_UNITS));
959 let (mgr, mut path_set) = manual_pathset(BASE_TIME, fetcher.clone(), cfg, None);
960
961 path_set.maintain(BASE_TIME, &mgr).await;
962
963 let active_fp = path_set.shared.active_path.load().as_ref().unwrap().1;
964
965 let issue_marker = IssueMarker {
967 target: IssueMarkerTarget::FullPath {
968 fingerprint: active_fp,
969 },
970 timestamp: BASE_TIME,
971 penalty: Score::new_clamped(-1.0), };
973
974 let issue = IssueKind::Socket {
975 err: SendError::FirstHopUnreachable {
976 isd_asn: SRC_ADDR.isd_asn(),
977 interface_id: 1,
978 address: None,
979 msg: "test".into(),
980 },
981 };
982
983 mgr.0
985 .issue_manager
986 .lock()
987 .unwrap()
988 .add_issue(issue, issue_marker);
989
990 let recv_result = path_set.internal.issue_rx.recv().await;
992 path_set.handle_issue_rx(BASE_TIME, recv_result, &mgr);
993
994 let new_active_fp = path_set.shared.active_path.load().as_ref().unwrap().1;
996 assert_ne!(
997 active_fp, new_active_fp,
998 "Active path should change when severely penalized"
999 );
1000 }
1001
1002 #[tokio::test]
1003 #[test_log::test]
1004 async fn should_swap_to_better_path_if_one_appears() {
1005 let cfg = base_config();
1006 let fetcher = MockFetcher::new(generate_responses(1, 0, BASE_TIME, DEFAULT_EXP_UNITS));
1007 let (mgr, mut path_set) = manual_pathset(BASE_TIME, fetcher.clone(), cfg, None);
1008
1009 path_set.maintain(BASE_TIME, &mgr).await;
1010
1011 path_set
1013 .shared
1014 .was_used_in_idle_period
1015 .store(true, std::sync::atomic::Ordering::Relaxed);
1016
1017 let active_fp = path_set.shared.active_path.load().as_ref().unwrap().1;
1018
1019 let issue_marker = IssueMarker {
1021 target: IssueMarkerTarget::FullPath {
1022 fingerprint: active_fp,
1023 },
1024 timestamp: BASE_TIME,
1025 penalty: Score::new_clamped(-0.8),
1026 };
1027
1028 mgr.0.issue_manager.lock().unwrap().add_issue(
1029 IssueKind::Socket {
1030 err: SendError::FirstHopUnreachable {
1031 isd_asn: SRC_ADDR.isd_asn(),
1032 interface_id: 1,
1033 address: None,
1034 msg: "test".into(),
1035 },
1036 },
1037 issue_marker,
1038 );
1039
1040 let active_fp_after_issue = path_set.shared.active_path.load().as_ref().unwrap().1;
1042 assert_eq!(
1043 active_fp, active_fp_after_issue,
1044 "Active path should remain the same if no better path exists"
1045 );
1046
1047 fetcher.lock().unwrap().set_response(generate_responses(
1049 1,
1050 100,
1051 BASE_TIME + Duration::from_secs(1),
1052 DEFAULT_EXP_UNITS,
1053 ));
1054
1055 path_set
1056 .maintain(path_set.internal.next_refetch, &mgr)
1057 .await;
1058 path_set
1060 .shared
1061 .was_used_in_idle_period
1062 .store(true, std::sync::atomic::Ordering::Relaxed);
1063
1064 let new_active_fp = path_set.shared.active_path.load().as_ref().unwrap().1;
1066 assert_ne!(
1067 active_fp, new_active_fp,
1068 "Active path should change when a better path appears"
1069 );
1070
1071 let positive_score = Score::new_clamped(0.8);
1073 let mut reliability = ReliabilityScore::new_with_time(path_set.internal.next_refetch);
1074 reliability.update(positive_score, path_set.internal.next_refetch);
1075
1076 path_set
1078 .internal
1079 .cached_paths
1080 .iter_mut()
1081 .find(|e| e.fingerprint == active_fp)
1082 .unwrap()
1083 .reliability = reliability;
1084
1085 path_set
1086 .maintain(path_set.internal.next_refetch, &mgr)
1087 .await;
1088
1089 assert_eq!(
1090 active_fp,
1091 path_set.shared.active_path.load().as_ref().unwrap().1,
1092 "Active path should change on positive score diff"
1093 );
1094 }
1095
1096 #[tokio::test]
1097 #[test_log::test]
1098 async fn should_keep_max_issue_cache_size() {
1099 let max_size = 10;
1100 let mut issue_mgr = PathIssueManager::new(max_size, 64, Duration::from_secs(10));
1101
1102 for i in 0..20u16 {
1104 let issue_marker = IssueMarker {
1105 target: IssueMarkerTarget::FirstHop {
1106 isd_asn: IsdAsn::new(Isd(1), Asn(1)),
1107 egress_interface: i,
1108 },
1109 timestamp: BASE_TIME + Duration::from_secs(i as u64),
1110 penalty: Score::new_clamped(-0.1),
1111 };
1112
1113 let issue = IssueKind::Socket {
1114 err: SendError::FirstHopUnreachable {
1115 isd_asn: IsdAsn::new(Isd(1), Asn(1)),
1116 interface_id: i,
1117 address: None,
1118 msg: "test".into(),
1119 },
1120 };
1121
1122 issue_mgr.add_issue(issue, issue_marker);
1123 }
1124
1125 assert!(
1127 issue_mgr.cache.len() <= max_size,
1128 "Cache size {} should not exceed max {}",
1129 issue_mgr.cache.len(),
1130 max_size
1131 );
1132
1133 assert_eq!(issue_mgr.cache.len(), issue_mgr.fifo_issues.len());
1135 }
1136 }
1137
1138 pub mod helpers {
1139 use std::{
1140 hash::{DefaultHasher, Hash, Hasher},
1141 net::{IpAddr, Ipv4Addr},
1142 sync::{Arc, Mutex},
1143 time::{Duration, SystemTime},
1144 };
1145
1146 use scion_proto::{
1147 address::{Asn, EndhostAddr, Isd, IsdAsn},
1148 path::{Path, test_builder::TestPathBuilder},
1149 };
1150 use tokio::sync::Notify;
1151
1152 use super::*;
1153 use crate::path::manager::{MultiPathManagerInner, PathIssueManager, pathset::PathSet};
1154
1155 pub const SRC_ADDR: EndhostAddr = EndhostAddr::new(
1156 IsdAsn::new(Isd(1), Asn(1)),
1157 IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
1158 );
1159 pub const DST_ADDR: EndhostAddr = EndhostAddr::new(
1160 IsdAsn::new(Isd(2), Asn(1)),
1161 IpAddr::V4(Ipv4Addr::new(127, 0, 0, 2)),
1162 );
1163
1164 pub const DEFAULT_EXP_UNITS: u8 = 100;
1165 pub const BASE_TIME: SystemTime = SystemTime::UNIX_EPOCH;
1166
1167 pub fn dummy_path(hop_count: u16, timestamp: u32, exp_units: u8, seed: u32) -> Path {
1168 let mut builder: TestPathBuilder = TestPathBuilder::new(SRC_ADDR, DST_ADDR)
1169 .using_info_timestamp(timestamp)
1170 .with_hop_expiry(exp_units)
1171 .up();
1172
1173 builder = builder.add_hop(0, 1);
1174
1175 for cnt in 0..hop_count {
1176 let mut hash = DefaultHasher::new();
1177 seed.hash(&mut hash);
1178 cnt.hash(&mut hash);
1179 let hash = hash.finish() as u32;
1180
1181 let hop = hash.saturating_sub(2) as u16; builder = builder.with_asn(hash).add_hop(hop + 1, hop + 2);
1183 }
1184
1185 builder = builder.add_hop(1, 0);
1186
1187 builder.build(timestamp).path()
1188 }
1189
1190 pub fn base_config() -> MultiPathManagerConfig {
1191 MultiPathManagerConfig {
1192 max_cached_paths_per_pair: 5,
1193 refetch_interval: Duration::from_secs(100),
1194 min_refetch_delay: Duration::from_secs(1),
1195 min_expiry_threshold: Duration::from_secs(5),
1196 max_idle_period: Duration::from_secs(30),
1197 fetch_failure_backoff: BackoffConfig {
1198 minimum_delay_secs: 1.0,
1199 maximum_delay_secs: 10.0,
1200 factor: 2.0,
1201 jitter_secs: 0.0,
1202 },
1203 issue_cache_size: 64,
1204 issue_broadcast_size: 64,
1205 issue_deduplication_window: Duration::from_secs(10),
1206 path_swap_score_threshold: 0.1,
1207 }
1208 }
1209
1210 pub fn generate_responses(
1211 path_count: u16,
1212 path_seed: u32,
1213 timestamp: SystemTime,
1214 exp_units: u8,
1215 ) -> Result<Vec<Path>, String> {
1216 let mut paths = Vec::new();
1217 for resp_id in 0..path_count {
1218 paths.push(dummy_path(
1219 2,
1220 timestamp
1221 .duration_since(SystemTime::UNIX_EPOCH)
1222 .unwrap()
1223 .as_secs() as u32,
1224 exp_units,
1225 path_seed + resp_id as u32,
1226 ));
1227 }
1228
1229 Ok(paths)
1230 }
1231
1232 pub struct MockFetcher {
1233 next_response: Result<Vec<Path>, String>,
1234 pub received_requests: usize,
1235 pub wait_till_notify: bool,
1236 pub notify_to_resolve: Arc<Notify>,
1237 }
1238 impl MockFetcher {
1239 pub fn new(response: Result<Vec<Path>, String>) -> Arc<Mutex<Self>> {
1240 Arc::new(Mutex::new(Self {
1241 next_response: response,
1242 received_requests: 0,
1243 wait_till_notify: false,
1244 notify_to_resolve: Arc::new(Notify::new()),
1245 }))
1246 }
1247
1248 pub fn set_response(&mut self, response: Result<Vec<Path>, String>) {
1249 self.next_response = response;
1250 }
1251
1252 pub fn wait_till_notify(&mut self, wait: bool) {
1253 self.wait_till_notify = wait;
1254 }
1255
1256 pub fn notify(&self) {
1257 self.notify_to_resolve.notify_waiters();
1258 }
1259 }
1260
1261 impl PathFetcher for Arc<Mutex<MockFetcher>> {
1262 async fn fetch_paths(
1263 &self,
1264 _src: IsdAsn,
1265 _dst: IsdAsn,
1266 ) -> Result<Vec<Path>, PathFetchError> {
1267 let response;
1268 let notify = {
1270 let mut guard = self.lock().unwrap();
1271
1272 guard.received_requests += 1;
1273 response = guard.next_response.clone();
1274
1275 if guard.wait_till_notify {
1277 let notif = guard.notify_to_resolve.clone().notified_owned();
1278 Some(notif)
1279 } else {
1280 None
1281 }
1282 };
1283
1284 if let Some(notif) = notify {
1285 notif.await;
1286 }
1287
1288 match response {
1289 Ok(paths) if paths.is_empty() => Err(PathFetchError::NoPathsFound),
1290 Ok(paths) => Ok(paths),
1291 Err(e) => Err(PathFetchError::InternalError(e.into())),
1292 }
1293 }
1294 }
1295
1296 pub fn manual_pathset<F: PathFetcher>(
1297 now: SystemTime,
1298 fetcher: F,
1299 cfg: MultiPathManagerConfig,
1300 strategy: Option<PathStrategy>,
1301 ) -> (MultiPathManager<F>, PathSet<F>) {
1302 let mgr_inner = MultiPathManagerInner {
1303 config: cfg,
1304 fetcher,
1305 path_strategy: strategy.unwrap_or_else(|| {
1306 let mut ps = PathStrategy::default();
1307 ps.scoring.use_default_scorers();
1308 ps
1309 }),
1310 issue_manager: Mutex::new(PathIssueManager::new(64, 64, Duration::from_secs(10))),
1311 managed_paths: HashIndex::new(),
1312 };
1313 let mgr = MultiPathManager(Arc::new(mgr_inner));
1314 let issue_rx = mgr.0.issue_manager.lock().unwrap().issues_subscriber();
1315 let mgr_ref = mgr.weak_ref();
1316 (
1317 mgr,
1318 PathSet::new_with_time(
1319 SRC_ADDR.isd_asn(),
1320 DST_ADDR.isd_asn(),
1321 mgr_ref,
1322 cfg,
1323 issue_rx,
1324 now,
1325 ),
1326 )
1327 }
1328 }
1329}