Skip to main content

scion_stack/path/
fetcher.rs

1// Copyright 2025 Anapaya Systems
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//   http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A [PathFetcher] is responsible for providing paths between two ISD-ASes.
16//!
17//! The default implementation [PathFetcherImpl] uses a [SegmentFetcher] to fetch path segments and
18//! combine them into end-to-end paths.
19//!
20//! The default [SegmentFetcher] implementation is [ConnectRpcSegmentFetcher], requesting segments
21//! from the Endhost API.
22
23use std::sync::Arc;
24
25use endhost_api_client::client::EndhostApiClient;
26use scion_proto::{
27    address::IsdAsn,
28    path::{self, Path},
29};
30
31use crate::path::fetcher::traits::{
32    PathFetchError, PathFetcher, SegmentFetchError, SegmentFetcher, Segments,
33};
34
35/// Path fetcher traits and types.
36pub mod traits {
37    use std::borrow::Cow;
38
39    use scion_proto::{
40        address::IsdAsn,
41        path::{Path, PathSegment},
42    };
43
44    use crate::types::ResFut;
45
46    /// Path fetcher trait.
47    pub trait PathFetcher: Send + Sync + 'static {
48        /// Fetch paths between source and destination ISD-AS.
49        fn fetch_paths(
50            &self,
51            src: IsdAsn,
52            dst: IsdAsn,
53        ) -> impl ResFut<'_, Vec<Path>, PathFetchError>;
54    }
55
56    /// Path fetch errors.
57    #[derive(Debug, thiserror::Error)]
58    pub enum PathFetchError {
59        /// Segment fetch failed.
60        #[error("failed to fetch segments: {0}")]
61        FetchSegments(#[from] SegmentFetchError),
62
63        /// No paths found.
64        #[error("no paths found")]
65        NoPathsFound,
66
67        /// Non network related internal error.
68        #[error("internal error: {0}")]
69        InternalError(Cow<'static, str>),
70    }
71
72    /// Segment fetcher trait.
73    #[async_trait::async_trait]
74    pub trait SegmentFetcher: Send + Sync + 'static {
75        /// Fetch path segments between src and dst.
76        async fn fetch_segments(
77            &self,
78            src: IsdAsn,
79            dst: IsdAsn,
80        ) -> Result<Segments, SegmentFetchError>;
81    }
82
83    /// Segment fetch error.
84    pub type SegmentFetchError = Box<dyn std::error::Error + Send + Sync>;
85
86    /// Path segments.
87    pub struct Segments {
88        /// Core segments.
89        pub core_segments: Vec<PathSegment>,
90        /// Non-core segments.
91        pub non_core_segments: Vec<PathSegment>,
92    }
93}
94
95/// Path fetcher.
96pub struct PathFetcherImpl {
97    segment_fetchers: Vec<Box<dyn SegmentFetcher>>,
98}
99
100impl PathFetcherImpl {
101    /// Creates a new path fetcher.
102    pub fn new(segment_fetchers: Vec<Box<dyn SegmentFetcher>>) -> Self {
103        Self { segment_fetchers }
104    }
105}
106
107impl PathFetcher for PathFetcherImpl {
108    async fn fetch_paths(&self, src: IsdAsn, dst: IsdAsn) -> Result<Vec<Path>, PathFetchError> {
109        let mut all_core_segments = Vec::new();
110        let mut all_non_core_segments = Vec::new();
111
112        // Fetch segments from all fetchers concurrently
113        let fetch_tasks: Vec<_> = self
114            .segment_fetchers
115            .iter()
116            .map(|fetcher| fetcher.fetch_segments(src, dst))
117            .collect();
118
119        let results = futures::future::join_all(fetch_tasks).await;
120
121        // Track errors and successes
122        let mut errors = Vec::new();
123
124        for (i, result) in results.into_iter().enumerate() {
125            match result {
126                Ok(Segments {
127                    core_segments,
128                    non_core_segments,
129                }) => {
130                    tracing::trace!(
131                        fetcher_index = i,
132                        n_core_segments = core_segments.len(),
133                        n_non_core_segments = non_core_segments.len(),
134                        %src,
135                        %dst,
136                        "Segment fetcher succeeded"
137                    );
138                    all_core_segments.extend(core_segments);
139                    all_non_core_segments.extend(non_core_segments);
140                }
141                Err(err) => {
142                    tracing::warn!(
143                        %src,
144                        %dst,
145                        %err,
146                        "Segment fetcher failed"
147                    );
148                    errors.push(err);
149                }
150            }
151        }
152
153        let paths = path::combinator::combine(src, dst, all_core_segments, all_non_core_segments);
154
155        // If there were errors but we still have paths, we still return the paths and only log the
156        // fetcher errors.
157        if !errors.is_empty() && paths.is_empty() {
158            return Err(PathFetchError::FetchSegments(
159                errors.into_iter().next().unwrap(),
160            ));
161        }
162
163        Ok(paths)
164    }
165}
166
167/// Connect RPC segment fetcher.
168pub struct ConnectRpcSegmentFetcher {
169    client: Arc<dyn EndhostApiClient>,
170}
171
172impl ConnectRpcSegmentFetcher {
173    /// Creates a new connect RPC segment fetcher.
174    pub fn new(client: Arc<dyn EndhostApiClient>) -> Self {
175        Self { client }
176    }
177}
178
179#[async_trait::async_trait]
180impl SegmentFetcher for ConnectRpcSegmentFetcher {
181    async fn fetch_segments(
182        &self,
183        src: IsdAsn,
184        dst: IsdAsn,
185    ) -> Result<Segments, SegmentFetchError> {
186        let resp = self
187            .client
188            .list_segments(src, dst, 128, "".to_string())
189            .await?;
190
191        tracing::debug!(
192            n_core=resp.segments.core_segments.len(),
193            n_up=resp.segments.up_segments.len(),
194            n_down=resp.segments.down_segments.len(),
195            src = %src,
196            dst = %dst,
197            "Received segments from control plane"
198        );
199
200        let (core_segments, non_core_segments) = resp.segments.split_parts();
201        Ok(Segments {
202            core_segments,
203            non_core_segments,
204        })
205    }
206}