pub(crate) use accessor::{TopologyAccessor, TopologyReadPermit};
use futures::StreamExt;
use nym_sphinx::addressing::nodes::NodeIdentity;
use nym_topology::NymTopologyError;
use std::time::Duration;
use tracing::*;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::sleep;
#[cfg(target_arch = "wasm32")]
use wasmtimer::tokio::sleep;
mod accessor;
pub mod nym_api_provider;
pub use nym_api_provider::{Config as NymApiTopologyProviderConfig, NymApiTopologyProvider};
pub use nym_topology::provider_trait::{ToTopologyMetadata, TopologyProvider};
const MAX_FAILURE_COUNT: usize = 10;
pub struct TopologyRefresherConfig {
pub refresh_rate: Duration,
}
impl TopologyRefresherConfig {
pub fn new(refresh_rate: Duration) -> Self {
TopologyRefresherConfig { refresh_rate }
}
}
pub struct TopologyRefresher {
topology_provider: Box<dyn TopologyProvider + Send + Sync>,
topology_accessor: TopologyAccessor,
refresh_rate: Duration,
consecutive_failure_count: usize,
}
impl TopologyRefresher {
pub fn new(
cfg: TopologyRefresherConfig,
topology_accessor: TopologyAccessor,
topology_provider: Box<dyn TopologyProvider + Send + Sync>,
) -> Self {
TopologyRefresher {
topology_provider,
topology_accessor,
refresh_rate: cfg.refresh_rate,
consecutive_failure_count: 0,
}
}
pub fn change_topology_provider(&mut self, provider: Box<dyn TopologyProvider + Send + Sync>) {
self.topology_provider = provider;
}
pub async fn try_refresh(&mut self) {
trace!("Refreshing the topology");
if self.topology_accessor.controlled_manually() {
info!("topology is being controlled manually - we're going to wait until the control is released...");
self.topology_accessor
.wait_for_released_manual_control()
.await;
}
let new_topology = self.topology_provider.get_new_topology().await;
if new_topology.is_none() {
warn!("failed to obtain new network topology");
}
if new_topology.is_none() && self.consecutive_failure_count < MAX_FAILURE_COUNT {
warn!("we're going to keep on using the old topology for this iteration");
self.consecutive_failure_count += 1;
return;
} else if new_topology.is_some() {
self.consecutive_failure_count = 0;
}
self.topology_accessor
.update_global_topology(new_topology)
.await;
}
pub async fn ensure_topology_is_routable(&self) -> Result<(), NymTopologyError> {
self.topology_accessor.ensure_is_routable().await
}
pub async fn ensure_contains_routable_egress(
&self,
egress: NodeIdentity,
) -> Result<(), NymTopologyError> {
let topology = self
.topology_accessor
.current_route_provider()
.await
.ok_or(NymTopologyError::EmptyNetworkTopology)?;
let _ = topology.egress_by_identity(egress)?;
Ok(())
}
pub async fn wait_for_gateway(
&mut self,
gateway: NodeIdentity,
timeout_duration: Duration,
) -> Result<(), NymTopologyError> {
info!(
"going to wait for at most {timeout_duration:?} for gateway '{gateway}' to come online"
);
let deadline = sleep(timeout_duration);
tokio::pin!(deadline);
loop {
tokio::select! {
_ = &mut deadline => {
return Err(NymTopologyError::TimedOutWaitingForGateway {
identity_key: gateway.to_base58_string()
})
}
_ = self.try_refresh() => {
if self.ensure_contains_routable_egress(gateway).await.is_ok() {
return Ok(())
}
info!("gateway '{gateway}' is still not online...");
sleep(self.refresh_rate).await
}
}
}
}
pub async fn run(&mut self) {
debug!("Started TopologyRefresher with graceful shutdown support");
#[cfg(not(target_arch = "wasm32"))]
let mut interval =
tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(self.refresh_rate));
#[cfg(target_arch = "wasm32")]
let mut interval =
gloo_timers::future::IntervalStream::new(self.refresh_rate.as_millis() as u32);
#[cfg(not(target_arch = "wasm32"))]
interval.next().await;
while interval.next().await.is_some() {
self.try_refresh().await;
}
error!("topology refresher interval has been exhausted!")
}
}