use nym_sphinx::addressing::clients::Recipient;
use nym_topology::{NymRouteProvider, NymTopology, NymTopologyError, NymTopologyMetadata};
use nym_validator_client::models::KeyRotationId;
use std::ops::Deref;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::sync::{Notify, RwLock, RwLockReadGuard};
#[derive(Debug)]
pub struct TopologyAccessorInner {
controlled_manually: AtomicBool,
released_manual_control: Notify,
topology: RwLock<NymRouteProvider>,
}
impl TopologyAccessorInner {
fn new(initial: NymRouteProvider) -> Self {
TopologyAccessorInner {
controlled_manually: AtomicBool::new(false),
released_manual_control: Notify::new(),
topology: RwLock::new(initial),
}
}
async fn update(&self, new: Option<NymTopology>) {
let mut guard = self.topology.write().await;
match new {
Some(updated) => {
guard.update(updated);
}
None => guard.clear_topology(),
}
}
}
pub struct TopologyReadPermit<'a> {
permit: RwLockReadGuard<'a, NymRouteProvider>,
}
impl Deref for TopologyReadPermit<'_> {
type Target = NymRouteProvider;
fn deref(&self) -> &Self::Target {
&self.permit
}
}
impl<'a> TopologyReadPermit<'a> {
pub(crate) fn try_get_valid_topology_ref(
&'a self,
ack_recipient: &Recipient,
packet_recipient: Option<&Recipient>,
) -> Result<&'a NymRouteProvider, NymTopologyError> {
let route_provider = self.permit.deref();
let topology = &route_provider.topology;
topology.ensure_not_empty()?;
topology.ensure_minimally_routable()?;
let _ = route_provider.egress_by_identity(ack_recipient.gateway())?;
if let Some(recipient) = packet_recipient {
let _ = route_provider.egress_by_identity(recipient.gateway())?;
}
Ok(route_provider)
}
}
impl<'a> From<RwLockReadGuard<'a, NymRouteProvider>> for TopologyReadPermit<'a> {
fn from(permit: RwLockReadGuard<'a, NymRouteProvider>) -> Self {
TopologyReadPermit { permit }
}
}
#[derive(Clone, Debug)]
pub struct TopologyAccessor {
inner: Arc<TopologyAccessorInner>,
}
impl TopologyAccessor {
pub fn new(ignore_egress_epoch_roles: bool) -> Self {
TopologyAccessor {
inner: Arc::new(TopologyAccessorInner::new(NymRouteProvider::new_empty(
ignore_egress_epoch_roles,
))),
}
}
pub fn controlled_manually(&self) -> bool {
self.inner.controlled_manually.load(Ordering::SeqCst)
}
pub async fn get_read_permit(&self) -> TopologyReadPermit<'_> {
self.inner.topology.read().await.into()
}
pub(crate) async fn update_global_topology(&self, new_topology: Option<NymTopology>) {
self.inner.update(new_topology).await;
}
pub(crate) async fn wait_for_released_manual_control(&self) {
self.inner.released_manual_control.notified().await
}
#[deprecated(note = "use .current_route_provider instead")]
pub async fn current_topology(&self) -> Option<NymTopology> {
self.current_route_provider()
.await
.as_ref()
.map(|p| p.topology.clone())
}
pub async fn current_route_provider(&self) -> Option<RwLockReadGuard<'_, NymRouteProvider>> {
let provider = self.inner.topology.read().await;
if provider.topology.is_empty() {
None
} else {
Some(provider)
}
}
pub async fn current_mixnet_epoch_id(&self) -> Option<u32> {
let route_provider = self.current_route_provider().await?;
Some(route_provider.absolute_epoch_id())
}
pub async fn current_key_rotation_id(&self) -> Option<KeyRotationId> {
let route_provider = self.current_route_provider().await?;
Some(route_provider.current_key_rotation())
}
pub async fn current_metadata(&self) -> Option<NymTopologyMetadata> {
let route_provider = self.current_route_provider().await?;
Some(route_provider.metadata())
}
pub async fn manually_change_topology(&self, new_topology: NymTopology) {
self.inner.controlled_manually.store(true, Ordering::SeqCst);
self.inner.update(Some(new_topology)).await;
}
pub fn release_manual_control(&self) {
self.inner
.controlled_manually
.store(false, Ordering::SeqCst);
self.inner.released_manual_control.notify_waiters();
}
pub async fn ensure_is_routable(&self) -> Result<(), NymTopologyError> {
self.inner
.topology
.read()
.await
.topology
.ensure_minimally_routable()
}
}