1use crate::state::WorkflowState;
2use async_recursion::async_recursion;
3use state::State;
4use wfrs_model::{Flow, Task, WorkflowDefinition};
5use wfrs_validator::ExclusiveGateway;
6pub mod state;
7
8pub struct Runtime<'a> {
9 pub entity_id: String,
10 pub definition: &'a WorkflowDefinition,
11 pub instance: WorkflowState,
12}
13
14impl<'a> Runtime<'a> {
15 pub fn new(definition: &'a WorkflowDefinition, entity_id: String) -> Self {
16 let instance = WorkflowState::new(definition.root_start_event());
17 Self {
18 entity_id,
19 definition,
20 instance,
21 }
22 }
23
24 pub fn with_state(mut self, state: State) -> Self {
25 self.instance = WorkflowState::from_state(state);
26 self
27 }
28
29 pub async fn replace(&self, state: State) {
30 self.instance.replace(state).await;
31 }
32
33 pub async fn complete(&self, task_id: i32) -> Result<(), String> {
34 let pending_task_idx = async {
35 let state = self.instance.state().await;
36 state
37 .inner
38 .pending_tasks
39 .iter()
40 .position(|pidx| pidx == &task_id)
41 }
42 .await;
43 if let Some(pending_task_idx) = pending_task_idx {
44 if let Some(usertask) = self.fetch_pending_task(pending_task_idx).await {
45 match &usertask.def {
46 wfrs_model::TaskDef::UserTask(ev) => {
47 self.visit_outgoing(&ev.outgoing).await;
48 self.run().await;
49 Ok(())
50 }
51 _ => Err(format!("task with id {pending_task_idx} is not a usertask")),
52 }
53 } else {
54 Err(format!("no pending usertask with id {task_id}"))
55 }
56 } else {
57 Err(format!("task with id {task_id} not found"))
58 }
59 }
60
61 async fn fetch_pending_task(&self, idx: usize) -> Option<&Task> {
62 self.definition
63 .tasks
64 .get(self.instance.pending_task_by_index(idx).await as usize)
65 }
66
67 async fn fetch_current_task(&self) -> Option<&Task> {
68 if let Some(current_task) = self.instance.pop_current_task().await {
69 self.definition.tasks.get(current_task as usize)
70 } else {
71 None
72 }
73 }
74
75 async fn fetch_current_flow(&self) -> Option<&Flow> {
76 if let Some(current_flow) = self.instance.pop_current_flow().await {
77 self.definition.flows.get(current_flow as usize)
78 } else {
79 None
80 }
81 }
82
83 async fn fetch_future_task(&self) -> Option<&Task> {
84 if let Some(future_task) = self.instance.pop_maybe_future_task().await {
85 self.definition.tasks.get(future_task as usize)
86 } else {
87 None
88 }
89 }
90
91 async fn fetch_future_flow(&self) -> Option<&Flow> {
92 if let Some(future_flow) = self.instance.pop_maybe_future_flow().await {
93 self.definition.flows.get(future_flow as usize)
94 } else {
95 None
96 }
97 }
98
99 async fn visit_outgoing(&self, outgoing: &[i32]) {
100 for outgoing in outgoing {
101 self.instance.push_current_flow(*outgoing).await;
102 }
103 }
104
105 async fn visit_future_outgoing(&self, outgoing: &[i32]) {
106 for outgoing in outgoing {
107 self.instance.push_maybe_future_flow(*outgoing).await;
108 }
109 }
110
111 #[async_recursion]
112 pub async fn run(&self) {
113 if let Some(current_task) = self.fetch_current_task().await {
114 match ¤t_task.def {
115 wfrs_model::TaskDef::StartEvent(ev) => {
116 self.visit_outgoing(&ev.outgoing).await;
117 self.run().await;
118 }
119 wfrs_model::TaskDef::UserTask(_) => {
120 self.instance.push_pending_task(current_task.id).await;
121 self.instance.push_visited_task(current_task.id).await;
122 }
123 wfrs_model::TaskDef::ExclusiveGateway(ev) => {
124 let out = async {
125 let state = self.instance.state().await;
126 ExclusiveGateway(ev).evaluate(self.definition, &state.inner.variables)
127 }
128 .await;
129 self.visit_outgoing(&out).await;
130 self.run().await;
131 }
132 wfrs_model::TaskDef::EndEvent(_) => {
133 self.instance.set_completed().await;
134 }
135 }
136 }
137
138 if let Some(current_flow) = self.fetch_current_flow().await {
139 self.instance.push_visited_flow(current_flow.id).await;
140 self.instance
141 .push_current_task(current_flow.target_ref)
142 .await;
143 self.instance
144 .push_visited_task(current_flow.source_ref)
145 .await;
146 self.run().await;
147 }
148 }
149
150 pub async fn set_default_active_task(&self) {
151 let mut state = self.instance.mut_state().await;
152 if let Some(idx) = state.inner.pending_tasks.first() {
153 state.inner.active = *idx;
154 } else {
155 state.inner.active = -1;
156 }
157 }
158
159 fn is_usertask(&self, task_id: i32) -> bool {
160 self.definition
161 .tasks
162 .get(task_id as usize)
163 .map(|ev| match ev.def {
164 wfrs_model::TaskDef::UserTask(_) => true,
165 _ => false,
166 })
167 .unwrap_or(false)
168 }
169
170 #[async_recursion]
171 async fn sim_run(&self) {
172 if let Some(future_task) = self.fetch_future_task().await {
173 match &future_task.def {
174 wfrs_model::TaskDef::StartEvent(ev) => {
175 self.visit_future_outgoing(&ev.outgoing).await;
176 self.sim_run().await;
177 }
178 wfrs_model::TaskDef::UserTask(ev) => {
179 self.visit_future_outgoing(&ev.outgoing).await;
180 self.sim_run().await;
181 }
182 wfrs_model::TaskDef::ExclusiveGateway(ev) => {
183 let out = async {
184 let state = self.instance.state().await;
185 ExclusiveGateway(ev).evaluate(self.definition, &state.inner.variables)
186 }
187 .await;
188 self.visit_future_outgoing(&out).await;
189 self.sim_run().await;
190 }
191 wfrs_model::TaskDef::EndEvent(_) => {}
192 }
193 }
194
195 if let Some(future_flow) = self.fetch_future_flow().await {
196 self.instance
197 .push_maybe_future_task(future_flow.target_ref)
198 .await;
199 if self.is_usertask(future_flow.source_ref) {
200 self.instance
201 .push_maybe_visited_task(future_flow.source_ref)
202 .await;
203 }
204 self.sim_run().await;
205 }
206 }
207
208 pub async fn simulate(&self) {
209 self.instance
210 .clear_future(self.definition.root_start_event())
211 .await;
212 self.sim_run().await;
213 }
214
215 pub async fn navigate_to(&self, task_id: i32) {
216 if let Some(task) = self.definition.tasks.get(task_id as usize) {
217 let visited = self.instance.has_visited(task_id).await
218 && self.instance.has_maybe_visited(task_id).await;
219 if task.is_user_task() && visited {
220 self.instance.set_usertask(task_id).await;
221 self.simulate().await;
222 } else {
223 let next_user_task = self.get_previous_user_task(task_id).await;
224 if let Some(task_id) = next_user_task {
225 let visited = self.instance.has_visited(task_id).await;
226 if visited {
227 self.instance.set_usertask(task_id).await;
228 self.simulate().await;
229 }
230 }
231 }
232 }
233 }
234
235 pub async fn get_previous_user_task(&self, task_id: i32) -> Option<i32> {
236 let mut task_id = task_id;
237 let mut result = None;
238 let state = self.instance.state().await;
239 loop {
240 let pos = state
241 .inner
242 .visited_tasks
243 .iter()
244 .position(|t| t == &task_id)
245 .and_then(|pos| {
246 if pos > 0 {
247 state.inner.visited_tasks.get(pos - 1)
248 } else {
249 None
250 }
251 });
252 if let Some(pos) = pos {
253 if let Some(task) = self.definition.tasks.get(*pos as usize) {
254 if task.is_user_task() {
255 result = Some(*pos);
256 break;
257 } else {
258 task_id = *pos;
259 }
260 }
261 } else {
262 break;
263 }
264 }
265 result
266 }
267}