1use arc_swap::ArcSwapOption;
3use dynamic_routing::{
4 dynamic_route_provider::DynamicRouteProviderBuilder,
5 node::Node,
6 snapshot::{
7 latency_based_routing::LatencyRoutingSnapshot,
8 round_robin_routing::RoundRobinRoutingSnapshot,
9 },
10};
11use std::{
12 future::Future,
13 str::FromStr,
14 sync::{
15 atomic::{AtomicUsize, Ordering},
16 Arc,
17 },
18 time::Duration,
19};
20use url::Url;
21
22use crate::agent::AgentError;
23
24use super::HttpService;
25#[cfg(not(feature = "_internal_dynamic-routing"))]
26pub(crate) mod dynamic_routing;
27#[cfg(feature = "_internal_dynamic-routing")]
28pub mod dynamic_routing;
29
30const IC0_DOMAIN: &str = "ic0.app";
31const ICP0_DOMAIN: &str = "icp0.io";
32const ICP_API_DOMAIN: &str = "icp-api.io";
33const LOCALHOST_DOMAIN: &str = "localhost";
34const IC0_SUB_DOMAIN: &str = ".ic0.app";
35const ICP0_SUB_DOMAIN: &str = ".icp0.io";
36const ICP_API_SUB_DOMAIN: &str = ".icp-api.io";
37const LOCALHOST_SUB_DOMAIN: &str = ".localhost";
38
39pub trait RouteProvider: std::fmt::Debug + Send + Sync {
41 fn route(&self) -> Result<Url, AgentError>;
46
47 fn n_ordered_routes(&self, n: usize) -> Result<Vec<Url>, AgentError>;
54}
55
56#[derive(Debug)]
58pub struct RoundRobinRouteProvider {
59 routes: Vec<Url>,
60 current_idx: AtomicUsize,
61}
62
63impl RouteProvider for RoundRobinRouteProvider {
64 fn route(&self) -> Result<Url, AgentError> {
66 if self.routes.is_empty() {
67 return Err(AgentError::RouteProviderError(
68 "No routing urls provided".to_string(),
69 ));
70 }
71 let prev_idx = self.current_idx.fetch_add(1, Ordering::Relaxed);
73 Ok(self.routes[prev_idx % self.routes.len()].clone())
74 }
75
76 fn n_ordered_routes(&self, n: usize) -> Result<Vec<Url>, AgentError> {
77 if n == 0 {
78 return Ok(Vec::new());
79 }
80
81 if n >= self.routes.len() {
82 return Ok(self.routes.clone());
83 }
84
85 let idx = self.current_idx.fetch_add(n, Ordering::Relaxed) % self.routes.len();
86 let mut urls = Vec::with_capacity(n);
87
88 if self.routes.len() - idx >= n {
89 urls.extend_from_slice(&self.routes[idx..idx + n]);
90 } else {
91 urls.extend_from_slice(&self.routes[idx..]);
92 urls.extend_from_slice(&self.routes[..n - urls.len()]);
93 }
94
95 Ok(urls)
96 }
97}
98
99impl RoundRobinRouteProvider {
100 pub fn new<T: AsRef<str>>(routes: Vec<T>) -> Result<Self, AgentError> {
102 let routes: Result<Vec<Url>, _> = routes
103 .into_iter()
104 .map(|url| {
105 Url::from_str(url.as_ref()).and_then(|mut url| {
106 if let Some(domain) = url.domain() {
108 if domain.ends_with(IC0_SUB_DOMAIN) {
109 url.set_host(Some(IC0_DOMAIN))?;
110 } else if domain.ends_with(ICP0_SUB_DOMAIN) {
111 url.set_host(Some(ICP0_DOMAIN))?;
112 } else if domain.ends_with(ICP_API_SUB_DOMAIN) {
113 url.set_host(Some(ICP_API_DOMAIN))?;
114 } else if domain.ends_with(LOCALHOST_SUB_DOMAIN) {
115 url.set_host(Some(LOCALHOST_DOMAIN))?;
116 }
117 }
118 Ok(url)
119 })
120 })
121 .collect();
122 Ok(Self {
123 routes: routes?,
124 current_idx: AtomicUsize::new(0),
125 })
126 }
127}
128
129impl RouteProvider for Url {
130 fn route(&self) -> Result<Url, AgentError> {
131 Ok(self.clone())
132 }
133 fn n_ordered_routes(&self, _: usize) -> Result<Vec<Url>, AgentError> {
134 Ok(vec![self.route()?])
135 }
136}
137
138#[derive(Debug)]
140pub struct DynamicRouteProvider {
141 inner: Box<dyn RouteProvider>,
142}
143
144impl DynamicRouteProvider {
145 pub async fn run_in_background(
147 seed_domains: Vec<String>,
148 client: Arc<dyn HttpService>,
149 strategy: DynamicRoutingStrategy,
150 ) -> Result<Self, AgentError> {
151 let seed_nodes: Result<Vec<_>, _> = seed_domains.into_iter().map(Node::new).collect();
152 let boxed = match strategy {
153 DynamicRoutingStrategy::ByLatency => Box::new(
154 DynamicRouteProviderBuilder::new(
155 LatencyRoutingSnapshot::new(),
156 seed_nodes?,
157 client,
158 )
159 .build()
160 .await,
161 ) as Box<dyn RouteProvider>,
162 DynamicRoutingStrategy::RoundRobin => Box::new(
163 DynamicRouteProviderBuilder::new(
164 RoundRobinRoutingSnapshot::new(),
165 seed_nodes?,
166 client,
167 )
168 .build()
169 .await,
170 ),
171 };
172 Ok(Self { inner: boxed })
173 }
174 pub async fn run_in_background_with_intervals(
176 seed_domains: Vec<String>,
177 client: Arc<dyn HttpService>,
178 strategy: DynamicRoutingStrategy,
179 list_update_interval: Duration,
180 health_check_interval: Duration,
181 ) -> Result<Self, AgentError> {
182 let seed_nodes: Result<Vec<_>, _> = seed_domains.into_iter().map(Node::new).collect();
183 let boxed = match strategy {
184 DynamicRoutingStrategy::ByLatency => Box::new(
185 DynamicRouteProviderBuilder::new(
186 LatencyRoutingSnapshot::new(),
187 seed_nodes?,
188 client,
189 )
190 .with_fetch_period(list_update_interval)
191 .with_check_period(health_check_interval)
192 .build()
193 .await,
194 ) as Box<dyn RouteProvider>,
195 DynamicRoutingStrategy::RoundRobin => Box::new(
196 DynamicRouteProviderBuilder::new(
197 RoundRobinRoutingSnapshot::new(),
198 seed_nodes?,
199 client,
200 )
201 .with_fetch_period(list_update_interval)
202 .with_check_period(health_check_interval)
203 .build()
204 .await,
205 ),
206 };
207 Ok(Self { inner: boxed })
208 }
209}
210
211impl RouteProvider for DynamicRouteProvider {
212 fn route(&self) -> Result<Url, AgentError> {
213 self.inner.route()
214 }
215 fn n_ordered_routes(&self, n: usize) -> Result<Vec<Url>, AgentError> {
216 self.inner.n_ordered_routes(n)
217 }
218}
219
220#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
222pub enum DynamicRoutingStrategy {
223 ByLatency,
225 RoundRobin,
227}
228
229#[derive(Debug)]
230pub(crate) struct UrlUntilReady<R> {
231 url: Url,
232 router: ArcSwapOption<R>,
233}
234
235impl<R: RouteProvider + 'static> UrlUntilReady<R> {
236 pub(crate) fn new<
237 #[cfg(not(target_family = "wasm"))] F: Future<Output = R> + Send + 'static,
238 #[cfg(target_family = "wasm")] F: Future<Output = R> + 'static,
239 >(
240 url: Url,
241 fut: F,
242 ) -> Arc<Self> {
243 let s = Arc::new(Self {
244 url,
245 router: ArcSwapOption::empty(),
246 });
247 let weak = Arc::downgrade(&s);
248 crate::util::spawn(async move {
249 let router = fut.await;
250 if let Some(outer) = weak.upgrade() {
251 outer.router.store(Some(Arc::new(router)))
252 }
253 });
254 s
255 }
256}
257
258impl<R: RouteProvider> RouteProvider for UrlUntilReady<R> {
259 fn n_ordered_routes(&self, n: usize) -> Result<Vec<Url>, AgentError> {
260 if let Some(r) = &*self.router.load() {
261 r.n_ordered_routes(n)
262 } else {
263 self.url.n_ordered_routes(n)
264 }
265 }
266 fn route(&self) -> Result<Url, AgentError> {
267 if let Some(r) = &*self.router.load() {
268 r.route()
269 } else {
270 self.url.route()
271 }
272 }
273}
274
275#[cfg(test)]
276mod tests {
277 use super::*;
278
279 #[test]
280 fn test_empty_routes() {
281 let provider = RoundRobinRouteProvider::new::<&str>(vec![])
282 .expect("failed to create a route provider");
283 let result = provider.route().unwrap_err();
284 assert_eq!(
285 result,
286 AgentError::RouteProviderError("No routing urls provided".to_string())
287 );
288 }
289
290 #[test]
291 fn test_routes_rotation() {
292 let provider = RoundRobinRouteProvider::new(vec!["https://url1.com", "https://url2.com"])
293 .expect("failed to create a route provider");
294 let url_strings = ["https://url1.com", "https://url2.com", "https://url1.com"];
295 let expected_urls: Vec<Url> = url_strings
296 .iter()
297 .map(|url_str| Url::parse(url_str).expect("Invalid URL"))
298 .collect();
299 let urls: Vec<Url> = (0..3)
300 .map(|_| provider.route().expect("failed to get next url"))
301 .collect();
302 assert_eq!(expected_urls, urls);
303 }
304
305 #[test]
306 fn test_n_routes() {
307 let provider = RoundRobinRouteProvider::new(Vec::<&str>::new())
309 .expect("failed to create a route provider");
310 let urls_iter = provider.n_ordered_routes(1).expect("failed to get urls");
311 assert!(urls_iter.is_empty());
312 let provider = RoundRobinRouteProvider::new(vec![
314 "https://url1.com",
315 "https://url2.com",
316 "https://url3.com",
317 "https://url4.com",
318 "https://url5.com",
319 ])
320 .expect("failed to create a route provider");
321 let urls: Vec<_> = provider.n_ordered_routes(3).expect("failed to get urls");
323 let expected_urls: Vec<Url> = ["https://url1.com", "https://url2.com", "https://url3.com"]
324 .iter()
325 .map(|url_str| Url::parse(url_str).expect("invalid URL"))
326 .collect();
327 assert_eq!(urls, expected_urls);
328 let urls: Vec<_> = provider.n_ordered_routes(3).expect("failed to get urls");
330 let expected_urls: Vec<Url> = ["https://url4.com", "https://url5.com", "https://url1.com"]
331 .iter()
332 .map(|url_str| Url::parse(url_str).expect("invalid URL"))
333 .collect();
334 assert_eq!(urls, expected_urls);
335 let urls: Vec<_> = provider.n_ordered_routes(2).expect("failed to get urls");
337 let expected_urls: Vec<Url> = ["https://url2.com", "https://url3.com"]
338 .iter()
339 .map(|url_str| Url::parse(url_str).expect("invalid URL"))
340 .collect();
341 assert_eq!(urls, expected_urls);
342 let urls: Vec<_> = provider.n_ordered_routes(5).expect("failed to get urls");
344 let expected_urls: Vec<Url> = [
345 "https://url1.com",
346 "https://url2.com",
347 "https://url3.com",
348 "https://url4.com",
349 "https://url5.com",
350 ]
351 .iter()
352 .map(|url_str| Url::parse(url_str).expect("invalid URL"))
353 .collect();
354 assert_eq!(urls, expected_urls);
355 }
356}