scion_stack/path/
fetcher.rs1use std::sync::Arc;
24
25use endhost_api_client::client::EndhostApiClient;
26use scion_proto::{
27 address::IsdAsn,
28 path::{self, Path},
29};
30
31use crate::path::fetcher::traits::{
32 PathFetchError, PathFetcher, SegmentFetchError, SegmentFetcher, Segments,
33};
34
35pub mod traits {
37 use std::borrow::Cow;
38
39 use scion_proto::{
40 address::IsdAsn,
41 path::{Path, PathSegment},
42 };
43
44 use crate::types::ResFut;
45
46 pub trait PathFetcher: Send + Sync + 'static {
48 fn fetch_paths(
50 &self,
51 src: IsdAsn,
52 dst: IsdAsn,
53 ) -> impl ResFut<'_, Vec<Path>, PathFetchError>;
54 }
55
56 #[derive(Debug, thiserror::Error)]
58 pub enum PathFetchError {
59 #[error("failed to fetch segments: {0}")]
61 FetchSegments(#[from] SegmentFetchError),
62
63 #[error("no paths found")]
65 NoPathsFound,
66
67 #[error("internal error: {0}")]
69 InternalError(Cow<'static, str>),
70 }
71
72 #[async_trait::async_trait]
74 pub trait SegmentFetcher: Send + Sync + 'static {
75 async fn fetch_segments(
77 &self,
78 src: IsdAsn,
79 dst: IsdAsn,
80 ) -> Result<Segments, SegmentFetchError>;
81 }
82
83 pub type SegmentFetchError = Box<dyn std::error::Error + Send + Sync>;
85
86 pub struct Segments {
88 pub core_segments: Vec<PathSegment>,
90 pub non_core_segments: Vec<PathSegment>,
92 }
93}
94
95pub struct PathFetcherImpl {
97 segment_fetchers: Vec<Box<dyn SegmentFetcher>>,
98}
99
100impl PathFetcherImpl {
101 pub fn new(segment_fetchers: Vec<Box<dyn SegmentFetcher>>) -> Self {
103 Self { segment_fetchers }
104 }
105}
106
107impl PathFetcher for PathFetcherImpl {
108 async fn fetch_paths(&self, src: IsdAsn, dst: IsdAsn) -> Result<Vec<Path>, PathFetchError> {
109 let mut all_core_segments = Vec::new();
110 let mut all_non_core_segments = Vec::new();
111
112 let fetch_tasks: Vec<_> = self
114 .segment_fetchers
115 .iter()
116 .map(|fetcher| fetcher.fetch_segments(src, dst))
117 .collect();
118
119 let results = futures::future::join_all(fetch_tasks).await;
120
121 let mut errors = Vec::new();
123
124 for (i, result) in results.into_iter().enumerate() {
125 match result {
126 Ok(Segments {
127 core_segments,
128 non_core_segments,
129 }) => {
130 tracing::trace!(
131 fetcher_index = i,
132 n_core_segments = core_segments.len(),
133 n_non_core_segments = non_core_segments.len(),
134 %src,
135 %dst,
136 "Segment fetcher succeeded"
137 );
138 all_core_segments.extend(core_segments);
139 all_non_core_segments.extend(non_core_segments);
140 }
141 Err(err) => {
142 tracing::warn!(
143 %src,
144 %dst,
145 %err,
146 "Segment fetcher failed"
147 );
148 errors.push(err);
149 }
150 }
151 }
152
153 let paths = path::combinator::combine(src, dst, all_core_segments, all_non_core_segments);
154
155 if !errors.is_empty() && paths.is_empty() {
158 return Err(PathFetchError::FetchSegments(
159 errors.into_iter().next().unwrap(),
160 ));
161 }
162
163 Ok(paths)
164 }
165}
166
167pub struct ConnectRpcSegmentFetcher {
169 client: Arc<dyn EndhostApiClient>,
170}
171
172impl ConnectRpcSegmentFetcher {
173 pub fn new(client: Arc<dyn EndhostApiClient>) -> Self {
175 Self { client }
176 }
177}
178
179#[async_trait::async_trait]
180impl SegmentFetcher for ConnectRpcSegmentFetcher {
181 async fn fetch_segments(
182 &self,
183 src: IsdAsn,
184 dst: IsdAsn,
185 ) -> Result<Segments, SegmentFetchError> {
186 let resp = self
187 .client
188 .list_segments(src, dst, 128, "".to_string())
189 .await?;
190
191 tracing::debug!(
192 n_core=resp.segments.core_segments.len(),
193 n_up=resp.segments.up_segments.len(),
194 n_down=resp.segments.down_segments.len(),
195 src = %src,
196 dst = %dst,
197 "Received segments from control plane"
198 );
199
200 let (core_segments, non_core_segments) = resp.segments.split_parts();
201 Ok(Segments {
202 core_segments,
203 non_core_segments,
204 })
205 }
206}