odem-rs 0.3.0

Object-based Discrete-Event Modelling in Rust using async/await
Documentation
//! This crate contains a simulation model of a ferry system with multiple
//! harbors, ferries, and cars. The simulation is built using the odem-rs
//! library, incorporating asynchronous processes and random variable
//! distributions.

use std::{collections::VecDeque, fmt, pin::pin};

use rand_distr::{Distribution, Exp, Normal};

use tracing::debug;

use odem_rs::{
	prelude::*,
	sync::{
		channel::shared::{Receiver, Sender},
		error::RecvError,
	},
};

/// Configuration structure for the ferry simulation model.
#[derive(Config, Default)]
#[time(Time<f64>)]
struct CarFerry {
	/// A stream of independent random number generators (RNGs)
	rng_stream: RngStream,
	/// Statistics for cargo lengths.
	ferry_cargo_len: RandomVariable<usize>,
	/// Statistics for load times.
	ferry_load_time: RandomVariable<Time<f64>>,
	/// Statistics for individual wait times of the cars.
	car_wait_time: RandomVariable<Time<f64>>,
}

impl fmt::Debug for CarFerry {
	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
		f.debug_struct("CarFerry")
			.field("ferry_cargo_len", &self.ferry_cargo_len)
			.field("ferry_load_time", &self.ferry_load_time.display(minute))
			.field("car_wait_time", &self.car_wait_time.display(minute))
			.finish()
	}
}

// Constants for simulation parameters.
const FERRY_COUNT: usize = 1;
const HARBOR_COUNT: usize = 2;
const FERRY_CAPACITY: usize = 5;

/// Structure representing a car in the simulation.
///
/// Contains information about the arrival time and loading duration of the car.
#[derive(Debug)]
struct Car {
	arrival_time: Time<f64>,
	load_duration: Time<f64>,
}

/// Structure representing a harbor pier.
///
/// It includes a landing site for cars and an asynchronous `actions` method
/// that simulates the arrival of cars to the pier.
struct Pier {
	/// Channel to send cars to the ferry.
	landing_site: Sender<Car>,
}

impl Behavior<CarFerry> for Pier {
	type Output = ();

	async fn actions(&self, sim: &Sim<CarFerry>) -> Self::Output {
		let mut rng = sim.global().rng_stream.rng();
		let mut cars = 0;

		// Arrival and loading time distributions.
		let arrival_delay = Exp::new(0.1).unwrap();
		let loading_delay = Normal::new(0.5, 0.2).unwrap();

		loop {
			// Wait for the next car to arrive.
			sim.advance(minute::new(arrival_delay.sample(&mut rng)))
				.await;

			cars += 1;
			debug!(%cars, "cars generated");

			// Create a new car with arrival and load times.
			let car = Car {
				arrival_time: sim.now(),
				load_duration: minute::new(
					loading_delay
						.sample_iter(&mut rng)
						.find(|&val| val >= 0.0)
						.unwrap(),
				),
			};
			self.landing_site
				.try_send(car)
				.expect("No ferries in the simulation");
		}
	}
}

/// Structure representing a ferry in the simulation.
struct Ferry {
	/// Receivers from piers to accept arriving cars.
	piers: Vec<Receiver<Car>>,
}

impl Behavior<CarFerry> for Ferry {
	type Output = ();

	/// Asynchronous method simulating ferry operations, including loading,
	/// unloading, and traveling between piers.
	async fn actions(&self, sim: &Sim<CarFerry>) -> Self::Output {
		let mut cargo = Vec::<Car>::with_capacity(FERRY_CAPACITY);

		loop {
			for pier in self.piers.iter() {
				debug!("unloading {} cars", cargo.len());

				// Unload the cars at the current pier.
				for car in cargo.drain(..) {
					sim.advance(car.load_duration).await;
				}

				let begin_loading = sim.now();

				// Load cars until capacity is reached or timeout occurs.
				for _ in 0..cargo.capacity() {
					match sim
						.fork(pier.recv())
						.or(async {
							sim.advance(minute::new(5.0)).await;
							Err(RecvError)
						})
						.await
					{
						// A car arrived before timeout.
						Ok(car) => {
							sim.global()
								.car_wait_time
								.tabulate(sim.now() - car.arrival_time);
							sim.advance(car.load_duration).await;
							cargo.push(car);
						}
						// Timeout occurred; depart to the next harbor.
						Err(_) => break,
					}
				}

				// Record ferry loading statistics.
				sim.global()
					.ferry_load_time
					.tabulate(sim.now() - begin_loading);
				sim.global().ferry_cargo_len.tabulate(cargo.len());

				// Travel to the next harbor.
				sim.advance(minute::new(10.0)).await;
			}
		}
	}
}

/// Asynchronous function representing the main ferry simulation.
///
/// It creates and activates harbors, ferries, and their corresponding
/// agents. The simulation duration is specified, and the simulation is
/// advanced until the specified duration is reached.
async fn ferry(sim: &Sim<CarFerry>, harbors: usize, ferries: usize, duration: Time<f64>) {
	let mut ports = Vec::with_capacity(harbors);

	let harbor_pool = pin!(Pool::with_capacity(harbors).with(Agent::new));
	let ferry_pool = pin!(Pool::with_capacity(ferries).with(Agent::new));

	// Create all the harbors (piers).
	for _ in 0..harbors {
		let (sx, rx) = channel(VecDeque::new());
		sim.activate(harbor_pool.alloc(Pier { landing_site: sx }));
		ports.push(rx);
	}

	// Create all the ferries.
	for id in 0..ferries {
		sim.activate(
			ferry_pool.alloc(Ferry {
				piers: ports
					.iter()
					.skip(id)
					.chain(ports.iter().take(id))
					.cloned()
					.collect(),
			}),
		);
	}

	// Run the simulation for the specified duration.
	sim.advance(duration).await;

	// Handle any cars that weren't picked up by a ferry.
	for port in ports {
		while let Ok(car) = port.try_recv() {
			sim.global()
				.car_wait_time
				.tabulate(sim.now() - car.arrival_time);
		}
	}
}

/// Main function for running the ferry simulation.
///
/// Initializes the simulator, runs the ferry simulation using the `ferry`
/// function, and displays relevant statistics at the end.
#[cfg(not(test))]
fn main() {
	use tracing::Level;
	tracing_subscriber::fmt()
		.with_max_level(Level::DEBUG)
		.with_target(false)
		.with_timer(model_time!("[{time:#3}]"))
		.init();

	let config = simulation(async |sim| {
		ferry(sim, HARBOR_COUNT, FERRY_COUNT, hour::new(2.0)).await;
	})
	.unwrap();

	println!("Ferry cargo len: {:#.3?}", config.ferry_cargo_len);
	println!(
		"Ferry load time: {:#.3?}",
		config.ferry_load_time.display(minute)
	);
	println!(
		"Car wait time: {:#.3?}",
		config.car_wait_time.display(minute)
	);
}

#[cfg(test)]
criterion::criterion_main!(bench::benches);

#[cfg(test)]
mod bench {
	use core::time::Duration;
	use criterion::{AxisScale, BenchmarkId, Criterion, PlotConfiguration, criterion_group};

	use super::*;

	const RANGE: u32 = 10;
	const STEP: f64 = 1000.0;

	fn ferry_bench(c: &mut Criterion) {
		let mut group = c.benchmark_group("Ferry");

		// set up the benchmark parameters
		group
			.confidence_level(0.99)
			.plot_config(PlotConfiguration::default().summary_scale(AxisScale::Logarithmic))
			.measurement_time(Duration::from_secs(5));

		// vary in the length of the simulation run
		for sim_duration in (0..RANGE).map(|c| f64::from(1 << c) * STEP) {
			// benchmark the Rust implementation
			group.bench_function(BenchmarkId::new("ferry", sim_duration), |b| {
				b.iter(|| {
					simulation(async |sim| {
						ferry(sim, HARBOR_COUNT, FERRY_COUNT, minute::new(sim_duration)).await;
					})
					.ok();
				})
			});
		}

		group.finish();
	}

	criterion_group!(benches, ferry_bench);
}