1use arc_swap::ArcSwapOption;
3use dynamic_routing::{dynamic_route_provider::DynamicRouteProviderBuilder, node::Node};
4use std::{
5 future::Future,
6 str::FromStr,
7 sync::{
8 atomic::{AtomicUsize, Ordering},
9 Arc,
10 },
11 time::Duration,
12};
13use url::Url;
14
15use crate::agent::AgentError;
16
17use super::HttpService;
18#[cfg(not(feature = "_internal_dynamic-routing"))]
19pub(crate) mod dynamic_routing;
20#[cfg(feature = "_internal_dynamic-routing")]
21pub mod dynamic_routing;
22
23const IC0_DOMAIN: &str = "ic0.app";
24const ICP0_DOMAIN: &str = "icp0.io";
25const ICP_API_DOMAIN: &str = "icp-api.io";
26const LOCALHOST_DOMAIN: &str = "localhost";
27const IC0_SUB_DOMAIN: &str = ".ic0.app";
28const ICP0_SUB_DOMAIN: &str = ".icp0.io";
29const ICP_API_SUB_DOMAIN: &str = ".icp-api.io";
30const LOCALHOST_SUB_DOMAIN: &str = ".localhost";
31
32#[derive(Debug, PartialEq)]
34pub struct RoutesStats {
35 pub total: usize,
37
38 pub healthy: Option<usize>,
42}
43
44impl RoutesStats {
45 pub fn new(total: usize, healthy: Option<usize>) -> Self {
47 Self { total, healthy }
48 }
49}
50
51pub trait RouteProvider: std::fmt::Debug + Send + Sync {
53 fn route(&self) -> Result<Url, AgentError>;
58
59 fn n_ordered_routes(&self, n: usize) -> Result<Vec<Url>, AgentError>;
66
67 fn routes_stats(&self) -> RoutesStats;
69}
70
71#[derive(Debug)]
73pub struct RoundRobinRouteProvider {
74 routes: Vec<Url>,
75 current_idx: AtomicUsize,
76}
77
78impl RouteProvider for RoundRobinRouteProvider {
79 fn route(&self) -> Result<Url, AgentError> {
81 if self.routes.is_empty() {
82 return Err(AgentError::RouteProviderError(
83 "No routing urls provided".to_string(),
84 ));
85 }
86 let prev_idx = self.current_idx.fetch_add(1, Ordering::Relaxed);
88 Ok(self.routes[prev_idx % self.routes.len()].clone())
89 }
90
91 fn n_ordered_routes(&self, n: usize) -> Result<Vec<Url>, AgentError> {
92 if n == 0 {
93 return Ok(Vec::new());
94 }
95
96 if n >= self.routes.len() {
97 return Ok(self.routes.clone());
98 }
99
100 let idx = self.current_idx.fetch_add(n, Ordering::Relaxed) % self.routes.len();
101 let mut urls = Vec::with_capacity(n);
102
103 if self.routes.len() - idx >= n {
104 urls.extend_from_slice(&self.routes[idx..idx + n]);
105 } else {
106 urls.extend_from_slice(&self.routes[idx..]);
107 urls.extend_from_slice(&self.routes[..n - urls.len()]);
108 }
109
110 Ok(urls)
111 }
112
113 fn routes_stats(&self) -> RoutesStats {
114 RoutesStats::new(self.routes.len(), None)
115 }
116}
117
118impl RoundRobinRouteProvider {
119 pub fn new<T: AsRef<str>>(routes: Vec<T>) -> Result<Self, AgentError> {
121 let routes: Result<Vec<Url>, _> = routes
122 .into_iter()
123 .map(|url| {
124 Url::from_str(url.as_ref()).and_then(|mut url| {
125 if let Some(domain) = url.domain() {
127 if domain.ends_with(IC0_SUB_DOMAIN) {
128 url.set_host(Some(IC0_DOMAIN))?;
129 } else if domain.ends_with(ICP0_SUB_DOMAIN) {
130 url.set_host(Some(ICP0_DOMAIN))?;
131 } else if domain.ends_with(ICP_API_SUB_DOMAIN) {
132 url.set_host(Some(ICP_API_DOMAIN))?;
133 } else if domain.ends_with(LOCALHOST_SUB_DOMAIN) {
134 url.set_host(Some(LOCALHOST_DOMAIN))?;
135 }
136 }
137 Ok(url)
138 })
139 })
140 .collect();
141 Ok(Self {
142 routes: routes?,
143 current_idx: AtomicUsize::new(0),
144 })
145 }
146}
147
148impl RouteProvider for Url {
149 fn route(&self) -> Result<Url, AgentError> {
150 Ok(self.clone())
151 }
152 fn n_ordered_routes(&self, _: usize) -> Result<Vec<Url>, AgentError> {
153 Ok(vec![self.route()?])
154 }
155 fn routes_stats(&self) -> RoutesStats {
156 RoutesStats::new(1, None)
157 }
158}
159
160#[derive(Debug)]
162pub struct DynamicRouteProvider {
163 inner: Box<dyn RouteProvider>,
164}
165
166impl DynamicRouteProvider {
167 pub async fn run_in_background(
169 seed_domains: Vec<String>,
170 client: Arc<dyn HttpService>,
171 ) -> Result<Self, AgentError> {
172 let seed_nodes: Result<Vec<_>, _> = seed_domains.into_iter().map(Node::new).collect();
173 let provider = DynamicRouteProviderBuilder::new(seed_nodes?, client).build();
174 provider.start().await;
175
176 Ok(Self {
177 inner: Box::new(provider),
178 })
179 }
180 pub async fn run_in_background_with_intervals(
182 seed_domains: Vec<String>,
183 client: Arc<dyn HttpService>,
184 list_update_interval: Duration,
185 health_check_interval: Duration,
186 ) -> Result<Self, AgentError> {
187 let seed_nodes: Result<Vec<_>, _> = seed_domains.into_iter().map(Node::new).collect();
188 let provider = DynamicRouteProviderBuilder::new(seed_nodes?, client)
189 .with_fetch_period(list_update_interval)
190 .with_check_period(health_check_interval)
191 .build();
192 provider.start().await;
193
194 Ok(Self {
195 inner: Box::new(provider),
196 })
197 }
198}
199
200impl RouteProvider for DynamicRouteProvider {
201 fn route(&self) -> Result<Url, AgentError> {
202 self.inner.route()
203 }
204 fn n_ordered_routes(&self, n: usize) -> Result<Vec<Url>, AgentError> {
205 self.inner.n_ordered_routes(n)
206 }
207 fn routes_stats(&self) -> RoutesStats {
208 self.inner.routes_stats()
209 }
210}
211
212#[derive(Debug)]
213pub(crate) struct UrlUntilReady<R> {
214 url: Url,
215 router: ArcSwapOption<R>,
216}
217
218impl<R: RouteProvider + 'static> UrlUntilReady<R> {
219 pub(crate) fn new<
220 #[cfg(not(target_family = "wasm"))] F: Future<Output = R> + Send + 'static,
221 #[cfg(target_family = "wasm")] F: Future<Output = R> + 'static,
222 >(
223 url: Url,
224 fut: F,
225 ) -> Arc<Self> {
226 let s = Arc::new(Self {
227 url,
228 router: ArcSwapOption::empty(),
229 });
230 let weak = Arc::downgrade(&s);
231 crate::util::spawn(async move {
232 let router = fut.await;
233 if let Some(outer) = weak.upgrade() {
234 outer.router.store(Some(Arc::new(router)))
235 }
236 });
237 s
238 }
239}
240
241impl<R: RouteProvider> RouteProvider for UrlUntilReady<R> {
242 fn n_ordered_routes(&self, n: usize) -> Result<Vec<Url>, AgentError> {
243 if let Some(r) = &*self.router.load() {
244 r.n_ordered_routes(n)
245 } else {
246 self.url.n_ordered_routes(n)
247 }
248 }
249 fn route(&self) -> Result<Url, AgentError> {
250 if let Some(r) = &*self.router.load() {
251 r.route()
252 } else {
253 self.url.route()
254 }
255 }
256 fn routes_stats(&self) -> RoutesStats {
257 RoutesStats::new(1, None)
258 }
259}
260
261#[cfg(test)]
262mod tests {
263 use super::*;
264
265 #[test]
266 fn test_empty_routes() {
267 let provider = RoundRobinRouteProvider::new::<&str>(vec![])
268 .expect("failed to create a route provider");
269 let result = provider.route().unwrap_err();
270 assert_eq!(
271 result,
272 AgentError::RouteProviderError("No routing urls provided".to_string())
273 );
274 }
275
276 #[test]
277 fn test_routes_rotation() {
278 let provider = RoundRobinRouteProvider::new(vec!["https://url1.com", "https://url2.com"])
279 .expect("failed to create a route provider");
280 let url_strings = ["https://url1.com", "https://url2.com", "https://url1.com"];
281 let expected_urls: Vec<Url> = url_strings
282 .iter()
283 .map(|url_str| Url::parse(url_str).expect("Invalid URL"))
284 .collect();
285 let urls: Vec<Url> = (0..3)
286 .map(|_| provider.route().expect("failed to get next url"))
287 .collect();
288 assert_eq!(expected_urls, urls);
289 }
290
291 #[test]
292 fn test_n_routes() {
293 let provider = RoundRobinRouteProvider::new(Vec::<&str>::new())
295 .expect("failed to create a route provider");
296 let urls_iter = provider.n_ordered_routes(1).expect("failed to get urls");
297 assert!(urls_iter.is_empty());
298 let provider = RoundRobinRouteProvider::new(vec![
300 "https://url1.com",
301 "https://url2.com",
302 "https://url3.com",
303 "https://url4.com",
304 "https://url5.com",
305 ])
306 .expect("failed to create a route provider");
307 let urls: Vec<_> = provider.n_ordered_routes(3).expect("failed to get urls");
309 let expected_urls: Vec<Url> = ["https://url1.com", "https://url2.com", "https://url3.com"]
310 .iter()
311 .map(|url_str| Url::parse(url_str).expect("invalid URL"))
312 .collect();
313 assert_eq!(urls, expected_urls);
314 let urls: Vec<_> = provider.n_ordered_routes(3).expect("failed to get urls");
316 let expected_urls: Vec<Url> = ["https://url4.com", "https://url5.com", "https://url1.com"]
317 .iter()
318 .map(|url_str| Url::parse(url_str).expect("invalid URL"))
319 .collect();
320 assert_eq!(urls, expected_urls);
321 let urls: Vec<_> = provider.n_ordered_routes(2).expect("failed to get urls");
323 let expected_urls: Vec<Url> = ["https://url2.com", "https://url3.com"]
324 .iter()
325 .map(|url_str| Url::parse(url_str).expect("invalid URL"))
326 .collect();
327 assert_eq!(urls, expected_urls);
328 let urls: Vec<_> = provider.n_ordered_routes(5).expect("failed to get urls");
330 let expected_urls: Vec<Url> = [
331 "https://url1.com",
332 "https://url2.com",
333 "https://url3.com",
334 "https://url4.com",
335 "https://url5.com",
336 ]
337 .iter()
338 .map(|url_str| Url::parse(url_str).expect("invalid URL"))
339 .collect();
340 assert_eq!(urls, expected_urls);
341 }
342}