use async_trait::async_trait;
use candid::Principal;
use futures_util::FutureExt;
use std::{fmt::Debug, sync::Arc, time::Duration};
use stop_token::StopToken;
use url::Url;
#[allow(unused)]
use crate::agent::route_provider::dynamic_routing::health_check::HEALTH_MANAGER_ACTOR;
use crate::agent::{
route_provider::dynamic_routing::{
dynamic_route_provider::DynamicRouteProviderError,
messages::FetchedNodes,
node::Node,
snapshot::routing_snapshot::RoutingSnapshot,
type_aliases::{AtomicSwap, SenderWatch},
},
Agent, HttpService,
};
#[allow(unused)]
const NODES_FETCH_ACTOR: &str = "NodesFetchActor";
#[cfg_attr(target_family = "wasm", async_trait(?Send))]
#[cfg_attr(not(target_family = "wasm"), async_trait)]
pub trait Fetch: Sync + Send + Debug {
async fn fetch(&self, url: Url) -> Result<Vec<Node>, DynamicRouteProviderError>;
}
#[derive(Debug)]
pub struct NodesFetcher {
http_client: Arc<dyn HttpService>,
subnet_id: Principal,
root_key: Option<Vec<u8>>,
}
impl NodesFetcher {
pub fn new(
http_client: Arc<dyn HttpService>,
subnet_id: Principal,
root_key: Option<Vec<u8>>,
) -> Self {
Self {
http_client,
subnet_id,
root_key,
}
}
}
#[cfg_attr(target_family = "wasm", async_trait(?Send))]
#[cfg_attr(not(target_family = "wasm"), async_trait)]
impl Fetch for NodesFetcher {
async fn fetch(&self, url: Url) -> Result<Vec<Node>, DynamicRouteProviderError> {
let agent = Agent::builder()
.with_url(url)
.with_arc_http_middleware(self.http_client.clone())
.build()
.map_err(|err| {
DynamicRouteProviderError::NodesFetchError(format!(
"Failed to build the agent: {err}"
))
})?;
if let Some(key) = self.root_key.clone() {
agent.set_root_key(key);
}
let api_bns = agent
.fetch_api_boundary_nodes_by_subnet_id(self.subnet_id)
.await
.map_err(|err| {
DynamicRouteProviderError::NodesFetchError(format!(
"Failed to fetch API nodes: {err}"
))
})?;
let nodes = api_bns
.into_iter()
.filter_map(|api_node| api_node.try_into().ok())
.collect();
return Ok(nodes);
}
}
pub(super) struct NodesFetchActor<S> {
fetcher: Arc<dyn Fetch>,
period: Duration,
fetch_retry_interval: Duration,
fetch_sender: SenderWatch<FetchedNodes>,
routing_snapshot: AtomicSwap<S>,
token: StopToken,
}
impl<S> NodesFetchActor<S>
where
S: RoutingSnapshot,
{
pub fn new(
fetcher: Arc<dyn Fetch>,
period: Duration,
retry_interval: Duration,
fetch_sender: SenderWatch<FetchedNodes>,
snapshot: AtomicSwap<S>,
token: StopToken,
) -> Self {
Self {
fetcher,
period,
fetch_retry_interval: retry_interval,
fetch_sender,
routing_snapshot: snapshot,
token,
}
}
pub async fn run(self) {
loop {
loop {
let snapshot = self.routing_snapshot.load();
if let Some(node) = snapshot.next_node() {
let fetch_result = futures_util::select! {
result = self.fetcher.fetch((&node).into()).fuse() => result,
_ = self.token.clone().fuse() => {
log!(warn, "{NODES_FETCH_ACTOR}: was gracefully cancelled");
return;
}
};
match fetch_result {
Ok(nodes) => {
let msg = Some(FetchedNodes { nodes });
match self.fetch_sender.send(msg) {
Ok(()) => break, Err(_err) => {
log!(error, "{NODES_FETCH_ACTOR}: failed to send results to {HEALTH_MANAGER_ACTOR}: {_err:?}");
}
}
}
Err(_err) => {
log!(
error,
"{NODES_FETCH_ACTOR}: failed to fetch nodes: {_err:?}"
);
}
};
} else {
log!(error, "{NODES_FETCH_ACTOR}: no nodes in the snapshot");
break;
};
log!(
warn,
"Retrying to fetch the nodes in {:?}",
self.fetch_retry_interval
);
futures_util::select! {
_ = crate::util::sleep(self.fetch_retry_interval).fuse() => {}
_ = self.token.clone().fuse() => {
log!(warn, "{NODES_FETCH_ACTOR}: was gracefully cancelled");
return;
}
}
}
futures_util::select! {
_ = crate::util::sleep(self.period).fuse() => {
continue;
}
_ = self.token.clone().fuse() => {
log!(warn, "{NODES_FETCH_ACTOR}: was gracefully cancelled");
break;
}
}
}
}
}