1use std::{
59 collections::{HashMap, VecDeque, hash_map},
60 sync::{Arc, Mutex, Weak},
61 time::{Duration, SystemTime},
62};
63
64use scc::HashIndex;
65use scion_proto::{address::IsdAsn, path::Path, scmp::ScmpErrorMessage};
66use scion_sdk_utils::backoff::BackoffConfig;
67use tokio::sync::broadcast::{self};
68
69use crate::{
70 path::{
71 PathStrategy,
72 fetcher::{
73 PathFetcherImpl,
74 traits::{PathFetchError, PathFetcher},
75 },
76 manager::{
77 issues::{IssueKind, IssueMarker, IssueMarkerTarget, SendError},
78 pathset::{PathSet, PathSetHandle, PathSetTask},
79 traits::{PathManager, PathPrefetcher, PathWaitError, SyncPathManager},
80 },
81 types::PathManagerPath,
82 },
83 scionstack::{
84 ScionSocketSendError, scmp_handler::ScmpErrorReceiver, socket::SendErrorReceiver,
85 },
86};
87
88mod algo;
89mod issues;
92mod pathset;
94pub mod reliability;
96pub mod traits;
98
99#[derive(Debug, Clone, Copy)]
101pub struct MultiPathManagerConfig {
102 max_cached_paths_per_pair: usize,
104 refetch_interval: Duration,
106 min_refetch_delay: Duration,
108 min_expiry_threshold: Duration,
110 max_idle_period: Duration,
112 fetch_failure_backoff: BackoffConfig,
114 issue_cache_size: usize,
116 issue_broadcast_size: usize,
118 issue_deduplication_window: Duration,
120 path_swap_score_threshold: f32,
122}
123
124impl Default for MultiPathManagerConfig {
125 fn default() -> Self {
126 MultiPathManagerConfig {
127 max_cached_paths_per_pair: 50,
128 refetch_interval: Duration::from_secs(60 * 30), min_refetch_delay: Duration::from_secs(60),
130 min_expiry_threshold: Duration::from_secs(60 * 5), max_idle_period: Duration::from_secs(60 * 2), fetch_failure_backoff: BackoffConfig {
133 minimum_delay_secs: 60.0,
134 maximum_delay_secs: 300.0,
135 factor: 1.5,
136 jitter_secs: 5.0,
137 },
138 issue_cache_size: 100,
139 issue_broadcast_size: 10,
140 issue_deduplication_window: Duration::from_secs(10),
142 path_swap_score_threshold: 0.5,
143 }
144 }
145}
146
147impl MultiPathManagerConfig {
148 fn validate(&self) -> Result<(), &'static str> {
150 if self.min_refetch_delay > self.refetch_interval {
151 return Err("min_refetch_delay must be smaller than refetch_interval");
152 }
154
155 if self.min_refetch_delay > self.min_expiry_threshold {
156 return Err("min_refetch_delay must be smaller than min_expiry_threshold");
157 }
159
160 Ok(())
161 }
162}
163
164#[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)]
166pub enum GetPathError {
167 #[error("no paths are available for the given src-dst pair")]
169 NoPaths,
170}
171
172pub struct MultiPathManager<F: PathFetcher = PathFetcherImpl>(Arc<MultiPathManagerInner<F>>);
174
175impl<F> Clone for MultiPathManager<F>
176where
177 F: PathFetcher,
178{
179 fn clone(&self) -> Self {
180 MultiPathManager(self.0.clone())
181 }
182}
183
184struct MultiPathManagerInner<F: PathFetcher> {
185 config: MultiPathManagerConfig,
186 fetcher: F,
187 path_strategy: PathStrategy,
188 issue_manager: Mutex<PathIssueManager>,
189 managed_paths: HashIndex<(IsdAsn, IsdAsn), (PathSetHandle, PathSetTask)>,
190}
191
192impl<F: PathFetcher> MultiPathManager<F> {
193 pub fn new(
195 config: MultiPathManagerConfig,
196 fetcher: F,
197 path_strategy: PathStrategy,
198 ) -> Result<Self, &'static str> {
199 config.validate()?;
200
201 let issue_manager = Mutex::new(PathIssueManager::new(
202 config.issue_cache_size,
203 config.issue_broadcast_size,
204 config.issue_deduplication_window,
205 ));
206
207 Ok(MultiPathManager(Arc::new(MultiPathManagerInner {
208 config,
209 fetcher,
210 issue_manager,
211 path_strategy,
212 managed_paths: HashIndex::new(),
213 })))
214 }
215
216 pub fn try_path(&self, src: IsdAsn, dst: IsdAsn, now: SystemTime) -> Option<Path> {
222 let try_path = self
223 .0
224 .managed_paths
225 .peek_with(&(src, dst), |_, (handle, _)| {
226 handle.try_active_path().as_deref().map(|p| p.0.clone())
227 })
228 .flatten();
229
230 match try_path {
231 Some(active) => {
232 let expired = active.is_expired(now.into()).unwrap_or(true);
235 debug_assert!(!expired, "Returned expired path from try_get_path");
236
237 Some(active)
238 }
239 None => {
240 self.fast_ensure_managed_paths(src, dst);
242 None
243 }
244 }
245 }
246
247 pub async fn path(
254 &self,
255 src: IsdAsn,
256 dst: IsdAsn,
257 now: SystemTime,
258 ) -> Result<Path, Arc<PathFetchError>> {
259 let try_path = self
260 .0
261 .managed_paths
262 .peek_with(&(src, dst), |_, (handle, _)| {
263 handle.try_active_path().as_deref().map(|p| p.0.clone())
264 })
265 .flatten();
266
267 let res = match try_path {
268 Some(active) => Ok(active),
269 None => {
270 let path_set = self.ensure_managed_paths(src, dst);
272
273 let active = path_set.active_path().await.as_ref().map(|p| p.0.clone());
275
276 match active {
278 Some(active) => Ok(active),
279 None => {
280 let last_error = path_set.current_error();
282 match last_error {
283 Some(e) => Err(e),
284 None => {
285 Err(Arc::new(PathFetchError::NoPathsFound))
289 }
290 }
291 }
292 }
293 }
294 };
295
296 if let Ok(active) = &res {
297 let expired = active.is_expired(now.into()).unwrap_or(true);
300 debug_assert!(!expired, "Returned expired path from get_path");
301 }
302
303 res
304 }
305
306 pub fn weak_ref(&self) -> MultiPathManagerRef<F> {
308 MultiPathManagerRef(Arc::downgrade(&self.0))
309 }
310
311 fn fast_ensure_managed_paths(&self, src: IsdAsn, dst: IsdAsn) {
315 if self.0.managed_paths.contains(&(src, dst)) {
316 return;
317 }
318
319 self.ensure_managed_paths(src, dst);
320 }
321
322 fn ensure_managed_paths(&self, src: IsdAsn, dst: IsdAsn) -> PathSetHandle {
326 let entry = match self.0.managed_paths.entry_sync((src, dst)) {
327 scc::hash_index::Entry::Occupied(occupied) => {
328 tracing::trace!(%src, %dst, "Already managing paths for src-dst pair");
329 occupied
330 }
331 scc::hash_index::Entry::Vacant(vacant) => {
332 tracing::info!(%src, %dst, "Starting to manage paths for src-dst pair");
333 let managed = PathSet::new(
334 src,
335 dst,
336 self.weak_ref(),
337 self.0.config,
338 self.0.issue_manager.lock().unwrap().issues_subscriber(),
339 );
340
341 vacant.insert_entry(managed.manage())
342 }
343 };
344
345 entry.get().0.clone()
346 }
347
348 pub fn stop_managing_paths(&self, src: IsdAsn, dst: IsdAsn) {
350 if self.0.managed_paths.remove_sync(&(src, dst)) {
351 tracing::info!(%src, %dst, "Stopped managing paths for src-dst pair");
352 }
353 }
354
355 pub fn report_path_issue(&self, timestamp: SystemTime, issue: IssueKind, path: Option<&Path>) {
357 let Some(applies_to) = issue.target_type(path) else {
358 return;
360 };
361
362 if matches!(applies_to, IssueMarkerTarget::DestinationNetwork { .. }) {
363 return;
365 }
366
367 tracing::debug!(%issue, "New path issue");
368
369 let issue_marker = IssueMarker {
370 target: applies_to,
371 timestamp,
372 penalty: issue.penalty(),
373 };
374
375 {
377 let mut issues_guard = self.0.issue_manager.lock().unwrap();
378 issues_guard.add_issue(issue, issue_marker.clone());
379 }
380 }
381}
382
383impl<F: PathFetcher> ScmpErrorReceiver for MultiPathManager<F> {
384 fn report_scmp_error(&self, scmp_error: ScmpErrorMessage, path: &Path) {
385 self.report_path_issue(
386 SystemTime::now(),
387 IssueKind::Scmp { error: scmp_error },
388 Some(path),
389 );
390 }
391}
392
393impl<F: PathFetcher> SendErrorReceiver for MultiPathManager<F> {
394 fn report_send_error(&self, error: &ScionSocketSendError) {
395 if let Some(send_error) = SendError::from_socket_send_error(error) {
396 self.report_path_issue(
397 SystemTime::now(),
398 IssueKind::Socket { err: send_error },
399 None,
400 );
401 }
402 }
403}
404
405impl<F: PathFetcher> SyncPathManager for MultiPathManager<F> {
406 fn register_path(
407 &self,
408 _src: IsdAsn,
409 _dst: IsdAsn,
410 _now: chrono::DateTime<chrono::Utc>,
411 _path: Path<bytes::Bytes>,
412 ) {
413 }
417
418 fn try_cached_path(
419 &self,
420 src: IsdAsn,
421 dst: IsdAsn,
422 now: chrono::DateTime<chrono::Utc>,
423 ) -> std::io::Result<Option<Path<bytes::Bytes>>> {
424 Ok(self.try_path(src, dst, now.into()))
425 }
426}
427
428impl<F: PathFetcher> PathManager for MultiPathManager<F> {
429 fn path_wait(
430 &self,
431 src: IsdAsn,
432 dst: IsdAsn,
433 now: chrono::DateTime<chrono::Utc>,
434 ) -> impl crate::types::ResFut<'_, Path<bytes::Bytes>, PathWaitError> {
435 async move {
436 match self.path(src, dst, now.into()).await {
437 Ok(path) => Ok(path),
438 Err(e) => {
439 match &*e {
440 PathFetchError::FetchSegments(error) => {
441 Err(PathWaitError::FetchFailed(format!("{error}")))
442 }
443 PathFetchError::InternalError(msg) => {
444 Err(PathWaitError::FetchFailed(msg.to_string()))
445 }
446 PathFetchError::NoPathsFound => Err(PathWaitError::NoPathFound),
447 }
448 }
449 }
450 }
451 }
452}
453
454impl<F: PathFetcher> PathPrefetcher for MultiPathManager<F> {
455 fn prefetch_path(&self, src: IsdAsn, dst: IsdAsn) {
456 self.ensure_managed_paths(src, dst);
457 }
458}
459
460pub struct MultiPathManagerRef<F: PathFetcher>(Weak<MultiPathManagerInner<F>>);
464
465impl<F: PathFetcher> Clone for MultiPathManagerRef<F> {
466 fn clone(&self) -> Self {
467 MultiPathManagerRef(self.0.clone())
468 }
469}
470
471impl<F: PathFetcher> MultiPathManagerRef<F> {
472 pub fn get(&self) -> Option<MultiPathManager<F>> {
474 self.0.upgrade().map(|arc| MultiPathManager(arc))
475 }
476}
477
478struct PathIssueManager {
482 max_entries: usize,
484 deduplication_window: Duration,
485
486 cache: HashMap<u64, IssueMarker>,
489 fifo_issues: VecDeque<(u64, SystemTime)>,
491
492 issue_broadcast_tx: broadcast::Sender<(u64, IssueMarker)>,
494}
495
496impl PathIssueManager {
497 fn new(max_entries: usize, broadcast_buffer: usize, deduplication_window: Duration) -> Self {
498 let (issue_broadcast_tx, _) = broadcast::channel(broadcast_buffer);
499 PathIssueManager {
500 max_entries,
501 deduplication_window,
502 cache: HashMap::new(),
503 fifo_issues: VecDeque::new(),
504 issue_broadcast_tx,
505 }
506 }
507
508 pub fn issues_subscriber(&self) -> broadcast::Receiver<(u64, IssueMarker)> {
510 self.issue_broadcast_tx.subscribe()
511 }
512
513 pub fn add_issue(&mut self, issue: IssueKind, marker: IssueMarker) {
522 let id = issue.dedup_id(&marker.target);
523
524 if let Some(existing_marker) = self.cache.get(&id) {
526 let time_since_last_seen = marker
527 .timestamp
528 .duration_since(existing_marker.timestamp)
529 .unwrap_or_else(|_| Duration::from_secs(0));
530
531 if time_since_last_seen < self.deduplication_window {
532 tracing::trace!(%id, ?time_since_last_seen, ?marker, %issue, "Ignoring duplicate path issue");
533 return;
535 }
536 }
537
538 self.issue_broadcast_tx.send((id, marker.clone())).ok();
540
541 if self.cache.len() >= self.max_entries {
542 self.pop_front();
543 }
544
545 self.fifo_issues.push_back((id, marker.timestamp)); self.cache.insert(id, marker);
548 }
549
550 pub fn apply_cached_issues(&self, entry: &mut PathManagerPath, now: SystemTime) -> bool {
558 let mut applied = false;
559 for issue in self.cache.values() {
560 if issue.target.matches_path(&entry.path, &entry.fingerprint) {
561 entry.reliability.update(issue.decayed_penalty(now), now);
562 applied = true;
563 }
564 }
565 applied
566 }
567
568 fn pop_front(&mut self) -> Option<IssueMarker> {
570 let (issue_id, timestamp) = self.fifo_issues.pop_front()?;
571
572 match self.cache.entry(issue_id) {
573 hash_map::Entry::Occupied(occupied_entry) => {
574 match occupied_entry.get().timestamp == timestamp {
576 true => Some(occupied_entry.remove()),
577 false => None, }
579 }
580 hash_map::Entry::Vacant(_) => {
581 debug_assert!(false, "Bad cache: issue ID not found in cache");
582 None
583 }
584 }
585 }
586}
587
588#[cfg(test)]
589mod tests {
590 use helpers::*;
591 use tokio::time::timeout;
592
593 use super::*;
594
595 #[tokio::test]
597 #[test_log::test]
598 async fn should_create_pathset_on_request() {
599 let cfg = base_config();
600 let fetcher = MockFetcher::new(generate_responses(5, 0, BASE_TIME, DEFAULT_EXP_UNITS));
601
602 let mgr = MultiPathManager::new(cfg, fetcher, PathStrategy::default())
603 .expect("Should create manager");
604
605 assert!(mgr.0.managed_paths.is_empty());
607
608 let path = mgr.try_path(SRC_ADDR.isd_asn(), DST_ADDR.isd_asn(), BASE_TIME);
610 assert!(path.is_none());
612
613 assert!(
615 mgr.0
616 .managed_paths
617 .contains(&(SRC_ADDR.isd_asn(), DST_ADDR.isd_asn()))
618 );
619 }
620
621 #[tokio::test]
623 #[test_log::test]
624 async fn should_remove_idle_pathsets() {
625 let mut cfg = base_config();
626 cfg.max_idle_period = Duration::from_millis(10); let fetcher = MockFetcher::new(generate_responses(5, 0, BASE_TIME, DEFAULT_EXP_UNITS));
629
630 let mgr = MultiPathManager::new(cfg, fetcher, PathStrategy::default())
631 .expect("Should create manager");
632
633 let handle = mgr.ensure_managed_paths(SRC_ADDR.isd_asn(), DST_ADDR.isd_asn());
635
636 assert!(
638 mgr.0
639 .managed_paths
640 .contains(&(SRC_ADDR.isd_asn(), DST_ADDR.isd_asn()))
641 );
642
643 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
645
646 let contains = mgr
648 .0
649 .managed_paths
650 .contains(&(SRC_ADDR.isd_asn(), DST_ADDR.isd_asn()));
651
652 assert!(!contains, "Idle path set should be removed");
653
654 let err = handle.current_error();
655 assert!(
656 err.is_some(),
657 "Handle should report error after path set removal"
658 );
659 println!("Error after idle removal: {:?}", err);
660 assert!(
661 err.unwrap().to_string().contains("idle"),
662 "Error message should indicate idle removal"
663 );
664 }
665
666 #[tokio::test]
668 #[test_log::test]
669 async fn should_cancel_pathset_tasks_on_drop() {
670 let cfg: MultiPathManagerConfig = base_config();
671 let fetcher = MockFetcher::new(generate_responses(5, 0, BASE_TIME, DEFAULT_EXP_UNITS));
672
673 let mgr = MultiPathManager::new(cfg, fetcher, PathStrategy::default())
674 .expect("Should create manager");
675
676 let handle = mgr.ensure_managed_paths(SRC_ADDR.isd_asn(), DST_ADDR.isd_asn());
678 handle.wait_initialized().await;
679
680 let mut set_entry = mgr
681 .0
682 .managed_paths
683 .get_sync(&(SRC_ADDR.isd_asn(), DST_ADDR.isd_asn()))
684 .unwrap();
685
686 let task_handle = unsafe {
687 let swap_handle = tokio::spawn(async {});
690 std::mem::replace(&mut set_entry.get_mut().1._task, swap_handle)
691 };
692
693 let cancel_token = set_entry.get().1.cancel_token.clone();
694
695 let count = mgr.0.managed_paths.len();
696 assert_eq!(count, 1, "Should have 1 managed path set");
697
698 drop(mgr);
700 assert!(
702 cancel_token.is_cancelled(),
703 "Cancel token should be triggered"
704 );
705
706 timeout(Duration::from_millis(50), task_handle)
708 .await
709 .unwrap()
710 .unwrap();
711
712 let err = handle
713 .shared
714 .sync
715 .lock()
716 .unwrap()
717 .current_error
718 .clone()
719 .expect("Should have error after manager drop");
720
721 assert!(
724 err.to_string().contains("cancelled") || err.to_string().contains("dropped"),
725 "Error message should indicate cancellation or manager drop"
726 );
727 }
728
729 mod issue_handling {
730 use scc::HashIndex;
731 use scion_proto::address::{Asn, Isd};
732
733 use super::*;
734 use crate::path::{
735 manager::{MultiPathManagerInner, PathIssueManager, reliability::ReliabilityScore},
736 types::Score,
737 };
738
739 #[tokio::test]
742 #[test_log::test]
743 async fn should_ingest_issues_and_apply_to_existing_paths() {
744 let cfg = base_config();
745 let fetcher = MockFetcher::new(generate_responses(5, 0, BASE_TIME, DEFAULT_EXP_UNITS));
746 let (mgr, mut path_set) = manual_pathset(BASE_TIME, fetcher.clone(), cfg, None);
747
748 path_set.maintain(BASE_TIME, &mgr).await;
749
750 let first_path = &path_set.internal.cached_paths[0];
752 let first_fp = first_path.fingerprint;
753
754 let issue = IssueKind::Socket {
756 err: SendError::FirstHopUnreachable {
757 isd_asn: first_path.path.source(),
758 interface_id: first_path.path.first_hop_egress_interface().unwrap().id,
759 msg: "test".into(),
760 },
761 };
762
763 let penalty = Score::new_clamped(-0.3);
764 let marker = IssueMarker {
765 target: issue.target_type(Some(&first_path.path)).unwrap(),
766 timestamp: BASE_TIME,
767 penalty,
768 };
769
770 {
771 let mut issues_guard = mgr.0.issue_manager.lock().unwrap();
772 issues_guard.add_issue(issue, marker);
774 assert!(!issues_guard.cache.is_empty(), "Issue should be in cache");
776 }
777 let recv_result = path_set.internal.issue_rx.recv().await;
779 path_set.handle_issue_rx(BASE_TIME, recv_result, &mgr);
780
781 let updated_path = path_set
783 .internal
784 .cached_paths
785 .iter()
786 .find(|e| e.fingerprint == first_fp)
787 .expect("Path should still exist");
788
789 let updated_score = updated_path.reliability.score(BASE_TIME).value();
790
791 assert!(
792 updated_score == penalty.value(),
793 "Path score should be updated by penalty. Expected: {}, Got: {}",
794 penalty.value(),
795 updated_score
796 );
797
798 let later_time = BASE_TIME + Duration::from_secs(30);
800 let decayed_score = updated_path.reliability.score(later_time).value();
801 assert!(
802 decayed_score > updated_score,
803 "Path score should recover over time. Updated: {}, Decayed: {}",
804 updated_score,
805 decayed_score
806 );
807 }
808
809 #[tokio::test]
810 #[test_log::test]
811 async fn should_deduplicate_issues_within_window() {
812 let cfg = base_config();
813 let mgr_inner = MultiPathManagerInner {
814 config: cfg,
815 fetcher: MockFetcher::new(Ok(vec![])),
816 path_strategy: PathStrategy::default(),
817 issue_manager: Mutex::new(PathIssueManager::new(64, 64, Duration::from_secs(10))),
818 managed_paths: HashIndex::new(),
819 };
820 let mgr = MultiPathManager(Arc::new(mgr_inner));
821
822 let issue_marker = IssueMarker {
823 target: IssueMarkerTarget::FirstHop {
824 isd_asn: SRC_ADDR.isd_asn(),
825 egress_interface: 1,
826 },
827 timestamp: BASE_TIME,
828 penalty: Score::new_clamped(-0.3),
829 };
830
831 let issue = IssueKind::Socket {
832 err: SendError::FirstHopUnreachable {
833 isd_asn: SRC_ADDR.isd_asn(),
834 interface_id: 1,
835 msg: "test".into(),
836 },
837 };
838
839 mgr.0
841 .issue_manager
842 .lock()
843 .unwrap()
844 .add_issue(issue.clone(), issue_marker.clone());
845 let cache_size_1 = mgr.0.issue_manager.lock().unwrap().cache.len();
846 assert_eq!(cache_size_1, 1);
847
848 let issue_marker_2 = IssueMarker {
850 timestamp: BASE_TIME + Duration::from_secs(1), ..issue_marker.clone()
852 };
853 mgr.0
854 .issue_manager
855 .lock()
856 .unwrap()
857 .add_issue(issue.clone(), issue_marker_2);
858
859 let fifo_size = mgr.0.issue_manager.lock().unwrap().fifo_issues.len();
860 let cache_size_2 = mgr.0.issue_manager.lock().unwrap().cache.len();
861 assert_eq!(cache_size_2, 1, "Duplicate issue should be ignored");
862 assert_eq!(
863 fifo_size, 1,
864 "FIFO queue size should remain unchanged on duplicate issue"
865 );
866
867 let issue_marker_3 = IssueMarker {
869 timestamp: BASE_TIME + Duration::from_secs(11), ..issue_marker
871 };
872 mgr.0
873 .issue_manager
874 .lock()
875 .unwrap()
876 .add_issue(issue, issue_marker_3);
877
878 let fifo_size_3 = mgr.0.issue_manager.lock().unwrap().fifo_issues.len();
879 let cache_size_3 = mgr.0.issue_manager.lock().unwrap().cache.len();
880 assert_eq!(
881 cache_size_3, 1,
882 "Issue outside dedup window should update existing"
883 );
884 assert_eq!(
885 fifo_size_3, 2,
886 "FIFO queue size should increase for new issue outside dedup window"
887 );
888 }
889
890 #[tokio::test]
893 #[test_log::test]
894 async fn should_apply_issues_to_new_paths_on_fetch() {
895 let cfg = base_config();
896 let fetcher = MockFetcher::new(Ok(vec![]));
897 let (mgr, mut path_set) = manual_pathset(BASE_TIME, fetcher.clone(), cfg, None);
898
899 path_set.maintain(BASE_TIME, &mgr).await;
900
901 let issue_marker = IssueMarker {
903 target: IssueMarkerTarget::FirstHop {
904 isd_asn: SRC_ADDR.isd_asn(),
905 egress_interface: 1,
906 },
907 timestamp: BASE_TIME,
908 penalty: Score::new_clamped(-0.5),
909 };
910
911 let issue = IssueKind::Socket {
912 err: SendError::FirstHopUnreachable {
913 isd_asn: SRC_ADDR.isd_asn(),
914 interface_id: 1,
915 msg: "test".into(),
916 },
917 };
918
919 mgr.0
921 .issue_manager
922 .lock()
923 .unwrap()
924 .add_issue(issue, issue_marker);
925
926 path_set.drain_and_apply_issue_channel(BASE_TIME);
928
929 fetcher.lock().unwrap().set_response(generate_responses(
931 3,
932 0,
933 BASE_TIME + Duration::from_secs(1),
934 DEFAULT_EXP_UNITS,
935 ));
936
937 let next_refetch = path_set.internal.next_refetch;
938 path_set.maintain(next_refetch, &mgr).await;
939
940 let affected_path = path_set
942 .internal
943 .cached_paths
944 .first()
945 .expect("Path should exist");
946
947 let score = affected_path
948 .reliability
949 .score(BASE_TIME + Duration::from_secs(1))
950 .value();
951 assert!(
952 score < 0.0,
953 "Newly fetched path should have cached issue applied. Score: {}",
954 score
955 );
956 }
957
958 #[tokio::test]
960 #[test_log::test]
961 async fn should_trigger_active_path_reevaluation_on_issue() {
962 let cfg = base_config();
963 let fetcher = MockFetcher::new(generate_responses(5, 0, BASE_TIME, DEFAULT_EXP_UNITS));
964 let (mgr, mut path_set) = manual_pathset(BASE_TIME, fetcher.clone(), cfg, None);
965
966 path_set.maintain(BASE_TIME, &mgr).await;
967
968 let active_fp = path_set.shared.active_path.load().as_ref().unwrap().1;
969
970 let issue_marker = IssueMarker {
972 target: IssueMarkerTarget::FullPath {
973 fingerprint: active_fp,
974 },
975 timestamp: BASE_TIME,
976 penalty: Score::new_clamped(-1.0), };
978
979 let issue = IssueKind::Socket {
980 err: SendError::FirstHopUnreachable {
981 isd_asn: SRC_ADDR.isd_asn(),
982 interface_id: 1,
983 msg: "test".into(),
984 },
985 };
986
987 mgr.0
989 .issue_manager
990 .lock()
991 .unwrap()
992 .add_issue(issue, issue_marker);
993
994 let recv_result = path_set.internal.issue_rx.recv().await;
996 path_set.handle_issue_rx(BASE_TIME, recv_result, &mgr);
997
998 let new_active_fp = path_set.shared.active_path.load().as_ref().unwrap().1;
1000 assert_ne!(
1001 active_fp, new_active_fp,
1002 "Active path should change when severely penalized"
1003 );
1004 }
1005
1006 #[tokio::test]
1007 #[test_log::test]
1008 async fn should_swap_to_better_path_if_one_appears() {
1009 let cfg = base_config();
1010 let fetcher = MockFetcher::new(generate_responses(1, 0, BASE_TIME, DEFAULT_EXP_UNITS));
1011 let (mgr, mut path_set) = manual_pathset(BASE_TIME, fetcher.clone(), cfg, None);
1012
1013 path_set.maintain(BASE_TIME, &mgr).await;
1014
1015 path_set
1017 .shared
1018 .was_used_in_idle_period
1019 .store(true, std::sync::atomic::Ordering::Relaxed);
1020
1021 let active_fp = path_set.shared.active_path.load().as_ref().unwrap().1;
1022
1023 let issue_marker = IssueMarker {
1025 target: IssueMarkerTarget::FullPath {
1026 fingerprint: active_fp,
1027 },
1028 timestamp: BASE_TIME,
1029 penalty: Score::new_clamped(-0.8),
1030 };
1031
1032 mgr.0.issue_manager.lock().unwrap().add_issue(
1033 IssueKind::Socket {
1034 err: SendError::FirstHopUnreachable {
1035 isd_asn: SRC_ADDR.isd_asn(),
1036 interface_id: 1,
1037 msg: "test".into(),
1038 },
1039 },
1040 issue_marker,
1041 );
1042
1043 let active_fp_after_issue = path_set.shared.active_path.load().as_ref().unwrap().1;
1045 assert_eq!(
1046 active_fp, active_fp_after_issue,
1047 "Active path should remain the same if no better path exists"
1048 );
1049
1050 fetcher.lock().unwrap().set_response(generate_responses(
1052 1,
1053 100,
1054 BASE_TIME + Duration::from_secs(1),
1055 DEFAULT_EXP_UNITS,
1056 ));
1057
1058 path_set
1059 .maintain(path_set.internal.next_refetch, &mgr)
1060 .await;
1061 path_set
1063 .shared
1064 .was_used_in_idle_period
1065 .store(true, std::sync::atomic::Ordering::Relaxed);
1066
1067 let new_active_fp = path_set.shared.active_path.load().as_ref().unwrap().1;
1069 assert_ne!(
1070 active_fp, new_active_fp,
1071 "Active path should change when a better path appears"
1072 );
1073
1074 let positive_score = Score::new_clamped(0.8);
1076 let mut reliability = ReliabilityScore::new_with_time(path_set.internal.next_refetch);
1077 reliability.update(positive_score, path_set.internal.next_refetch);
1078
1079 path_set
1081 .internal
1082 .cached_paths
1083 .iter_mut()
1084 .find(|e| e.fingerprint == active_fp)
1085 .unwrap()
1086 .reliability = reliability;
1087
1088 path_set
1089 .maintain(path_set.internal.next_refetch, &mgr)
1090 .await;
1091
1092 assert_eq!(
1093 active_fp,
1094 path_set.shared.active_path.load().as_ref().unwrap().1,
1095 "Active path should change on positive score diff"
1096 );
1097 }
1098
1099 #[tokio::test]
1100 #[test_log::test]
1101 async fn should_keep_max_issue_cache_size() {
1102 let max_size = 10;
1103 let mut issue_mgr = PathIssueManager::new(max_size, 64, Duration::from_secs(10));
1104
1105 for i in 0..20u16 {
1107 let issue_marker = IssueMarker {
1108 target: IssueMarkerTarget::FirstHop {
1109 isd_asn: IsdAsn::new(Isd(1), Asn(1)),
1110 egress_interface: i,
1111 },
1112 timestamp: BASE_TIME + Duration::from_secs(i as u64),
1113 penalty: Score::new_clamped(-0.1),
1114 };
1115
1116 let issue = IssueKind::Socket {
1117 err: SendError::FirstHopUnreachable {
1118 isd_asn: IsdAsn::new(Isd(1), Asn(1)),
1119 interface_id: i,
1120 msg: "test".into(),
1121 },
1122 };
1123
1124 issue_mgr.add_issue(issue, issue_marker);
1125 }
1126
1127 assert!(
1129 issue_mgr.cache.len() <= max_size,
1130 "Cache size {} should not exceed max {}",
1131 issue_mgr.cache.len(),
1132 max_size
1133 );
1134
1135 assert_eq!(issue_mgr.cache.len(), issue_mgr.fifo_issues.len());
1137 }
1138 }
1139
1140 pub mod helpers {
1141 use std::{
1142 hash::{DefaultHasher, Hash, Hasher},
1143 net::{IpAddr, Ipv4Addr},
1144 sync::{Arc, Mutex},
1145 time::{Duration, SystemTime},
1146 };
1147
1148 use scion_proto::{
1149 address::{Asn, EndhostAddr, Isd, IsdAsn},
1150 path::{Path, test_builder::TestPathBuilder},
1151 };
1152 use tokio::sync::Notify;
1153
1154 use super::*;
1155 use crate::path::manager::{MultiPathManagerInner, PathIssueManager, pathset::PathSet};
1156
1157 pub const SRC_ADDR: EndhostAddr = EndhostAddr::new(
1158 IsdAsn::new(Isd(1), Asn(1)),
1159 IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
1160 );
1161 pub const DST_ADDR: EndhostAddr = EndhostAddr::new(
1162 IsdAsn::new(Isd(2), Asn(1)),
1163 IpAddr::V4(Ipv4Addr::new(127, 0, 0, 2)),
1164 );
1165
1166 pub const DEFAULT_EXP_UNITS: u8 = 100;
1167 pub const BASE_TIME: SystemTime = SystemTime::UNIX_EPOCH;
1168
1169 pub fn dummy_path(hop_count: u16, timestamp: u32, exp_units: u8, seed: u32) -> Path {
1170 let mut builder: TestPathBuilder = TestPathBuilder::new(SRC_ADDR, DST_ADDR)
1171 .using_info_timestamp(timestamp)
1172 .with_hop_expiry(exp_units)
1173 .up();
1174
1175 builder = builder.add_hop(0, 1);
1176
1177 for cnt in 0..hop_count {
1178 let mut hash = DefaultHasher::new();
1179 seed.hash(&mut hash);
1180 cnt.hash(&mut hash);
1181 let hash = hash.finish() as u32;
1182
1183 let hop = hash.saturating_sub(2) as u16; builder = builder.with_asn(hash).add_hop(hop + 1, hop + 2);
1185 }
1186
1187 builder = builder.add_hop(1, 0);
1188
1189 builder.build(timestamp).path()
1190 }
1191
1192 pub fn base_config() -> MultiPathManagerConfig {
1193 MultiPathManagerConfig {
1194 max_cached_paths_per_pair: 5,
1195 refetch_interval: Duration::from_secs(100),
1196 min_refetch_delay: Duration::from_secs(1),
1197 min_expiry_threshold: Duration::from_secs(5),
1198 max_idle_period: Duration::from_secs(30),
1199 fetch_failure_backoff: BackoffConfig {
1200 minimum_delay_secs: 1.0,
1201 maximum_delay_secs: 10.0,
1202 factor: 2.0,
1203 jitter_secs: 0.0,
1204 },
1205 issue_cache_size: 64,
1206 issue_broadcast_size: 64,
1207 issue_deduplication_window: Duration::from_secs(10),
1208 path_swap_score_threshold: 0.1,
1209 }
1210 }
1211
1212 pub fn generate_responses(
1213 path_count: u16,
1214 path_seed: u32,
1215 timestamp: SystemTime,
1216 exp_units: u8,
1217 ) -> Result<Vec<Path>, String> {
1218 let mut paths = Vec::new();
1219 for resp_id in 0..path_count {
1220 paths.push(dummy_path(
1221 2,
1222 timestamp
1223 .duration_since(SystemTime::UNIX_EPOCH)
1224 .unwrap()
1225 .as_secs() as u32,
1226 exp_units,
1227 path_seed + resp_id as u32,
1228 ));
1229 }
1230
1231 Ok(paths)
1232 }
1233
1234 pub struct MockFetcher {
1235 next_response: Result<Vec<Path>, String>,
1236 pub received_requests: usize,
1237 pub wait_till_notify: bool,
1238 pub notify_to_resolve: Arc<Notify>,
1239 }
1240 impl MockFetcher {
1241 pub fn new(response: Result<Vec<Path>, String>) -> Arc<Mutex<Self>> {
1242 Arc::new(Mutex::new(Self {
1243 next_response: response,
1244 received_requests: 0,
1245 wait_till_notify: false,
1246 notify_to_resolve: Arc::new(Notify::new()),
1247 }))
1248 }
1249
1250 pub fn set_response(&mut self, response: Result<Vec<Path>, String>) {
1251 self.next_response = response;
1252 }
1253
1254 pub fn wait_till_notify(&mut self, wait: bool) {
1255 self.wait_till_notify = wait;
1256 }
1257
1258 pub fn notify(&self) {
1259 self.notify_to_resolve.notify_waiters();
1260 }
1261 }
1262
1263 impl PathFetcher for Arc<Mutex<MockFetcher>> {
1264 async fn fetch_paths(
1265 &self,
1266 _src: IsdAsn,
1267 _dst: IsdAsn,
1268 ) -> Result<Vec<Path>, PathFetchError> {
1269 let response;
1270 let notify = {
1272 let mut guard = self.lock().unwrap();
1273
1274 guard.received_requests += 1;
1275 response = guard.next_response.clone();
1276
1277 if guard.wait_till_notify {
1279 let notif = guard.notify_to_resolve.clone().notified_owned();
1280 Some(notif)
1281 } else {
1282 None
1283 }
1284 };
1285
1286 if let Some(notif) = notify {
1287 notif.await;
1288 }
1289
1290 match response {
1291 Ok(paths) if paths.is_empty() => Err(PathFetchError::NoPathsFound),
1292 Ok(paths) => Ok(paths),
1293 Err(e) => Err(PathFetchError::InternalError(e.into())),
1294 }
1295 }
1296 }
1297
1298 pub fn manual_pathset<F: PathFetcher>(
1299 now: SystemTime,
1300 fetcher: F,
1301 cfg: MultiPathManagerConfig,
1302 strategy: Option<PathStrategy>,
1303 ) -> (MultiPathManager<F>, PathSet<F>) {
1304 let mgr_inner = MultiPathManagerInner {
1305 config: cfg,
1306 fetcher,
1307 path_strategy: strategy.unwrap_or_else(|| {
1308 let mut ps = PathStrategy::default();
1309 ps.scoring.use_default_scorers();
1310 ps
1311 }),
1312 issue_manager: Mutex::new(PathIssueManager::new(64, 64, Duration::from_secs(10))),
1313 managed_paths: HashIndex::new(),
1314 };
1315 let mgr = MultiPathManager(Arc::new(mgr_inner));
1316 let issue_rx = mgr.0.issue_manager.lock().unwrap().issues_subscriber();
1317 let mgr_ref = mgr.weak_ref();
1318 (
1319 mgr,
1320 PathSet::new_with_time(
1321 SRC_ADDR.isd_asn(),
1322 DST_ADDR.isd_asn(),
1323 mgr_ref,
1324 cfg,
1325 issue_rx,
1326 now,
1327 ),
1328 )
1329 }
1330 }
1331}