sim/models/
parallel_gateway.rs

1use std::collections::HashMap;
2use std::f64::INFINITY;
3
4use serde::{Deserialize, Serialize};
5
6use super::model_trait::{DevsModel, Reportable, ReportableModel, SerializableModel};
7use super::{ModelMessage, ModelRecord};
8use crate::simulator::Services;
9use crate::utils::errors::SimulationError;
10
11use sim_derive::SerializableModel;
12
13#[cfg(feature = "simx")]
14use simx::event_rules;
15
16/// The parallel gateway splits a job across multiple processing paths. The
17/// job is duplicated across every one of the processing paths. In addition
18/// to splitting the process, a second parallel gateway can be used to join
19/// the split paths. The parallel gateway is a BPMN concept.
20#[derive(Debug, Clone, Serialize, Deserialize, SerializableModel)]
21#[serde(rename_all = "camelCase")]
22pub struct ParallelGateway {
23    ports_in: PortsIn,
24    ports_out: PortsOut,
25    #[serde(default)]
26    store_records: bool,
27    #[serde(default)]
28    state: State,
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
32#[serde(rename_all = "camelCase")]
33struct PortsIn {
34    flow_paths: Vec<String>,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
38enum ArrivalPort {
39    FlowPath,
40    Unknown,
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
44#[serde(rename_all = "camelCase")]
45struct PortsOut {
46    flow_paths: Vec<String>,
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
50#[serde(rename_all = "camelCase")]
51struct State {
52    until_next_event: f64,
53    collections: HashMap<String, usize>,
54    records: Vec<ModelRecord>,
55}
56
57impl Default for State {
58    fn default() -> Self {
59        Self {
60            until_next_event: INFINITY,
61            collections: HashMap::new(),
62            records: Vec::new(),
63        }
64    }
65}
66
67#[cfg_attr(feature = "simx", event_rules)]
68impl ParallelGateway {
69    pub fn new(
70        flow_paths_in: Vec<String>,
71        flow_paths_out: Vec<String>,
72        store_records: bool,
73    ) -> Self {
74        Self {
75            ports_in: PortsIn {
76                flow_paths: flow_paths_in,
77            },
78            ports_out: PortsOut {
79                flow_paths: flow_paths_out,
80            },
81            store_records,
82            state: State::default(),
83        }
84    }
85
86    fn arrival_port(&self, message_port: &str) -> ArrivalPort {
87        if self.ports_in.flow_paths.contains(&message_port.to_string()) {
88            ArrivalPort::FlowPath
89        } else {
90            ArrivalPort::Unknown
91        }
92    }
93
94    fn full_collection(&self) -> Option<(&String, &usize)> {
95        self.state
96            .collections
97            .iter()
98            .find(|(_, count)| **count == self.ports_in.flow_paths.len())
99    }
100
101    fn increment_collection(&mut self, incoming_message: &ModelMessage, services: &mut Services) {
102        *self
103            .state
104            .collections
105            .entry(incoming_message.content.clone())
106            .or_insert(0) += 1;
107        self.record(
108            services.global_time(),
109            String::from("Arrival"),
110            format![
111                "{} on {}",
112                incoming_message.content.clone(),
113                incoming_message.port_name.clone()
114            ],
115        );
116        self.state.until_next_event = 0.0;
117    }
118
119    fn send_job(&mut self, services: &mut Services) -> Result<Vec<ModelMessage>, SimulationError> {
120        self.state.until_next_event = 0.0;
121        let completed_collection = self
122            .full_collection()
123            .ok_or(SimulationError::InvalidModelState)?
124            .0
125            .to_string();
126        self.state.collections.remove(&completed_collection);
127        Ok(self
128            .ports_out
129            .flow_paths
130            .clone()
131            .iter()
132            .fold(Vec::new(), |mut messages, flow_path| {
133                self.record(
134                    services.global_time(),
135                    String::from("Departure"),
136                    format!["{} on {}", completed_collection.clone(), flow_path.clone()],
137                );
138                messages.push(ModelMessage {
139                    port_name: flow_path.clone(),
140                    content: completed_collection.clone(),
141                });
142                messages
143            }))
144    }
145
146    fn passivate(&mut self) -> Vec<ModelMessage> {
147        self.state.until_next_event = INFINITY;
148        Vec::new()
149    }
150
151    fn record(&mut self, time: f64, action: String, subject: String) {
152        if self.store_records {
153            self.state.records.push(ModelRecord {
154                time,
155                action,
156                subject,
157            });
158        }
159    }
160}
161
162#[cfg_attr(feature = "simx", event_rules)]
163impl DevsModel for ParallelGateway {
164    fn events_ext(
165        &mut self,
166        incoming_message: &ModelMessage,
167        services: &mut Services,
168    ) -> Result<(), SimulationError> {
169        match self.arrival_port(&incoming_message.port_name) {
170            ArrivalPort::FlowPath => Ok(self.increment_collection(incoming_message, services)),
171            ArrivalPort::Unknown => Err(SimulationError::InvalidMessage),
172        }
173    }
174
175    fn events_int(
176        &mut self,
177        services: &mut Services,
178    ) -> Result<Vec<ModelMessage>, SimulationError> {
179        match self.full_collection() {
180            Some(_) => self.send_job(services),
181            None => Ok(self.passivate()),
182        }
183    }
184
185    fn time_advance(&mut self, time_delta: f64) {
186        self.state.until_next_event -= time_delta;
187    }
188
189    fn until_next_event(&self) -> f64 {
190        self.state.until_next_event
191    }
192}
193
194impl Reportable for ParallelGateway {
195    fn status(&self) -> String {
196        String::from("Active")
197    }
198
199    fn records(&self) -> &Vec<ModelRecord> {
200        &self.state.records
201    }
202}
203
204impl ReportableModel for ParallelGateway {}