wfrs_engine/
lib.rs

1use crate::state::WorkflowState;
2use async_recursion::async_recursion;
3use state::State;
4use wfrs_model::{Flow, Task, WorkflowDefinition};
5use wfrs_validator::ExclusiveGateway;
6pub mod state;
7
8pub struct Runtime<'a> {
9    pub entity_id: String,
10    pub definition: &'a WorkflowDefinition,
11    pub instance: WorkflowState,
12}
13
14impl<'a> Runtime<'a> {
15    pub fn new(definition: &'a WorkflowDefinition, entity_id: String) -> Self {
16        let instance = WorkflowState::new(definition.root_start_event());
17        Self {
18            entity_id,
19            definition,
20            instance,
21        }
22    }
23
24    pub fn with_state(mut self, state: State) -> Self {
25        self.instance = WorkflowState::from_state(state);
26        self
27    }
28
29    pub async fn replace(&self, state: State) {
30        self.instance.replace(state).await;
31    }
32
33    pub async fn complete(&self, task_id: i32) -> Result<(), String> {
34        let pending_task_idx = async {
35            let state = self.instance.state().await;
36            state
37                .inner
38                .pending_tasks
39                .iter()
40                .position(|pidx| pidx == &task_id)
41        }
42        .await;
43        if let Some(pending_task_idx) = pending_task_idx {
44            if let Some(usertask) = self.fetch_pending_task(pending_task_idx).await {
45                match &usertask.def {
46                    wfrs_model::TaskDef::UserTask(ev) => {
47                        self.visit_outgoing(&ev.outgoing).await;
48                        self.run().await;
49                        Ok(())
50                    }
51                    _ => Err(format!("task with id {pending_task_idx} is not a usertask")),
52                }
53            } else {
54                Err(format!("no pending usertask with id {task_id}"))
55            }
56        } else {
57            Err(format!("task with id {task_id} not found"))
58        }
59    }
60
61    async fn fetch_pending_task(&self, idx: usize) -> Option<&Task> {
62        self.definition
63            .tasks
64            .get(self.instance.pending_task_by_index(idx).await as usize)
65    }
66
67    async fn fetch_current_task(&self) -> Option<&Task> {
68        if let Some(current_task) = self.instance.pop_current_task().await {
69            self.definition.tasks.get(current_task as usize)
70        } else {
71            None
72        }
73    }
74
75    async fn fetch_current_flow(&self) -> Option<&Flow> {
76        if let Some(current_flow) = self.instance.pop_current_flow().await {
77            self.definition.flows.get(current_flow as usize)
78        } else {
79            None
80        }
81    }
82
83    async fn fetch_future_task(&self) -> Option<&Task> {
84        if let Some(future_task) = self.instance.pop_maybe_future_task().await {
85            self.definition.tasks.get(future_task as usize)
86        } else {
87            None
88        }
89    }
90
91    async fn fetch_future_flow(&self) -> Option<&Flow> {
92        if let Some(future_flow) = self.instance.pop_maybe_future_flow().await {
93            self.definition.flows.get(future_flow as usize)
94        } else {
95            None
96        }
97    }
98
99    async fn visit_outgoing(&self, outgoing: &[i32]) {
100        for outgoing in outgoing {
101            self.instance.push_current_flow(*outgoing).await;
102        }
103    }
104
105    async fn visit_future_outgoing(&self, outgoing: &[i32]) {
106        for outgoing in outgoing {
107            self.instance.push_maybe_future_flow(*outgoing).await;
108        }
109    }
110
111    #[async_recursion]
112    pub async fn run(&self) {
113        if let Some(current_task) = self.fetch_current_task().await {
114            match &current_task.def {
115                wfrs_model::TaskDef::StartEvent(ev) => {
116                    self.visit_outgoing(&ev.outgoing).await;
117                    self.run().await;
118                }
119                wfrs_model::TaskDef::UserTask(_) => {
120                    self.instance.push_pending_task(current_task.id).await;
121                    self.instance.push_visited_task(current_task.id).await;
122                }
123                wfrs_model::TaskDef::ExclusiveGateway(ev) => {
124                    let out = async {
125                        let state = self.instance.state().await;
126                        ExclusiveGateway(ev).evaluate(self.definition, &state.inner.variables)
127                    }
128                    .await;
129                    self.visit_outgoing(&out).await;
130                    self.run().await;
131                }
132                wfrs_model::TaskDef::EndEvent(_) => {
133                    self.instance.set_completed().await;
134                }
135            }
136        }
137
138        if let Some(current_flow) = self.fetch_current_flow().await {
139            self.instance.push_visited_flow(current_flow.id).await;
140            self.instance
141                .push_current_task(current_flow.target_ref)
142                .await;
143            self.instance
144                .push_visited_task(current_flow.source_ref)
145                .await;
146            self.run().await;
147        }
148    }
149
150    pub async fn set_default_active_task(&self) {
151        let mut state = self.instance.mut_state().await;
152        if let Some(idx) = state.inner.pending_tasks.first() {
153            state.inner.active = *idx;
154        } else {
155            state.inner.active = -1;
156        }
157    }
158
159    fn is_usertask(&self, task_id: i32) -> bool {
160        self.definition
161            .tasks
162            .get(task_id as usize)
163            .map(|ev| match ev.def {
164                wfrs_model::TaskDef::UserTask(_) => true,
165                _ => false,
166            })
167            .unwrap_or(false)
168    }
169
170    #[async_recursion]
171    async fn sim_run(&self) {
172        if let Some(future_task) = self.fetch_future_task().await {
173            match &future_task.def {
174                wfrs_model::TaskDef::StartEvent(ev) => {
175                    self.visit_future_outgoing(&ev.outgoing).await;
176                    self.sim_run().await;
177                }
178                wfrs_model::TaskDef::UserTask(ev) => {
179                    self.visit_future_outgoing(&ev.outgoing).await;
180                    self.sim_run().await;
181                }
182                wfrs_model::TaskDef::ExclusiveGateway(ev) => {
183                    let out = async {
184                        let state = self.instance.state().await;
185                        ExclusiveGateway(ev).evaluate(self.definition, &state.inner.variables)
186                    }
187                    .await;
188                    self.visit_future_outgoing(&out).await;
189                    self.sim_run().await;
190                }
191                wfrs_model::TaskDef::EndEvent(_) => {}
192            }
193        }
194
195        if let Some(future_flow) = self.fetch_future_flow().await {
196            self.instance
197                .push_maybe_future_task(future_flow.target_ref)
198                .await;
199            if self.is_usertask(future_flow.source_ref) {
200                self.instance
201                    .push_maybe_visited_task(future_flow.source_ref)
202                    .await;
203            }
204            self.sim_run().await;
205        }
206    }
207
208    pub async fn simulate(&self) {
209        self.instance
210            .clear_future(self.definition.root_start_event())
211            .await;
212        self.sim_run().await;
213    }
214
215    pub async fn navigate_to(&self, task_id: i32) {
216        if let Some(task) = self.definition.tasks.get(task_id as usize) {
217            let visited = self.instance.has_visited(task_id).await
218                && self.instance.has_maybe_visited(task_id).await;
219            if task.is_user_task() && visited {
220                self.instance.set_usertask(task_id).await;
221                self.simulate().await;
222            } else {
223                let next_user_task = self.get_previous_user_task(task_id).await;
224                if let Some(task_id) = next_user_task {
225                    let visited = self.instance.has_visited(task_id).await;
226                    if visited {
227                        self.instance.set_usertask(task_id).await;
228                        self.simulate().await;
229                    }
230                }
231            }
232        }
233    }
234
235    pub async fn get_previous_user_task(&self, task_id: i32) -> Option<i32> {
236        let mut task_id = task_id;
237        let mut result = None;
238        let state = self.instance.state().await;
239        loop {
240            let pos = state
241                .inner
242                .visited_tasks
243                .iter()
244                .position(|t| t == &task_id)
245                .and_then(|pos| {
246                    if pos > 0 {
247                        state.inner.visited_tasks.get(pos - 1)
248                    } else {
249                        None
250                    }
251                });
252            if let Some(pos) = pos {
253                if let Some(task) = self.definition.tasks.get(*pos as usize) {
254                    if task.is_user_task() {
255                        result = Some(*pos);
256                        break;
257                    } else {
258                        task_id = *pos;
259                    }
260                }
261            } else {
262                break;
263            }
264        }
265        result
266    }
267}