Skip to main content

scion_stack/path/
manager.rs

1// Copyright 2025 Anapaya Systems
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//   http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Multipath manager for SCION path selection.
16//!
17//! Runs one task per (src,dst) pair. Each task fetches paths, filters them, applies issue
18//! penalties, ranks candidates, and picks an active path.
19//!
20//! Tasks track expiry, refetch intervals, backoff after failures, and drop entries that go
21//! idle. The Active path for a (src,dst) pair is exposed lock-free via `ArcSwap`.
22//!
23//! All path data comes from the provided `PathFetcher`. Issue reports feed
24//! into reliability scoring and can trigger immediate re-ranking.
25//!
26//! ## Issue Handling & Penalties
27//!
28//! Incoming issues are applied to cached paths immediately and can trigger an active-path
29//! switch. Issues are cached with a timestamp and applied to newly fetched paths.
30//!
31//! Penalties on individual paths and individual cached issues decay over time. Allowing paths
32//! to recover.
33//!
34//! ## Active Path Switching
35//!
36//! If no active path exists, the highest-ranked valid path is selected.
37//! Active path is replaced when it expires, nears expiry, or falls behind the best candidate
38//! by a configured score margin.
39
40// Internal:
41//
42// ## Core components
43//
44// MultiPathManager: Central entry point. Holds configuration, the global issue manager, and a
45// concurrent map from (src, dst) to worker. Spawns a worker on first access and provides lock-free
46// reads to workers.
47//
48// PathSet: Per-tuple worker. Fetches paths, filters them, applies issue penalties, ranks
49// candidates, and maintains an active path. Runs a periodic maintenance loop handling refetch,
50// backoff, and idle shutdown.
51//
52// PathIssueManager:  Global issue cache and broadcast system. Deduplicates issues and notifies all
53// workers of incoming issues.
54//
55// IssueKind / IssueMarker: Describe concrete path problems (SCMP, socket errors). Compute the
56// affected hop or full path, assign a penalty, and support deduplication and decay.
57
58use std::{
59    collections::{HashMap, VecDeque, hash_map},
60    sync::{Arc, Mutex, Weak},
61    time::{Duration, SystemTime},
62};
63
64use scc::HashIndex;
65use scion_proto::{address::IsdAsn, packet::ByEndpoint, path::Path, scmp::ScmpErrorMessage};
66use scion_sdk_utils::backoff::BackoffConfig;
67use tokio::sync::broadcast::{self};
68
69use crate::{
70    path::{
71        PathStrategy,
72        fetcher::{
73            PathFetcherImpl,
74            traits::{PathFetchError, PathFetcher},
75        },
76        manager::{
77            issues::{IssueKind, IssueMarker, IssueMarkerTarget, SendError},
78            pathset::{PathSet, PathSetHandle, PathSetTask},
79            traits::{PathManager, PathPrefetcher, PathWaitError, SyncPathManager},
80        },
81        types::PathManagerPath,
82    },
83    scionstack::{
84        ScionSocketSendError, scmp_handler::ScmpErrorReceiver, socket::SendErrorReceiver,
85    },
86};
87
88mod algo;
89/// Path issue definitions, including mapping issues to affected targets and their respective
90/// penalties
91mod issues;
92/// Pathsets manage paths for a specific src-dst pair.
93mod pathset;
94/// Path reliability tracking
95pub mod reliability;
96/// Path fetcher traits and types.
97pub mod traits;
98
99/// Configuration for the MultiPathManager.
100#[derive(Debug, Clone, Copy)]
101pub struct MultiPathManagerConfig {
102    /// Maximum number of cached paths per src-dst pair.
103    max_cached_paths_per_pair: usize,
104    /// Interval between path refetches
105    refetch_interval: Duration,
106    /// Minimum duration between path refetches.
107    min_refetch_delay: Duration,
108    /// Minimum remaining expiry before refetching paths.
109    min_expiry_threshold: Duration,
110    /// Maximum idle period before the managed paths are removed.
111    max_idle_period: Duration,
112    /// Backoff configuration for path fetch failures.
113    fetch_failure_backoff: BackoffConfig,
114    /// Count of issues to be cached
115    issue_cache_size: usize,
116    /// Size of the issue cache broadcast channel
117    issue_broadcast_size: usize,
118    /// Time window to ignore duplicate issues
119    issue_deduplication_window: Duration,
120    /// Score difference after which active path should be replaced
121    path_swap_score_threshold: f32,
122}
123
124impl Default for MultiPathManagerConfig {
125    fn default() -> Self {
126        MultiPathManagerConfig {
127            max_cached_paths_per_pair: 50,
128            refetch_interval: Duration::from_secs(60 * 30), // 30 minutes
129            min_refetch_delay: Duration::from_secs(60),
130            min_expiry_threshold: Duration::from_secs(60 * 5), // 5 minutes
131            max_idle_period: Duration::from_secs(60 * 2),      // 2 minutes
132            fetch_failure_backoff: BackoffConfig {
133                minimum_delay_secs: 60.0,
134                maximum_delay_secs: 300.0,
135                factor: 1.5,
136                jitter_secs: 5.0,
137            },
138            issue_cache_size: 100,
139            issue_broadcast_size: 10,
140            // Same issue within 10s is duplicate
141            issue_deduplication_window: Duration::from_secs(10),
142            path_swap_score_threshold: 0.5,
143        }
144    }
145}
146
147impl MultiPathManagerConfig {
148    /// Validates the configuration.
149    fn validate(&self) -> Result<(), &'static str> {
150        if self.min_refetch_delay > self.refetch_interval {
151            return Err("min_refetch_delay must be smaller than refetch_interval");
152            // Otherwise, refetch interval makes no sense
153        }
154
155        if self.min_refetch_delay > self.min_expiry_threshold {
156            return Err("min_refetch_delay must be smaller than min_expiry_threshold");
157            // Otherwise, very unlikely, we have paths expiring before we can refetch
158        }
159
160        Ok(())
161    }
162}
163
164/// Path manager managing multiple paths per src-dst pair.
165pub struct MultiPathManager<F: PathFetcher = PathFetcherImpl>(Arc<MultiPathManagerInner<F>>);
166
167impl<F> Clone for MultiPathManager<F>
168where
169    F: PathFetcher,
170{
171    fn clone(&self) -> Self {
172        MultiPathManager(self.0.clone())
173    }
174}
175
176struct MultiPathManagerInner<F: PathFetcher> {
177    config: MultiPathManagerConfig,
178    fetcher: F,
179    path_strategy: PathStrategy,
180    issue_manager: Mutex<PathIssueManager>,
181    managed_paths: HashIndex<(IsdAsn, IsdAsn), (PathSetHandle, PathSetTask)>,
182}
183
184impl<F: PathFetcher> MultiPathManager<F> {
185    /// Creates a new [`MultiPathManager`].
186    pub fn new(
187        config: MultiPathManagerConfig,
188        fetcher: F,
189        path_strategy: PathStrategy,
190    ) -> Result<Self, &'static str> {
191        config.validate()?;
192
193        let issue_manager = Mutex::new(PathIssueManager::new(
194            config.issue_cache_size,
195            config.issue_broadcast_size,
196            config.issue_deduplication_window,
197        ));
198
199        Ok(MultiPathManager(Arc::new(MultiPathManagerInner {
200            config,
201            fetcher,
202            issue_manager,
203            path_strategy,
204            managed_paths: HashIndex::new(),
205        })))
206    }
207
208    /// Tries to get the active path for the given src-dst pair.
209    ///
210    /// If no active path is set, returns None.
211    ///
212    /// If the src-dst pair is not yet managed, starts managing it.
213    pub fn try_path(&self, src: IsdAsn, dst: IsdAsn, now: SystemTime) -> Option<Path> {
214        let try_path = self
215            .0
216            .managed_paths
217            .peek_with(&(src, dst), |_, (handle, _)| {
218                handle.try_active_path().as_deref().map(|p| p.0.clone())
219            })
220            .flatten();
221
222        match try_path {
223            Some(active) => {
224                // XXX(ake): Since the Paths are actively managed, they should never be expired
225                // here.
226                let expired = active.is_expired(now.into()).unwrap_or(true);
227                debug_assert!(!expired, "Returned expired path from try_get_path");
228
229                Some(active)
230            }
231            None => {
232                // Start managing paths for the src-dst pair
233                self.fast_ensure_managed_paths(src, dst);
234                None
235            }
236        }
237    }
238
239    /// Gets the active path for the given src-dst pair.
240    ///
241    /// If the src-dst pair is not yet managed, starts managing it, possibly waiting for the first
242    /// path fetch.
243    ///
244    /// Returns an error if no path is available after waiting.
245    pub async fn path(
246        &self,
247        src: IsdAsn,
248        dst: IsdAsn,
249        now: SystemTime,
250    ) -> Result<Path, Arc<PathFetchError>> {
251        if src == dst {
252            return Ok(Path::empty(ByEndpoint {
253                source: src,
254                destination: dst,
255            }));
256        }
257
258        let try_path = self
259            .0
260            .managed_paths
261            .peek_with(&(src, dst), |_, (handle, _)| {
262                handle.try_active_path().as_deref().map(|p| p.0.clone())
263            })
264            .flatten();
265
266        let res = match try_path {
267            Some(active) => Ok(active),
268            None => {
269                // Ensure paths are being managed
270                let path_set = self.ensure_managed_paths(src, dst);
271
272                // Try to get active path, possibly waiting for initialization/update
273                let active = path_set.active_path().await.as_ref().map(|p| p.0.clone());
274
275                // Check active path after waiting
276                match active {
277                    Some(active) => Ok(active),
278                    None => {
279                        // No active path even after waiting, return last error if any
280                        let last_error = path_set.current_error();
281                        match last_error {
282                            Some(e) => Err(e),
283                            None => {
284                                // There is a chance for a race here, where the error was cleared
285                                // between the wait and now. In that case, we assume no paths were
286                                // found.
287                                Err(Arc::new(PathFetchError::NoPathsFound))
288                            }
289                        }
290                    }
291                }
292            }
293        };
294
295        if let Ok(active) = &res {
296            // XXX(ake): Since the Paths are actively managed, they should never be expired
297            // here.
298            let expired = active.is_expired(now.into()).unwrap_or(true);
299            debug_assert!(!expired, "Returned expired path from get_path");
300        }
301
302        res
303    }
304
305    /// Creates a weak reference to this MultiPathManager.
306    pub fn weak_ref(&self) -> MultiPathManagerRef<F> {
307        MultiPathManagerRef(Arc::downgrade(&self.0))
308    }
309
310    /// Quickly ensures that paths are being managed for the given src-dst pair.
311    ///
312    /// Does nothing if paths are already being managed.
313    fn fast_ensure_managed_paths(&self, src: IsdAsn, dst: IsdAsn) {
314        if self.0.managed_paths.contains(&(src, dst)) {
315            return;
316        }
317
318        self.ensure_managed_paths(src, dst);
319    }
320
321    /// Starts managing paths for the given src-dst pair.
322    ///
323    /// Returns a reference to the managed paths.
324    fn ensure_managed_paths(&self, src: IsdAsn, dst: IsdAsn) -> PathSetHandle {
325        let entry = match self.0.managed_paths.entry_sync((src, dst)) {
326            scc::hash_index::Entry::Occupied(occupied) => {
327                tracing::trace!(%src, %dst, "Already managing paths for src-dst pair");
328                occupied
329            }
330            scc::hash_index::Entry::Vacant(vacant) => {
331                tracing::info!(%src, %dst, "Starting to manage paths for src-dst pair");
332                let managed = PathSet::new(
333                    src,
334                    dst,
335                    self.weak_ref(),
336                    self.0.config,
337                    self.0
338                        .issue_manager
339                        .lock()
340                        .expect("lock poisoned")
341                        .issues_subscriber(),
342                );
343
344                vacant.insert_entry(managed.manage())
345            }
346        };
347
348        entry.get().0.clone()
349    }
350
351    /// Stops managing paths for the given src-dst pair.
352    pub fn stop_managing_paths(&self, src: IsdAsn, dst: IsdAsn) {
353        if self.0.managed_paths.remove_sync(&(src, dst)) {
354            tracing::info!(%src, %dst, "Stopped managing paths for src-dst pair");
355        }
356    }
357
358    /// report error
359    pub fn report_path_issue(&self, timestamp: SystemTime, issue: IssueKind, path: Option<&Path>) {
360        let Some(applies_to) = issue.target_type(path) else {
361            // Not a path issue we care about
362            return;
363        };
364
365        if matches!(applies_to, IssueMarkerTarget::DestinationNetwork { .. }) {
366            // We can't handle dst network issues in a global path manager
367            return;
368        }
369
370        tracing::debug!(%issue, "New path issue");
371
372        let issue_marker = IssueMarker {
373            target: applies_to,
374            timestamp,
375            penalty: issue.penalty(),
376        };
377
378        // Push to issues cache
379        {
380            let mut issues_guard = self.0.issue_manager.lock().expect("lock poisoned");
381            issues_guard.add_issue(issue, issue_marker.clone());
382        }
383    }
384}
385
386impl<F: PathFetcher> ScmpErrorReceiver for MultiPathManager<F> {
387    fn report_scmp_error(&self, scmp_error: ScmpErrorMessage, path: &Path) {
388        self.report_path_issue(
389            SystemTime::now(),
390            IssueKind::Scmp { error: scmp_error },
391            Some(path),
392        );
393    }
394}
395
396impl<F: PathFetcher> SendErrorReceiver for MultiPathManager<F> {
397    fn report_send_error(&self, error: &ScionSocketSendError) {
398        if let Some(send_error) = SendError::from_socket_send_error(error) {
399            self.report_path_issue(
400                SystemTime::now(),
401                IssueKind::Socket { err: send_error },
402                None,
403            );
404        }
405    }
406}
407
408impl<F: PathFetcher> SyncPathManager for MultiPathManager<F> {
409    fn register_path(
410        &self,
411        _src: IsdAsn,
412        _dst: IsdAsn,
413        _now: chrono::DateTime<chrono::Utc>,
414        _path: Path<bytes::Bytes>,
415    ) {
416        // No-op
417        // Based on discussions we do not support externally registered paths in the PathManager
418        // Likely we will handle path mirroring in Connection Based Protocols instead
419    }
420
421    fn try_cached_path(
422        &self,
423        src: IsdAsn,
424        dst: IsdAsn,
425        now: chrono::DateTime<chrono::Utc>,
426    ) -> std::io::Result<Option<Path<bytes::Bytes>>> {
427        Ok(self.try_path(src, dst, now.into()))
428    }
429}
430
431impl<F: PathFetcher> PathManager for MultiPathManager<F> {
432    fn path_wait(
433        &self,
434        src: IsdAsn,
435        dst: IsdAsn,
436        now: chrono::DateTime<chrono::Utc>,
437    ) -> impl crate::types::ResFut<'_, Path<bytes::Bytes>, PathWaitError> {
438        async move {
439            match self.path(src, dst, now.into()).await {
440                Ok(path) => Ok(path),
441                Err(e) => {
442                    match &*e {
443                        PathFetchError::FetchSegments(error) => {
444                            Err(PathWaitError::FetchFailed(format!("{error}")))
445                        }
446                        PathFetchError::InternalError(msg) => {
447                            Err(PathWaitError::FetchFailed(msg.to_string()))
448                        }
449                        PathFetchError::NoPathsFound => Err(PathWaitError::NoPathFound),
450                    }
451                }
452            }
453        }
454    }
455}
456
457impl<F: PathFetcher> PathPrefetcher for MultiPathManager<F> {
458    fn prefetch_path(&self, src: IsdAsn, dst: IsdAsn) {
459        self.ensure_managed_paths(src, dst);
460    }
461}
462
463/// Weak reference to a MultiPathManager.
464///
465/// Can be upgraded to a strong reference using [`get`](Self::get).
466pub struct MultiPathManagerRef<F: PathFetcher>(Weak<MultiPathManagerInner<F>>);
467
468impl<F: PathFetcher> Clone for MultiPathManagerRef<F> {
469    fn clone(&self) -> Self {
470        MultiPathManagerRef(self.0.clone())
471    }
472}
473
474impl<F: PathFetcher> MultiPathManagerRef<F> {
475    /// Attempts to upgrade the weak reference to a strong reference.
476    pub fn get(&self) -> Option<MultiPathManager<F>> {
477        self.0.upgrade().map(|arc| MultiPathManager(arc))
478    }
479}
480
481/// Path Issue manager
482///
483/// Receives reported issues, deduplicates them, and broadcasts them to all path sets.
484struct PathIssueManager {
485    // Config
486    max_entries: usize,
487    deduplication_window: Duration,
488
489    // Mutable
490    /// Map of issue ID to issue marker
491    cache: HashMap<u64, IssueMarker>,
492    // FiFo queue of issue IDs and their timestamps
493    fifo_issues: VecDeque<(u64, SystemTime)>,
494
495    /// Channel for broadcasting issues
496    issue_broadcast_tx: broadcast::Sender<(u64, IssueMarker)>,
497}
498
499impl PathIssueManager {
500    fn new(max_entries: usize, broadcast_buffer: usize, deduplication_window: Duration) -> Self {
501        let (issue_broadcast_tx, _) = broadcast::channel(broadcast_buffer);
502        PathIssueManager {
503            max_entries,
504            deduplication_window,
505            cache: HashMap::new(),
506            fifo_issues: VecDeque::new(),
507            issue_broadcast_tx,
508        }
509    }
510
511    /// Returns a subscriber to the issue broadcast channel.
512    pub fn issues_subscriber(&self) -> broadcast::Receiver<(u64, IssueMarker)> {
513        self.issue_broadcast_tx.subscribe()
514    }
515
516    /// Adds a new issue to the manager.
517    ///
518    /// Issues might cause the Active path to change immediately.
519    ///
520    /// All issues get cached to be applied to newly fetched paths.
521    ///
522    /// If a similar issue, applying to the same Path is seen in the deduplication window, it will
523    /// be ignored.
524    pub fn add_issue(&mut self, issue: IssueKind, marker: IssueMarker) {
525        let id = issue.dedup_id(&marker.target);
526
527        // Check if we already have this issue
528        if let Some(existing_marker) = self.cache.get(&id) {
529            let time_since_last_seen = marker
530                .timestamp
531                .duration_since(existing_marker.timestamp)
532                .unwrap_or_else(|_| Duration::from_secs(0));
533
534            if time_since_last_seen < self.deduplication_window {
535                tracing::trace!(%id, ?time_since_last_seen, ?marker, %issue, "Ignoring duplicate path issue");
536                // Too soon since last seen, ignore
537                return;
538            }
539        }
540
541        // Broadcast issue
542        self.issue_broadcast_tx.send((id, marker.clone())).ok();
543
544        if self.cache.len() >= self.max_entries {
545            self.pop_front();
546        }
547
548        // Insert issue
549        self.fifo_issues.push_back((id, marker.timestamp)); // Store timestamp for matching on removal
550        self.cache.insert(id, marker);
551    }
552
553    /// Applies all cached issues to the given path.
554    ///
555    /// This is called when a path is fetched, to ensure that issues affecting it are applied.
556    /// Should only be called on fresh paths.
557    ///
558    /// Returns true if any issues were applied.
559    /// Returns the max
560    pub fn apply_cached_issues(&self, entry: &mut PathManagerPath, now: SystemTime) -> bool {
561        let mut applied = false;
562        for issue in self.cache.values() {
563            if issue.target.matches_path(&entry.path, &entry.fingerprint) {
564                entry.reliability.update(issue.decayed_penalty(now), now);
565                applied = true;
566            }
567        }
568        applied
569    }
570
571    /// Pops the oldest issue from the cache.
572    fn pop_front(&mut self) -> Option<IssueMarker> {
573        let (issue_id, timestamp) = self.fifo_issues.pop_front()?;
574
575        match self.cache.entry(issue_id) {
576            hash_map::Entry::Occupied(occupied_entry) => {
577                // Only remove if timestamps match
578                match occupied_entry.get().timestamp == timestamp {
579                    true => Some(occupied_entry.remove()),
580                    false => None, // Entry was updated, do not remove
581                }
582            }
583            hash_map::Entry::Vacant(_) => {
584                debug_assert!(false, "Bad cache: issue ID not found in cache");
585                None
586            }
587        }
588    }
589}
590
591#[cfg(test)]
592mod tests {
593    use helpers::*;
594    use tokio::time::timeout;
595
596    use super::*;
597
598    // The manager should create path sets on request
599    #[tokio::test]
600    #[test_log::test]
601    async fn should_create_pathset_on_request() {
602        let cfg = base_config();
603        let fetcher = MockFetcher::new(generate_responses(5, 0, BASE_TIME, DEFAULT_EXP_UNITS));
604
605        let mgr = MultiPathManager::new(cfg, fetcher, PathStrategy::default())
606            .expect("Should create manager");
607
608        // Initially no managed paths
609        assert!(mgr.0.managed_paths.is_empty());
610
611        // Request a path - should create path set
612        let path = mgr.try_path(SRC_ADDR.isd_asn(), DST_ADDR.isd_asn(), BASE_TIME);
613        // First call returns None (not yet initialized)
614        assert!(path.is_none());
615
616        // But path set should be created
617        assert!(
618            mgr.0
619                .managed_paths
620                .contains(&(SRC_ADDR.isd_asn(), DST_ADDR.isd_asn()))
621        );
622    }
623
624    // The manager should remove idle path sets
625    #[tokio::test]
626    #[test_log::test]
627    async fn should_remove_idle_pathsets() {
628        let mut cfg = base_config();
629        cfg.max_idle_period = Duration::from_millis(10); // Short idle period for testing
630
631        let fetcher = MockFetcher::new(generate_responses(5, 0, BASE_TIME, DEFAULT_EXP_UNITS));
632
633        let mgr = MultiPathManager::new(cfg, fetcher, PathStrategy::default())
634            .expect("Should create manager");
635
636        // Create path set
637        let handle = mgr.ensure_managed_paths(SRC_ADDR.isd_asn(), DST_ADDR.isd_asn());
638
639        // Should exist
640        assert!(
641            mgr.0
642                .managed_paths
643                .contains(&(SRC_ADDR.isd_asn(), DST_ADDR.isd_asn()))
644        );
645
646        // Wait for idle timeout plus some margin
647        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
648
649        // Path set should be removed by idle check
650        let contains = mgr
651            .0
652            .managed_paths
653            .contains(&(SRC_ADDR.isd_asn(), DST_ADDR.isd_asn()));
654
655        assert!(!contains, "Idle path set should be removed");
656
657        let err = handle.current_error();
658        assert!(
659            err.is_some(),
660            "Handle should report error after path set removal"
661        );
662        println!("Error after idle removal: {:?}", err);
663        assert!(
664            err.unwrap().to_string().contains("idle"),
665            "Error message should indicate idle removal"
666        );
667    }
668
669    // Dropping the manager should cancel all path set maintenance tasks
670    #[tokio::test]
671    #[test_log::test]
672    async fn should_cancel_pathset_tasks_on_drop() {
673        let cfg: MultiPathManagerConfig = base_config();
674        let fetcher = MockFetcher::new(generate_responses(5, 0, BASE_TIME, DEFAULT_EXP_UNITS));
675
676        let mgr = MultiPathManager::new(cfg, fetcher, PathStrategy::default())
677            .expect("Should create manager");
678
679        // ensure path set exists and initialized
680        let handle = mgr.ensure_managed_paths(SRC_ADDR.isd_asn(), DST_ADDR.isd_asn());
681        handle.wait_initialized().await;
682
683        let mut set_entry = mgr
684            .0
685            .managed_paths
686            .get_sync(&(SRC_ADDR.isd_asn(), DST_ADDR.isd_asn()))
687            .unwrap();
688
689        let task_handle = unsafe {
690            // swap join handle with a fake one, only possible since the manager doesn't use
691            // the handle
692            let swap_handle = tokio::spawn(async {});
693            std::mem::replace(&mut set_entry.get_mut().1._task, swap_handle)
694        };
695
696        let cancel_token = set_entry.get().1.cancel_token.clone();
697
698        let count = mgr.0.managed_paths.len();
699        assert_eq!(count, 1, "Should have 1 managed path set");
700
701        // Drop the manager
702        drop(mgr);
703        // Cancel token should be triggered
704        assert!(
705            cancel_token.is_cancelled(),
706            "Cancel token should be triggered"
707        );
708
709        // Give tasks time to detect manager drop and exit
710        timeout(Duration::from_millis(50), task_handle)
711            .await
712            .unwrap()
713            .unwrap();
714
715        let err = handle
716            .shared
717            .sync
718            .lock()
719            .unwrap()
720            .current_error
721            .clone()
722            .expect("Should have error after manager drop");
723
724        // XXX(ake): exit reason may vary between "cancelled" and "manager dropped" because
725        // of select!
726        assert!(
727            err.to_string().contains("cancelled") || err.to_string().contains("dropped"),
728            "Error message should indicate cancellation or manager drop"
729        );
730    }
731
732    mod issue_handling {
733        use scc::HashIndex;
734        use scion_proto::address::{Asn, Isd};
735
736        use super::*;
737        use crate::path::{
738            manager::{MultiPathManagerInner, PathIssueManager, reliability::ReliabilityScore},
739            types::Score,
740        };
741
742        // When an issue is ingested, affected paths should have their reliability scores
743        // updated appropriately The issue should be in the issue cache
744        #[tokio::test]
745        #[test_log::test]
746        async fn should_ingest_issues_and_apply_to_existing_paths() {
747            let cfg = base_config();
748            let fetcher = MockFetcher::new(generate_responses(5, 0, BASE_TIME, DEFAULT_EXP_UNITS));
749            let (mgr, mut path_set) = manual_pathset(BASE_TIME, fetcher.clone(), cfg, None);
750
751            path_set.maintain(BASE_TIME, &mgr).await;
752
753            // Get the first path to create an issue for
754            let first_path = &path_set.internal.cached_paths[0];
755            let first_fp = first_path.fingerprint;
756
757            // Create an issue targeting the first hop of the first path
758            let issue = IssueKind::Socket {
759                err: SendError::FirstHopUnreachable {
760                    isd_asn: first_path.path.source(),
761                    interface_id: first_path.path.first_hop_egress_interface().unwrap().id,
762                    address: None,
763                    msg: "test".into(),
764                },
765            };
766
767            let penalty = Score::new_clamped(-0.3);
768            let marker = IssueMarker {
769                target: issue.target_type(Some(&first_path.path)).unwrap(),
770                timestamp: BASE_TIME,
771                penalty,
772            };
773
774            {
775                let mut issues_guard = mgr.0.issue_manager.lock().unwrap();
776                // Add issue to manager
777                issues_guard.add_issue(issue, marker);
778                // Check issue is in cache
779                assert!(!issues_guard.cache.is_empty(), "Issue should be in cache");
780            }
781            // Handle the issue in path_set
782            let recv_result = path_set.internal.issue_rx.recv().await;
783            path_set.handle_issue_rx(BASE_TIME, recv_result, &mgr);
784
785            // Check that the path's score was updated
786            let updated_path = path_set
787                .internal
788                .cached_paths
789                .iter()
790                .find(|e| e.fingerprint == first_fp)
791                .expect("Path should still exist");
792
793            let updated_score = updated_path.reliability.score(BASE_TIME).value();
794
795            assert!(
796                updated_score == penalty.value(),
797                "Path score should be updated by penalty. Expected: {}, Got: {}",
798                penalty.value(),
799                updated_score
800            );
801
802            // Should decay over time
803            let later_time = BASE_TIME + Duration::from_secs(30);
804            let decayed_score = updated_path.reliability.score(later_time).value();
805            assert!(
806                decayed_score > updated_score,
807                "Path score should recover over time. Updated: {}, Decayed: {}",
808                updated_score,
809                decayed_score
810            );
811        }
812
813        #[tokio::test]
814        #[test_log::test]
815        async fn should_deduplicate_issues_within_window() {
816            let cfg = base_config();
817            let mgr_inner = MultiPathManagerInner {
818                config: cfg,
819                fetcher: MockFetcher::new(Ok(vec![])),
820                path_strategy: PathStrategy::default(),
821                issue_manager: Mutex::new(PathIssueManager::new(64, 64, Duration::from_secs(10))),
822                managed_paths: HashIndex::new(),
823            };
824            let mgr = MultiPathManager(Arc::new(mgr_inner));
825
826            let issue_marker = IssueMarker {
827                target: IssueMarkerTarget::FirstHop {
828                    isd_asn: SRC_ADDR.isd_asn(),
829                    egress_interface: 1,
830                },
831                timestamp: BASE_TIME,
832                penalty: Score::new_clamped(-0.3),
833            };
834
835            let issue = IssueKind::Socket {
836                err: SendError::FirstHopUnreachable {
837                    isd_asn: SRC_ADDR.isd_asn(),
838                    interface_id: 1,
839                    address: None,
840                    msg: "test".into(),
841                },
842            };
843
844            // Add issue first time
845            mgr.0
846                .issue_manager
847                .lock()
848                .unwrap()
849                .add_issue(issue.clone(), issue_marker.clone());
850            let cache_size_1 = mgr.0.issue_manager.lock().unwrap().cache.len();
851            assert_eq!(cache_size_1, 1);
852
853            // Add same issue within dedup window (should be ignored)
854            let issue_marker_2 = IssueMarker {
855                timestamp: BASE_TIME + Duration::from_secs(1), // Within 10s window
856                ..issue_marker.clone()
857            };
858            mgr.0
859                .issue_manager
860                .lock()
861                .unwrap()
862                .add_issue(issue.clone(), issue_marker_2);
863
864            let fifo_size = mgr.0.issue_manager.lock().unwrap().fifo_issues.len();
865            let cache_size_2 = mgr.0.issue_manager.lock().unwrap().cache.len();
866            assert_eq!(cache_size_2, 1, "Duplicate issue should be ignored");
867            assert_eq!(
868                fifo_size, 1,
869                "FIFO queue size should remain unchanged on duplicate issue"
870            );
871
872            // Add same issue outside dedup window (should be added)
873            let issue_marker_3 = IssueMarker {
874                timestamp: BASE_TIME + Duration::from_secs(11), // Outside 10s window
875                ..issue_marker
876            };
877            mgr.0
878                .issue_manager
879                .lock()
880                .unwrap()
881                .add_issue(issue, issue_marker_3);
882
883            let fifo_size_3 = mgr.0.issue_manager.lock().unwrap().fifo_issues.len();
884            let cache_size_3 = mgr.0.issue_manager.lock().unwrap().cache.len();
885            assert_eq!(
886                cache_size_3, 1,
887                "Issue outside dedup window should update existing"
888            );
889            assert_eq!(
890                fifo_size_3, 2,
891                "FIFO queue size should increase for new issue outside dedup window"
892            );
893        }
894
895        // When new paths are fetched, existing issues in the issue cache should be applied to
896        // them
897        #[tokio::test]
898        #[test_log::test]
899        async fn should_apply_issues_to_new_paths_on_fetch() {
900            let cfg = base_config();
901            let fetcher = MockFetcher::new(Ok(vec![]));
902            let (mgr, mut path_set) = manual_pathset(BASE_TIME, fetcher.clone(), cfg, None);
903
904            path_set.maintain(BASE_TIME, &mgr).await;
905
906            // Create an issue
907            let issue_marker = IssueMarker {
908                target: IssueMarkerTarget::FirstHop {
909                    isd_asn: SRC_ADDR.isd_asn(),
910                    egress_interface: 1,
911                },
912                timestamp: BASE_TIME,
913                penalty: Score::new_clamped(-0.5),
914            };
915
916            let issue = IssueKind::Socket {
917                err: SendError::FirstHopUnreachable {
918                    isd_asn: SRC_ADDR.isd_asn(),
919                    interface_id: 1,
920                    address: None,
921                    msg: "test".into(),
922                },
923            };
924
925            // Add to manager's issue cache
926            mgr.0
927                .issue_manager
928                .lock()
929                .unwrap()
930                .add_issue(issue, issue_marker);
931
932            // Drain issue channel so no issues are pending
933            path_set.drain_and_apply_issue_channel(BASE_TIME);
934
935            // Now fetch paths again - the issue should be applied to the newly fetched path
936            fetcher.lock().unwrap().set_response(generate_responses(
937                3,
938                0,
939                BASE_TIME + Duration::from_secs(1),
940                DEFAULT_EXP_UNITS,
941            ));
942
943            let next_refetch = path_set.internal.next_refetch;
944            path_set.maintain(next_refetch, &mgr).await;
945
946            // The newly fetched path should have the penalty applied
947            let affected_path = path_set
948                .internal
949                .cached_paths
950                .first()
951                .expect("Path should exist");
952
953            let score = affected_path
954                .reliability
955                .score(BASE_TIME + Duration::from_secs(1))
956                .value();
957            assert!(
958                score < 0.0,
959                "Newly fetched path should have cached issue applied. Score: {}",
960                score
961            );
962        }
963
964        // If the active path is affected by an issue, it should be re-evaluated
965        #[tokio::test]
966        #[test_log::test]
967        async fn should_trigger_active_path_reevaluation_on_issue() {
968            let cfg = base_config();
969            let fetcher = MockFetcher::new(generate_responses(5, 0, BASE_TIME, DEFAULT_EXP_UNITS));
970            let (mgr, mut path_set) = manual_pathset(BASE_TIME, fetcher.clone(), cfg, None);
971
972            path_set.maintain(BASE_TIME, &mgr).await;
973
974            let active_fp = path_set.shared.active_path.load().as_ref().unwrap().1;
975
976            // Create a severe issue targeting the active path
977            let issue_marker = IssueMarker {
978                target: IssueMarkerTarget::FullPath {
979                    fingerprint: active_fp,
980                },
981                timestamp: BASE_TIME,
982                penalty: Score::new_clamped(-1.0), // Severe penalty
983            };
984
985            let issue = IssueKind::Socket {
986                err: SendError::FirstHopUnreachable {
987                    isd_asn: SRC_ADDR.isd_asn(),
988                    interface_id: 1,
989                    address: None,
990                    msg: "test".into(),
991                },
992            };
993
994            // Add issue
995            mgr.0
996                .issue_manager
997                .lock()
998                .unwrap()
999                .add_issue(issue, issue_marker);
1000
1001            // Handle issue
1002            let recv_result = path_set.internal.issue_rx.recv().await;
1003            path_set.handle_issue_rx(BASE_TIME, recv_result, &mgr);
1004
1005            // Active path should have changed
1006            let new_active_fp = path_set.shared.active_path.load().as_ref().unwrap().1;
1007            assert_ne!(
1008                active_fp, new_active_fp,
1009                "Active path should change when severely penalized"
1010            );
1011        }
1012
1013        #[tokio::test]
1014        #[test_log::test]
1015        async fn should_swap_to_better_path_if_one_appears() {
1016            let cfg = base_config();
1017            let fetcher = MockFetcher::new(generate_responses(1, 0, BASE_TIME, DEFAULT_EXP_UNITS));
1018            let (mgr, mut path_set) = manual_pathset(BASE_TIME, fetcher.clone(), cfg, None);
1019
1020            path_set.maintain(BASE_TIME, &mgr).await;
1021
1022            // mark as used to prevent idle removal
1023            path_set
1024                .shared
1025                .was_used_in_idle_period
1026                .store(true, std::sync::atomic::Ordering::Relaxed);
1027
1028            let active_fp = path_set.shared.active_path.load().as_ref().unwrap().1;
1029
1030            // add issue to active path to lower its score
1031            let issue_marker = IssueMarker {
1032                target: IssueMarkerTarget::FullPath {
1033                    fingerprint: active_fp,
1034                },
1035                timestamp: BASE_TIME,
1036                penalty: Score::new_clamped(-0.8),
1037            };
1038
1039            mgr.0.issue_manager.lock().unwrap().add_issue(
1040                IssueKind::Socket {
1041                    err: SendError::FirstHopUnreachable {
1042                        isd_asn: SRC_ADDR.isd_asn(),
1043                        interface_id: 1,
1044                        address: None,
1045                        msg: "test".into(),
1046                    },
1047                },
1048                issue_marker,
1049            );
1050
1051            // active path should be the same
1052            let active_fp_after_issue = path_set.shared.active_path.load().as_ref().unwrap().1;
1053            assert_eq!(
1054                active_fp, active_fp_after_issue,
1055                "Active path should remain the same if no better path exists"
1056            );
1057
1058            // Now fetch a better path
1059            fetcher.lock().unwrap().set_response(generate_responses(
1060                1,
1061                100,
1062                BASE_TIME + Duration::from_secs(1),
1063                DEFAULT_EXP_UNITS,
1064            ));
1065
1066            path_set
1067                .maintain(path_set.internal.next_refetch, &mgr)
1068                .await;
1069            // mark as used to prevent idle removal
1070            path_set
1071                .shared
1072                .was_used_in_idle_period
1073                .store(true, std::sync::atomic::Ordering::Relaxed);
1074
1075            // Active path should have changed
1076            let new_active_fp = path_set.shared.active_path.load().as_ref().unwrap().1;
1077            assert_ne!(
1078                active_fp, new_active_fp,
1079                "Active path should change when a better path appears"
1080            );
1081
1082            // Should also work for positive score changes
1083            let positive_score = Score::new_clamped(0.8);
1084            let mut reliability = ReliabilityScore::new_with_time(path_set.internal.next_refetch);
1085            reliability.update(positive_score, path_set.internal.next_refetch);
1086
1087            // Change old paths reliability to be better
1088            path_set
1089                .internal
1090                .cached_paths
1091                .iter_mut()
1092                .find(|e| e.fingerprint == active_fp)
1093                .unwrap()
1094                .reliability = reliability;
1095
1096            path_set
1097                .maintain(path_set.internal.next_refetch, &mgr)
1098                .await;
1099
1100            assert_eq!(
1101                active_fp,
1102                path_set.shared.active_path.load().as_ref().unwrap().1,
1103                "Active path should change on positive score diff"
1104            );
1105        }
1106
1107        #[tokio::test]
1108        #[test_log::test]
1109        async fn should_keep_max_issue_cache_size() {
1110            let max_size = 10;
1111            let mut issue_mgr = PathIssueManager::new(max_size, 64, Duration::from_secs(10));
1112
1113            // Add more issues than max_size
1114            for i in 0..20u16 {
1115                let issue_marker = IssueMarker {
1116                    target: IssueMarkerTarget::FirstHop {
1117                        isd_asn: IsdAsn::new(Isd(1), Asn(1)),
1118                        egress_interface: i,
1119                    },
1120                    timestamp: BASE_TIME + Duration::from_secs(i as u64),
1121                    penalty: Score::new_clamped(-0.1),
1122                };
1123
1124                let issue = IssueKind::Socket {
1125                    err: SendError::FirstHopUnreachable {
1126                        isd_asn: IsdAsn::new(Isd(1), Asn(1)),
1127                        interface_id: i,
1128                        address: None,
1129                        msg: "test".into(),
1130                    },
1131                };
1132
1133                issue_mgr.add_issue(issue, issue_marker);
1134            }
1135
1136            // Cache should not exceed max_size
1137            assert!(
1138                issue_mgr.cache.len() <= max_size,
1139                "Cache size {} should not exceed max {}",
1140                issue_mgr.cache.len(),
1141                max_size
1142            );
1143
1144            // FIFO queue should match cache size
1145            assert_eq!(issue_mgr.cache.len(), issue_mgr.fifo_issues.len());
1146        }
1147    }
1148
1149    pub mod helpers {
1150        use std::{
1151            hash::{DefaultHasher, Hash, Hasher},
1152            net::{IpAddr, Ipv4Addr},
1153            sync::{Arc, Mutex},
1154            time::{Duration, SystemTime},
1155        };
1156
1157        use scion_proto::{
1158            address::{Asn, EndhostAddr, Isd, IsdAsn},
1159            path::{Path, test_builder::TestPathBuilder},
1160        };
1161        use tokio::sync::Notify;
1162
1163        use super::*;
1164        use crate::path::manager::{MultiPathManagerInner, PathIssueManager, pathset::PathSet};
1165
1166        pub const SRC_ADDR: EndhostAddr = EndhostAddr::new(
1167            IsdAsn::new(Isd(1), Asn(1)),
1168            IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
1169        );
1170        pub const DST_ADDR: EndhostAddr = EndhostAddr::new(
1171            IsdAsn::new(Isd(2), Asn(1)),
1172            IpAddr::V4(Ipv4Addr::new(127, 0, 0, 2)),
1173        );
1174
1175        pub const DEFAULT_EXP_UNITS: u8 = 100;
1176        pub const BASE_TIME: SystemTime = SystemTime::UNIX_EPOCH;
1177
1178        pub fn dummy_path(hop_count: u16, timestamp: u32, exp_units: u8, seed: u32) -> Path {
1179            let mut builder: TestPathBuilder =
1180                TestPathBuilder::new(SRC_ADDR.into(), DST_ADDR.into())
1181                    .using_info_timestamp(timestamp)
1182                    .with_hop_expiry(exp_units)
1183                    .up();
1184
1185            builder = builder.add_hop(0, 1);
1186
1187            for cnt in 0..hop_count {
1188                let mut hash = DefaultHasher::new();
1189                seed.hash(&mut hash);
1190                cnt.hash(&mut hash);
1191                let hash = hash.finish() as u32;
1192
1193                let hop = hash.saturating_sub(2) as u16; // ensure no underflow or overflow
1194                builder = builder.with_asn(hash).add_hop(hop + 1, hop + 2);
1195            }
1196
1197            builder = builder.add_hop(1, 0);
1198
1199            builder.build(timestamp).path()
1200        }
1201
1202        pub fn base_config() -> MultiPathManagerConfig {
1203            MultiPathManagerConfig {
1204                max_cached_paths_per_pair: 5,
1205                refetch_interval: Duration::from_secs(100),
1206                min_refetch_delay: Duration::from_secs(1),
1207                min_expiry_threshold: Duration::from_secs(5),
1208                max_idle_period: Duration::from_secs(30),
1209                fetch_failure_backoff: BackoffConfig {
1210                    minimum_delay_secs: 1.0,
1211                    maximum_delay_secs: 10.0,
1212                    factor: 2.0,
1213                    jitter_secs: 0.0,
1214                },
1215                issue_cache_size: 64,
1216                issue_broadcast_size: 64,
1217                issue_deduplication_window: Duration::from_secs(10),
1218                path_swap_score_threshold: 0.1,
1219            }
1220        }
1221
1222        pub fn generate_responses(
1223            path_count: u16,
1224            path_seed: u32,
1225            timestamp: SystemTime,
1226            exp_units: u8,
1227        ) -> Result<Vec<Path>, String> {
1228            let mut paths = Vec::new();
1229            for resp_id in 0..path_count {
1230                paths.push(dummy_path(
1231                    2,
1232                    timestamp
1233                        .duration_since(SystemTime::UNIX_EPOCH)
1234                        .unwrap()
1235                        .as_secs() as u32,
1236                    exp_units,
1237                    path_seed + resp_id as u32,
1238                ));
1239            }
1240
1241            Ok(paths)
1242        }
1243
1244        pub struct MockFetcher {
1245            next_response: Result<Vec<Path>, String>,
1246            pub received_requests: usize,
1247            pub wait_till_notify: bool,
1248            pub notify_to_resolve: Arc<Notify>,
1249        }
1250        impl MockFetcher {
1251            pub fn new(response: Result<Vec<Path>, String>) -> Arc<Mutex<Self>> {
1252                Arc::new(Mutex::new(Self {
1253                    next_response: response,
1254                    received_requests: 0,
1255                    wait_till_notify: false,
1256                    notify_to_resolve: Arc::new(Notify::new()),
1257                }))
1258            }
1259
1260            pub fn set_response(&mut self, response: Result<Vec<Path>, String>) {
1261                self.next_response = response;
1262            }
1263
1264            pub fn wait_till_notify(&mut self, wait: bool) {
1265                self.wait_till_notify = wait;
1266            }
1267
1268            pub fn notify(&self) {
1269                self.notify_to_resolve.notify_waiters();
1270            }
1271        }
1272
1273        impl PathFetcher for Arc<Mutex<MockFetcher>> {
1274            async fn fetch_paths(
1275                &self,
1276                _src: IsdAsn,
1277                _dst: IsdAsn,
1278            ) -> Result<Vec<Path>, PathFetchError> {
1279                let response;
1280                // Wait for notification if needed
1281                let notify = {
1282                    let mut guard = self.lock().unwrap();
1283
1284                    guard.received_requests += 1;
1285                    response = guard.next_response.clone();
1286
1287                    // maybe wait till notified
1288                    if guard.wait_till_notify {
1289                        let notif = guard.notify_to_resolve.clone().notified_owned();
1290                        Some(notif)
1291                    } else {
1292                        None
1293                    }
1294                };
1295
1296                if let Some(notif) = notify {
1297                    notif.await;
1298                }
1299
1300                match response {
1301                    Ok(paths) if paths.is_empty() => Err(PathFetchError::NoPathsFound),
1302                    Ok(paths) => Ok(paths),
1303                    Err(e) => Err(PathFetchError::InternalError(e.into())),
1304                }
1305            }
1306        }
1307
1308        pub fn manual_pathset<F: PathFetcher>(
1309            now: SystemTime,
1310            fetcher: F,
1311            cfg: MultiPathManagerConfig,
1312            strategy: Option<PathStrategy>,
1313        ) -> (MultiPathManager<F>, PathSet<F>) {
1314            let mgr_inner = MultiPathManagerInner {
1315                config: cfg,
1316                fetcher,
1317                path_strategy: strategy.unwrap_or_else(|| {
1318                    let mut ps = PathStrategy::default();
1319                    ps.scoring.use_default_scorers();
1320                    ps
1321                }),
1322                issue_manager: Mutex::new(PathIssueManager::new(64, 64, Duration::from_secs(10))),
1323                managed_paths: HashIndex::new(),
1324            };
1325            let mgr = MultiPathManager(Arc::new(mgr_inner));
1326            let issue_rx = mgr.0.issue_manager.lock().unwrap().issues_subscriber();
1327            let mgr_ref = mgr.weak_ref();
1328            (
1329                mgr,
1330                PathSet::new_with_time(
1331                    SRC_ADDR.isd_asn(),
1332                    DST_ADDR.isd_asn(),
1333                    mgr_ref,
1334                    cfg,
1335                    issue_rx,
1336                    now,
1337                ),
1338            )
1339        }
1340    }
1341}