dscale 0.6.0

A fast & deterministic simulation framework for benchmarking and testing distributed systems
Documentation
mod bandwidth;
mod latency;

use std::sync::Arc;

pub use bandwidth::BandwidthConfig;

use crate::GLOBAL_POOL;
use crate::Jiffies;
use crate::Pid;
use crate::events::Event;
use crate::events::MessageState;
use crate::events::PidEvent;
use crate::events::TimedEvent;
use crate::network::bandwidth::Bandwidth;
use crate::network::latency::Latency;
use crate::random::Randomizer;
use crate::random::Seed;
use crate::topology::Topology;

pub(crate) enum NetworkPoll {
    Await(TimedEvent),
    Ready(PidEvent),
}

pub(crate) struct Network {
    bandwidth: Bandwidth,
    latency: Latency,
}

impl Network {
    pub(crate) fn new(
        seed: Seed,
        bandwidth_type: BandwidthConfig,
        topology: Arc<Topology>,
    ) -> Self {
        Self {
            latency: Latency::new(Randomizer::new(seed), topology.clone()),
            bandwidth: Bandwidth::new(bandwidth_type, topology.list_pool(GLOBAL_POOL).len()),
        }
    }
}

impl Network {
    pub(crate) fn poll(
        &mut self,
        invocation_time: Jiffies,
        pid: Pid,
        event: PidEvent,
    ) -> NetworkPoll {
        match event {
            PidEvent::Message {
                message,
                state,
                source,
            } => match state {
                MessageState::Init => {
                    let message_size = message.virtual_size();
                    let (state, extra_latency) =
                        match self.bandwidth.try_send(message_size, pid, invocation_time) {
                            None => {
                                // If message passed send bandwidth constraint
                                // we can schedule it in AwaitLatency state immediately.
                                (
                                    MessageState::AwaitLatency,
                                    self.latency.random_latency(source, pid),
                                )
                            }
                            Some(extra_bandwidth_latency) => {
                                (MessageState::AwaitSendBandwidth, extra_bandwidth_latency)
                            }
                        };
                    NetworkPoll::Await(TimedEvent {
                        invocation_time: invocation_time + extra_latency,
                        event: Event::Pid {
                            pid,
                            event: PidEvent::Message {
                                source,
                                message,
                                state,
                            },
                        },
                    })
                }
                MessageState::AwaitSendBandwidth => {
                    let latency = self.latency.random_latency(source, pid);
                    NetworkPoll::Await(TimedEvent {
                        invocation_time: invocation_time + latency,
                        event: Event::Pid {
                            pid,
                            event: PidEvent::Message {
                                source,
                                message,
                                state: MessageState::AwaitLatency,
                            },
                        },
                    })
                }
                MessageState::AwaitLatency => {
                    let message_size = message.virtual_size();
                    match self.bandwidth.try_recv(message_size, pid, invocation_time) {
                        None => NetworkPoll::Ready(PidEvent::Message {
                            message,
                            state,
                            source,
                        }),
                        Some(additional_latency) => NetworkPoll::Await(TimedEvent {
                            invocation_time: invocation_time + additional_latency,
                            event: Event::Pid {
                                pid,
                                event: PidEvent::Message {
                                    source,
                                    message,
                                    state: MessageState::AwaitRecvBandwidth,
                                },
                            },
                        }),
                    }
                }
                MessageState::AwaitRecvBandwidth => NetworkPoll::Ready(PidEvent::Message {
                    message,
                    state,
                    source,
                }),
            },
            _ => unreachable!(),
        }
    }
}