scion_stack/path/
fetcher.rs1use std::sync::Arc;
21
22use endhost_api_client::client::EndhostApiClient;
23use scion_proto::{
24 address::IsdAsn,
25 path::{self, Path},
26};
27
28use crate::path::fetcher::traits::{
29 PathFetchError, PathFetcher, SegmentFetchError, SegmentFetcher, Segments,
30};
31
32pub mod traits {
34 use std::borrow::Cow;
35
36 use scion_proto::{
37 address::IsdAsn,
38 path::{Path, PathSegment},
39 };
40
41 use crate::types::ResFut;
42
43 pub trait PathFetcher: Send + Sync + 'static {
45 fn fetch_paths(
47 &self,
48 src: IsdAsn,
49 dst: IsdAsn,
50 ) -> impl ResFut<'_, Vec<Path>, PathFetchError>;
51 }
52
53 #[derive(Debug, thiserror::Error)]
55 pub enum PathFetchError {
56 #[error("failed to fetch segments: {0}")]
58 FetchSegments(#[from] SegmentFetchError),
59
60 #[error("no paths found")]
62 NoPathsFound,
63
64 #[error("internal error: {0}")]
66 InternalError(Cow<'static, str>),
67 }
68
69 #[async_trait::async_trait]
71 pub trait SegmentFetcher: Send + Sync + 'static {
72 async fn fetch_segments(
74 &self,
75 src: IsdAsn,
76 dst: IsdAsn,
77 ) -> Result<Segments, SegmentFetchError>;
78 }
79
80 pub type SegmentFetchError = Box<dyn std::error::Error + Send + Sync>;
82
83 #[derive(Debug)]
85 pub struct Segments {
86 pub core_segments: Vec<PathSegment>,
88 pub non_core_segments: Vec<PathSegment>,
90 }
91}
92
93pub struct PathFetcherImpl {
95 segment_fetchers: Vec<(String, Box<dyn SegmentFetcher>)>,
96 timeout: std::time::Duration,
99}
100
101impl PathFetcherImpl {
102 pub fn new(
104 segment_fetchers: Vec<(String, Box<dyn SegmentFetcher>)>,
105 timeout: std::time::Duration,
106 ) -> Self {
107 Self {
108 segment_fetchers,
109 timeout,
110 }
111 }
112}
113
114impl PathFetcher for PathFetcherImpl {
115 async fn fetch_paths(&self, src: IsdAsn, dst: IsdAsn) -> Result<Vec<Path>, PathFetchError> {
116 let mut all_core_segments = Vec::new();
117 let mut all_non_core_segments = Vec::new();
118
119 let fetch_tasks: Vec<_> = self
121 .segment_fetchers
122 .iter()
123 .map(|(_, fetcher)| {
124 tokio::time::timeout(self.timeout, fetcher.fetch_segments(src, dst))
125 })
126 .collect();
127
128 let results = futures::future::join_all(fetch_tasks).await;
129
130 let mut errors = Vec::new();
132
133 for (i, result) in results.into_iter().enumerate() {
134 let fetcher_name = &self.segment_fetchers[i].0;
135 match result {
136 Ok(res) => {
137 match res {
138 Ok(segments) => {
139 tracing::info!(
140 name = %fetcher_name,
141 n_core_segments = segments.core_segments.len(),
142 n_non_core_segments = segments.non_core_segments.len(),
143 %src,
144 %dst,
145 "Segment fetcher succeeded"
146 );
147 all_core_segments.extend(segments.core_segments);
148 all_non_core_segments.extend(segments.non_core_segments);
149 }
150 Err(e) => {
151 errors.push((fetcher_name.clone(), e));
152 }
153 }
154 }
155 Err(e) => {
156 errors.push((
157 fetcher_name.clone(),
158 Box::new(e) as Box<dyn std::error::Error + Send + Sync>,
159 ));
160 }
161 }
162 }
163
164 let paths = path::combinator::combine(src, dst, all_core_segments, all_non_core_segments);
165
166 for (fetcher_name, error) in errors.iter() {
167 tracing::warn!(
168 name = %fetcher_name,
169 %error,
170 %src,
171 %dst,
172 "Segment fetcher failed"
173 );
174 }
175
176 if let Some((_name, err)) = errors.into_iter().next()
179 && paths.is_empty()
180 {
181 return Err(PathFetchError::FetchSegments(err));
182 }
183
184 Ok(paths)
185 }
186}
187
188pub struct EndhostApiSegmentFetcher {
190 client: Arc<dyn EndhostApiClient>,
191}
192
193impl EndhostApiSegmentFetcher {
194 pub fn new(client: Arc<dyn EndhostApiClient>) -> Self {
196 Self { client }
197 }
198}
199
200#[async_trait::async_trait]
201impl SegmentFetcher for EndhostApiSegmentFetcher {
202 async fn fetch_segments(
203 &self,
204 src: IsdAsn,
205 dst: IsdAsn,
206 ) -> Result<Segments, SegmentFetchError> {
207 let resp = self
208 .client
209 .list_segments(src.into(), dst.into(), 128, "".to_string())
210 .await?;
211
212 tracing::trace!(
213 n_core=resp.segments.core_segments.len(),
214 n_up=resp.segments.up_segments.len(),
215 n_down=resp.segments.down_segments.len(),
216 src = %src,
217 dst = %dst,
218 "Received segments from endhost API"
219 );
220
221 let (core_segments, non_core_segments) = resp.segments.split_parts();
222 Ok(Segments {
223 core_segments: core_segments.into_iter().map(Into::into).collect(),
224 non_core_segments: non_core_segments.into_iter().map(Into::into).collect(),
225 })
226 }
227}