use arc_swap::ArcSwapOption;
use dynamic_routing::{
dynamic_route_provider::DynamicRouteProviderBuilder,
node::Node,
snapshot::{
latency_based_routing::LatencyRoutingSnapshot,
round_robin_routing::RoundRobinRoutingSnapshot,
},
};
use std::{
future::Future,
str::FromStr,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::Duration,
};
use url::Url;
use crate::agent::AgentError;
use super::HttpService;
#[cfg(not(feature = "_internal_dynamic-routing"))]
pub(crate) mod dynamic_routing;
#[cfg(feature = "_internal_dynamic-routing")]
pub mod dynamic_routing;
const IC0_DOMAIN: &str = "ic0.app";
const ICP0_DOMAIN: &str = "icp0.io";
const ICP_API_DOMAIN: &str = "icp-api.io";
const LOCALHOST_DOMAIN: &str = "localhost";
const IC0_SUB_DOMAIN: &str = ".ic0.app";
const ICP0_SUB_DOMAIN: &str = ".icp0.io";
const ICP_API_SUB_DOMAIN: &str = ".icp-api.io";
const LOCALHOST_SUB_DOMAIN: &str = ".localhost";
pub trait RouteProvider: std::fmt::Debug + Send + Sync {
fn route(&self) -> Result<Url, AgentError>;
fn n_ordered_routes(&self, n: usize) -> Result<Vec<Url>, AgentError>;
}
#[derive(Debug)]
pub struct RoundRobinRouteProvider {
routes: Vec<Url>,
current_idx: AtomicUsize,
}
impl RouteProvider for RoundRobinRouteProvider {
fn route(&self) -> Result<Url, AgentError> {
if self.routes.is_empty() {
return Err(AgentError::RouteProviderError(
"No routing urls provided".to_string(),
));
}
let prev_idx = self.current_idx.fetch_add(1, Ordering::Relaxed);
Ok(self.routes[prev_idx % self.routes.len()].clone())
}
fn n_ordered_routes(&self, n: usize) -> Result<Vec<Url>, AgentError> {
if n == 0 {
return Ok(Vec::new());
}
if n >= self.routes.len() {
return Ok(self.routes.clone());
}
let idx = self.current_idx.fetch_add(n, Ordering::Relaxed) % self.routes.len();
let mut urls = Vec::with_capacity(n);
if self.routes.len() - idx >= n {
urls.extend_from_slice(&self.routes[idx..idx + n]);
} else {
urls.extend_from_slice(&self.routes[idx..]);
urls.extend_from_slice(&self.routes[..n - urls.len()]);
}
Ok(urls)
}
}
impl RoundRobinRouteProvider {
pub fn new<T: AsRef<str>>(routes: Vec<T>) -> Result<Self, AgentError> {
let routes: Result<Vec<Url>, _> = routes
.into_iter()
.map(|url| {
Url::from_str(url.as_ref()).and_then(|mut url| {
if let Some(domain) = url.domain() {
if domain.ends_with(IC0_SUB_DOMAIN) {
url.set_host(Some(IC0_DOMAIN))?;
} else if domain.ends_with(ICP0_SUB_DOMAIN) {
url.set_host(Some(ICP0_DOMAIN))?;
} else if domain.ends_with(ICP_API_SUB_DOMAIN) {
url.set_host(Some(ICP_API_DOMAIN))?;
} else if domain.ends_with(LOCALHOST_SUB_DOMAIN) {
url.set_host(Some(LOCALHOST_DOMAIN))?;
}
}
Ok(url)
})
})
.collect();
Ok(Self {
routes: routes?,
current_idx: AtomicUsize::new(0),
})
}
}
impl RouteProvider for Url {
fn route(&self) -> Result<Url, AgentError> {
Ok(self.clone())
}
fn n_ordered_routes(&self, _: usize) -> Result<Vec<Url>, AgentError> {
Ok(vec![self.route()?])
}
}
#[derive(Debug)]
pub struct DynamicRouteProvider {
inner: Box<dyn RouteProvider>,
}
impl DynamicRouteProvider {
pub async fn run_in_background(
seed_domains: Vec<String>,
client: Arc<dyn HttpService>,
strategy: DynamicRoutingStrategy,
) -> Result<Self, AgentError> {
let seed_nodes: Result<Vec<_>, _> = seed_domains.into_iter().map(Node::new).collect();
let boxed = match strategy {
DynamicRoutingStrategy::ByLatency => Box::new(
DynamicRouteProviderBuilder::new(
LatencyRoutingSnapshot::new(),
seed_nodes?,
client,
)
.build()
.await,
) as Box<dyn RouteProvider>,
DynamicRoutingStrategy::RoundRobin => Box::new(
DynamicRouteProviderBuilder::new(
RoundRobinRoutingSnapshot::new(),
seed_nodes?,
client,
)
.build()
.await,
),
};
Ok(Self { inner: boxed })
}
pub async fn run_in_background_with_intervals(
seed_domains: Vec<String>,
client: Arc<dyn HttpService>,
strategy: DynamicRoutingStrategy,
list_update_interval: Duration,
health_check_interval: Duration,
) -> Result<Self, AgentError> {
let seed_nodes: Result<Vec<_>, _> = seed_domains.into_iter().map(Node::new).collect();
let boxed = match strategy {
DynamicRoutingStrategy::ByLatency => Box::new(
DynamicRouteProviderBuilder::new(
LatencyRoutingSnapshot::new(),
seed_nodes?,
client,
)
.with_fetch_period(list_update_interval)
.with_check_period(health_check_interval)
.build()
.await,
) as Box<dyn RouteProvider>,
DynamicRoutingStrategy::RoundRobin => Box::new(
DynamicRouteProviderBuilder::new(
RoundRobinRoutingSnapshot::new(),
seed_nodes?,
client,
)
.with_fetch_period(list_update_interval)
.with_check_period(health_check_interval)
.build()
.await,
),
};
Ok(Self { inner: boxed })
}
}
impl RouteProvider for DynamicRouteProvider {
fn route(&self) -> Result<Url, AgentError> {
self.inner.route()
}
fn n_ordered_routes(&self, n: usize) -> Result<Vec<Url>, AgentError> {
self.inner.n_ordered_routes(n)
}
}
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
pub enum DynamicRoutingStrategy {
ByLatency,
RoundRobin,
}
#[derive(Debug)]
pub(crate) struct UrlUntilReady<R> {
url: Url,
router: ArcSwapOption<R>,
}
impl<R: RouteProvider + 'static> UrlUntilReady<R> {
pub(crate) fn new<
#[cfg(not(target_family = "wasm"))] F: Future<Output = R> + Send + 'static,
#[cfg(target_family = "wasm")] F: Future<Output = R> + 'static,
>(
url: Url,
fut: F,
) -> Arc<Self> {
let s = Arc::new(Self {
url,
router: ArcSwapOption::empty(),
});
let weak = Arc::downgrade(&s);
crate::util::spawn(async move {
let router = fut.await;
if let Some(outer) = weak.upgrade() {
outer.router.store(Some(Arc::new(router)))
}
});
s
}
}
impl<R: RouteProvider> RouteProvider for UrlUntilReady<R> {
fn n_ordered_routes(&self, n: usize) -> Result<Vec<Url>, AgentError> {
if let Some(r) = &*self.router.load() {
r.n_ordered_routes(n)
} else {
self.url.n_ordered_routes(n)
}
}
fn route(&self) -> Result<Url, AgentError> {
if let Some(r) = &*self.router.load() {
r.route()
} else {
self.url.route()
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_empty_routes() {
let provider = RoundRobinRouteProvider::new::<&str>(vec![])
.expect("failed to create a route provider");
let result = provider.route().unwrap_err();
assert_eq!(
result,
AgentError::RouteProviderError("No routing urls provided".to_string())
);
}
#[test]
fn test_routes_rotation() {
let provider = RoundRobinRouteProvider::new(vec!["https://url1.com", "https://url2.com"])
.expect("failed to create a route provider");
let url_strings = ["https://url1.com", "https://url2.com", "https://url1.com"];
let expected_urls: Vec<Url> = url_strings
.iter()
.map(|url_str| Url::parse(url_str).expect("Invalid URL"))
.collect();
let urls: Vec<Url> = (0..3)
.map(|_| provider.route().expect("failed to get next url"))
.collect();
assert_eq!(expected_urls, urls);
}
#[test]
fn test_n_routes() {
let provider = RoundRobinRouteProvider::new(Vec::<&str>::new())
.expect("failed to create a route provider");
let urls_iter = provider.n_ordered_routes(1).expect("failed to get urls");
assert!(urls_iter.is_empty());
let provider = RoundRobinRouteProvider::new(vec![
"https://url1.com",
"https://url2.com",
"https://url3.com",
"https://url4.com",
"https://url5.com",
])
.expect("failed to create a route provider");
let urls: Vec<_> = provider.n_ordered_routes(3).expect("failed to get urls");
let expected_urls: Vec<Url> = ["https://url1.com", "https://url2.com", "https://url3.com"]
.iter()
.map(|url_str| Url::parse(url_str).expect("invalid URL"))
.collect();
assert_eq!(urls, expected_urls);
let urls: Vec<_> = provider.n_ordered_routes(3).expect("failed to get urls");
let expected_urls: Vec<Url> = ["https://url4.com", "https://url5.com", "https://url1.com"]
.iter()
.map(|url_str| Url::parse(url_str).expect("invalid URL"))
.collect();
assert_eq!(urls, expected_urls);
let urls: Vec<_> = provider.n_ordered_routes(2).expect("failed to get urls");
let expected_urls: Vec<Url> = ["https://url2.com", "https://url3.com"]
.iter()
.map(|url_str| Url::parse(url_str).expect("invalid URL"))
.collect();
assert_eq!(urls, expected_urls);
let urls: Vec<_> = provider.n_ordered_routes(5).expect("failed to get urls");
let expected_urls: Vec<Url> = [
"https://url1.com",
"https://url2.com",
"https://url3.com",
"https://url4.com",
"https://url5.com",
]
.iter()
.map(|url_str| Url::parse(url_str).expect("invalid URL"))
.collect();
assert_eq!(urls, expected_urls);
}
}