use crate::runtime::BALTER_OUT;
use axum::extract::ws::WebSocket;
use futures_util::{SinkExt, StreamExt};
use serde::{Deserialize, Serialize};
use std::{
collections::{HashMap, HashSet},
net::SocketAddr,
sync::{Arc, RwLock},
time::Duration,
};
use thiserror::Error;
use time::OffsetDateTime;
use tokio_tungstenite::{connect_async, tungstenite::Message};
use tracing::{debug, error, instrument, trace};
use url::Url;
pub(crate) type SharedGossipData = Arc<RwLock<GossipData>>;
#[derive(Debug)]
pub(crate) struct GossipData {
seed_list: HashSet<SocketAddr>,
cluster_info: HashMap<SocketAddr, PeerInfo>,
addr: Option<SocketAddr>,
port: u16,
}
impl GossipData {
pub fn new(seed_list: &[SocketAddr], port: u16) -> Self {
let seed_list = HashSet::from_iter(seed_list.iter().copied());
Self {
seed_list,
cluster_info: HashMap::new(),
addr: None,
port,
}
}
pub(crate) fn shared(self) -> SharedGossipData {
Arc::new(RwLock::new(self))
}
fn select_gossip_peer(&self) -> Option<SocketAddr> {
let (oldest_peer, _) = self
.cluster_info
.iter()
.filter(|(&addr, _)| addr != self.addr.unwrap())
.filter(|(_, peer)| peer.state != PeerState::Unreachable)
.min_by_key(|(_, peer)| peer.last_timestamp_utc)?;
Some(*oldest_peer)
}
fn select_peer_for_work(&self) -> Option<SocketAddr> {
let (free_peer, _) = self
.cluster_info
.iter()
.filter(|(&addr, _)| {
if let Some(self_addr) = self.addr {
addr != self_addr
} else {
true
}
})
.filter(|(_, peer)| peer.state == PeerState::Free)
.min_by_key(|(_, peer)| peer.last_timestamp_utc)?;
Some(*free_peer)
}
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub(crate) struct PeerInfo {
version: u64,
last_timestamp_utc: OffsetDateTime,
state: PeerState,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub(crate) enum PeerState {
Free,
Busy,
Unreachable,
Unknown,
}
#[derive(Error, Debug)]
pub(crate) enum GossipError {
#[error("Poisoned lock error.")]
PoisonError,
#[error("Url parsing error: {0}")]
UrlError(#[from] url::ParseError),
#[error("Websocket error: {0}")]
WsError(#[from] tokio_tungstenite::tungstenite::Error),
#[error("Error with serialization: {0}")]
BincodeError(#[from] Box<bincode::ErrorKind>),
#[error("Empty message received during gossip when one is expected")]
NoMessage,
#[error("Message received is not binary")]
NonBinary,
#[error("Message received has an invalid addr: Expected {0}, Actual {1}")]
InvalidAddr(SocketAddr, SocketAddr),
#[error("Axum websocket error: {0}")]
AxumError(#[from] axum::Error),
#[error("Reqwest error: {0}")]
ReqwestError(#[from] reqwest::Error),
}
pub(crate) async fn gossip_task(shared_data: SharedGossipData) -> Result<(), GossipError> {
let mut interval = tokio::time::interval(Duration::from_millis(5000));
tokio::spawn(work_gossip_task(shared_data.clone()));
loop {
interval.tick().await;
let peer = {
let gossip_data = shared_data.read().map_err(|_| GossipError::PoisonError)?;
if let Some(peer) = gossip_data.seed_list.iter().last() {
*peer
} else {
break;
}
};
match gossip_with_peer(&shared_data, peer).await {
Ok(_) => {
debug!("Successfully gossiped with {peer}.");
trace!("New State: {:?}", shared_data.read().unwrap());
{
let mut data = shared_data.write().map_err(|_| GossipError::PoisonError)?;
data.seed_list.remove(&peer);
}
}
Err(err) => {
error!("Error gossiping: {err:?}");
}
}
}
debug!("Seed list complete.");
loop {
interval.tick().await;
let peer: Option<SocketAddr> = shared_data
.read()
.map_err(|_| GossipError::PoisonError)?
.select_gossip_peer();
if let Some(peer) = peer {
match gossip_with_peer(&shared_data, peer).await {
Ok(_) => {
debug!("Successfully gossiped with {peer}.");
trace!("New State: {:?}", shared_data.read().unwrap());
}
Err(err) => {
error!("Error gossiping: {err:?}");
}
}
} else {
debug!("No peers available to gossip with");
}
}
}
async fn work_gossip_task(shared_data: SharedGossipData) -> Result<(), GossipError> {
let (_, ref rx) = *BALTER_OUT;
loop {
match rx.recv().await {
Ok(config) => {
debug!("Requesting help from a peer for scenario {}", config.name);
let addr = {
let mut data = shared_data.write().map_err(|_| GossipError::PoisonError)?;
if let Some(self_addr) = data.addr {
if let Some(val) = data.cluster_info.get_mut(&self_addr) {
val.state = PeerState::Busy;
}
}
data.select_peer_for_work()
};
let addr = if let Some(addr) = addr {
addr
} else {
error!("No peers available for work.");
continue;
};
let url = Url::parse(&format!("http://{}/run", addr))?;
let client = reqwest::Client::new();
let res = client.post(url).json(&config).send().await?;
if !res.status().is_success() {
error!(
"Peer error: status_code={}, text={}",
res.status(),
res.text().await?
);
}
}
Err(err) => {
error!("Task for requesting help from peers has errored: {err:?}");
break;
}
}
}
Ok(())
}
#[derive(Serialize, Deserialize, Debug, Clone)]
struct GossipMessage {
addr: SocketAddr,
cluster_info: HashMap<SocketAddr, PeerInfo>,
}
#[instrument(skip(shared_data))]
async fn gossip_with_peer(
shared_data: &SharedGossipData,
addr: SocketAddr,
) -> Result<(), GossipError> {
let url = Url::parse(&format!("ws://{}/info-ws", addr))?;
let (mut ws_stream, _) = connect_async(url).await?;
trace!("Successfully negotiated stream");
let data = {
let cluster_info = shared_data
.read()
.map_err(|_| GossipError::PoisonError)?
.cluster_info
.clone();
let message = GossipMessage { addr, cluster_info };
bincode::serialize(&message)?
};
ws_stream.send(Message::Binary(data)).await?;
let message = ws_stream.next().await.ok_or(GossipError::NoMessage)??;
let message: GossipMessage = match message {
Message::Binary(val) => bincode::deserialize(&val)?,
_ => return Err(GossipError::NonBinary),
};
sync_cluster_info(shared_data, &message)?;
Ok(())
}
pub(crate) async fn receive_gossip(
mut socket: WebSocket,
shared_data: &SharedGossipData,
peer_addr: SocketAddr,
) -> Result<(), GossipError> {
let message = socket.recv().await.ok_or(GossipError::NoMessage)??;
let message: GossipMessage = match message {
axum::extract::ws::Message::Binary(val) => bincode::deserialize(&val)?,
_ => return Err(GossipError::NonBinary),
};
let data = {
sync_cluster_info(shared_data, &message)?;
let message = GossipMessage {
addr: peer_addr,
cluster_info: shared_data
.read()
.map_err(|_| GossipError::PoisonError)?
.cluster_info
.clone(),
};
bincode::serialize(&message)?
};
socket
.send(axum::extract::ws::Message::Binary(data))
.await?;
Ok(())
}
fn sync_cluster_info(
shared_data: &SharedGossipData,
message: &GossipMessage,
) -> Result<(), GossipError> {
let shared_data = &mut shared_data.write().map_err(|_| GossipError::PoisonError)?;
let self_addr = {
let mut message_addr = message.addr;
message_addr.set_port(shared_data.port);
if let Some(addr) = shared_data.addr {
if addr != message_addr {
return Err(GossipError::InvalidAddr(addr, message_addr));
}
addr
} else {
shared_data.addr = Some(message_addr);
message_addr
}
};
for (addr, peer_info) in message.cluster_info.iter() {
let local_peer_info = shared_data.cluster_info.get(addr);
if let Some(local_peer_info) = local_peer_info {
if local_peer_info.version < peer_info.version {
shared_data.cluster_info.insert(*addr, peer_info.clone());
}
} else {
shared_data.cluster_info.insert(*addr, peer_info.clone());
}
}
#[allow(clippy::map_entry)]
if shared_data.cluster_info.contains_key(&self_addr) {
let info = shared_data.cluster_info.get_mut(&self_addr).unwrap();
info.version += 1;
info.last_timestamp_utc = OffsetDateTime::now_utc();
} else {
shared_data.cluster_info.insert(
self_addr,
PeerInfo {
version: 1,
last_timestamp_utc: OffsetDateTime::now_utc(),
state: PeerState::Free,
},
);
}
Ok(())
}