dscale 0.4.1

A fast & deterministic simulation framework for benchmarking and testing distributed systems
Documentation
mod bandwidth;
mod latency;

use std::sync::Arc;

pub use bandwidth::BandwidthConfig;
pub(crate) use bandwidth::BandwidthQueue;
pub(crate) use latency::LatencyQueue;
use log::debug;

use crate::GLOBAL_POOL;
use crate::MessagePtr;
use crate::Rank;
use crate::actors::SimulationActor;
use crate::destination::Destination;
use crate::event::Event;
use crate::global::clock;
use crate::jiffy::Jiffies;
use crate::random::Randomizer;
use crate::random::Seed;
use crate::step::Step;
use crate::step::TimedStep;
use crate::topology::Topology;

pub(crate) struct NetworkActor {
    bandwidth_queue: BandwidthQueue,
    topology: Arc<Topology>,
}

impl NetworkActor {
    fn submit_single_message(
        &mut self,
        message: MessagePtr,
        source: Rank,
        destination: Destination,
    ) {
        let targets = match destination {
            Destination::BroadcastWithinPool(pool_name) => self.topology.list_pool(pool_name),
            Destination::Target(rank) => &[rank],
        };

        debug!("Submitting steps P{source} -> P{targets:?}");
        let base_time = clock::global_now() + Jiffies(1);
        for &target in targets {
            let timed_step = TimedStep {
                invocation_time: base_time,
                step: Step::NetworkStep {
                    source: source,
                    target: target,
                    message: message.clone(),
                },
            };
            self.bandwidth_queue.push(timed_step);
        }
    }
}

impl NetworkActor {
    pub(crate) fn new(
        seed: Seed,
        bandwidth_type: BandwidthConfig,
        topology: Arc<Topology>,
    ) -> Self {
        Self {
            bandwidth_queue: BandwidthQueue::new(
                bandwidth_type,
                topology.list_pool(GLOBAL_POOL).len(),
                LatencyQueue::new(Randomizer::new(seed), topology.clone()),
            ),
            topology,
        }
    }
}

impl SimulationActor for NetworkActor {
    fn next_step(&mut self) -> Step {
        self.bandwidth_queue
            .pop()
            .expect("Should not be empty")
            .step
    }

    fn peek_next_step(&self) -> Option<Jiffies> {
        self.bandwidth_queue.peek_closest()
    }

    fn submit(&mut self, event: Event) {
        match event {
            Event::NetworkEvent {
                source,
                destination,
                message,
            } => {
                self.submit_single_message(message, source, destination);
            }
            _ => unreachable!(),
        }
    }
}