scion_stack/path/
fetcher.rs1use std::sync::Arc;
21
22use endhost_api_client::client::EndhostApiClient;
23use scion_proto::{
24 address::IsdAsn,
25 path::{self, Path},
26};
27
28use crate::path::fetcher::traits::{
29 PathFetchError, PathFetcher, SegmentFetchError, SegmentFetcher, Segments,
30};
31
32pub mod traits {
34 use std::borrow::Cow;
35
36 use scion_proto::{
37 address::IsdAsn,
38 path::{Path, PathSegment},
39 };
40
41 use crate::types::ResFut;
42
43 pub trait PathFetcher: Send + Sync + 'static {
45 fn fetch_paths(
47 &self,
48 src: IsdAsn,
49 dst: IsdAsn,
50 ) -> impl ResFut<'_, Vec<Path>, PathFetchError>;
51 }
52
53 #[derive(Debug, thiserror::Error)]
55 pub enum PathFetchError {
56 #[error("failed to fetch segments: {0}")]
58 FetchSegments(#[from] SegmentFetchError),
59
60 #[error("no paths found")]
62 NoPathsFound,
63
64 #[error("internal error: {0}")]
66 InternalError(Cow<'static, str>),
67 }
68
69 #[async_trait::async_trait]
71 pub trait SegmentFetcher: Send + Sync + 'static {
72 async fn fetch_segments(
74 &self,
75 src: IsdAsn,
76 dst: IsdAsn,
77 ) -> Result<Segments, SegmentFetchError>;
78 }
79
80 pub type SegmentFetchError = Box<dyn std::error::Error + Send + Sync>;
82
83 #[derive(Debug)]
85 pub struct Segments {
86 pub core_segments: Vec<PathSegment>,
88 pub non_core_segments: Vec<PathSegment>,
90 }
91}
92
93pub struct PathFetcherImpl {
95 segment_fetchers: Vec<Box<dyn SegmentFetcher>>,
96}
97
98impl PathFetcherImpl {
99 pub fn new(segment_fetchers: Vec<Box<dyn SegmentFetcher>>) -> Self {
101 Self { segment_fetchers }
102 }
103}
104
105impl PathFetcher for PathFetcherImpl {
106 async fn fetch_paths(&self, src: IsdAsn, dst: IsdAsn) -> Result<Vec<Path>, PathFetchError> {
107 let mut all_core_segments = Vec::new();
108 let mut all_non_core_segments = Vec::new();
109
110 let fetch_tasks: Vec<_> = self
112 .segment_fetchers
113 .iter()
114 .map(|fetcher| fetcher.fetch_segments(src, dst))
115 .collect();
116
117 let results = futures::future::join_all(fetch_tasks).await;
118
119 let mut errors = Vec::new();
121
122 for (i, result) in results.into_iter().enumerate() {
123 match result {
124 Ok(Segments {
125 core_segments,
126 non_core_segments,
127 }) => {
128 tracing::trace!(
129 fetcher_index = i,
130 n_core_segments = core_segments.len(),
131 n_non_core_segments = non_core_segments.len(),
132 %src,
133 %dst,
134 "Segment fetcher succeeded"
135 );
136 all_core_segments.extend(core_segments);
137 all_non_core_segments.extend(non_core_segments);
138 }
139 Err(e) => {
140 errors.push(e);
141 }
142 }
143 }
144
145 let paths = path::combinator::combine(src, dst, all_core_segments, all_non_core_segments);
146
147 for (i, error) in errors.iter().enumerate() {
148 tracing::warn!(
149 error_index = i,
150 %error,
151 %src,
152 %dst,
153 "Segment fetcher failed"
154 );
155 }
156
157 if !errors.is_empty() && paths.is_empty() {
160 return Err(PathFetchError::FetchSegments(
161 errors.into_iter().next().unwrap(),
162 ));
163 }
164
165 Ok(paths)
166 }
167}
168
169pub struct EndhostApiSegmentFetcher {
171 client: Arc<dyn EndhostApiClient>,
172}
173
174impl EndhostApiSegmentFetcher {
175 pub fn new(client: Arc<dyn EndhostApiClient>) -> Self {
177 Self { client }
178 }
179}
180
181#[async_trait::async_trait]
182impl SegmentFetcher for EndhostApiSegmentFetcher {
183 async fn fetch_segments(
184 &self,
185 src: IsdAsn,
186 dst: IsdAsn,
187 ) -> Result<Segments, SegmentFetchError> {
188 let resp = self
189 .client
190 .list_segments(src, dst, 128, "".to_string())
191 .await?;
192
193 tracing::trace!(
194 n_core=resp.segments.core_segments.len(),
195 n_up=resp.segments.up_segments.len(),
196 n_down=resp.segments.down_segments.len(),
197 src = %src,
198 dst = %dst,
199 "Received segments from endhost API"
200 );
201
202 let (core_segments, non_core_segments) = resp.segments.split_parts();
203 Ok(Segments {
204 core_segments,
205 non_core_segments,
206 })
207 }
208}