chitchat 0.10.1

Cluster membership library using gossip with Scuttlebutt reconciliation.
Documentation
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use rand::distr::Bernoulli;
use rand::prelude::{Distribution, SmallRng};
use rand::{SeedableRng, rng};
use tokio::sync::RwLock;

use crate::ChitchatMessage;
use crate::transport::{Socket, Transport};

struct TransportWithDelay<D: Distribution<f32> + Send + Sync + 'static> {
    delay_secs: D,
    transport: Box<dyn Transport>,
}

pub trait DelayMillisDist: Distribution<f32> + Send + Sync + Clone + 'static {}

#[async_trait]
impl<D: DelayMillisDist> Transport for TransportWithDelay<D> {
    async fn open(&self, listen_addr: SocketAddr) -> anyhow::Result<Box<dyn Socket>> {
        let rng = SmallRng::from_rng(&mut rng());
        let socket = self.transport.open(listen_addr).await?;
        Ok(Box::new(SocketWithDelay {
            delay_secs: self.delay_secs.clone(),
            socket: Arc::new(RwLock::new(socket)),
            rng,
        }))
    }
}

struct SocketWithDelay<D: Distribution<f32> + Send + Sync + 'static> {
    delay_secs: D,
    socket: Arc<RwLock<Box<dyn Socket>>>,
    rng: SmallRng,
}

#[async_trait]
impl<D: DelayMillisDist> Socket for SocketWithDelay<D> {
    async fn send(&mut self, to: SocketAddr, message: ChitchatMessage) -> anyhow::Result<()> {
        let socket_clone = self.socket.clone();
        let delay_secs = self.delay_secs.sample(&mut self.rng);
        let delay = Duration::from_secs_f32(delay_secs);
        tokio::task::spawn(async move {
            tokio::time::sleep(delay).await;
            let _ = socket_clone.write().await.send(to, message).await;
        });
        Ok(())
    }

    async fn recv(&mut self) -> anyhow::Result<(SocketAddr, ChitchatMessage)> {
        self.socket.write().await.recv().await
    }
}

pub trait TransportExt {
    fn drop_message(self, drop_probability: f64) -> Box<dyn Transport>;
    fn delay<D: DelayMillisDist>(self, delay_proba: D) -> Box<dyn Transport>;
}

impl<T: Transport> TransportExt for T {
    fn drop_message(self, drop_probability: f64) -> Box<dyn Transport> {
        Box::new(TransportWithMessageDrop {
            drop_probability: Bernoulli::new(drop_probability).unwrap(),
            transport: Box::new(self),
        })
    }

    fn delay<D: DelayMillisDist>(self, delay_secs: D) -> Box<dyn Transport> {
        Box::new(TransportWithDelay {
            delay_secs,
            transport: Box::new(self),
        })
    }
}

struct TransportWithMessageDrop {
    drop_probability: Bernoulli,
    transport: Box<dyn Transport>,
}

#[async_trait]
impl Transport for TransportWithMessageDrop {
    async fn open(&self, listen_addr: SocketAddr) -> anyhow::Result<Box<dyn Socket>> {
        let rng = SmallRng::from_rng(&mut rng());
        let socket = self.transport.open(listen_addr).await?;
        Ok(Box::new(SocketWithMessageDrop {
            drop_probability: self.drop_probability,
            socket,
            rng,
        }))
    }
}

struct SocketWithMessageDrop {
    drop_probability: Bernoulli,
    socket: Box<dyn Socket>,
    rng: SmallRng,
}

#[async_trait]
impl Socket for SocketWithMessageDrop {
    async fn send(&mut self, to: SocketAddr, message: ChitchatMessage) -> anyhow::Result<()> {
        let should_drop = self.drop_probability.sample(&mut self.rng);
        if should_drop {
            return Ok(());
        }
        self.socket.send(to, message).await
    }

    async fn recv(&mut self) -> anyhow::Result<(SocketAddr, ChitchatMessage)> {
        self.socket.recv().await
    }
}