use std::{collections::VecDeque, pin::pin, rc::Rc};
use rand_distr::{Distribution, Exp, Normal, Poisson, num_traits::Float};
use tracing::debug;
use odem_rs::{
prelude::*,
sync::{
channel::shared::{Receiver, Sender},
facility::Facility,
},
util::random::DefaultRng,
util::random_variable::Utilized,
};
#[derive(Config, Default)]
struct GroceryStore {
#[time]
time: Time<f64>,
rng_stream: RngStream,
parameters: Parameters,
statistics: Statistics,
}
struct Parameters {
conveyor_belt_capacity: u32,
active_customers: Control<u32>,
customer_arrival_distribution: Exp<f64>,
item_amount_distribution: Poisson<f32>,
move_item_distribution: Normal<f64>,
price_distribution: Exp<f64>,
billing_distribution: Normal<f64>,
}
impl Default for Parameters {
fn default() -> Self {
Self {
conveyor_belt_capacity: 20,
active_customers: Control::new(0),
customer_arrival_distribution: Exp::new(1.0 / 30.0).unwrap(),
item_amount_distribution: Poisson::new(3.0).unwrap(),
move_item_distribution: Normal::new(3.0, 2.0).unwrap(),
price_distribution: Exp::new(1.0 / 5.0).unwrap(),
billing_distribution: Normal::new(10.0, 5.0).unwrap(),
}
}
}
#[derive(Default)]
struct Statistics {
revenue: [RandomVariable; CASH_REGISTER],
cashier_idle_time: [RandomVariable<Utilized<Time<f64>>>; CASH_REGISTER],
customer_processing_time: RandomVariable<Time<f64>>,
}
impl Statistics {
#[cfg(not(test))]
fn print(&self) {
let mut total_revenue = RandomVariable::new();
let mut total_idle_time = RandomVariable::new();
for i in 0..CASH_REGISTER {
total_revenue = total_revenue.join(self.revenue[i].clone());
total_idle_time = total_idle_time.join(self.cashier_idle_time[i].clone());
}
println!(
"Customer processing time: {:#.2?}",
self.customer_processing_time.display(second)
);
println!("Revenue: {:.2} €", total_revenue.sum());
for (i, revenue) in self.revenue.iter().enumerate() {
println!("\tCheckout [{}] = {:.2} €", i + 1, revenue.sum());
}
println!("Cashier idle time: {:#}", total_idle_time.sum().display());
for (i, idle_time) in self.cashier_idle_time.iter().enumerate() {
println!("\tCashier [{}] = {:#}", i + 1, idle_time.sum().display());
}
}
}
pub fn sample_positive<T>(dist: impl Distribution<T>, rng: &mut DefaultRng) -> T
where
T: Float,
{
dist.sample_iter(rng)
.find(|&val| val.is_sign_positive())
.unwrap()
}
struct CashRegister {
id: usize,
customer_count: Control<u32>,
conveyor_belt: Facility,
conveyor_belt_head: Sender<Option<Item>>,
conveyor_belt_count: Control<u32>,
conveyor_belt_capacity: u32,
item_storage: Facility,
item_storage_tail: Receiver<Option<Item>>,
item_storage_count: Control<u32>,
bill_paid: Control<bool>,
}
impl CashRegister {
fn new(
id: usize,
conveyor_belt_head: Sender<Option<Item>>,
item_storage_tail: Receiver<Option<Item>>,
conveyor_belt_capacity: u32,
) -> CashRegister {
CashRegister {
id,
customer_count: Control::new(0),
conveyor_belt: Facility::new(),
conveyor_belt_head,
conveyor_belt_count: Control::new(0),
conveyor_belt_capacity,
item_storage: Facility::new(),
item_storage_tail,
item_storage_count: Control::new(0),
bill_paid: Control::new(false),
}
}
}
struct Cashier {
conveyor_belt_tail: Receiver<Option<Item>>,
item_storage_head: Sender<Option<Item>>,
checkout: Rc<CashRegister>,
}
impl Cashier {
fn new(
conveyor_belt_tail: Receiver<Option<Item>>,
item_storage_head: Sender<Option<Item>>,
checkout: Rc<CashRegister>,
) -> Self {
Cashier {
conveyor_belt_tail,
item_storage_head,
checkout,
}
}
async fn scan_items_from_belt(&self, sim: &Sim<GroceryStore>, rng: &mut DefaultRng) {
let Parameters {
move_item_distribution,
price_distribution,
..
} = &sim.global().parameters;
let Statistics {
revenue,
cashier_idle_time,
..
} = &sim.global().statistics;
let CashRegister {
id,
conveyor_belt_count,
item_storage_count,
bill_paid,
..
} = &*self.checkout;
cashier_idle_time[*id].tabulate(Utilized(sim.now(), true));
while let Ok(option_item) = self.conveyor_belt_tail.recv().await {
cashier_idle_time[*id].tabulate(Utilized(sim.now(), false));
match option_item {
Some(item) => {
sim.advance(second::new(sample_positive(move_item_distribution, rng)))
.await;
conveyor_belt_count.update(|count| count - 1);
let price = rng.sample(price_distribution);
revenue[*id].tabulate(price);
sim.advance(second::new(sample_positive(move_item_distribution, rng)))
.await;
self.item_storage_head.try_send(Some(item)).unwrap();
item_storage_count.update(|count| count + 1);
}
None => {
self.item_storage_head.try_send(None).unwrap();
bill_paid.set(false);
break;
}
}
cashier_idle_time[*id].tabulate(Utilized(sim.now(), true));
}
}
}
impl Behavior<GroceryStore> for Cashier {
type Output = ();
async fn actions(&self, sim: &Sim<GroceryStore>) -> Self::Output {
let mut rng = sim.global().rng_stream.rng();
debug!("Cashier opened checkout!");
loop {
self.scan_items_from_belt(sim, &mut rng).await;
until!(self.checkout.item_storage_count == 0).await;
let billing_duration = sim
.global()
.parameters
.billing_distribution
.sample_iter(&mut rng)
.find_map(|v| (v >= 0.0).then_some(second::new(v)))
.unwrap();
sim.advance(billing_duration).await;
self.checkout.bill_paid.set(true);
}
}
}
#[derive(Debug, Clone, Copy, Default)]
struct Item;
struct Customer {
basket: Vec<Item>,
checkout: Rc<CashRegister>,
}
impl Customer {
fn new(number_of_items: usize, checkout: Rc<CashRegister>) -> Self {
Customer {
basket: vec![Item; number_of_items],
checkout,
}
}
async fn place_items_on_belt(&self, sim: &Sim<GroceryStore>, rng: &mut DefaultRng) {
let Parameters {
move_item_distribution,
..
} = sim.global().parameters;
let CashRegister {
conveyor_belt,
conveyor_belt_count,
conveyor_belt_capacity,
conveyor_belt_head,
..
} = &self.checkout.as_ref();
let conveyor_belt = conveyor_belt.seize().await;
for item in &self.basket {
until!(conveyor_belt_count < conveyor_belt_capacity).await;
sim.advance(second::new(sample_positive(move_item_distribution, rng)))
.await;
conveyor_belt_head.try_send(Some(*item)).unwrap();
conveyor_belt_count.update(|count| count + 1);
}
conveyor_belt_head.try_send(None).unwrap();
conveyor_belt.release();
}
async fn collect_items_from_storage(&self, sim: &Sim<GroceryStore>, rng: &mut DefaultRng) {
let Parameters {
move_item_distribution,
..
} = sim.global().parameters;
let CashRegister {
item_storage,
item_storage_tail,
item_storage_count,
bill_paid,
..
} = &self.checkout.as_ref();
let item_storage = item_storage.seize().await;
while let Ok(option_item) = item_storage_tail.recv().await {
match option_item {
Some(_item) => {
sim.advance(second::new(sample_positive(move_item_distribution, rng)))
.await;
item_storage_count.update(|count| count - 1)
}
None => {
until!(bill_paid).await;
break;
}
}
}
item_storage.release();
}
}
impl Behavior<GroceryStore> for Customer {
type Output = ();
async fn actions(&self, sim: &Sim<GroceryStore>) -> Self::Output {
let Parameters {
active_customers, ..
} = &sim.global().parameters;
let Statistics {
customer_processing_time,
..
} = &sim.global().statistics;
let CashRegister { customer_count, .. } = &self.checkout.as_ref();
let mut rng = sim.global().rng_stream.rng();
let arrival_time = sim.now();
debug!(
"Arrived after {:#} with {} items",
arrival_time.display(),
self.basket.len()
);
customer_count.update(|count| count + 1);
active_customers.update(|total| total + 1);
self.place_items_on_belt(sim, &mut rng).await;
self.collect_items_from_storage(sim, &mut rng).await;
customer_count.update(|count| count - 1);
active_customers.update(|total| total - 1);
customer_processing_time.tabulate(sim.now() - arrival_time);
}
}
async fn checkout(sim: &Sim<GroceryStore>, duration: Time<f64>) -> Time<f64> {
let Parameters {
active_customers,
conveyor_belt_capacity,
..
} = &sim.global().parameters;
let mut checkouts = Vec::with_capacity(CASH_REGISTER);
let cashiers = pin!(Pool::fixed::<CASH_REGISTER>());
for id in 0..CASH_REGISTER {
let (band_sender, band_receiver) = channel(VecDeque::new());
let (item_storage_sender, item_storage_receiver) = channel(VecDeque::new());
let checkout = Rc::new(CashRegister::new(
id,
band_sender,
item_storage_receiver,
*conveyor_belt_capacity,
));
sim.activate(cashiers.alloc(Agent::new(Cashier::new(
band_receiver,
item_storage_sender,
checkout.clone(),
))));
checkouts.push(checkout.clone());
}
let customer_pool = pin!(Pool::dynamic());
sim.fork(sim.advance(duration))
.or(async {
let Parameters {
customer_arrival_distribution,
item_amount_distribution,
..
} = &sim.global().parameters;
let mut rng = sim.global().rng_stream.rng();
loop {
let arrival_delay = second::new(customer_arrival_distribution.sample(&mut rng));
sim.advance(arrival_delay).await;
let item_count = item_amount_distribution.sample(&mut rng).ceil() as usize;
let shortest_lane = checkouts.iter().min_by(|c1, c2| {
c1.conveyor_belt_count
.get()
.cmp(&c2.conveyor_belt_count.get())
.then_with(|| c1.customer_count.get().cmp(&c2.customer_count.get()))
});
sim.activate(customer_pool.alloc(Agent::new(Customer::new(
item_count,
shortest_lane.unwrap().clone(),
))));
}
})
.await;
until!(active_customers == 0).await;
sim.now()
}
const CASH_REGISTER: usize = 2;
#[cfg(not(test))]
fn main() {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.with_target(false)
.with_timer(model_time!("[{time:#3}]"))
.init();
let sim_duration = hour::new(0.5);
let sim = Simulator::new(GroceryStore::default());
let time = sim
.run(async |sim| checkout(sim, sim_duration).await)
.unwrap();
println!("Simulation duration: {:#}", time.display());
sim.inner().statistics.print();
}
#[cfg(test)]
criterion::criterion_main!(bench::benches);
#[cfg(test)]
mod bench {
use core::time::Duration;
use criterion::{
AxisScale, BatchSize, BenchmarkId, Criterion, PlotConfiguration, criterion_group,
};
use super::*;
const RANGE: u32 = 10;
const STEP: f64 = 1000.0;
fn checkout_bench(c: &mut Criterion) {
let mut group = c.benchmark_group("Checkout");
group
.confidence_level(0.99)
.plot_config(PlotConfiguration::default().summary_scale(AxisScale::Logarithmic))
.measurement_time(Duration::from_secs(30));
for sim_duration in (0..RANGE).map(|c| f64::from(1 << c) * STEP) {
group.bench_function(BenchmarkId::new("checkout", sim_duration), |b| {
b.iter_batched(
Simulator::default,
|sim| sim.run(async |sim| checkout(sim, second::new(sim_duration)).await),
BatchSize::SmallInput,
)
});
}
group.finish();
}
criterion_group!(benches, checkout_bench);
}