netsim 0.3.0

Run tests in network-isolated threads. Intercept and meddle with their packets.
Documentation
use crate::priv_prelude::*;

/// `Sink`/`Stream` adapter which randomly drops items.
///
/// Can be created via [`SinkStreamExt::with_loss`](crate::SinkStreamExt::with_loss).
#[pin_project]
pub struct Loss<S> {
    #[pin]
    stream: S,
    jitter: Jitter,
}

struct Jitter {
    loss_rate: f64,
    jitter_period: Duration,
    currently_dropping: bool,
    prev_switch_instant: Instant,
    next_switch_instant: Instant,
}

impl Jitter {
    pub fn new(loss_rate: f64, jitter_period: Duration) -> Jitter {
        assert!(0.0 <= loss_rate);
        assert!(loss_rate <= 1.0);
        let now = Instant::now();
        let mut jitter = Jitter {
            loss_rate,
            jitter_period,
            currently_dropping: false,
            prev_switch_instant: now,
            next_switch_instant: now,
        };
        jitter.reset(now);
        jitter
    }

    pub fn reset(&mut self, switch_instant: Instant) {
        self.prev_switch_instant = switch_instant;
        self.currently_dropping = rand::thread_rng().gen::<f64>() < self.loss_rate;
        self.set_next_switch_instant();
    }

    pub fn advance(&mut self) {
        let now = Instant::now();
        if self.next_switch_instant + (self.jitter_period * 10) < now {
            self.reset(now);
            return;
        }
        while self.next_switch_instant < now {
            self.prev_switch_instant = self.next_switch_instant;
            self.currently_dropping = !self.currently_dropping;
            self.set_next_switch_instant();
        }
    }

    pub fn currently_dropping(&self) -> bool {
        self.currently_dropping
    }

    fn set_next_switch_instant(&mut self) {
        let delay = if self.currently_dropping {
            self.jitter_period.mul_f64(self.loss_rate)
        } else {
            self.jitter_period.mul_f64(1.0 - self.loss_rate)
        };
        self.next_switch_instant = {
            self.prev_switch_instant + adapter::expovariate_duration(delay, &mut rand::thread_rng())
        };
    }
}

impl<S> Loss<S> {
    /// Creates a new [`Loss`]. See the documentation for
    /// [`SinkStreamExt::with_loss`](crate::SinkStreamExt::with_loss).
    pub fn new(stream: S, loss_rate: f64, jitter_period: Duration) -> Loss<S> {
        Loss {
            stream,
            jitter: Jitter::new(loss_rate, jitter_period),
        }
    }
}

impl<S> Stream for Loss<S>
where
    S: Stream,
{
    type Item = S::Item;

    fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Option<S::Item>> {
        let mut this = self.project();
        this.jitter.advance();
        loop {
            match this.stream.as_mut().poll_next(cx) {
                Poll::Ready(Some(value)) => {
                    if this.jitter.currently_dropping() {
                        continue;
                    }
                    break Poll::Ready(Some(value));
                },
                Poll::Ready(None) => break Poll::Ready(None),
                Poll::Pending => break Poll::Pending,
            }
        }
    }
}

impl<S, T> Sink<T> for Loss<S>
where
    S: Stream,
    S: Sink<T>,
{
    type Error = <S as Sink<T>>::Error;

    fn poll_ready(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Result<(), Self::Error>> {
        let this = self.project();
        this.stream.poll_ready(cx)
    }

    fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
        let this = self.project();
        this.jitter.advance();
        if this.jitter.currently_dropping() {
            return Ok(());
        }
        this.stream.start_send(item)
    }

    fn poll_flush(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
        let this = self.project();
        this.stream.poll_flush(cx)
    }

    fn poll_close(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
        let this = self.project();
        this.stream.poll_close(cx)
    }
}

impl<S> FusedStream for Loss<S>
where
    S: FusedStream,
{
    fn is_terminated(&self) -> bool {
        self.stream.is_terminated()
    }
}