1use std::collections::HashMap;
2use std::f64::INFINITY;
3
4use serde::{Deserialize, Serialize};
5
6use super::model_trait::{DevsModel, Reportable, ReportableModel, SerializableModel};
7use super::{ModelMessage, ModelRecord};
8use crate::simulator::Services;
9use crate::utils::errors::SimulationError;
10
11use sim_derive::SerializableModel;
12
13#[cfg(feature = "simx")]
14use simx::event_rules;
15
16#[derive(Debug, Clone, Serialize, Deserialize, SerializableModel)]
21#[serde(rename_all = "camelCase")]
22pub struct ParallelGateway {
23 ports_in: PortsIn,
24 ports_out: PortsOut,
25 #[serde(default)]
26 store_records: bool,
27 #[serde(default)]
28 state: State,
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
32#[serde(rename_all = "camelCase")]
33struct PortsIn {
34 flow_paths: Vec<String>,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
38enum ArrivalPort {
39 FlowPath,
40 Unknown,
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
44#[serde(rename_all = "camelCase")]
45struct PortsOut {
46 flow_paths: Vec<String>,
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
50#[serde(rename_all = "camelCase")]
51struct State {
52 until_next_event: f64,
53 collections: HashMap<String, usize>,
54 records: Vec<ModelRecord>,
55}
56
57impl Default for State {
58 fn default() -> Self {
59 Self {
60 until_next_event: INFINITY,
61 collections: HashMap::new(),
62 records: Vec::new(),
63 }
64 }
65}
66
67#[cfg_attr(feature = "simx", event_rules)]
68impl ParallelGateway {
69 pub fn new(
70 flow_paths_in: Vec<String>,
71 flow_paths_out: Vec<String>,
72 store_records: bool,
73 ) -> Self {
74 Self {
75 ports_in: PortsIn {
76 flow_paths: flow_paths_in,
77 },
78 ports_out: PortsOut {
79 flow_paths: flow_paths_out,
80 },
81 store_records,
82 state: State::default(),
83 }
84 }
85
86 fn arrival_port(&self, message_port: &str) -> ArrivalPort {
87 if self.ports_in.flow_paths.contains(&message_port.to_string()) {
88 ArrivalPort::FlowPath
89 } else {
90 ArrivalPort::Unknown
91 }
92 }
93
94 fn full_collection(&self) -> Option<(&String, &usize)> {
95 self.state
96 .collections
97 .iter()
98 .find(|(_, count)| **count == self.ports_in.flow_paths.len())
99 }
100
101 fn increment_collection(&mut self, incoming_message: &ModelMessage, services: &mut Services) {
102 *self
103 .state
104 .collections
105 .entry(incoming_message.content.clone())
106 .or_insert(0) += 1;
107 self.record(
108 services.global_time(),
109 String::from("Arrival"),
110 format![
111 "{} on {}",
112 incoming_message.content.clone(),
113 incoming_message.port_name.clone()
114 ],
115 );
116 self.state.until_next_event = 0.0;
117 }
118
119 fn send_job(&mut self, services: &mut Services) -> Result<Vec<ModelMessage>, SimulationError> {
120 self.state.until_next_event = 0.0;
121 let completed_collection = self
122 .full_collection()
123 .ok_or(SimulationError::InvalidModelState)?
124 .0
125 .to_string();
126 self.state.collections.remove(&completed_collection);
127 Ok(self
128 .ports_out
129 .flow_paths
130 .clone()
131 .iter()
132 .fold(Vec::new(), |mut messages, flow_path| {
133 self.record(
134 services.global_time(),
135 String::from("Departure"),
136 format!["{} on {}", completed_collection.clone(), flow_path.clone()],
137 );
138 messages.push(ModelMessage {
139 port_name: flow_path.clone(),
140 content: completed_collection.clone(),
141 });
142 messages
143 }))
144 }
145
146 fn passivate(&mut self) -> Vec<ModelMessage> {
147 self.state.until_next_event = INFINITY;
148 Vec::new()
149 }
150
151 fn record(&mut self, time: f64, action: String, subject: String) {
152 if self.store_records {
153 self.state.records.push(ModelRecord {
154 time,
155 action,
156 subject,
157 });
158 }
159 }
160}
161
162#[cfg_attr(feature = "simx", event_rules)]
163impl DevsModel for ParallelGateway {
164 fn events_ext(
165 &mut self,
166 incoming_message: &ModelMessage,
167 services: &mut Services,
168 ) -> Result<(), SimulationError> {
169 match self.arrival_port(&incoming_message.port_name) {
170 ArrivalPort::FlowPath => Ok(self.increment_collection(incoming_message, services)),
171 ArrivalPort::Unknown => Err(SimulationError::InvalidMessage),
172 }
173 }
174
175 fn events_int(
176 &mut self,
177 services: &mut Services,
178 ) -> Result<Vec<ModelMessage>, SimulationError> {
179 match self.full_collection() {
180 Some(_) => self.send_job(services),
181 None => Ok(self.passivate()),
182 }
183 }
184
185 fn time_advance(&mut self, time_delta: f64) {
186 self.state.until_next_event -= time_delta;
187 }
188
189 fn until_next_event(&self) -> f64 {
190 self.state.until_next_event
191 }
192}
193
194impl Reportable for ParallelGateway {
195 fn status(&self) -> String {
196 String::from("Active")
197 }
198
199 fn records(&self) -> &Vec<ModelRecord> {
200 &self.state.records
201 }
202}
203
204impl ReportableModel for ParallelGateway {}