1use arc_swap::ArcSwapOption;
3use dynamic_routing::{
4 dynamic_route_provider::DynamicRouteProviderBuilder,
5 node::Node,
6 snapshot::{
7 latency_based_routing::LatencyRoutingSnapshot,
8 round_robin_routing::RoundRobinRoutingSnapshot,
9 },
10};
11use std::{
12 future::Future,
13 str::FromStr,
14 sync::{
15 atomic::{AtomicUsize, Ordering},
16 Arc,
17 },
18 time::Duration,
19};
20use url::Url;
21
22use crate::agent::AgentError;
23
24use super::HttpService;
25#[cfg(not(feature = "_internal_dynamic-routing"))]
26pub(crate) mod dynamic_routing;
27#[cfg(feature = "_internal_dynamic-routing")]
28pub mod dynamic_routing;
29
30const IC0_DOMAIN: &str = "ic0.app";
31const ICP0_DOMAIN: &str = "icp0.io";
32const ICP_API_DOMAIN: &str = "icp-api.io";
33const LOCALHOST_DOMAIN: &str = "localhost";
34const IC0_SUB_DOMAIN: &str = ".ic0.app";
35const ICP0_SUB_DOMAIN: &str = ".icp0.io";
36const ICP_API_SUB_DOMAIN: &str = ".icp-api.io";
37const LOCALHOST_SUB_DOMAIN: &str = ".localhost";
38
39#[derive(Debug, PartialEq)]
41pub struct RoutesStats {
42 pub total: usize,
44
45 pub healthy: Option<usize>,
49}
50
51impl RoutesStats {
52 pub fn new(total: usize, healthy: Option<usize>) -> Self {
54 Self { total, healthy }
55 }
56}
57
58pub trait RouteProvider: std::fmt::Debug + Send + Sync {
60 fn route(&self) -> Result<Url, AgentError>;
65
66 fn n_ordered_routes(&self, n: usize) -> Result<Vec<Url>, AgentError>;
73
74 fn routes_stats(&self) -> RoutesStats;
76}
77
78#[derive(Debug)]
80pub struct RoundRobinRouteProvider {
81 routes: Vec<Url>,
82 current_idx: AtomicUsize,
83}
84
85impl RouteProvider for RoundRobinRouteProvider {
86 fn route(&self) -> Result<Url, AgentError> {
88 if self.routes.is_empty() {
89 return Err(AgentError::RouteProviderError(
90 "No routing urls provided".to_string(),
91 ));
92 }
93 let prev_idx = self.current_idx.fetch_add(1, Ordering::Relaxed);
95 Ok(self.routes[prev_idx % self.routes.len()].clone())
96 }
97
98 fn n_ordered_routes(&self, n: usize) -> Result<Vec<Url>, AgentError> {
99 if n == 0 {
100 return Ok(Vec::new());
101 }
102
103 if n >= self.routes.len() {
104 return Ok(self.routes.clone());
105 }
106
107 let idx = self.current_idx.fetch_add(n, Ordering::Relaxed) % self.routes.len();
108 let mut urls = Vec::with_capacity(n);
109
110 if self.routes.len() - idx >= n {
111 urls.extend_from_slice(&self.routes[idx..idx + n]);
112 } else {
113 urls.extend_from_slice(&self.routes[idx..]);
114 urls.extend_from_slice(&self.routes[..n - urls.len()]);
115 }
116
117 Ok(urls)
118 }
119
120 fn routes_stats(&self) -> RoutesStats {
121 RoutesStats::new(self.routes.len(), None)
122 }
123}
124
125impl RoundRobinRouteProvider {
126 pub fn new<T: AsRef<str>>(routes: Vec<T>) -> Result<Self, AgentError> {
128 let routes: Result<Vec<Url>, _> = routes
129 .into_iter()
130 .map(|url| {
131 Url::from_str(url.as_ref()).and_then(|mut url| {
132 if let Some(domain) = url.domain() {
134 if domain.ends_with(IC0_SUB_DOMAIN) {
135 url.set_host(Some(IC0_DOMAIN))?;
136 } else if domain.ends_with(ICP0_SUB_DOMAIN) {
137 url.set_host(Some(ICP0_DOMAIN))?;
138 } else if domain.ends_with(ICP_API_SUB_DOMAIN) {
139 url.set_host(Some(ICP_API_DOMAIN))?;
140 } else if domain.ends_with(LOCALHOST_SUB_DOMAIN) {
141 url.set_host(Some(LOCALHOST_DOMAIN))?;
142 }
143 }
144 Ok(url)
145 })
146 })
147 .collect();
148 Ok(Self {
149 routes: routes?,
150 current_idx: AtomicUsize::new(0),
151 })
152 }
153}
154
155impl RouteProvider for Url {
156 fn route(&self) -> Result<Url, AgentError> {
157 Ok(self.clone())
158 }
159 fn n_ordered_routes(&self, _: usize) -> Result<Vec<Url>, AgentError> {
160 Ok(vec![self.route()?])
161 }
162 fn routes_stats(&self) -> RoutesStats {
163 RoutesStats::new(1, None)
164 }
165}
166
167#[derive(Debug)]
169pub struct DynamicRouteProvider {
170 inner: Box<dyn RouteProvider>,
171}
172
173impl DynamicRouteProvider {
174 pub async fn run_in_background(
176 seed_domains: Vec<String>,
177 client: Arc<dyn HttpService>,
178 strategy: DynamicRoutingStrategy,
179 ) -> Result<Self, AgentError> {
180 let seed_nodes: Result<Vec<_>, _> = seed_domains.into_iter().map(Node::new).collect();
181 let boxed = match strategy {
182 DynamicRoutingStrategy::ByLatency => Box::new(
183 DynamicRouteProviderBuilder::new(
184 LatencyRoutingSnapshot::new(),
185 seed_nodes?,
186 client,
187 )
188 .build()
189 .await,
190 ) as Box<dyn RouteProvider>,
191 DynamicRoutingStrategy::RoundRobin => Box::new(
192 DynamicRouteProviderBuilder::new(
193 RoundRobinRoutingSnapshot::new(),
194 seed_nodes?,
195 client,
196 )
197 .build()
198 .await,
199 ),
200 };
201 Ok(Self { inner: boxed })
202 }
203 pub async fn run_in_background_with_intervals(
205 seed_domains: Vec<String>,
206 client: Arc<dyn HttpService>,
207 strategy: DynamicRoutingStrategy,
208 list_update_interval: Duration,
209 health_check_interval: Duration,
210 ) -> Result<Self, AgentError> {
211 let seed_nodes: Result<Vec<_>, _> = seed_domains.into_iter().map(Node::new).collect();
212 let boxed = match strategy {
213 DynamicRoutingStrategy::ByLatency => Box::new(
214 DynamicRouteProviderBuilder::new(
215 LatencyRoutingSnapshot::new(),
216 seed_nodes?,
217 client,
218 )
219 .with_fetch_period(list_update_interval)
220 .with_check_period(health_check_interval)
221 .build()
222 .await,
223 ) as Box<dyn RouteProvider>,
224 DynamicRoutingStrategy::RoundRobin => Box::new(
225 DynamicRouteProviderBuilder::new(
226 RoundRobinRoutingSnapshot::new(),
227 seed_nodes?,
228 client,
229 )
230 .with_fetch_period(list_update_interval)
231 .with_check_period(health_check_interval)
232 .build()
233 .await,
234 ),
235 };
236 Ok(Self { inner: boxed })
237 }
238}
239
240impl RouteProvider for DynamicRouteProvider {
241 fn route(&self) -> Result<Url, AgentError> {
242 self.inner.route()
243 }
244 fn n_ordered_routes(&self, n: usize) -> Result<Vec<Url>, AgentError> {
245 self.inner.n_ordered_routes(n)
246 }
247 fn routes_stats(&self) -> RoutesStats {
248 self.inner.routes_stats()
249 }
250}
251
252#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
254pub enum DynamicRoutingStrategy {
255 ByLatency,
257 RoundRobin,
259}
260
261#[derive(Debug)]
262pub(crate) struct UrlUntilReady<R> {
263 url: Url,
264 router: ArcSwapOption<R>,
265}
266
267impl<R: RouteProvider + 'static> UrlUntilReady<R> {
268 pub(crate) fn new<
269 #[cfg(not(target_family = "wasm"))] F: Future<Output = R> + Send + 'static,
270 #[cfg(target_family = "wasm")] F: Future<Output = R> + 'static,
271 >(
272 url: Url,
273 fut: F,
274 ) -> Arc<Self> {
275 let s = Arc::new(Self {
276 url,
277 router: ArcSwapOption::empty(),
278 });
279 let weak = Arc::downgrade(&s);
280 crate::util::spawn(async move {
281 let router = fut.await;
282 if let Some(outer) = weak.upgrade() {
283 outer.router.store(Some(Arc::new(router)))
284 }
285 });
286 s
287 }
288}
289
290impl<R: RouteProvider> RouteProvider for UrlUntilReady<R> {
291 fn n_ordered_routes(&self, n: usize) -> Result<Vec<Url>, AgentError> {
292 if let Some(r) = &*self.router.load() {
293 r.n_ordered_routes(n)
294 } else {
295 self.url.n_ordered_routes(n)
296 }
297 }
298 fn route(&self) -> Result<Url, AgentError> {
299 if let Some(r) = &*self.router.load() {
300 r.route()
301 } else {
302 self.url.route()
303 }
304 }
305 fn routes_stats(&self) -> RoutesStats {
306 RoutesStats::new(1, None)
307 }
308}
309
310#[cfg(test)]
311mod tests {
312 use super::*;
313
314 #[test]
315 fn test_empty_routes() {
316 let provider = RoundRobinRouteProvider::new::<&str>(vec![])
317 .expect("failed to create a route provider");
318 let result = provider.route().unwrap_err();
319 assert_eq!(
320 result,
321 AgentError::RouteProviderError("No routing urls provided".to_string())
322 );
323 }
324
325 #[test]
326 fn test_routes_rotation() {
327 let provider = RoundRobinRouteProvider::new(vec!["https://url1.com", "https://url2.com"])
328 .expect("failed to create a route provider");
329 let url_strings = ["https://url1.com", "https://url2.com", "https://url1.com"];
330 let expected_urls: Vec<Url> = url_strings
331 .iter()
332 .map(|url_str| Url::parse(url_str).expect("Invalid URL"))
333 .collect();
334 let urls: Vec<Url> = (0..3)
335 .map(|_| provider.route().expect("failed to get next url"))
336 .collect();
337 assert_eq!(expected_urls, urls);
338 }
339
340 #[test]
341 fn test_n_routes() {
342 let provider = RoundRobinRouteProvider::new(Vec::<&str>::new())
344 .expect("failed to create a route provider");
345 let urls_iter = provider.n_ordered_routes(1).expect("failed to get urls");
346 assert!(urls_iter.is_empty());
347 let provider = RoundRobinRouteProvider::new(vec![
349 "https://url1.com",
350 "https://url2.com",
351 "https://url3.com",
352 "https://url4.com",
353 "https://url5.com",
354 ])
355 .expect("failed to create a route provider");
356 let urls: Vec<_> = provider.n_ordered_routes(3).expect("failed to get urls");
358 let expected_urls: Vec<Url> = ["https://url1.com", "https://url2.com", "https://url3.com"]
359 .iter()
360 .map(|url_str| Url::parse(url_str).expect("invalid URL"))
361 .collect();
362 assert_eq!(urls, expected_urls);
363 let urls: Vec<_> = provider.n_ordered_routes(3).expect("failed to get urls");
365 let expected_urls: Vec<Url> = ["https://url4.com", "https://url5.com", "https://url1.com"]
366 .iter()
367 .map(|url_str| Url::parse(url_str).expect("invalid URL"))
368 .collect();
369 assert_eq!(urls, expected_urls);
370 let urls: Vec<_> = provider.n_ordered_routes(2).expect("failed to get urls");
372 let expected_urls: Vec<Url> = ["https://url2.com", "https://url3.com"]
373 .iter()
374 .map(|url_str| Url::parse(url_str).expect("invalid URL"))
375 .collect();
376 assert_eq!(urls, expected_urls);
377 let urls: Vec<_> = provider.n_ordered_routes(5).expect("failed to get urls");
379 let expected_urls: Vec<Url> = [
380 "https://url1.com",
381 "https://url2.com",
382 "https://url3.com",
383 "https://url4.com",
384 "https://url5.com",
385 ]
386 .iter()
387 .map(|url_str| Url::parse(url_str).expect("invalid URL"))
388 .collect();
389 assert_eq!(urls, expected_urls);
390 }
391}