Skip to main content

scion_stack/path/
fetcher.rs

1// Copyright 2025 Anapaya Systems
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//   http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A [PathFetcher] is responsible for providing paths between two ISD-ASes.
16//!
17//! The default implementation [PathFetcherImpl] uses a [SegmentFetcher] to fetch path segments and
18//! combine them into end-to-end paths.
19
20use std::sync::Arc;
21
22use endhost_api_client::client::EndhostApiClient;
23use scion_proto::{
24    address::IsdAsn,
25    path::{self, Path},
26};
27
28use crate::path::fetcher::traits::{
29    PathFetchError, PathFetcher, SegmentFetchError, SegmentFetcher, Segments,
30};
31
32/// Path fetcher traits and types.
33pub mod traits {
34    use std::borrow::Cow;
35
36    use scion_proto::{
37        address::IsdAsn,
38        path::{Path, PathSegment},
39    };
40
41    use crate::types::ResFut;
42
43    /// Path fetcher trait.
44    pub trait PathFetcher: Send + Sync + 'static {
45        /// Fetch paths between source and destination ISD-AS.
46        fn fetch_paths(
47            &self,
48            src: IsdAsn,
49            dst: IsdAsn,
50        ) -> impl ResFut<'_, Vec<Path>, PathFetchError>;
51    }
52
53    /// Path fetch errors.
54    #[derive(Debug, thiserror::Error)]
55    pub enum PathFetchError {
56        /// Segment fetch failed.
57        #[error("failed to fetch segments: {0}")]
58        FetchSegments(#[from] SegmentFetchError),
59
60        /// No paths found.
61        #[error("no paths found")]
62        NoPathsFound,
63
64        /// Non network related internal error.
65        #[error("internal error: {0}")]
66        InternalError(Cow<'static, str>),
67    }
68
69    /// Segment fetcher trait.
70    #[async_trait::async_trait]
71    pub trait SegmentFetcher: Send + Sync + 'static {
72        /// Fetch path segments between src and dst.
73        async fn fetch_segments(
74            &self,
75            src: IsdAsn,
76            dst: IsdAsn,
77        ) -> Result<Segments, SegmentFetchError>;
78    }
79
80    /// Segment fetch error.
81    pub type SegmentFetchError = Box<dyn std::error::Error + Send + Sync>;
82
83    /// Path segments.
84    #[derive(Debug)]
85    pub struct Segments {
86        /// Core segments.
87        pub core_segments: Vec<PathSegment>,
88        /// Non-core segments.
89        pub non_core_segments: Vec<PathSegment>,
90    }
91}
92
93/// Path fetcher.
94pub struct PathFetcherImpl {
95    segment_fetchers: Vec<(String, Box<dyn SegmentFetcher>)>,
96    // Timeout for each segment fetcher to avoid waiting indefinitely for slow or unresponsive
97    // fetchers.
98    timeout: std::time::Duration,
99}
100
101impl PathFetcherImpl {
102    /// Creates a new path fetcher.
103    pub fn new(
104        segment_fetchers: Vec<(String, Box<dyn SegmentFetcher>)>,
105        timeout: std::time::Duration,
106    ) -> Self {
107        Self {
108            segment_fetchers,
109            timeout,
110        }
111    }
112}
113
114impl PathFetcher for PathFetcherImpl {
115    async fn fetch_paths(&self, src: IsdAsn, dst: IsdAsn) -> Result<Vec<Path>, PathFetchError> {
116        let mut all_core_segments = Vec::new();
117        let mut all_non_core_segments = Vec::new();
118
119        // Fetch segments from all fetchers concurrently.
120        let fetch_tasks: Vec<_> = self
121            .segment_fetchers
122            .iter()
123            .map(|(_, fetcher)| {
124                tokio::time::timeout(self.timeout, fetcher.fetch_segments(src, dst))
125            })
126            .collect();
127
128        let results = futures::future::join_all(fetch_tasks).await;
129
130        // Track errors and successes
131        let mut errors = Vec::new();
132
133        for (i, result) in results.into_iter().enumerate() {
134            let fetcher_name = &self.segment_fetchers[i].0;
135            match result {
136                Ok(res) => {
137                    match res {
138                        Ok(segments) => {
139                            tracing::info!(
140                                name = %fetcher_name,
141                                n_core_segments = segments.core_segments.len(),
142                                n_non_core_segments = segments.non_core_segments.len(),
143                                %src,
144                                %dst,
145                                "Segment fetcher succeeded"
146                            );
147                            all_core_segments.extend(segments.core_segments);
148                            all_non_core_segments.extend(segments.non_core_segments);
149                        }
150                        Err(e) => {
151                            errors.push((fetcher_name.clone(), e));
152                        }
153                    }
154                }
155                Err(e) => {
156                    errors.push((
157                        fetcher_name.clone(),
158                        Box::new(e) as Box<dyn std::error::Error + Send + Sync>,
159                    ));
160                }
161            }
162        }
163
164        let paths = path::combinator::combine(src, dst, all_core_segments, all_non_core_segments);
165
166        for (fetcher_name, error) in errors.iter() {
167            tracing::warn!(
168                name = %fetcher_name,
169                %error,
170                %src,
171                %dst,
172                "Segment fetcher failed"
173            );
174        }
175
176        // If there were errors but we still have paths, we still return the paths and only log the
177        // fetcher errors.
178        if let Some((_name, err)) = errors.into_iter().next()
179            && paths.is_empty()
180        {
181            return Err(PathFetchError::FetchSegments(err));
182        }
183
184        Ok(paths)
185    }
186}
187
188/// Segment fetcher that uses the endhost API via Connect-RPC to fetch segments.
189pub struct EndhostApiSegmentFetcher {
190    client: Arc<dyn EndhostApiClient>,
191}
192
193impl EndhostApiSegmentFetcher {
194    /// Creates a new endhost API segment fetcher.
195    pub fn new(client: Arc<dyn EndhostApiClient>) -> Self {
196        Self { client }
197    }
198}
199
200#[async_trait::async_trait]
201impl SegmentFetcher for EndhostApiSegmentFetcher {
202    async fn fetch_segments(
203        &self,
204        src: IsdAsn,
205        dst: IsdAsn,
206    ) -> Result<Segments, SegmentFetchError> {
207        let resp = self
208            .client
209            .list_segments(src.into(), dst.into(), 128, "".to_string())
210            .await?;
211
212        tracing::trace!(
213            n_core=resp.segments.core_segments.len(),
214            n_up=resp.segments.up_segments.len(),
215            n_down=resp.segments.down_segments.len(),
216            src = %src,
217            dst = %dst,
218            "Received segments from endhost API"
219        );
220
221        let (core_segments, non_core_segments) = resp.segments.split_parts();
222        Ok(Segments {
223            core_segments: core_segments.into_iter().map(Into::into).collect(),
224            non_core_segments: non_core_segments.into_iter().map(Into::into).collect(),
225        })
226    }
227}