1use std::{
59 collections::{HashMap, VecDeque, hash_map},
60 sync::{Arc, Mutex, Weak},
61 time::{Duration, SystemTime},
62};
63
64use scc::HashIndex;
65use scion_proto::{address::IsdAsn, packet::ByEndpoint, path::Path, scmp::ScmpErrorMessage};
66use scion_sdk_utils::backoff::BackoffConfig;
67use tokio::sync::broadcast::{self};
68
69use crate::{
70 path::{
71 PathStrategy,
72 fetcher::{
73 PathFetcherImpl,
74 traits::{PathFetchError, PathFetcher},
75 },
76 manager::{
77 issues::{IssueKind, IssueMarker, IssueMarkerTarget, SendError},
78 pathset::{PathSet, PathSetHandle, PathSetTask},
79 traits::{PathManager, PathPrefetcher, PathWaitError, SyncPathManager},
80 },
81 types::PathManagerPath,
82 },
83 scionstack::{
84 ScionSocketSendError, scmp_handler::ScmpErrorReceiver, socket::SendErrorReceiver,
85 },
86};
87
88mod algo;
89mod issues;
92mod pathset;
94pub mod reliability;
96pub mod traits;
98
99#[derive(Debug, Clone, Copy)]
101pub struct MultiPathManagerConfig {
102 max_cached_paths_per_pair: usize,
104 refetch_interval: Duration,
106 min_refetch_delay: Duration,
108 min_expiry_threshold: Duration,
110 max_idle_period: Duration,
112 fetch_failure_backoff: BackoffConfig,
114 issue_cache_size: usize,
116 issue_broadcast_size: usize,
118 issue_deduplication_window: Duration,
120 path_swap_score_threshold: f32,
122}
123
124impl Default for MultiPathManagerConfig {
125 fn default() -> Self {
126 MultiPathManagerConfig {
127 max_cached_paths_per_pair: 50,
128 refetch_interval: Duration::from_secs(60 * 30), min_refetch_delay: Duration::from_secs(60),
130 min_expiry_threshold: Duration::from_secs(60 * 5), max_idle_period: Duration::from_secs(60 * 2), fetch_failure_backoff: BackoffConfig {
133 minimum_delay_secs: 60.0,
134 maximum_delay_secs: 300.0,
135 factor: 1.5,
136 jitter_secs: 5.0,
137 },
138 issue_cache_size: 100,
139 issue_broadcast_size: 10,
140 issue_deduplication_window: Duration::from_secs(10),
142 path_swap_score_threshold: 0.5,
143 }
144 }
145}
146
147impl MultiPathManagerConfig {
148 fn validate(&self) -> Result<(), &'static str> {
150 if self.min_refetch_delay > self.refetch_interval {
151 return Err("min_refetch_delay must be smaller than refetch_interval");
152 }
154
155 if self.min_refetch_delay > self.min_expiry_threshold {
156 return Err("min_refetch_delay must be smaller than min_expiry_threshold");
157 }
159
160 Ok(())
161 }
162}
163
164pub struct MultiPathManager<F: PathFetcher = PathFetcherImpl>(Arc<MultiPathManagerInner<F>>);
166
167impl<F> Clone for MultiPathManager<F>
168where
169 F: PathFetcher,
170{
171 fn clone(&self) -> Self {
172 MultiPathManager(self.0.clone())
173 }
174}
175
176struct MultiPathManagerInner<F: PathFetcher> {
177 config: MultiPathManagerConfig,
178 fetcher: F,
179 path_strategy: PathStrategy,
180 issue_manager: Mutex<PathIssueManager>,
181 managed_paths: HashIndex<(IsdAsn, IsdAsn), (PathSetHandle, PathSetTask)>,
182}
183
184impl<F: PathFetcher> MultiPathManager<F> {
185 pub fn new(
187 config: MultiPathManagerConfig,
188 fetcher: F,
189 path_strategy: PathStrategy,
190 ) -> Result<Self, &'static str> {
191 config.validate()?;
192
193 let issue_manager = Mutex::new(PathIssueManager::new(
194 config.issue_cache_size,
195 config.issue_broadcast_size,
196 config.issue_deduplication_window,
197 ));
198
199 Ok(MultiPathManager(Arc::new(MultiPathManagerInner {
200 config,
201 fetcher,
202 issue_manager,
203 path_strategy,
204 managed_paths: HashIndex::new(),
205 })))
206 }
207
208 pub fn try_path(&self, src: IsdAsn, dst: IsdAsn, now: SystemTime) -> Option<Path> {
214 let try_path = self
215 .0
216 .managed_paths
217 .peek_with(&(src, dst), |_, (handle, _)| {
218 handle.try_active_path().as_deref().map(|p| p.0.clone())
219 })
220 .flatten();
221
222 match try_path {
223 Some(active) => {
224 let expired = active.is_expired(now.into()).unwrap_or(true);
227 debug_assert!(!expired, "Returned expired path from try_get_path");
228
229 Some(active)
230 }
231 None => {
232 self.fast_ensure_managed_paths(src, dst);
234 None
235 }
236 }
237 }
238
239 pub async fn path(
246 &self,
247 src: IsdAsn,
248 dst: IsdAsn,
249 now: SystemTime,
250 ) -> Result<Path, Arc<PathFetchError>> {
251 if src == dst {
252 return Ok(Path::empty(ByEndpoint {
253 source: src,
254 destination: dst,
255 }));
256 }
257
258 let try_path = self
259 .0
260 .managed_paths
261 .peek_with(&(src, dst), |_, (handle, _)| {
262 handle.try_active_path().as_deref().map(|p| p.0.clone())
263 })
264 .flatten();
265
266 let res = match try_path {
267 Some(active) => Ok(active),
268 None => {
269 let path_set = self.ensure_managed_paths(src, dst);
271
272 let active = path_set.active_path().await.as_ref().map(|p| p.0.clone());
274
275 match active {
277 Some(active) => Ok(active),
278 None => {
279 let last_error = path_set.current_error();
281 match last_error {
282 Some(e) => Err(e),
283 None => {
284 Err(Arc::new(PathFetchError::NoPathsFound))
288 }
289 }
290 }
291 }
292 }
293 };
294
295 if let Ok(active) = &res {
296 let expired = active.is_expired(now.into()).unwrap_or(true);
299 debug_assert!(!expired, "Returned expired path from get_path");
300 }
301
302 res
303 }
304
305 pub fn weak_ref(&self) -> MultiPathManagerRef<F> {
307 MultiPathManagerRef(Arc::downgrade(&self.0))
308 }
309
310 fn fast_ensure_managed_paths(&self, src: IsdAsn, dst: IsdAsn) {
314 if self.0.managed_paths.contains(&(src, dst)) {
315 return;
316 }
317
318 self.ensure_managed_paths(src, dst);
319 }
320
321 fn ensure_managed_paths(&self, src: IsdAsn, dst: IsdAsn) -> PathSetHandle {
325 let entry = match self.0.managed_paths.entry_sync((src, dst)) {
326 scc::hash_index::Entry::Occupied(occupied) => {
327 tracing::trace!(%src, %dst, "Already managing paths for src-dst pair");
328 occupied
329 }
330 scc::hash_index::Entry::Vacant(vacant) => {
331 tracing::info!(%src, %dst, "Starting to manage paths for src-dst pair");
332 let managed = PathSet::new(
333 src,
334 dst,
335 self.weak_ref(),
336 self.0.config,
337 self.0
338 .issue_manager
339 .lock()
340 .expect("lock poisoned")
341 .issues_subscriber(),
342 );
343
344 vacant.insert_entry(managed.manage())
345 }
346 };
347
348 entry.get().0.clone()
349 }
350
351 pub fn stop_managing_paths(&self, src: IsdAsn, dst: IsdAsn) {
353 if self.0.managed_paths.remove_sync(&(src, dst)) {
354 tracing::info!(%src, %dst, "Stopped managing paths for src-dst pair");
355 }
356 }
357
358 pub fn report_path_issue(&self, timestamp: SystemTime, issue: IssueKind, path: Option<&Path>) {
360 let Some(applies_to) = issue.target_type(path) else {
361 return;
363 };
364
365 if matches!(applies_to, IssueMarkerTarget::DestinationNetwork { .. }) {
366 return;
368 }
369
370 tracing::debug!(%issue, "New path issue");
371
372 let issue_marker = IssueMarker {
373 target: applies_to,
374 timestamp,
375 penalty: issue.penalty(),
376 };
377
378 {
380 let mut issues_guard = self.0.issue_manager.lock().expect("lock poisoned");
381 issues_guard.add_issue(issue, issue_marker.clone());
382 }
383 }
384}
385
386impl<F: PathFetcher> ScmpErrorReceiver for MultiPathManager<F> {
387 fn report_scmp_error(&self, scmp_error: ScmpErrorMessage, path: &Path) {
388 self.report_path_issue(
389 SystemTime::now(),
390 IssueKind::Scmp { error: scmp_error },
391 Some(path),
392 );
393 }
394}
395
396impl<F: PathFetcher> SendErrorReceiver for MultiPathManager<F> {
397 fn report_send_error(&self, error: &ScionSocketSendError) {
398 if let Some(send_error) = SendError::from_socket_send_error(error) {
399 self.report_path_issue(
400 SystemTime::now(),
401 IssueKind::Socket { err: send_error },
402 None,
403 );
404 }
405 }
406}
407
408impl<F: PathFetcher> SyncPathManager for MultiPathManager<F> {
409 fn register_path(
410 &self,
411 _src: IsdAsn,
412 _dst: IsdAsn,
413 _now: chrono::DateTime<chrono::Utc>,
414 _path: Path<bytes::Bytes>,
415 ) {
416 }
420
421 fn try_cached_path(
422 &self,
423 src: IsdAsn,
424 dst: IsdAsn,
425 now: chrono::DateTime<chrono::Utc>,
426 ) -> std::io::Result<Option<Path<bytes::Bytes>>> {
427 Ok(self.try_path(src, dst, now.into()))
428 }
429}
430
431impl<F: PathFetcher> PathManager for MultiPathManager<F> {
432 fn path_wait(
433 &self,
434 src: IsdAsn,
435 dst: IsdAsn,
436 now: chrono::DateTime<chrono::Utc>,
437 ) -> impl crate::types::ResFut<'_, Path<bytes::Bytes>, PathWaitError> {
438 async move {
439 match self.path(src, dst, now.into()).await {
440 Ok(path) => Ok(path),
441 Err(e) => {
442 match &*e {
443 PathFetchError::FetchSegments(error) => {
444 Err(PathWaitError::FetchFailed(format!("{error}")))
445 }
446 PathFetchError::InternalError(msg) => {
447 Err(PathWaitError::FetchFailed(msg.to_string()))
448 }
449 PathFetchError::NoPathsFound => Err(PathWaitError::NoPathFound),
450 }
451 }
452 }
453 }
454 }
455}
456
457impl<F: PathFetcher> PathPrefetcher for MultiPathManager<F> {
458 fn prefetch_path(&self, src: IsdAsn, dst: IsdAsn) {
459 self.ensure_managed_paths(src, dst);
460 }
461}
462
463pub struct MultiPathManagerRef<F: PathFetcher>(Weak<MultiPathManagerInner<F>>);
467
468impl<F: PathFetcher> Clone for MultiPathManagerRef<F> {
469 fn clone(&self) -> Self {
470 MultiPathManagerRef(self.0.clone())
471 }
472}
473
474impl<F: PathFetcher> MultiPathManagerRef<F> {
475 pub fn get(&self) -> Option<MultiPathManager<F>> {
477 self.0.upgrade().map(|arc| MultiPathManager(arc))
478 }
479}
480
481struct PathIssueManager {
485 max_entries: usize,
487 deduplication_window: Duration,
488
489 cache: HashMap<u64, IssueMarker>,
492 fifo_issues: VecDeque<(u64, SystemTime)>,
494
495 issue_broadcast_tx: broadcast::Sender<(u64, IssueMarker)>,
497}
498
499impl PathIssueManager {
500 fn new(max_entries: usize, broadcast_buffer: usize, deduplication_window: Duration) -> Self {
501 let (issue_broadcast_tx, _) = broadcast::channel(broadcast_buffer);
502 PathIssueManager {
503 max_entries,
504 deduplication_window,
505 cache: HashMap::new(),
506 fifo_issues: VecDeque::new(),
507 issue_broadcast_tx,
508 }
509 }
510
511 pub fn issues_subscriber(&self) -> broadcast::Receiver<(u64, IssueMarker)> {
513 self.issue_broadcast_tx.subscribe()
514 }
515
516 pub fn add_issue(&mut self, issue: IssueKind, marker: IssueMarker) {
525 let id = issue.dedup_id(&marker.target);
526
527 if let Some(existing_marker) = self.cache.get(&id) {
529 let time_since_last_seen = marker
530 .timestamp
531 .duration_since(existing_marker.timestamp)
532 .unwrap_or_else(|_| Duration::from_secs(0));
533
534 if time_since_last_seen < self.deduplication_window {
535 tracing::trace!(%id, ?time_since_last_seen, ?marker, %issue, "Ignoring duplicate path issue");
536 return;
538 }
539 }
540
541 self.issue_broadcast_tx.send((id, marker.clone())).ok();
543
544 if self.cache.len() >= self.max_entries {
545 self.pop_front();
546 }
547
548 self.fifo_issues.push_back((id, marker.timestamp)); self.cache.insert(id, marker);
551 }
552
553 pub fn apply_cached_issues(&self, entry: &mut PathManagerPath, now: SystemTime) -> bool {
561 let mut applied = false;
562 for issue in self.cache.values() {
563 if issue.target.matches_path(&entry.path, &entry.fingerprint) {
564 entry.reliability.update(issue.decayed_penalty(now), now);
565 applied = true;
566 }
567 }
568 applied
569 }
570
571 fn pop_front(&mut self) -> Option<IssueMarker> {
573 let (issue_id, timestamp) = self.fifo_issues.pop_front()?;
574
575 match self.cache.entry(issue_id) {
576 hash_map::Entry::Occupied(occupied_entry) => {
577 match occupied_entry.get().timestamp == timestamp {
579 true => Some(occupied_entry.remove()),
580 false => None, }
582 }
583 hash_map::Entry::Vacant(_) => {
584 debug_assert!(false, "Bad cache: issue ID not found in cache");
585 None
586 }
587 }
588 }
589}
590
591#[cfg(test)]
592mod tests {
593 use helpers::*;
594 use tokio::time::timeout;
595
596 use super::*;
597
598 #[tokio::test]
600 #[test_log::test]
601 async fn should_create_pathset_on_request() {
602 let cfg = base_config();
603 let fetcher = MockFetcher::new(generate_responses(5, 0, BASE_TIME, DEFAULT_EXP_UNITS));
604
605 let mgr = MultiPathManager::new(cfg, fetcher, PathStrategy::default())
606 .expect("Should create manager");
607
608 assert!(mgr.0.managed_paths.is_empty());
610
611 let path = mgr.try_path(SRC_ADDR.isd_asn(), DST_ADDR.isd_asn(), BASE_TIME);
613 assert!(path.is_none());
615
616 assert!(
618 mgr.0
619 .managed_paths
620 .contains(&(SRC_ADDR.isd_asn(), DST_ADDR.isd_asn()))
621 );
622 }
623
624 #[tokio::test]
626 #[test_log::test]
627 async fn should_remove_idle_pathsets() {
628 let mut cfg = base_config();
629 cfg.max_idle_period = Duration::from_millis(10); let fetcher = MockFetcher::new(generate_responses(5, 0, BASE_TIME, DEFAULT_EXP_UNITS));
632
633 let mgr = MultiPathManager::new(cfg, fetcher, PathStrategy::default())
634 .expect("Should create manager");
635
636 let handle = mgr.ensure_managed_paths(SRC_ADDR.isd_asn(), DST_ADDR.isd_asn());
638
639 assert!(
641 mgr.0
642 .managed_paths
643 .contains(&(SRC_ADDR.isd_asn(), DST_ADDR.isd_asn()))
644 );
645
646 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
648
649 let contains = mgr
651 .0
652 .managed_paths
653 .contains(&(SRC_ADDR.isd_asn(), DST_ADDR.isd_asn()));
654
655 assert!(!contains, "Idle path set should be removed");
656
657 let err = handle.current_error();
658 assert!(
659 err.is_some(),
660 "Handle should report error after path set removal"
661 );
662 println!("Error after idle removal: {:?}", err);
663 assert!(
664 err.unwrap().to_string().contains("idle"),
665 "Error message should indicate idle removal"
666 );
667 }
668
669 #[tokio::test]
671 #[test_log::test]
672 async fn should_cancel_pathset_tasks_on_drop() {
673 let cfg: MultiPathManagerConfig = base_config();
674 let fetcher = MockFetcher::new(generate_responses(5, 0, BASE_TIME, DEFAULT_EXP_UNITS));
675
676 let mgr = MultiPathManager::new(cfg, fetcher, PathStrategy::default())
677 .expect("Should create manager");
678
679 let handle = mgr.ensure_managed_paths(SRC_ADDR.isd_asn(), DST_ADDR.isd_asn());
681 handle.wait_initialized().await;
682
683 let mut set_entry = mgr
684 .0
685 .managed_paths
686 .get_sync(&(SRC_ADDR.isd_asn(), DST_ADDR.isd_asn()))
687 .unwrap();
688
689 let task_handle = unsafe {
690 let swap_handle = tokio::spawn(async {});
693 std::mem::replace(&mut set_entry.get_mut().1._task, swap_handle)
694 };
695
696 let cancel_token = set_entry.get().1.cancel_token.clone();
697
698 let count = mgr.0.managed_paths.len();
699 assert_eq!(count, 1, "Should have 1 managed path set");
700
701 drop(mgr);
703 assert!(
705 cancel_token.is_cancelled(),
706 "Cancel token should be triggered"
707 );
708
709 timeout(Duration::from_millis(50), task_handle)
711 .await
712 .unwrap()
713 .unwrap();
714
715 let err = handle
716 .shared
717 .sync
718 .lock()
719 .unwrap()
720 .current_error
721 .clone()
722 .expect("Should have error after manager drop");
723
724 assert!(
727 err.to_string().contains("cancelled") || err.to_string().contains("dropped"),
728 "Error message should indicate cancellation or manager drop"
729 );
730 }
731
732 mod issue_handling {
733 use scc::HashIndex;
734 use scion_proto::address::{Asn, Isd};
735
736 use super::*;
737 use crate::path::{
738 manager::{MultiPathManagerInner, PathIssueManager, reliability::ReliabilityScore},
739 types::Score,
740 };
741
742 #[tokio::test]
745 #[test_log::test]
746 async fn should_ingest_issues_and_apply_to_existing_paths() {
747 let cfg = base_config();
748 let fetcher = MockFetcher::new(generate_responses(5, 0, BASE_TIME, DEFAULT_EXP_UNITS));
749 let (mgr, mut path_set) = manual_pathset(BASE_TIME, fetcher.clone(), cfg, None);
750
751 path_set.maintain(BASE_TIME, &mgr).await;
752
753 let first_path = &path_set.internal.cached_paths[0];
755 let first_fp = first_path.fingerprint;
756
757 let issue = IssueKind::Socket {
759 err: SendError::FirstHopUnreachable {
760 isd_asn: first_path.path.source(),
761 interface_id: first_path.path.first_hop_egress_interface().unwrap().id,
762 address: None,
763 msg: "test".into(),
764 },
765 };
766
767 let penalty = Score::new_clamped(-0.3);
768 let marker = IssueMarker {
769 target: issue.target_type(Some(&first_path.path)).unwrap(),
770 timestamp: BASE_TIME,
771 penalty,
772 };
773
774 {
775 let mut issues_guard = mgr.0.issue_manager.lock().unwrap();
776 issues_guard.add_issue(issue, marker);
778 assert!(!issues_guard.cache.is_empty(), "Issue should be in cache");
780 }
781 let recv_result = path_set.internal.issue_rx.recv().await;
783 path_set.handle_issue_rx(BASE_TIME, recv_result, &mgr);
784
785 let updated_path = path_set
787 .internal
788 .cached_paths
789 .iter()
790 .find(|e| e.fingerprint == first_fp)
791 .expect("Path should still exist");
792
793 let updated_score = updated_path.reliability.score(BASE_TIME).value();
794
795 assert!(
796 updated_score == penalty.value(),
797 "Path score should be updated by penalty. Expected: {}, Got: {}",
798 penalty.value(),
799 updated_score
800 );
801
802 let later_time = BASE_TIME + Duration::from_secs(30);
804 let decayed_score = updated_path.reliability.score(later_time).value();
805 assert!(
806 decayed_score > updated_score,
807 "Path score should recover over time. Updated: {}, Decayed: {}",
808 updated_score,
809 decayed_score
810 );
811 }
812
813 #[tokio::test]
814 #[test_log::test]
815 async fn should_deduplicate_issues_within_window() {
816 let cfg = base_config();
817 let mgr_inner = MultiPathManagerInner {
818 config: cfg,
819 fetcher: MockFetcher::new(Ok(vec![])),
820 path_strategy: PathStrategy::default(),
821 issue_manager: Mutex::new(PathIssueManager::new(64, 64, Duration::from_secs(10))),
822 managed_paths: HashIndex::new(),
823 };
824 let mgr = MultiPathManager(Arc::new(mgr_inner));
825
826 let issue_marker = IssueMarker {
827 target: IssueMarkerTarget::FirstHop {
828 isd_asn: SRC_ADDR.isd_asn(),
829 egress_interface: 1,
830 },
831 timestamp: BASE_TIME,
832 penalty: Score::new_clamped(-0.3),
833 };
834
835 let issue = IssueKind::Socket {
836 err: SendError::FirstHopUnreachable {
837 isd_asn: SRC_ADDR.isd_asn(),
838 interface_id: 1,
839 address: None,
840 msg: "test".into(),
841 },
842 };
843
844 mgr.0
846 .issue_manager
847 .lock()
848 .unwrap()
849 .add_issue(issue.clone(), issue_marker.clone());
850 let cache_size_1 = mgr.0.issue_manager.lock().unwrap().cache.len();
851 assert_eq!(cache_size_1, 1);
852
853 let issue_marker_2 = IssueMarker {
855 timestamp: BASE_TIME + Duration::from_secs(1), ..issue_marker.clone()
857 };
858 mgr.0
859 .issue_manager
860 .lock()
861 .unwrap()
862 .add_issue(issue.clone(), issue_marker_2);
863
864 let fifo_size = mgr.0.issue_manager.lock().unwrap().fifo_issues.len();
865 let cache_size_2 = mgr.0.issue_manager.lock().unwrap().cache.len();
866 assert_eq!(cache_size_2, 1, "Duplicate issue should be ignored");
867 assert_eq!(
868 fifo_size, 1,
869 "FIFO queue size should remain unchanged on duplicate issue"
870 );
871
872 let issue_marker_3 = IssueMarker {
874 timestamp: BASE_TIME + Duration::from_secs(11), ..issue_marker
876 };
877 mgr.0
878 .issue_manager
879 .lock()
880 .unwrap()
881 .add_issue(issue, issue_marker_3);
882
883 let fifo_size_3 = mgr.0.issue_manager.lock().unwrap().fifo_issues.len();
884 let cache_size_3 = mgr.0.issue_manager.lock().unwrap().cache.len();
885 assert_eq!(
886 cache_size_3, 1,
887 "Issue outside dedup window should update existing"
888 );
889 assert_eq!(
890 fifo_size_3, 2,
891 "FIFO queue size should increase for new issue outside dedup window"
892 );
893 }
894
895 #[tokio::test]
898 #[test_log::test]
899 async fn should_apply_issues_to_new_paths_on_fetch() {
900 let cfg = base_config();
901 let fetcher = MockFetcher::new(Ok(vec![]));
902 let (mgr, mut path_set) = manual_pathset(BASE_TIME, fetcher.clone(), cfg, None);
903
904 path_set.maintain(BASE_TIME, &mgr).await;
905
906 let issue_marker = IssueMarker {
908 target: IssueMarkerTarget::FirstHop {
909 isd_asn: SRC_ADDR.isd_asn(),
910 egress_interface: 1,
911 },
912 timestamp: BASE_TIME,
913 penalty: Score::new_clamped(-0.5),
914 };
915
916 let issue = IssueKind::Socket {
917 err: SendError::FirstHopUnreachable {
918 isd_asn: SRC_ADDR.isd_asn(),
919 interface_id: 1,
920 address: None,
921 msg: "test".into(),
922 },
923 };
924
925 mgr.0
927 .issue_manager
928 .lock()
929 .unwrap()
930 .add_issue(issue, issue_marker);
931
932 path_set.drain_and_apply_issue_channel(BASE_TIME);
934
935 fetcher.lock().unwrap().set_response(generate_responses(
937 3,
938 0,
939 BASE_TIME + Duration::from_secs(1),
940 DEFAULT_EXP_UNITS,
941 ));
942
943 let next_refetch = path_set.internal.next_refetch;
944 path_set.maintain(next_refetch, &mgr).await;
945
946 let affected_path = path_set
948 .internal
949 .cached_paths
950 .first()
951 .expect("Path should exist");
952
953 let score = affected_path
954 .reliability
955 .score(BASE_TIME + Duration::from_secs(1))
956 .value();
957 assert!(
958 score < 0.0,
959 "Newly fetched path should have cached issue applied. Score: {}",
960 score
961 );
962 }
963
964 #[tokio::test]
966 #[test_log::test]
967 async fn should_trigger_active_path_reevaluation_on_issue() {
968 let cfg = base_config();
969 let fetcher = MockFetcher::new(generate_responses(5, 0, BASE_TIME, DEFAULT_EXP_UNITS));
970 let (mgr, mut path_set) = manual_pathset(BASE_TIME, fetcher.clone(), cfg, None);
971
972 path_set.maintain(BASE_TIME, &mgr).await;
973
974 let active_fp = path_set.shared.active_path.load().as_ref().unwrap().1;
975
976 let issue_marker = IssueMarker {
978 target: IssueMarkerTarget::FullPath {
979 fingerprint: active_fp,
980 },
981 timestamp: BASE_TIME,
982 penalty: Score::new_clamped(-1.0), };
984
985 let issue = IssueKind::Socket {
986 err: SendError::FirstHopUnreachable {
987 isd_asn: SRC_ADDR.isd_asn(),
988 interface_id: 1,
989 address: None,
990 msg: "test".into(),
991 },
992 };
993
994 mgr.0
996 .issue_manager
997 .lock()
998 .unwrap()
999 .add_issue(issue, issue_marker);
1000
1001 let recv_result = path_set.internal.issue_rx.recv().await;
1003 path_set.handle_issue_rx(BASE_TIME, recv_result, &mgr);
1004
1005 let new_active_fp = path_set.shared.active_path.load().as_ref().unwrap().1;
1007 assert_ne!(
1008 active_fp, new_active_fp,
1009 "Active path should change when severely penalized"
1010 );
1011 }
1012
1013 #[tokio::test]
1014 #[test_log::test]
1015 async fn should_swap_to_better_path_if_one_appears() {
1016 let cfg = base_config();
1017 let fetcher = MockFetcher::new(generate_responses(1, 0, BASE_TIME, DEFAULT_EXP_UNITS));
1018 let (mgr, mut path_set) = manual_pathset(BASE_TIME, fetcher.clone(), cfg, None);
1019
1020 path_set.maintain(BASE_TIME, &mgr).await;
1021
1022 path_set
1024 .shared
1025 .was_used_in_idle_period
1026 .store(true, std::sync::atomic::Ordering::Relaxed);
1027
1028 let active_fp = path_set.shared.active_path.load().as_ref().unwrap().1;
1029
1030 let issue_marker = IssueMarker {
1032 target: IssueMarkerTarget::FullPath {
1033 fingerprint: active_fp,
1034 },
1035 timestamp: BASE_TIME,
1036 penalty: Score::new_clamped(-0.8),
1037 };
1038
1039 mgr.0.issue_manager.lock().unwrap().add_issue(
1040 IssueKind::Socket {
1041 err: SendError::FirstHopUnreachable {
1042 isd_asn: SRC_ADDR.isd_asn(),
1043 interface_id: 1,
1044 address: None,
1045 msg: "test".into(),
1046 },
1047 },
1048 issue_marker,
1049 );
1050
1051 let active_fp_after_issue = path_set.shared.active_path.load().as_ref().unwrap().1;
1053 assert_eq!(
1054 active_fp, active_fp_after_issue,
1055 "Active path should remain the same if no better path exists"
1056 );
1057
1058 fetcher.lock().unwrap().set_response(generate_responses(
1060 1,
1061 100,
1062 BASE_TIME + Duration::from_secs(1),
1063 DEFAULT_EXP_UNITS,
1064 ));
1065
1066 path_set
1067 .maintain(path_set.internal.next_refetch, &mgr)
1068 .await;
1069 path_set
1071 .shared
1072 .was_used_in_idle_period
1073 .store(true, std::sync::atomic::Ordering::Relaxed);
1074
1075 let new_active_fp = path_set.shared.active_path.load().as_ref().unwrap().1;
1077 assert_ne!(
1078 active_fp, new_active_fp,
1079 "Active path should change when a better path appears"
1080 );
1081
1082 let positive_score = Score::new_clamped(0.8);
1084 let mut reliability = ReliabilityScore::new_with_time(path_set.internal.next_refetch);
1085 reliability.update(positive_score, path_set.internal.next_refetch);
1086
1087 path_set
1089 .internal
1090 .cached_paths
1091 .iter_mut()
1092 .find(|e| e.fingerprint == active_fp)
1093 .unwrap()
1094 .reliability = reliability;
1095
1096 path_set
1097 .maintain(path_set.internal.next_refetch, &mgr)
1098 .await;
1099
1100 assert_eq!(
1101 active_fp,
1102 path_set.shared.active_path.load().as_ref().unwrap().1,
1103 "Active path should change on positive score diff"
1104 );
1105 }
1106
1107 #[tokio::test]
1108 #[test_log::test]
1109 async fn should_keep_max_issue_cache_size() {
1110 let max_size = 10;
1111 let mut issue_mgr = PathIssueManager::new(max_size, 64, Duration::from_secs(10));
1112
1113 for i in 0..20u16 {
1115 let issue_marker = IssueMarker {
1116 target: IssueMarkerTarget::FirstHop {
1117 isd_asn: IsdAsn::new(Isd(1), Asn(1)),
1118 egress_interface: i,
1119 },
1120 timestamp: BASE_TIME + Duration::from_secs(i as u64),
1121 penalty: Score::new_clamped(-0.1),
1122 };
1123
1124 let issue = IssueKind::Socket {
1125 err: SendError::FirstHopUnreachable {
1126 isd_asn: IsdAsn::new(Isd(1), Asn(1)),
1127 interface_id: i,
1128 address: None,
1129 msg: "test".into(),
1130 },
1131 };
1132
1133 issue_mgr.add_issue(issue, issue_marker);
1134 }
1135
1136 assert!(
1138 issue_mgr.cache.len() <= max_size,
1139 "Cache size {} should not exceed max {}",
1140 issue_mgr.cache.len(),
1141 max_size
1142 );
1143
1144 assert_eq!(issue_mgr.cache.len(), issue_mgr.fifo_issues.len());
1146 }
1147 }
1148
1149 pub mod helpers {
1150 use std::{
1151 hash::{DefaultHasher, Hash, Hasher},
1152 net::{IpAddr, Ipv4Addr},
1153 sync::{Arc, Mutex},
1154 time::{Duration, SystemTime},
1155 };
1156
1157 use scion_proto::{
1158 address::{Asn, EndhostAddr, Isd, IsdAsn},
1159 path::{Path, test_builder::TestPathBuilder},
1160 };
1161 use tokio::sync::Notify;
1162
1163 use super::*;
1164 use crate::path::manager::{MultiPathManagerInner, PathIssueManager, pathset::PathSet};
1165
1166 pub const SRC_ADDR: EndhostAddr = EndhostAddr::new(
1167 IsdAsn::new(Isd(1), Asn(1)),
1168 IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
1169 );
1170 pub const DST_ADDR: EndhostAddr = EndhostAddr::new(
1171 IsdAsn::new(Isd(2), Asn(1)),
1172 IpAddr::V4(Ipv4Addr::new(127, 0, 0, 2)),
1173 );
1174
1175 pub const DEFAULT_EXP_UNITS: u8 = 100;
1176 pub const BASE_TIME: SystemTime = SystemTime::UNIX_EPOCH;
1177
1178 pub fn dummy_path(hop_count: u16, timestamp: u32, exp_units: u8, seed: u32) -> Path {
1179 let mut builder: TestPathBuilder =
1180 TestPathBuilder::new(SRC_ADDR.into(), DST_ADDR.into())
1181 .using_info_timestamp(timestamp)
1182 .with_hop_expiry(exp_units)
1183 .up();
1184
1185 builder = builder.add_hop(0, 1);
1186
1187 for cnt in 0..hop_count {
1188 let mut hash = DefaultHasher::new();
1189 seed.hash(&mut hash);
1190 cnt.hash(&mut hash);
1191 let hash = hash.finish() as u32;
1192
1193 let hop = hash.saturating_sub(2) as u16; builder = builder.with_asn(hash).add_hop(hop + 1, hop + 2);
1195 }
1196
1197 builder = builder.add_hop(1, 0);
1198
1199 builder.build(timestamp).path()
1200 }
1201
1202 pub fn base_config() -> MultiPathManagerConfig {
1203 MultiPathManagerConfig {
1204 max_cached_paths_per_pair: 5,
1205 refetch_interval: Duration::from_secs(100),
1206 min_refetch_delay: Duration::from_secs(1),
1207 min_expiry_threshold: Duration::from_secs(5),
1208 max_idle_period: Duration::from_secs(30),
1209 fetch_failure_backoff: BackoffConfig {
1210 minimum_delay_secs: 1.0,
1211 maximum_delay_secs: 10.0,
1212 factor: 2.0,
1213 jitter_secs: 0.0,
1214 },
1215 issue_cache_size: 64,
1216 issue_broadcast_size: 64,
1217 issue_deduplication_window: Duration::from_secs(10),
1218 path_swap_score_threshold: 0.1,
1219 }
1220 }
1221
1222 pub fn generate_responses(
1223 path_count: u16,
1224 path_seed: u32,
1225 timestamp: SystemTime,
1226 exp_units: u8,
1227 ) -> Result<Vec<Path>, String> {
1228 let mut paths = Vec::new();
1229 for resp_id in 0..path_count {
1230 paths.push(dummy_path(
1231 2,
1232 timestamp
1233 .duration_since(SystemTime::UNIX_EPOCH)
1234 .unwrap()
1235 .as_secs() as u32,
1236 exp_units,
1237 path_seed + resp_id as u32,
1238 ));
1239 }
1240
1241 Ok(paths)
1242 }
1243
1244 pub struct MockFetcher {
1245 next_response: Result<Vec<Path>, String>,
1246 pub received_requests: usize,
1247 pub wait_till_notify: bool,
1248 pub notify_to_resolve: Arc<Notify>,
1249 }
1250 impl MockFetcher {
1251 pub fn new(response: Result<Vec<Path>, String>) -> Arc<Mutex<Self>> {
1252 Arc::new(Mutex::new(Self {
1253 next_response: response,
1254 received_requests: 0,
1255 wait_till_notify: false,
1256 notify_to_resolve: Arc::new(Notify::new()),
1257 }))
1258 }
1259
1260 pub fn set_response(&mut self, response: Result<Vec<Path>, String>) {
1261 self.next_response = response;
1262 }
1263
1264 pub fn wait_till_notify(&mut self, wait: bool) {
1265 self.wait_till_notify = wait;
1266 }
1267
1268 pub fn notify(&self) {
1269 self.notify_to_resolve.notify_waiters();
1270 }
1271 }
1272
1273 impl PathFetcher for Arc<Mutex<MockFetcher>> {
1274 async fn fetch_paths(
1275 &self,
1276 _src: IsdAsn,
1277 _dst: IsdAsn,
1278 ) -> Result<Vec<Path>, PathFetchError> {
1279 let response;
1280 let notify = {
1282 let mut guard = self.lock().unwrap();
1283
1284 guard.received_requests += 1;
1285 response = guard.next_response.clone();
1286
1287 if guard.wait_till_notify {
1289 let notif = guard.notify_to_resolve.clone().notified_owned();
1290 Some(notif)
1291 } else {
1292 None
1293 }
1294 };
1295
1296 if let Some(notif) = notify {
1297 notif.await;
1298 }
1299
1300 match response {
1301 Ok(paths) if paths.is_empty() => Err(PathFetchError::NoPathsFound),
1302 Ok(paths) => Ok(paths),
1303 Err(e) => Err(PathFetchError::InternalError(e.into())),
1304 }
1305 }
1306 }
1307
1308 pub fn manual_pathset<F: PathFetcher>(
1309 now: SystemTime,
1310 fetcher: F,
1311 cfg: MultiPathManagerConfig,
1312 strategy: Option<PathStrategy>,
1313 ) -> (MultiPathManager<F>, PathSet<F>) {
1314 let mgr_inner = MultiPathManagerInner {
1315 config: cfg,
1316 fetcher,
1317 path_strategy: strategy.unwrap_or_else(|| {
1318 let mut ps = PathStrategy::default();
1319 ps.scoring.use_default_scorers();
1320 ps
1321 }),
1322 issue_manager: Mutex::new(PathIssueManager::new(64, 64, Duration::from_secs(10))),
1323 managed_paths: HashIndex::new(),
1324 };
1325 let mgr = MultiPathManager(Arc::new(mgr_inner));
1326 let issue_rx = mgr.0.issue_manager.lock().unwrap().issues_subscriber();
1327 let mgr_ref = mgr.weak_ref();
1328 (
1329 mgr,
1330 PathSet::new_with_time(
1331 SRC_ADDR.isd_asn(),
1332 DST_ADDR.isd_asn(),
1333 mgr_ref,
1334 cfg,
1335 issue_rx,
1336 now,
1337 ),
1338 )
1339 }
1340 }
1341}