1use simrs::{Component, ComponentId, Executor, Fifo, Key, QueueId, Scheduler, Simulation, State};
2
3use std::cell::RefCell;
4use std::rc::Rc;
5use std::time::Duration;
6
7#[derive(Debug)]
8struct Product;
9
10struct Producer {
11 outgoing: QueueId<Fifo<Product>>,
12 consumer: ComponentId<ConsumerEvent>,
13 produced_count: Key<usize>,
14 messages: Rc<RefCell<Vec<String>>>,
15}
16
17struct Consumer {
18 incoming: QueueId<Fifo<Product>>,
19 working_on: Key<Option<Product>>,
20 messages: Rc<RefCell<Vec<String>>>,
21}
22
23#[derive(Debug)]
24struct ProducerEvent;
25
26#[derive(Debug)]
27enum ConsumerEvent {
28 Received,
29 Finished,
30}
31
32impl Producer {
33 fn produce(&self) -> Product {
34 Product
35 }
36 fn interval(&self) -> Duration {
37 Duration::from_secs(1)
38 }
39 fn log(&self) {
40 self.messages.borrow_mut().push(String::from("Produced"));
41 }
42}
43
44impl Consumer {
45 fn interval(&self) -> Duration {
46 Duration::from_secs(1)
47 }
48 fn log(&self, _: Product) {
49 self.messages.borrow_mut().push(String::from("Consumed"));
50 }
51}
52
53impl Component for Producer {
54 type Event = ProducerEvent;
55
56 fn process_event(
57 &self,
58 self_id: ComponentId<ProducerEvent>,
59 _event: &ProducerEvent,
60 scheduler: &mut Scheduler,
61 state: &mut State,
62 ) {
63 let count = *state.get(self.produced_count).unwrap();
64 if count < 10 {
65 let _ = state.send(self.outgoing, self.produce());
66 self.log();
67 scheduler.schedule(self.interval(), self_id, ProducerEvent);
68 scheduler.schedule(Duration::default(), self.consumer, ConsumerEvent::Received);
69 *state.get_mut(self.produced_count).unwrap() = count + 1;
70 }
71 }
72}
73
74impl Component for Consumer {
75 type Event = ConsumerEvent;
76
77 fn process_event(
78 &self,
79 self_id: ComponentId<ConsumerEvent>,
80 event: &ConsumerEvent,
81 scheduler: &mut Scheduler,
82 state: &mut State,
83 ) {
84 let busy = state.get(self.working_on).unwrap().is_some();
85 match event {
86 ConsumerEvent::Received => {
87 if !busy {
88 if let Some(product) = state.recv(self.incoming) {
89 if let Some(w) = state.get_mut(self.working_on) {
90 *w = Some(product);
91 }
92 scheduler.schedule(self.interval(), self_id, ConsumerEvent::Finished);
93 }
94 }
95 }
96 ConsumerEvent::Finished => {
97 let product = state.get_mut(self.working_on).unwrap().take().unwrap();
98 self.log(product);
99 if state.len(self.incoming) > 0 {
100 scheduler.schedule(Duration::default(), self_id, ConsumerEvent::Received);
101 }
102 }
103 }
104 }
105}
106
107const EXPECTED: &str = "Produced
1080ns
1090ns
110Produced
1111s
112Consumed
1131s
1141s
1151s
116Produced
1172s
118Consumed
1192s
1202s
1212s
122Produced
1233s
124Consumed
1253s
1263s
1273s
128Produced
1294s
130Consumed
1314s
1324s
1334s
134Produced
1355s
136Consumed
1375s
1385s
1395s
140Produced
1416s
142Consumed
1436s
1446s
1456s
146Produced
1477s
148Consumed
1497s
1507s
1517s
152Produced
1538s
154Consumed
1558s
1568s
1578s
158Produced
1599s
160Consumed
1619s
1629s
1639s
16410s
165Consumed
16610s";
167
168fn main() {
169 let messages = Rc::new(RefCell::new(Vec::<String>::new()));
170 let mut simulation = Simulation::default();
171 let queue = simulation.add_queue(Fifo::default());
172 let working_on = simulation.state.insert::<Option<Product>>(None);
173 let consumer = simulation.add_component(Consumer {
174 incoming: queue,
175 working_on,
176 messages: messages.clone(),
177 });
178 let produced_count = simulation.state.insert(0_usize);
179 let producer = simulation.add_component(Producer {
180 outgoing: queue,
181 consumer,
182 produced_count,
183 messages: messages.clone(),
184 });
185 simulation.schedule(Duration::new(0, 0), producer, ProducerEvent);
186 {
189 let messages = messages.clone();
190 simulation.execute(Executor::unbound().side_effect(move |sim| {
191 messages
192 .borrow_mut()
193 .push(format!("{:?}", sim.scheduler.time()));
194 }));
195 }
196 assert_eq!(*messages.borrow(), EXPECTED.split('\n').collect::<Vec<_>>());
197}