use std::time::Duration;
use rand::Rng;
use tokio::sync::watch;
use triomphe::Arc;
use super::mirror::MirrorNetwork;
use super::Network;
use crate::NodeAddressBookQuery;
#[derive(Clone)]
pub(crate) struct ManagedNetwork(Arc<ManagedNetworkInner>);
impl ManagedNetwork {
const NETWORK_FIRST_UPDATE_DELAY: Duration = Duration::from_secs(10);
pub(crate) fn new(
primary: Network,
mirror: MirrorNetwork,
) -> Self {
Self(Arc::new(ManagedNetworkInner { primary, mirror }))
}
pub(crate) fn mainnet() -> Self {
Self::new(Network::mainnet(), MirrorNetwork::mainnet())
}
pub(crate) fn testnet() -> Self {
Self::new(Network::testnet(), MirrorNetwork::testnet())
}
pub(crate) fn previewnet() -> Self {
Self::new(Network::previewnet(), MirrorNetwork::previewnet())
}
}
impl std::ops::Deref for ManagedNetwork {
type Target = ManagedNetworkInner;
fn deref(&self) -> &Self::Target {
&self.0
}
}
pub(crate) struct ManagedNetworkInner {
pub(crate) primary: Network,
pub(crate) mirror: MirrorNetwork,
}
pub(crate) fn spawn_network_update(
network: ManagedNetwork,
initial_update_interval: Option<Duration>,
) -> watch::Sender<Option<Duration>> {
let (tx, rx) = watch::channel(initial_update_interval);
tokio::task::spawn(update_network(network, rx));
tx
}
async fn update_network(
network: ManagedNetwork,
mut update_interval_rx: watch::Receiver<Option<Duration>>,
) {
tokio::time::sleep(ManagedNetwork::NETWORK_FIRST_UPDATE_DELAY).await;
'outer: loop {
let start = tokio::time::Instant::now();
match NodeAddressBookQuery::new()
.execute_mirrornet(network.mirror.load().channel(), None)
.await
{
Ok(it) => network.primary.update_from_address_book(&it),
Err(e) => {
log::warn!("{e:?}");
}
}
let jitter = rand::thread_rng().gen_range(0..100);
'wait: loop {
let update_interval = match update_interval_rx.wait_for(Option::is_some).await {
Ok(it) => it.unwrap(),
Err(e) => {
log::debug!("client network update shutdown: {e}");
return;
}
};
tokio::select! {
_ = tokio::time::sleep_until(start + update_interval + Duration::from_millis(jitter)) => {
continue 'outer
}
_ = update_interval_rx.changed() => continue 'wait,
}
}
}
}