use std::{
future::Future,
time::{Duration, Instant},
};
use futures::{future, SinkExt, StreamExt};
use log::*;
use multiaddr::Multiaddr;
use tari_shutdown::ShutdownSignal;
use tokio::{
io::{AsyncRead, AsyncWrite, AsyncWriteExt},
sync::watch,
time,
};
use tokio_util::codec::{Framed, LinesCodec, LinesCodecError};
use crate::{connection_manager::wire_mode::WireMode, transports::Transport};
const MAX_LINE_LENGTH: usize = 50;
const LOG_TARGET: &str = "comms::connection_manager::self_liveness";
pub struct SelfLivenessSession<TSocket> {
framed: Framed<TSocket, LinesCodec>,
}
impl<TSocket> SelfLivenessSession<TSocket>
where TSocket: AsyncRead + AsyncWrite + Unpin
{
pub fn new(socket: TSocket) -> Self {
Self {
framed: Framed::new(socket, LinesCodec::new_with_max_length(MAX_LINE_LENGTH)),
}
}
pub fn run(self) -> impl Future<Output = Result<(), LinesCodecError>> {
let (sink, stream) = self.framed.split();
stream.forward(sink)
}
}
#[derive(Debug, Clone, Copy)]
pub enum SelfLivenessStatus {
Disabled,
Checking,
Unreachable,
Live(Duration),
}
pub struct SelfLivenessCheck<TTransport> {
transport: TTransport,
addresses: Vec<Multiaddr>,
interval: Duration,
tx_watch: watch::Sender<SelfLivenessStatus>,
shutdown_signal: ShutdownSignal,
}
impl<TTransport> SelfLivenessCheck<TTransport>
where
TTransport: Transport + Send + Sync + 'static,
TTransport::Output: AsyncRead + AsyncWrite + Unpin + Send,
{
pub fn spawn(
transport: TTransport,
addresses: Vec<Multiaddr>,
interval: Duration,
shutdown_signal: ShutdownSignal,
) -> watch::Receiver<SelfLivenessStatus> {
let (tx_watch, rx_watch) = watch::channel(SelfLivenessStatus::Checking);
let check = Self {
transport,
addresses,
interval,
tx_watch,
shutdown_signal,
};
tokio::spawn(check.run_until_shutdown());
rx_watch
}
pub async fn run_until_shutdown(self) {
let shutdown_signal = self.shutdown_signal.clone();
let run_fut = self.run();
tokio::pin!(run_fut);
future::select(run_fut, shutdown_signal).await;
}
pub async fn run(mut self) {
if self.addresses.is_empty() {
warn!(target: LOG_TARGET, "🔌️ No addresses to check");
return;
}
info!(
target: LOG_TARGET,
"🔌️ Starting liveness self-check with interval {:.2?}", self.interval
);
let mut current_address_idx = 0;
loop {
let timer = Instant::now();
let _ = self.tx_watch.send(SelfLivenessStatus::Checking);
let address = self
.addresses
.get(current_address_idx)
.expect("index should exist")
.clone();
match self.transport.dial(&address).await {
Ok(mut socket) => {
debug!(
target: LOG_TARGET,
"🔌 self liveness dial ({}) took {:.2?}",
address,
timer.elapsed()
);
if let Err(err) = socket.write(&[WireMode::Liveness.as_byte()]).await {
warn!(target: LOG_TARGET, "🔌️ self liveness failed to write byte: {err}");
self.tx_watch.send_replace(SelfLivenessStatus::Unreachable);
continue;
}
let mut framed = Framed::new(socket, LinesCodec::new_with_max_length(MAX_LINE_LENGTH));
loop {
match self.ping_pong(&mut framed).await {
Ok(Some(latency)) => {
debug!(target: LOG_TARGET, "⚡️️ self liveness check latency {latency:.2?}");
self.tx_watch.send_replace(SelfLivenessStatus::Live(latency));
},
Ok(None) => {
info!(target: LOG_TARGET, "🔌️ self liveness connection closed");
self.tx_watch.send_replace(SelfLivenessStatus::Unreachable);
break;
},
Err(err) => {
warn!(target: LOG_TARGET, "🔌️ self liveness ping pong failed: {err}");
self.tx_watch.send_replace(SelfLivenessStatus::Unreachable);
break;
},
}
time::sleep(self.interval).await;
}
},
Err(err) => {
current_address_idx = (current_address_idx + 1) % self.addresses.len();
self.tx_watch.send_replace(SelfLivenessStatus::Unreachable);
warn!(
target: LOG_TARGET,
"🔌️ Failed to dial own public address {address} for self check: {err}"
);
},
}
time::sleep(self.interval).await;
}
}
async fn ping_pong(
&mut self,
framed: &mut Framed<TTransport::Output, LinesCodec>,
) -> Result<Option<Duration>, LinesCodecError> {
let timer = Instant::now();
framed.send("pingpong".to_string()).await?;
match framed.next().await {
Some(res) => {
let val = res?;
trace!(target: LOG_TARGET, "Received: {val}");
Ok(Some(timer.elapsed()))
},
None => Ok(None),
}
}
}
#[cfg(test)]
mod test {
use tokio_stream::StreamExt;
use super::*;
use crate::memsocket::MemorySocket;
#[tokio::test]
async fn echos() {
let (inbound, outbound) = MemorySocket::new_pair();
let liveness = SelfLivenessSession::new(inbound);
let join_handle = tokio::spawn(liveness.run());
let mut outbound = Framed::new(outbound, LinesCodec::new());
for _ in 0..10usize {
outbound.send("ECHO".to_string()).await.unwrap()
}
let pings = outbound.take(10).collect::<Vec<_>>().await;
assert_eq!(pings.len(), 10);
assert!(pings.iter().all(|a| a.as_ref().unwrap() == "ECHO"));
time::timeout(Duration::from_secs(1), join_handle)
.await
.unwrap()
.unwrap()
.unwrap();
}
}