kaspa_utils/
sim.rs

1//! Module with structs for supporting discrete event simulation in virtual time.
2//! Inspired by python's simpy library.
3//!
4//! Users should define the message type `T` required for the simulation, derive `Process<T>` with
5//! various simulation actor logic and plug the processes into a `Simulation<T>` instance.
6
7use std::collections::{BinaryHeap, HashMap, HashSet};
8
9/// Internal structure representing a scheduled simulator event
10struct Event<T> {
11    timestamp: u64,
12    dest: u64,
13    msg: Option<T>,
14}
15
16impl<T> Event<T> {
17    pub fn new(timestamp: u64, dest: u64, msg: Option<T>) -> Self {
18        Self { timestamp, dest, msg }
19    }
20}
21
22impl<T> PartialEq for Event<T> {
23    fn eq(&self, other: &Self) -> bool {
24        self.timestamp == other.timestamp && self.dest == other.dest
25    }
26}
27
28impl<T> Eq for Event<T> {}
29
30impl<T> PartialOrd for Event<T> {
31    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
32        Some(self.cmp(other))
33    }
34}
35
36impl<T> Ord for Event<T> {
37    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
38        // Reversing so that min timestamp is scheduled first
39        other.timestamp.cmp(&self.timestamp).then_with(|| other.dest.cmp(&self.dest))
40    }
41}
42
43/// Process resumption trigger
44pub enum Resumption<T> {
45    Initial,
46    Scheduled,
47    Message(T),
48}
49
50/// Process suspension reason
51pub enum Suspension {
52    Timeout(u64),
53    Idle,
54    Halt, // Halt the simulation
55}
56
57/// A simulation process
58pub trait Process<T> {
59    fn resume(&mut self, resumption: Resumption<T>, env: &mut Environment<T>) -> Suspension;
60}
61
62pub type BoxedProcess<T> = Box<dyn Process<T>>;
63
64/// The simulation environment
65#[derive(Default)]
66pub struct Environment<T> {
67    now: u64,
68    broadcast_delay: u64,
69    event_queue: BinaryHeap<Event<T>>,
70    process_ids: HashSet<u64>,
71}
72
73impl<T: Clone> Environment<T> {
74    pub fn new(delay: u64) -> Self {
75        Self::with_start_time(delay, 0)
76    }
77
78    pub fn with_start_time(delay: u64, start_time: u64) -> Self {
79        Self { now: start_time, broadcast_delay: delay, event_queue: BinaryHeap::new(), process_ids: HashSet::new() }
80    }
81
82    pub fn now(&self) -> u64 {
83        self.now
84    }
85
86    pub fn send(&mut self, delay: u64, dest: u64, msg: T) {
87        self.event_queue.push(Event::new(self.now + delay, dest, Some(msg)))
88    }
89
90    pub fn timeout(&mut self, timeout: u64, dest: u64) {
91        self.event_queue.push(Event::new(self.now + timeout, dest, None))
92    }
93
94    pub fn broadcast(&mut self, _sender: u64, msg: T) {
95        for &id in self.process_ids.iter() {
96            self.event_queue.push(Event::new(self.now + self.broadcast_delay, id, Some(msg.clone())));
97        }
98    }
99
100    fn next_event(&mut self) -> Event<T> {
101        let event = self.event_queue.pop().unwrap();
102        self.now = event.timestamp;
103        event
104    }
105}
106
107/// The simulation manager
108#[derive(Default)]
109pub struct Simulation<T> {
110    env: Environment<T>,
111    processes: HashMap<u64, BoxedProcess<T>>,
112}
113
114impl<T: Clone> Simulation<T> {
115    pub fn new(delay: u64) -> Self {
116        Self { env: Environment::new(delay), processes: HashMap::new() }
117    }
118
119    pub fn with_start_time(delay: u64, start_time: u64) -> Self {
120        Self { env: Environment::with_start_time(delay, start_time), processes: HashMap::new() }
121    }
122
123    pub fn register(&mut self, id: u64, process: BoxedProcess<T>) {
124        self.processes.insert(id, process);
125        self.env.process_ids.insert(id);
126    }
127
128    pub fn step(&mut self) -> bool {
129        let event = self.env.next_event();
130        let process = self.processes.get_mut(&event.dest).unwrap();
131        let op = if let Some(msg) = event.msg { Resumption::Message(msg) } else { Resumption::Scheduled };
132        match process.resume(op, &mut self.env) {
133            Suspension::Timeout(timeout) => {
134                self.env.timeout(timeout, event.dest);
135                true
136            }
137            Suspension::Idle => true,
138            Suspension::Halt => false,
139        }
140    }
141
142    pub fn run(&mut self, until: u64) {
143        for (&id, process) in self.processes.iter_mut() {
144            match process.resume(Resumption::Initial, &mut self.env) {
145                Suspension::Timeout(timeout) => self.env.timeout(timeout, id),
146                Suspension::Idle => {}
147                Suspension::Halt => panic!("not expecting halt on startup"),
148            }
149        }
150
151        while self.step() {
152            if self.env.now() > until {
153                break;
154            }
155        }
156        self.processes.clear();
157    }
158}