use std::net::SocketAddr;
use std::ops::ControlFlow;
use namada_sdk::control_flow::time;
use namada_sdk::time::{DateTimeUtc, Utc};
use tokio::sync::mpsc::UnboundedReceiver;
use crate::tendermint_rpc::client::CompatMode;
use crate::tendermint_rpc::{Client, HttpClient};
const DEFAULT_BROADCAST_TIMEOUT: u64 = 180;
const BROADCASTER_TIMEOUT_ENV_VAR: &str = "NAMADA_BROADCASTER_TIMEOUT_SECS";
pub struct Broadcaster {
client: HttpClient,
receiver: UnboundedReceiver<Vec<u8>>,
}
impl Broadcaster {
pub fn new(url: SocketAddr, receiver: UnboundedReceiver<Vec<u8>>) -> Self {
Self {
client: HttpClient::builder(
format!("http://{}", url).as_str().try_into().unwrap(),
)
.compat_mode(CompatMode::V0_37)
.timeout(std::time::Duration::from_secs(30))
.build()
.unwrap(),
receiver,
}
}
async fn run_loop(&mut self, genesis_time: DateTimeUtc) {
#[allow(clippy::disallowed_methods)]
let now = Utc::now();
if let Ok(sleep_time) =
genesis_time.0.signed_duration_since(now).to_std()
{
if !sleep_time.is_zero() {
tokio::time::sleep(sleep_time).await;
}
}
let result = time::Sleep {
strategy: time::ExponentialBackoff {
base: 2,
as_duration: time::Duration::from_secs,
},
}
.run(|| async {
let timeout =
if let Ok(value) = std::env::var(BROADCASTER_TIMEOUT_ENV_VAR) {
value.parse::<u64>().unwrap_or(DEFAULT_BROADCAST_TIMEOUT)
} else {
DEFAULT_BROADCAST_TIMEOUT
};
let now = {
#[allow(clippy::disallowed_methods)]
time::Instant::now()
};
let status_result = time::Sleep {
strategy: time::Constant(time::Duration::from_secs(1)),
}
.timeout(
#[allow(clippy::arithmetic_side_effects)]
{
now + time::Duration::from_secs(timeout)
},
|| async {
match self.client.status().await {
Ok(status) => ControlFlow::Break(status),
Err(_) => ControlFlow::Continue(()),
}
},
)
.await;
let status = match status_result {
Ok(status) => status,
Err(_) => return ControlFlow::Break(Err(())),
};
if status.sync_info.catching_up {
ControlFlow::Continue(())
} else {
ControlFlow::Break(Ok(()))
}
})
.await;
if let Err(()) = result {
tracing::error!("Broadcaster failed to connect to CometBFT node");
return;
} else {
tracing::info!("Broadcaster successfully started.");
}
loop {
if let Some(msg) = self.receiver.recv().await {
let _ = self.client.broadcast_tx_sync(msg).await;
}
}
}
pub async fn run(
&mut self,
abort_recv: tokio::sync::oneshot::Receiver<()>,
genesis_time: DateTimeUtc,
) {
tracing::info!("Starting broadcaster.");
tokio::select! {
_ = self.run_loop(genesis_time) => {
tracing::error!("Broadcaster unexpectedly shut down.");
tracing::info!("Shutting down broadcaster...");
},
resp_sender = abort_recv => {
match resp_sender {
Ok(_) => {
tracing::info!("Shutting down broadcaster...");
},
Err(err) => {
tracing::error!("The broadcaster abort sender has unexpectedly dropped: {}", err);
tracing::info!("Shutting down broadcaster...");
}
}
}
}
}
}