Skip to main content

scion_stack/path/
manager.rs

1// Copyright 2025 Anapaya Systems
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//   http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Multipath manager for SCION path selection.
16//!
17//! Runs one task per (src,dst) pair. Each task fetches paths, filters them, applies issue
18//! penalties, ranks candidates, and picks an active path.
19//!
20//! Tasks track expiry, refetch intervals, backoff after failures, and drop entries that go
21//! idle. The Active path for a (src,dst) pair is exposed lock-free via `ArcSwap`.
22//!
23//! All path data comes from the provided `PathFetcher`. Issue reports feed
24//! into reliability scoring and can trigger immediate re-ranking.
25//!
26//! ## Issue Handling & Penalties
27//!
28//! Incoming issues are applied to cached paths immediately and can trigger an active-path
29//! switch. Issues are cached with a timestamp and applied to newly fetched paths.
30//!
31//! Penalties on individual paths and individual cached issues decay over time. Allowing paths
32//! to recover.
33//!
34//! ## Active Path Switching
35//!
36//! If no active path exists, the highest-ranked valid path is selected.
37//! Active path is replaced when it expires, nears expiry, or falls behind the best candidate
38//! by a configured score margin.
39
40// Internal:
41//
42// ## Core components
43//
44// MultiPathManager: Central entry point. Holds configuration, the global issue manager, and a
45// concurrent map from (src, dst) to worker. Spawns a worker on first access and provides lock-free
46// reads to workers.
47//
48// PathSet: Per-tuple worker. Fetches paths, filters them, applies issue penalties, ranks
49// candidates, and maintains an active path. Runs a periodic maintenance loop handling refetch,
50// backoff, and idle shutdown.
51//
52// PathIssueManager:  Global issue cache and broadcast system. Deduplicates issues and notifies all
53// workers of incoming issues.
54//
55// IssueKind / IssueMarker: Describe concrete path problems (SCMP, socket errors). Compute the
56// affected hop or full path, assign a penalty, and support deduplication and decay.
57
58use std::{
59    collections::{HashMap, VecDeque, hash_map},
60    sync::{Arc, Mutex, Weak},
61    time::{Duration, SystemTime},
62};
63
64use scc::HashIndex;
65use scion_proto::{address::IsdAsn, path::Path, scmp::ScmpErrorMessage};
66use scion_sdk_utils::backoff::BackoffConfig;
67use tokio::sync::broadcast::{self};
68
69use crate::{
70    path::{
71        PathStrategy,
72        fetcher::{
73            PathFetcherImpl,
74            traits::{PathFetchError, PathFetcher},
75        },
76        manager::{
77            issues::{IssueKind, IssueMarker, IssueMarkerTarget, SendError},
78            pathset::{PathSet, PathSetHandle, PathSetTask},
79            traits::{PathManager, PathPrefetcher, PathWaitError, SyncPathManager},
80        },
81        types::PathManagerPath,
82    },
83    scionstack::{
84        ScionSocketSendError, scmp_handler::ScmpErrorReceiver, socket::SendErrorReceiver,
85    },
86};
87
88mod algo;
89/// Path issue definitions, including mapping issues to affected targets and their respective
90/// penalties
91mod issues;
92/// Pathsets manage paths for a specific src-dst pair.
93mod pathset;
94/// Path reliability tracking
95pub mod reliability;
96/// Path fetcher traits and types.
97pub mod traits;
98
99/// Configuration for the MultiPathManager.
100#[derive(Debug, Clone, Copy)]
101pub struct MultiPathManagerConfig {
102    /// Maximum number of cached paths per src-dst pair.
103    max_cached_paths_per_pair: usize,
104    /// Interval between path refetches
105    refetch_interval: Duration,
106    /// Minimum duration between path refetches.
107    min_refetch_delay: Duration,
108    /// Minimum remaining expiry before refetching paths.
109    min_expiry_threshold: Duration,
110    /// Maximum idle period before the managed paths are removed.
111    max_idle_period: Duration,
112    /// Backoff configuration for path fetch failures.
113    fetch_failure_backoff: BackoffConfig,
114    /// Count of issues to be cached
115    issue_cache_size: usize,
116    /// Size of the issue cache broadcast channel
117    issue_broadcast_size: usize,
118    /// Time window to ignore duplicate issues
119    issue_deduplication_window: Duration,
120    /// Score difference after which active path should be replaced
121    path_swap_score_threshold: f32,
122}
123
124impl Default for MultiPathManagerConfig {
125    fn default() -> Self {
126        MultiPathManagerConfig {
127            max_cached_paths_per_pair: 50,
128            refetch_interval: Duration::from_secs(60 * 30), // 30 minutes
129            min_refetch_delay: Duration::from_secs(60),
130            min_expiry_threshold: Duration::from_secs(60 * 5), // 5 minutes
131            max_idle_period: Duration::from_secs(60 * 2),      // 2 minutes
132            fetch_failure_backoff: BackoffConfig {
133                minimum_delay_secs: 60.0,
134                maximum_delay_secs: 300.0,
135                factor: 1.5,
136                jitter_secs: 5.0,
137            },
138            issue_cache_size: 100,
139            issue_broadcast_size: 10,
140            // Same issue within 10s is duplicate
141            issue_deduplication_window: Duration::from_secs(10),
142            path_swap_score_threshold: 0.5,
143        }
144    }
145}
146
147impl MultiPathManagerConfig {
148    /// Validates the configuration.
149    fn validate(&self) -> Result<(), &'static str> {
150        if self.min_refetch_delay > self.refetch_interval {
151            return Err("min_refetch_delay must be smaller than refetch_interval");
152            // Otherwise, refetch interval makes no sense
153        }
154
155        if self.min_refetch_delay > self.min_expiry_threshold {
156            return Err("min_refetch_delay must be smaller than min_expiry_threshold");
157            // Otherwise, very unlikely, we have paths expiring before we can refetch
158        }
159
160        Ok(())
161    }
162}
163
164/// Errors that can occur when getting a path.
165#[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)]
166pub enum GetPathError {
167    /// No paths are available for the given src-dst pair.
168    #[error("no paths are available for the given src-dst pair")]
169    NoPaths,
170}
171
172/// Path manager managing multiple paths per src-dst pair.
173pub struct MultiPathManager<F: PathFetcher = PathFetcherImpl>(Arc<MultiPathManagerInner<F>>);
174
175impl<F> Clone for MultiPathManager<F>
176where
177    F: PathFetcher,
178{
179    fn clone(&self) -> Self {
180        MultiPathManager(self.0.clone())
181    }
182}
183
184struct MultiPathManagerInner<F: PathFetcher> {
185    config: MultiPathManagerConfig,
186    fetcher: F,
187    path_strategy: PathStrategy,
188    issue_manager: Mutex<PathIssueManager>,
189    managed_paths: HashIndex<(IsdAsn, IsdAsn), (PathSetHandle, PathSetTask)>,
190}
191
192impl<F: PathFetcher> MultiPathManager<F> {
193    /// Creates a new [`MultiPathManager`].
194    pub fn new(
195        config: MultiPathManagerConfig,
196        fetcher: F,
197        path_strategy: PathStrategy,
198    ) -> Result<Self, &'static str> {
199        config.validate()?;
200
201        let issue_manager = Mutex::new(PathIssueManager::new(
202            config.issue_cache_size,
203            config.issue_broadcast_size,
204            config.issue_deduplication_window,
205        ));
206
207        Ok(MultiPathManager(Arc::new(MultiPathManagerInner {
208            config,
209            fetcher,
210            issue_manager,
211            path_strategy,
212            managed_paths: HashIndex::new(),
213        })))
214    }
215
216    /// Tries to get the active path for the given src-dst pair.
217    ///
218    /// If no active path is set, returns None.
219    ///
220    /// If the src-dst pair is not yet managed, starts managing it.
221    pub fn try_path(&self, src: IsdAsn, dst: IsdAsn, now: SystemTime) -> Option<Path> {
222        let try_path = self
223            .0
224            .managed_paths
225            .peek_with(&(src, dst), |_, (handle, _)| {
226                handle.try_active_path().as_deref().map(|p| p.0.clone())
227            })
228            .flatten();
229
230        match try_path {
231            Some(active) => {
232                // XXX(ake): Since the Paths are actively managed, they should never be expired
233                // here.
234                let expired = active.is_expired(now.into()).unwrap_or(true);
235                debug_assert!(!expired, "Returned expired path from try_get_path");
236
237                Some(active)
238            }
239            None => {
240                // Start managing paths for the src-dst pair
241                self.fast_ensure_managed_paths(src, dst);
242                None
243            }
244        }
245    }
246
247    /// Gets the active path for the given src-dst pair.
248    ///
249    /// If the src-dst pair is not yet managed, starts managing it, possibly waiting for the first
250    /// path fetch.
251    ///
252    /// Returns an error if no path is available after waiting.
253    pub async fn path(
254        &self,
255        src: IsdAsn,
256        dst: IsdAsn,
257        now: SystemTime,
258    ) -> Result<Path, Arc<PathFetchError>> {
259        let try_path = self
260            .0
261            .managed_paths
262            .peek_with(&(src, dst), |_, (handle, _)| {
263                handle.try_active_path().as_deref().map(|p| p.0.clone())
264            })
265            .flatten();
266
267        let res = match try_path {
268            Some(active) => Ok(active),
269            None => {
270                // Ensure paths are being managed
271                let path_set = self.ensure_managed_paths(src, dst);
272
273                // Try to get active path, possibly waiting for initialization/update
274                let active = path_set.active_path().await.as_ref().map(|p| p.0.clone());
275
276                // Check active path after waiting
277                match active {
278                    Some(active) => Ok(active),
279                    None => {
280                        // No active path even after waiting, return last error if any
281                        let last_error = path_set.current_error();
282                        match last_error {
283                            Some(e) => Err(e),
284                            None => {
285                                // There is a chance for a race here, where the error was cleared
286                                // between the wait and now. In that case, we assume no paths were
287                                // found.
288                                Err(Arc::new(PathFetchError::NoPathsFound))
289                            }
290                        }
291                    }
292                }
293            }
294        };
295
296        if let Ok(active) = &res {
297            // XXX(ake): Since the Paths are actively managed, they should never be expired
298            // here.
299            let expired = active.is_expired(now.into()).unwrap_or(true);
300            debug_assert!(!expired, "Returned expired path from get_path");
301        }
302
303        res
304    }
305
306    /// Creates a weak reference to this MultiPathManager.
307    pub fn weak_ref(&self) -> MultiPathManagerRef<F> {
308        MultiPathManagerRef(Arc::downgrade(&self.0))
309    }
310
311    /// Quickly ensures that paths are being managed for the given src-dst pair.
312    ///
313    /// Does nothing if paths are already being managed.
314    fn fast_ensure_managed_paths(&self, src: IsdAsn, dst: IsdAsn) {
315        if self.0.managed_paths.contains(&(src, dst)) {
316            return;
317        }
318
319        self.ensure_managed_paths(src, dst);
320    }
321
322    /// Starts managing paths for the given src-dst pair.
323    ///
324    /// Returns a reference to the managed paths.
325    fn ensure_managed_paths(&self, src: IsdAsn, dst: IsdAsn) -> PathSetHandle {
326        let entry = match self.0.managed_paths.entry_sync((src, dst)) {
327            scc::hash_index::Entry::Occupied(occupied) => {
328                tracing::trace!(%src, %dst, "Already managing paths for src-dst pair");
329                occupied
330            }
331            scc::hash_index::Entry::Vacant(vacant) => {
332                tracing::info!(%src, %dst, "Starting to manage paths for src-dst pair");
333                let managed = PathSet::new(
334                    src,
335                    dst,
336                    self.weak_ref(),
337                    self.0.config,
338                    self.0.issue_manager.lock().unwrap().issues_subscriber(),
339                );
340
341                vacant.insert_entry(managed.manage())
342            }
343        };
344
345        entry.get().0.clone()
346    }
347
348    /// Stops managing paths for the given src-dst pair.
349    pub fn stop_managing_paths(&self, src: IsdAsn, dst: IsdAsn) {
350        if self.0.managed_paths.remove_sync(&(src, dst)) {
351            tracing::info!(%src, %dst, "Stopped managing paths for src-dst pair");
352        }
353    }
354
355    /// report error
356    pub fn report_path_issue(&self, timestamp: SystemTime, issue: IssueKind, path: Option<&Path>) {
357        let Some(applies_to) = issue.target_type(path) else {
358            // Not a path issue we care about
359            return;
360        };
361
362        if matches!(applies_to, IssueMarkerTarget::DestinationNetwork { .. }) {
363            // We can't handle dst network issues in a global path manager
364            return;
365        }
366
367        tracing::debug!(%issue, "New path issue");
368
369        let issue_marker = IssueMarker {
370            target: applies_to,
371            timestamp,
372            penalty: issue.penalty(),
373        };
374
375        // Push to issues cache
376        {
377            let mut issues_guard = self.0.issue_manager.lock().unwrap();
378            issues_guard.add_issue(issue, issue_marker.clone());
379        }
380    }
381}
382
383impl<F: PathFetcher> ScmpErrorReceiver for MultiPathManager<F> {
384    fn report_scmp_error(&self, scmp_error: ScmpErrorMessage, path: &Path) {
385        self.report_path_issue(
386            SystemTime::now(),
387            IssueKind::Scmp { error: scmp_error },
388            Some(path),
389        );
390    }
391}
392
393impl<F: PathFetcher> SendErrorReceiver for MultiPathManager<F> {
394    fn report_send_error(&self, error: &ScionSocketSendError) {
395        if let Some(send_error) = SendError::from_socket_send_error(error) {
396            self.report_path_issue(
397                SystemTime::now(),
398                IssueKind::Socket { err: send_error },
399                None,
400            );
401        }
402    }
403}
404
405impl<F: PathFetcher> SyncPathManager for MultiPathManager<F> {
406    fn register_path(
407        &self,
408        _src: IsdAsn,
409        _dst: IsdAsn,
410        _now: chrono::DateTime<chrono::Utc>,
411        _path: Path<bytes::Bytes>,
412    ) {
413        // No-op
414        // Based on discussions we do not support externally registered paths in the PathManager
415        // Likely we will handle path mirroring in Connection Based Protocols instead
416    }
417
418    fn try_cached_path(
419        &self,
420        src: IsdAsn,
421        dst: IsdAsn,
422        now: chrono::DateTime<chrono::Utc>,
423    ) -> std::io::Result<Option<Path<bytes::Bytes>>> {
424        Ok(self.try_path(src, dst, now.into()))
425    }
426}
427
428impl<F: PathFetcher> PathManager for MultiPathManager<F> {
429    fn path_wait(
430        &self,
431        src: IsdAsn,
432        dst: IsdAsn,
433        now: chrono::DateTime<chrono::Utc>,
434    ) -> impl crate::types::ResFut<'_, Path<bytes::Bytes>, PathWaitError> {
435        async move {
436            match self.path(src, dst, now.into()).await {
437                Ok(path) => Ok(path),
438                Err(e) => {
439                    match &*e {
440                        PathFetchError::FetchSegments(error) => {
441                            Err(PathWaitError::FetchFailed(format!("{error}")))
442                        }
443                        PathFetchError::InternalError(msg) => {
444                            Err(PathWaitError::FetchFailed(msg.to_string()))
445                        }
446                        PathFetchError::NoPathsFound => Err(PathWaitError::NoPathFound),
447                    }
448                }
449            }
450        }
451    }
452}
453
454impl<F: PathFetcher> PathPrefetcher for MultiPathManager<F> {
455    fn prefetch_path(&self, src: IsdAsn, dst: IsdAsn) {
456        self.ensure_managed_paths(src, dst);
457    }
458}
459
460/// Weak reference to a MultiPathManager.
461///
462/// Can be upgraded to a strong reference using [`get`](Self::get).
463pub struct MultiPathManagerRef<F: PathFetcher>(Weak<MultiPathManagerInner<F>>);
464
465impl<F: PathFetcher> Clone for MultiPathManagerRef<F> {
466    fn clone(&self) -> Self {
467        MultiPathManagerRef(self.0.clone())
468    }
469}
470
471impl<F: PathFetcher> MultiPathManagerRef<F> {
472    /// Attempts to upgrade the weak reference to a strong reference.
473    pub fn get(&self) -> Option<MultiPathManager<F>> {
474        self.0.upgrade().map(|arc| MultiPathManager(arc))
475    }
476}
477
478/// Path Issue manager
479///
480/// Receives reported issues, deduplicates them, and broadcasts them to all path sets.
481struct PathIssueManager {
482    // Config
483    max_entries: usize,
484    deduplication_window: Duration,
485
486    // Mutable
487    /// Map of issue ID to issue marker
488    cache: HashMap<u64, IssueMarker>,
489    // FiFo queue of issue IDs and their timestamps
490    fifo_issues: VecDeque<(u64, SystemTime)>,
491
492    /// Channel for broadcasting issues
493    issue_broadcast_tx: broadcast::Sender<(u64, IssueMarker)>,
494}
495
496impl PathIssueManager {
497    fn new(max_entries: usize, broadcast_buffer: usize, deduplication_window: Duration) -> Self {
498        let (issue_broadcast_tx, _) = broadcast::channel(broadcast_buffer);
499        PathIssueManager {
500            max_entries,
501            deduplication_window,
502            cache: HashMap::new(),
503            fifo_issues: VecDeque::new(),
504            issue_broadcast_tx,
505        }
506    }
507
508    /// Returns a subscriber to the issue broadcast channel.
509    pub fn issues_subscriber(&self) -> broadcast::Receiver<(u64, IssueMarker)> {
510        self.issue_broadcast_tx.subscribe()
511    }
512
513    /// Adds a new issue to the manager.
514    ///
515    /// Issues might cause the Active path to change immediately.
516    ///
517    /// All issues get cached to be applied to newly fetched paths.
518    ///
519    /// If a similar issue, applying to the same Path is seen in the deduplication window, it will
520    /// be ignored.
521    pub fn add_issue(&mut self, issue: IssueKind, marker: IssueMarker) {
522        let id = issue.dedup_id(&marker.target);
523
524        // Check if we already have this issue
525        if let Some(existing_marker) = self.cache.get(&id) {
526            let time_since_last_seen = marker
527                .timestamp
528                .duration_since(existing_marker.timestamp)
529                .unwrap_or_else(|_| Duration::from_secs(0));
530
531            if time_since_last_seen < self.deduplication_window {
532                tracing::trace!(%id, ?time_since_last_seen, ?marker, %issue, "Ignoring duplicate path issue");
533                // Too soon since last seen, ignore
534                return;
535            }
536        }
537
538        // Broadcast issue
539        self.issue_broadcast_tx.send((id, marker.clone())).ok();
540
541        if self.cache.len() >= self.max_entries {
542            self.pop_front();
543        }
544
545        // Insert issue
546        self.fifo_issues.push_back((id, marker.timestamp)); // Store timestamp for matching on removal
547        self.cache.insert(id, marker);
548    }
549
550    /// Applies all cached issues to the given path.
551    ///
552    /// This is called when a path is fetched, to ensure that issues affecting it are applied.
553    /// Should only be called on fresh paths.
554    ///
555    /// Returns true if any issues were applied.
556    /// Returns the max
557    pub fn apply_cached_issues(&self, entry: &mut PathManagerPath, now: SystemTime) -> bool {
558        let mut applied = false;
559        for issue in self.cache.values() {
560            if issue.target.matches_path(&entry.path, &entry.fingerprint) {
561                entry.reliability.update(issue.decayed_penalty(now), now);
562                applied = true;
563            }
564        }
565        applied
566    }
567
568    /// Pops the oldest issue from the cache.
569    fn pop_front(&mut self) -> Option<IssueMarker> {
570        let (issue_id, timestamp) = self.fifo_issues.pop_front()?;
571
572        match self.cache.entry(issue_id) {
573            hash_map::Entry::Occupied(occupied_entry) => {
574                // Only remove if timestamps match
575                match occupied_entry.get().timestamp == timestamp {
576                    true => Some(occupied_entry.remove()),
577                    false => None, // Entry was updated, do not remove
578                }
579            }
580            hash_map::Entry::Vacant(_) => {
581                debug_assert!(false, "Bad cache: issue ID not found in cache");
582                None
583            }
584        }
585    }
586}
587
588#[cfg(test)]
589mod tests {
590    use helpers::*;
591    use tokio::time::timeout;
592
593    use super::*;
594
595    // The manager should create path sets on request
596    #[tokio::test]
597    #[test_log::test]
598    async fn should_create_pathset_on_request() {
599        let cfg = base_config();
600        let fetcher = MockFetcher::new(generate_responses(5, 0, BASE_TIME, DEFAULT_EXP_UNITS));
601
602        let mgr = MultiPathManager::new(cfg, fetcher, PathStrategy::default())
603            .expect("Should create manager");
604
605        // Initially no managed paths
606        assert!(mgr.0.managed_paths.is_empty());
607
608        // Request a path - should create path set
609        let path = mgr.try_path(SRC_ADDR.isd_asn(), DST_ADDR.isd_asn(), BASE_TIME);
610        // First call returns None (not yet initialized)
611        assert!(path.is_none());
612
613        // But path set should be created
614        assert!(
615            mgr.0
616                .managed_paths
617                .contains(&(SRC_ADDR.isd_asn(), DST_ADDR.isd_asn()))
618        );
619    }
620
621    // The manager should remove idle path sets
622    #[tokio::test]
623    #[test_log::test]
624    async fn should_remove_idle_pathsets() {
625        let mut cfg = base_config();
626        cfg.max_idle_period = Duration::from_millis(10); // Short idle period for testing
627
628        let fetcher = MockFetcher::new(generate_responses(5, 0, BASE_TIME, DEFAULT_EXP_UNITS));
629
630        let mgr = MultiPathManager::new(cfg, fetcher, PathStrategy::default())
631            .expect("Should create manager");
632
633        // Create path set
634        let handle = mgr.ensure_managed_paths(SRC_ADDR.isd_asn(), DST_ADDR.isd_asn());
635
636        // Should exist
637        assert!(
638            mgr.0
639                .managed_paths
640                .contains(&(SRC_ADDR.isd_asn(), DST_ADDR.isd_asn()))
641        );
642
643        // Wait for idle timeout plus some margin
644        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
645
646        // Path set should be removed by idle check
647        let contains = mgr
648            .0
649            .managed_paths
650            .contains(&(SRC_ADDR.isd_asn(), DST_ADDR.isd_asn()));
651
652        assert!(!contains, "Idle path set should be removed");
653
654        let err = handle.current_error();
655        assert!(
656            err.is_some(),
657            "Handle should report error after path set removal"
658        );
659        println!("Error after idle removal: {:?}", err);
660        assert!(
661            err.unwrap().to_string().contains("idle"),
662            "Error message should indicate idle removal"
663        );
664    }
665
666    // Dropping the manager should cancel all path set maintenance tasks
667    #[tokio::test]
668    #[test_log::test]
669    async fn should_cancel_pathset_tasks_on_drop() {
670        let cfg: MultiPathManagerConfig = base_config();
671        let fetcher = MockFetcher::new(generate_responses(5, 0, BASE_TIME, DEFAULT_EXP_UNITS));
672
673        let mgr = MultiPathManager::new(cfg, fetcher, PathStrategy::default())
674            .expect("Should create manager");
675
676        // ensure path set exists and initialized
677        let handle = mgr.ensure_managed_paths(SRC_ADDR.isd_asn(), DST_ADDR.isd_asn());
678        handle.wait_initialized().await;
679
680        let mut set_entry = mgr
681            .0
682            .managed_paths
683            .get_sync(&(SRC_ADDR.isd_asn(), DST_ADDR.isd_asn()))
684            .unwrap();
685
686        let task_handle = unsafe {
687            // swap join handle with a fake one, only possible since the manager doesn't use
688            // the handle
689            let swap_handle = tokio::spawn(async {});
690            std::mem::replace(&mut set_entry.get_mut().1._task, swap_handle)
691        };
692
693        let cancel_token = set_entry.get().1.cancel_token.clone();
694
695        let count = mgr.0.managed_paths.len();
696        assert_eq!(count, 1, "Should have 1 managed path set");
697
698        // Drop the manager
699        drop(mgr);
700        // Cancel token should be triggered
701        assert!(
702            cancel_token.is_cancelled(),
703            "Cancel token should be triggered"
704        );
705
706        // Give tasks time to detect manager drop and exit
707        timeout(Duration::from_millis(50), task_handle)
708            .await
709            .unwrap()
710            .unwrap();
711
712        let err = handle
713            .shared
714            .sync
715            .lock()
716            .unwrap()
717            .current_error
718            .clone()
719            .expect("Should have error after manager drop");
720
721        // XXX(ake): exit reason may vary between "cancelled" and "manager dropped" because
722        // of select!
723        assert!(
724            err.to_string().contains("cancelled") || err.to_string().contains("dropped"),
725            "Error message should indicate cancellation or manager drop"
726        );
727    }
728
729    mod issue_handling {
730        use scc::HashIndex;
731        use scion_proto::address::{Asn, Isd};
732
733        use super::*;
734        use crate::path::{
735            manager::{MultiPathManagerInner, PathIssueManager, reliability::ReliabilityScore},
736            types::Score,
737        };
738
739        // When an issue is ingested, affected paths should have their reliability scores
740        // updated appropriately The issue should be in the issue cache
741        #[tokio::test]
742        #[test_log::test]
743        async fn should_ingest_issues_and_apply_to_existing_paths() {
744            let cfg = base_config();
745            let fetcher = MockFetcher::new(generate_responses(5, 0, BASE_TIME, DEFAULT_EXP_UNITS));
746            let (mgr, mut path_set) = manual_pathset(BASE_TIME, fetcher.clone(), cfg, None);
747
748            path_set.maintain(BASE_TIME, &mgr).await;
749
750            // Get the first path to create an issue for
751            let first_path = &path_set.internal.cached_paths[0];
752            let first_fp = first_path.fingerprint;
753
754            // Create an issue targeting the first hop of the first path
755            let issue = IssueKind::Socket {
756                err: SendError::FirstHopUnreachable {
757                    isd_asn: first_path.path.source(),
758                    interface_id: first_path.path.first_hop_egress_interface().unwrap().id,
759                    msg: "test".into(),
760                },
761            };
762
763            let penalty = Score::new_clamped(-0.3);
764            let marker = IssueMarker {
765                target: issue.target_type(Some(&first_path.path)).unwrap(),
766                timestamp: BASE_TIME,
767                penalty,
768            };
769
770            {
771                let mut issues_guard = mgr.0.issue_manager.lock().unwrap();
772                // Add issue to manager
773                issues_guard.add_issue(issue, marker);
774                // Check issue is in cache
775                assert!(!issues_guard.cache.is_empty(), "Issue should be in cache");
776            }
777            // Handle the issue in path_set
778            let recv_result = path_set.internal.issue_rx.recv().await;
779            path_set.handle_issue_rx(BASE_TIME, recv_result, &mgr);
780
781            // Check that the path's score was updated
782            let updated_path = path_set
783                .internal
784                .cached_paths
785                .iter()
786                .find(|e| e.fingerprint == first_fp)
787                .expect("Path should still exist");
788
789            let updated_score = updated_path.reliability.score(BASE_TIME).value();
790
791            assert!(
792                updated_score == penalty.value(),
793                "Path score should be updated by penalty. Expected: {}, Got: {}",
794                penalty.value(),
795                updated_score
796            );
797
798            // Should decay over time
799            let later_time = BASE_TIME + Duration::from_secs(30);
800            let decayed_score = updated_path.reliability.score(later_time).value();
801            assert!(
802                decayed_score > updated_score,
803                "Path score should recover over time. Updated: {}, Decayed: {}",
804                updated_score,
805                decayed_score
806            );
807        }
808
809        #[tokio::test]
810        #[test_log::test]
811        async fn should_deduplicate_issues_within_window() {
812            let cfg = base_config();
813            let mgr_inner = MultiPathManagerInner {
814                config: cfg,
815                fetcher: MockFetcher::new(Ok(vec![])),
816                path_strategy: PathStrategy::default(),
817                issue_manager: Mutex::new(PathIssueManager::new(64, 64, Duration::from_secs(10))),
818                managed_paths: HashIndex::new(),
819            };
820            let mgr = MultiPathManager(Arc::new(mgr_inner));
821
822            let issue_marker = IssueMarker {
823                target: IssueMarkerTarget::FirstHop {
824                    isd_asn: SRC_ADDR.isd_asn(),
825                    egress_interface: 1,
826                },
827                timestamp: BASE_TIME,
828                penalty: Score::new_clamped(-0.3),
829            };
830
831            let issue = IssueKind::Socket {
832                err: SendError::FirstHopUnreachable {
833                    isd_asn: SRC_ADDR.isd_asn(),
834                    interface_id: 1,
835                    msg: "test".into(),
836                },
837            };
838
839            // Add issue first time
840            mgr.0
841                .issue_manager
842                .lock()
843                .unwrap()
844                .add_issue(issue.clone(), issue_marker.clone());
845            let cache_size_1 = mgr.0.issue_manager.lock().unwrap().cache.len();
846            assert_eq!(cache_size_1, 1);
847
848            // Add same issue within dedup window (should be ignored)
849            let issue_marker_2 = IssueMarker {
850                timestamp: BASE_TIME + Duration::from_secs(1), // Within 10s window
851                ..issue_marker.clone()
852            };
853            mgr.0
854                .issue_manager
855                .lock()
856                .unwrap()
857                .add_issue(issue.clone(), issue_marker_2);
858
859            let fifo_size = mgr.0.issue_manager.lock().unwrap().fifo_issues.len();
860            let cache_size_2 = mgr.0.issue_manager.lock().unwrap().cache.len();
861            assert_eq!(cache_size_2, 1, "Duplicate issue should be ignored");
862            assert_eq!(
863                fifo_size, 1,
864                "FIFO queue size should remain unchanged on duplicate issue"
865            );
866
867            // Add same issue outside dedup window (should be added)
868            let issue_marker_3 = IssueMarker {
869                timestamp: BASE_TIME + Duration::from_secs(11), // Outside 10s window
870                ..issue_marker
871            };
872            mgr.0
873                .issue_manager
874                .lock()
875                .unwrap()
876                .add_issue(issue, issue_marker_3);
877
878            let fifo_size_3 = mgr.0.issue_manager.lock().unwrap().fifo_issues.len();
879            let cache_size_3 = mgr.0.issue_manager.lock().unwrap().cache.len();
880            assert_eq!(
881                cache_size_3, 1,
882                "Issue outside dedup window should update existing"
883            );
884            assert_eq!(
885                fifo_size_3, 2,
886                "FIFO queue size should increase for new issue outside dedup window"
887            );
888        }
889
890        // When new paths are fetched, existing issues in the issue cache should be applied to
891        // them
892        #[tokio::test]
893        #[test_log::test]
894        async fn should_apply_issues_to_new_paths_on_fetch() {
895            let cfg = base_config();
896            let fetcher = MockFetcher::new(Ok(vec![]));
897            let (mgr, mut path_set) = manual_pathset(BASE_TIME, fetcher.clone(), cfg, None);
898
899            path_set.maintain(BASE_TIME, &mgr).await;
900
901            // Create an issue
902            let issue_marker = IssueMarker {
903                target: IssueMarkerTarget::FirstHop {
904                    isd_asn: SRC_ADDR.isd_asn(),
905                    egress_interface: 1,
906                },
907                timestamp: BASE_TIME,
908                penalty: Score::new_clamped(-0.5),
909            };
910
911            let issue = IssueKind::Socket {
912                err: SendError::FirstHopUnreachable {
913                    isd_asn: SRC_ADDR.isd_asn(),
914                    interface_id: 1,
915                    msg: "test".into(),
916                },
917            };
918
919            // Add to manager's issue cache
920            mgr.0
921                .issue_manager
922                .lock()
923                .unwrap()
924                .add_issue(issue, issue_marker);
925
926            // Drain issue channel so no issues are pending
927            path_set.drain_and_apply_issue_channel(BASE_TIME);
928
929            // Now fetch paths again - the issue should be applied to the newly fetched path
930            fetcher.lock().unwrap().set_response(generate_responses(
931                3,
932                0,
933                BASE_TIME + Duration::from_secs(1),
934                DEFAULT_EXP_UNITS,
935            ));
936
937            let next_refetch = path_set.internal.next_refetch;
938            path_set.maintain(next_refetch, &mgr).await;
939
940            // The newly fetched path should have the penalty applied
941            let affected_path = path_set
942                .internal
943                .cached_paths
944                .first()
945                .expect("Path should exist");
946
947            let score = affected_path
948                .reliability
949                .score(BASE_TIME + Duration::from_secs(1))
950                .value();
951            assert!(
952                score < 0.0,
953                "Newly fetched path should have cached issue applied. Score: {}",
954                score
955            );
956        }
957
958        // If the active path is affected by an issue, it should be re-evaluated
959        #[tokio::test]
960        #[test_log::test]
961        async fn should_trigger_active_path_reevaluation_on_issue() {
962            let cfg = base_config();
963            let fetcher = MockFetcher::new(generate_responses(5, 0, BASE_TIME, DEFAULT_EXP_UNITS));
964            let (mgr, mut path_set) = manual_pathset(BASE_TIME, fetcher.clone(), cfg, None);
965
966            path_set.maintain(BASE_TIME, &mgr).await;
967
968            let active_fp = path_set.shared.active_path.load().as_ref().unwrap().1;
969
970            // Create a severe issue targeting the active path
971            let issue_marker = IssueMarker {
972                target: IssueMarkerTarget::FullPath {
973                    fingerprint: active_fp,
974                },
975                timestamp: BASE_TIME,
976                penalty: Score::new_clamped(-1.0), // Severe penalty
977            };
978
979            let issue = IssueKind::Socket {
980                err: SendError::FirstHopUnreachable {
981                    isd_asn: SRC_ADDR.isd_asn(),
982                    interface_id: 1,
983                    msg: "test".into(),
984                },
985            };
986
987            // Add issue
988            mgr.0
989                .issue_manager
990                .lock()
991                .unwrap()
992                .add_issue(issue, issue_marker);
993
994            // Handle issue
995            let recv_result = path_set.internal.issue_rx.recv().await;
996            path_set.handle_issue_rx(BASE_TIME, recv_result, &mgr);
997
998            // Active path should have changed
999            let new_active_fp = path_set.shared.active_path.load().as_ref().unwrap().1;
1000            assert_ne!(
1001                active_fp, new_active_fp,
1002                "Active path should change when severely penalized"
1003            );
1004        }
1005
1006        #[tokio::test]
1007        #[test_log::test]
1008        async fn should_swap_to_better_path_if_one_appears() {
1009            let cfg = base_config();
1010            let fetcher = MockFetcher::new(generate_responses(1, 0, BASE_TIME, DEFAULT_EXP_UNITS));
1011            let (mgr, mut path_set) = manual_pathset(BASE_TIME, fetcher.clone(), cfg, None);
1012
1013            path_set.maintain(BASE_TIME, &mgr).await;
1014
1015            // mark as used to prevent idle removal
1016            path_set
1017                .shared
1018                .was_used_in_idle_period
1019                .store(true, std::sync::atomic::Ordering::Relaxed);
1020
1021            let active_fp = path_set.shared.active_path.load().as_ref().unwrap().1;
1022
1023            // add issue to active path to lower its score
1024            let issue_marker = IssueMarker {
1025                target: IssueMarkerTarget::FullPath {
1026                    fingerprint: active_fp,
1027                },
1028                timestamp: BASE_TIME,
1029                penalty: Score::new_clamped(-0.8),
1030            };
1031
1032            mgr.0.issue_manager.lock().unwrap().add_issue(
1033                IssueKind::Socket {
1034                    err: SendError::FirstHopUnreachable {
1035                        isd_asn: SRC_ADDR.isd_asn(),
1036                        interface_id: 1,
1037                        msg: "test".into(),
1038                    },
1039                },
1040                issue_marker,
1041            );
1042
1043            // active path should be the same
1044            let active_fp_after_issue = path_set.shared.active_path.load().as_ref().unwrap().1;
1045            assert_eq!(
1046                active_fp, active_fp_after_issue,
1047                "Active path should remain the same if no better path exists"
1048            );
1049
1050            // Now fetch a better path
1051            fetcher.lock().unwrap().set_response(generate_responses(
1052                1,
1053                100,
1054                BASE_TIME + Duration::from_secs(1),
1055                DEFAULT_EXP_UNITS,
1056            ));
1057
1058            path_set
1059                .maintain(path_set.internal.next_refetch, &mgr)
1060                .await;
1061            // mark as used to prevent idle removal
1062            path_set
1063                .shared
1064                .was_used_in_idle_period
1065                .store(true, std::sync::atomic::Ordering::Relaxed);
1066
1067            // Active path should have changed
1068            let new_active_fp = path_set.shared.active_path.load().as_ref().unwrap().1;
1069            assert_ne!(
1070                active_fp, new_active_fp,
1071                "Active path should change when a better path appears"
1072            );
1073
1074            // Should also work for positive score changes
1075            let positive_score = Score::new_clamped(0.8);
1076            let mut reliability = ReliabilityScore::new_with_time(path_set.internal.next_refetch);
1077            reliability.update(positive_score, path_set.internal.next_refetch);
1078
1079            // Change old paths reliability to be better
1080            path_set
1081                .internal
1082                .cached_paths
1083                .iter_mut()
1084                .find(|e| e.fingerprint == active_fp)
1085                .unwrap()
1086                .reliability = reliability;
1087
1088            path_set
1089                .maintain(path_set.internal.next_refetch, &mgr)
1090                .await;
1091
1092            assert_eq!(
1093                active_fp,
1094                path_set.shared.active_path.load().as_ref().unwrap().1,
1095                "Active path should change on positive score diff"
1096            );
1097        }
1098
1099        #[tokio::test]
1100        #[test_log::test]
1101        async fn should_keep_max_issue_cache_size() {
1102            let max_size = 10;
1103            let mut issue_mgr = PathIssueManager::new(max_size, 64, Duration::from_secs(10));
1104
1105            // Add more issues than max_size
1106            for i in 0..20u16 {
1107                let issue_marker = IssueMarker {
1108                    target: IssueMarkerTarget::FirstHop {
1109                        isd_asn: IsdAsn::new(Isd(1), Asn(1)),
1110                        egress_interface: i,
1111                    },
1112                    timestamp: BASE_TIME + Duration::from_secs(i as u64),
1113                    penalty: Score::new_clamped(-0.1),
1114                };
1115
1116                let issue = IssueKind::Socket {
1117                    err: SendError::FirstHopUnreachable {
1118                        isd_asn: IsdAsn::new(Isd(1), Asn(1)),
1119                        interface_id: i,
1120                        msg: "test".into(),
1121                    },
1122                };
1123
1124                issue_mgr.add_issue(issue, issue_marker);
1125            }
1126
1127            // Cache should not exceed max_size
1128            assert!(
1129                issue_mgr.cache.len() <= max_size,
1130                "Cache size {} should not exceed max {}",
1131                issue_mgr.cache.len(),
1132                max_size
1133            );
1134
1135            // FIFO queue should match cache size
1136            assert_eq!(issue_mgr.cache.len(), issue_mgr.fifo_issues.len());
1137        }
1138    }
1139
1140    pub mod helpers {
1141        use std::{
1142            hash::{DefaultHasher, Hash, Hasher},
1143            net::{IpAddr, Ipv4Addr},
1144            sync::{Arc, Mutex},
1145            time::{Duration, SystemTime},
1146        };
1147
1148        use scion_proto::{
1149            address::{Asn, EndhostAddr, Isd, IsdAsn},
1150            path::{Path, test_builder::TestPathBuilder},
1151        };
1152        use tokio::sync::Notify;
1153
1154        use super::*;
1155        use crate::path::manager::{MultiPathManagerInner, PathIssueManager, pathset::PathSet};
1156
1157        pub const SRC_ADDR: EndhostAddr = EndhostAddr::new(
1158            IsdAsn::new(Isd(1), Asn(1)),
1159            IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
1160        );
1161        pub const DST_ADDR: EndhostAddr = EndhostAddr::new(
1162            IsdAsn::new(Isd(2), Asn(1)),
1163            IpAddr::V4(Ipv4Addr::new(127, 0, 0, 2)),
1164        );
1165
1166        pub const DEFAULT_EXP_UNITS: u8 = 100;
1167        pub const BASE_TIME: SystemTime = SystemTime::UNIX_EPOCH;
1168
1169        pub fn dummy_path(hop_count: u16, timestamp: u32, exp_units: u8, seed: u32) -> Path {
1170            let mut builder: TestPathBuilder = TestPathBuilder::new(SRC_ADDR, DST_ADDR)
1171                .using_info_timestamp(timestamp)
1172                .with_hop_expiry(exp_units)
1173                .up();
1174
1175            builder = builder.add_hop(0, 1);
1176
1177            for cnt in 0..hop_count {
1178                let mut hash = DefaultHasher::new();
1179                seed.hash(&mut hash);
1180                cnt.hash(&mut hash);
1181                let hash = hash.finish() as u32;
1182
1183                let hop = hash.saturating_sub(2) as u16; // ensure no underflow or overflow
1184                builder = builder.with_asn(hash).add_hop(hop + 1, hop + 2);
1185            }
1186
1187            builder = builder.add_hop(1, 0);
1188
1189            builder.build(timestamp).path()
1190        }
1191
1192        pub fn base_config() -> MultiPathManagerConfig {
1193            MultiPathManagerConfig {
1194                max_cached_paths_per_pair: 5,
1195                refetch_interval: Duration::from_secs(100),
1196                min_refetch_delay: Duration::from_secs(1),
1197                min_expiry_threshold: Duration::from_secs(5),
1198                max_idle_period: Duration::from_secs(30),
1199                fetch_failure_backoff: BackoffConfig {
1200                    minimum_delay_secs: 1.0,
1201                    maximum_delay_secs: 10.0,
1202                    factor: 2.0,
1203                    jitter_secs: 0.0,
1204                },
1205                issue_cache_size: 64,
1206                issue_broadcast_size: 64,
1207                issue_deduplication_window: Duration::from_secs(10),
1208                path_swap_score_threshold: 0.1,
1209            }
1210        }
1211
1212        pub fn generate_responses(
1213            path_count: u16,
1214            path_seed: u32,
1215            timestamp: SystemTime,
1216            exp_units: u8,
1217        ) -> Result<Vec<Path>, String> {
1218            let mut paths = Vec::new();
1219            for resp_id in 0..path_count {
1220                paths.push(dummy_path(
1221                    2,
1222                    timestamp
1223                        .duration_since(SystemTime::UNIX_EPOCH)
1224                        .unwrap()
1225                        .as_secs() as u32,
1226                    exp_units,
1227                    path_seed + resp_id as u32,
1228                ));
1229            }
1230
1231            Ok(paths)
1232        }
1233
1234        pub struct MockFetcher {
1235            next_response: Result<Vec<Path>, String>,
1236            pub received_requests: usize,
1237            pub wait_till_notify: bool,
1238            pub notify_to_resolve: Arc<Notify>,
1239        }
1240        impl MockFetcher {
1241            pub fn new(response: Result<Vec<Path>, String>) -> Arc<Mutex<Self>> {
1242                Arc::new(Mutex::new(Self {
1243                    next_response: response,
1244                    received_requests: 0,
1245                    wait_till_notify: false,
1246                    notify_to_resolve: Arc::new(Notify::new()),
1247                }))
1248            }
1249
1250            pub fn set_response(&mut self, response: Result<Vec<Path>, String>) {
1251                self.next_response = response;
1252            }
1253
1254            pub fn wait_till_notify(&mut self, wait: bool) {
1255                self.wait_till_notify = wait;
1256            }
1257
1258            pub fn notify(&self) {
1259                self.notify_to_resolve.notify_waiters();
1260            }
1261        }
1262
1263        impl PathFetcher for Arc<Mutex<MockFetcher>> {
1264            async fn fetch_paths(
1265                &self,
1266                _src: IsdAsn,
1267                _dst: IsdAsn,
1268            ) -> Result<Vec<Path>, PathFetchError> {
1269                let response;
1270                // Wait for notification if needed
1271                let notify = {
1272                    let mut guard = self.lock().unwrap();
1273
1274                    guard.received_requests += 1;
1275                    response = guard.next_response.clone();
1276
1277                    // maybe wait till notified
1278                    if guard.wait_till_notify {
1279                        let notif = guard.notify_to_resolve.clone().notified_owned();
1280                        Some(notif)
1281                    } else {
1282                        None
1283                    }
1284                };
1285
1286                if let Some(notif) = notify {
1287                    notif.await;
1288                }
1289
1290                match response {
1291                    Ok(paths) if paths.is_empty() => Err(PathFetchError::NoPathsFound),
1292                    Ok(paths) => Ok(paths),
1293                    Err(e) => Err(PathFetchError::InternalError(e.into())),
1294                }
1295            }
1296        }
1297
1298        pub fn manual_pathset<F: PathFetcher>(
1299            now: SystemTime,
1300            fetcher: F,
1301            cfg: MultiPathManagerConfig,
1302            strategy: Option<PathStrategy>,
1303        ) -> (MultiPathManager<F>, PathSet<F>) {
1304            let mgr_inner = MultiPathManagerInner {
1305                config: cfg,
1306                fetcher,
1307                path_strategy: strategy.unwrap_or_else(|| {
1308                    let mut ps = PathStrategy::default();
1309                    ps.scoring.use_default_scorers();
1310                    ps
1311                }),
1312                issue_manager: Mutex::new(PathIssueManager::new(64, 64, Duration::from_secs(10))),
1313                managed_paths: HashIndex::new(),
1314            };
1315            let mgr = MultiPathManager(Arc::new(mgr_inner));
1316            let issue_rx = mgr.0.issue_manager.lock().unwrap().issues_subscriber();
1317            let mgr_ref = mgr.weak_ref();
1318            (
1319                mgr,
1320                PathSet::new_with_time(
1321                    SRC_ADDR.isd_asn(),
1322                    DST_ADDR.isd_asn(),
1323                    mgr_ref,
1324                    cfg,
1325                    issue_rx,
1326                    now,
1327                ),
1328            )
1329        }
1330    }
1331}