1use crate::durable::{Action, EventType, HistoryEvent};
2use chrono::{DateTime, Utc};
3use serde::Serialize;
4use serde_json::{to_string, Value};
5use sha1::Sha1;
6use uuid::Uuid;
7
8#[derive(Debug, Serialize, Default)]
9#[serde(rename_all = "camelCase")]
10struct ExecutionResult {
11 is_done: bool,
12 actions: Vec<Vec<Action>>,
13 output: Option<Value>,
14 custom_status: Option<Value>,
15 error: Option<String>,
16}
17
18#[doc(hidden)]
19pub struct OrchestrationState {
20 pub(crate) history: Vec<HistoryEvent>,
21 result: ExecutionResult,
22 started_index: usize,
23 completed_index: Option<usize>,
24 guid_counter: u32,
25}
26
27impl OrchestrationState {
28 pub(crate) fn new(history: Vec<HistoryEvent>) -> Self {
29 let started_index = history
30 .iter()
31 .position(|event| event.event_type == EventType::OrchestratorStarted)
32 .expect("failed to find orchestrator started event");
33
34 let completed_index = history[started_index..]
35 .iter()
36 .position(|event| event.event_type == EventType::OrchestratorCompleted)
37 .map(|pos| pos + started_index);
38
39 OrchestrationState {
40 history,
41 result: ExecutionResult::default(),
42 started_index,
43 completed_index,
44 guid_counter: 0,
45 }
46 }
47
48 pub(crate) fn is_replaying(&self) -> bool {
49 match self.completed_index {
50 Some(i) => self.history.len() != (i + 1),
51 None => false,
52 }
53 }
54
55 pub(crate) fn current_time(&self) -> DateTime<Utc> {
56 self.history[self.started_index].timestamp
57 }
58
59 pub(crate) fn push_action(&mut self, action: Action) {
60 if self.result.actions.is_empty() {
61 self.result.actions.push(Vec::new());
62 }
63
64 self.result.actions.last_mut().unwrap().push(action);
65 }
66
67 pub(crate) fn set_output(&mut self, value: Value) {
68 self.result.output = Some(value);
69 self.result.is_done = true;
70 }
71
72 pub(crate) fn set_custom_status(&mut self, value: Value) {
73 self.result.custom_status = Some(value);
74 }
75
76 pub(crate) fn result(&self) -> String {
77 to_string(&self.result).unwrap()
78 }
79
80 pub(crate) fn find_start_event(
81 &mut self,
82 name: &str,
83 event_type: EventType,
84 ) -> Option<(usize, &mut HistoryEvent)> {
85 let index = self.history.iter().position(|event| {
86 !event.is_processed
87 && event.event_type == event_type
88 && event.name.as_ref().map(|n| n.as_ref()) == Some(name)
89 })?;
90
91 Some((index, &mut self.history[index]))
92 }
93
94 pub(crate) fn find_end_event(
95 &mut self,
96 start_index: usize,
97 completed_type: EventType,
98 failed_type: Option<EventType>,
99 ) -> Option<(usize, &mut HistoryEvent)> {
100 if start_index + 1 >= self.history.len() {
101 return None;
102 }
103
104 let id = self.history[start_index].event_id;
105
106 let index = self.history[start_index + 1..]
107 .iter()
108 .position(|event| {
109 !event.is_processed
110 && (event.event_type == completed_type
111 || (failed_type.is_some()
112 && event.event_type == *failed_type.as_ref().unwrap()))
113 && event.task_scheduled_id == Some(id)
114 })
115 .map(|p| p + start_index + 1)?;
116
117 Some((index, &mut self.history[index]))
118 }
119
120 pub(crate) fn find_timer_created(&mut self) -> Option<(usize, &mut HistoryEvent)> {
121 let index = self
122 .history
123 .iter()
124 .position(|event| !event.is_processed && event.event_type == EventType::TimerCreated)?;
125
126 Some((index, &mut self.history[index]))
127 }
128
129 pub(crate) fn find_timer_fired(
130 &mut self,
131 created_index: usize,
132 ) -> Option<(usize, &mut HistoryEvent)> {
133 if created_index + 1 >= self.history.len() {
134 return None;
135 }
136
137 let id = self.history[created_index].event_id;
138
139 let index = self.history[created_index + 1..]
140 .iter()
141 .position(|event| {
142 !event.is_processed
143 && event.event_type == EventType::TimerFired
144 && event.timer_id == Some(id)
145 })
146 .map(|p| p + created_index + 1)?;
147
148 Some((index, &mut self.history[index]))
149 }
150
151 pub(crate) fn find_event_raised(&mut self, name: &str) -> Option<(usize, &mut HistoryEvent)> {
152 let index = self.history.iter().position(|event| {
153 !event.is_processed
154 && event.event_type == EventType::EventRaised
155 && event.name.as_ref().map(|n| n.as_ref()) == Some(name)
156 })?;
157
158 Some((index, &mut self.history[index]))
159 }
160
161 pub(crate) fn update(&mut self, event_index: usize) {
162 if self.started_index + 1 >= self.history.len() || self.completed_index.is_none() {
164 return;
165 }
166
167 while self.completed_index.unwrap() < event_index {
168 let started_index = self.history[self.started_index + 1..]
169 .iter()
170 .position(|event| event.event_type == EventType::OrchestratorStarted)
171 .map(|pos| pos + self.started_index + 1);
172
173 if started_index.is_none() {
174 return;
175 }
176
177 self.started_index = started_index.unwrap();
178 self.completed_index = self.history[self.started_index..]
179 .iter()
180 .position(|event| event.event_type == EventType::OrchestratorCompleted)
181 .map(|pos| pos + self.started_index);
182
183 self.result.actions.push(Vec::new());
184
185 if self.completed_index.is_none() {
186 return;
187 }
188 }
189 }
190
191 pub(crate) fn new_guid(&mut self, instance_id: &str) -> uuid::Uuid {
192 const GUID_NAMESPACE: &str = "9e952958-5e33-4daf-827f-2fa12937b875";
193
194 let mut hasher = Sha1::new();
195 hasher.update(
196 format!(
197 "{}_{}_{}",
198 instance_id,
199 self.current_time().to_string(),
200 self.guid_counter
201 )
202 .as_bytes(),
203 );
204
205 self.guid_counter += 1;
206
207 Uuid::new_v5(
208 &Uuid::parse_str(GUID_NAMESPACE).expect("failed to parse namespace GUID"),
209 &hasher.digest().bytes(),
210 )
211 }
212}
213
214#[cfg(test)]
215mod tests {
216 use super::*;
217 use crate::durable::tests::create_event;
218 use serde_json::json;
219
220 #[test]
221 #[should_panic(expected = "failed to find orchestrator started event")]
222 fn it_requires_an_orchestration_start_event() {
223 OrchestrationState::new(Vec::new());
224 }
225
226 #[test]
227 fn it_constructs() {
228 let history = vec![create_event(
229 EventType::OrchestratorStarted,
230 -1,
231 None,
232 None,
233 None,
234 )];
235
236 let timestamp = history[0].timestamp;
237
238 let state = OrchestrationState::new(history);
239
240 assert_eq!(state.current_time(), timestamp);
241 assert_eq!(state.is_replaying(), false);
242 }
243
244 #[test]
245 fn it_pushes_an_action() {
246 let history = vec![create_event(
247 EventType::OrchestratorStarted,
248 -1,
249 None,
250 None,
251 None,
252 )];
253
254 let mut state = OrchestrationState::new(history);
255
256 let action = Action::CallActivity {
257 function_name: "test".to_string(),
258 input: json!("hello"),
259 };
260
261 state.push_action(action.clone());
262
263 assert_eq!(state.result.actions.len(), 1);
264 assert_eq!(state.result.actions[0].len(), 1);
265 assert_eq!(state.result.actions[0][0], action);
266 }
267
268 #[test]
269 fn it_sets_done_with_output() {
270 let history = vec![create_event(
271 EventType::OrchestratorStarted,
272 -1,
273 None,
274 None,
275 None,
276 )];
277
278 let mut state = OrchestrationState::new(history);
279
280 state.set_output(json!(42));
281
282 assert!(state.result.is_done);
283 assert_eq!(state.result.output.as_ref().unwrap(), &json!(42));
284 }
285
286 #[test]
287 fn it_returns_a_json_result() {
288 let history = vec![create_event(
289 EventType::OrchestratorStarted,
290 -1,
291 None,
292 None,
293 None,
294 )];
295
296 let mut state = OrchestrationState::new(history);
297
298 state.push_action(Action::CallActivity {
299 function_name: "test".to_string(),
300 input: json!("hello"),
301 });
302
303 state.set_output(json!("hello"));
304
305 assert_eq!(
306 state.result(),
307 r#"{"isDone":true,"actions":[[{"actionType":"callActivity","functionName":"test","input":"hello"}]],"output":"hello","customStatus":null,"error":null}"#
308 );
309 }
310
311 #[test]
312 fn it_returns_none_if_scheduled_activity_is_not_in_history() {
313 let history = vec![create_event(
314 EventType::OrchestratorStarted,
315 -1,
316 None,
317 None,
318 None,
319 )];
320
321 let mut state = OrchestrationState::new(history);
322
323 assert_eq!(
324 state.find_start_event("foo", EventType::TaskScheduled),
325 None
326 );
327 }
328
329 #[test]
330 fn it_returns_some_if_scheduled_activity_is_in_history() {
331 let history = vec![
332 create_event(EventType::OrchestratorStarted, -1, None, None, None),
333 create_event(
334 EventType::TaskScheduled,
335 0,
336 Some("foo".to_string()),
337 None,
338 None,
339 ),
340 ];
341
342 let mut state = OrchestrationState::new(history);
343
344 match state.find_start_event("foo", EventType::TaskScheduled) {
345 Some((idx, entry)) => {
346 assert_eq!(idx, 1);
347 assert_eq!(entry.event_type, EventType::TaskScheduled);
348 }
349 None => assert!(false),
350 }
351 }
352
353 #[test]
354 fn it_returns_none_if_finished_activity_is_not_in_history() {
355 let history = vec![
356 create_event(EventType::OrchestratorStarted, -1, None, None, None),
357 create_event(
358 EventType::TaskScheduled,
359 0,
360 Some("foo".to_string()),
361 None,
362 None,
363 ),
364 ];
365
366 let mut state = OrchestrationState::new(history);
367
368 match state.find_start_event("foo", EventType::TaskScheduled) {
369 Some((idx, entry)) => {
370 assert_eq!(idx, 1);
371 assert_eq!(entry.event_type, EventType::TaskScheduled);
372 assert_eq!(
373 state.find_end_event(
374 idx,
375 EventType::TaskCompleted,
376 Some(EventType::TaskFailed)
377 ),
378 None
379 );
380 }
381 None => assert!(false),
382 }
383 }
384
385 #[test]
386 fn it_returns_some_if_completed_activity_is_in_history() {
387 let history = vec![
388 create_event(EventType::OrchestratorStarted, -1, None, None, None),
389 create_event(
390 EventType::TaskScheduled,
391 0,
392 Some("foo".to_string()),
393 None,
394 None,
395 ),
396 create_event(
397 EventType::TaskCompleted,
398 -1,
399 Some("foo".to_string()),
400 Some(json!("bar").to_string()),
401 Some(0),
402 ),
403 ];
404
405 let mut state = OrchestrationState::new(history);
406
407 match state.find_start_event("foo", EventType::TaskScheduled) {
408 Some((idx, entry)) => {
409 assert_eq!(idx, 1);
410 assert_eq!(entry.event_type, EventType::TaskScheduled);
411 match state.find_end_event(
412 idx,
413 EventType::TaskCompleted,
414 Some(EventType::TaskFailed),
415 ) {
416 Some((idx, entry)) => {
417 assert_eq!(idx, 2);
418 assert_eq!(entry.event_type, EventType::TaskCompleted);
419 assert_eq!(entry.result, Some(json!("bar").to_string()));
420 }
421 None => assert!(false),
422 }
423 }
424 None => assert!(false),
425 }
426 }
427
428 #[test]
429 fn it_returns_some_if_failed_activity_is_in_history() {
430 let history = vec![
431 create_event(EventType::OrchestratorStarted, -1, None, None, None),
432 create_event(
433 EventType::TaskScheduled,
434 0,
435 Some("foo".to_string()),
436 None,
437 None,
438 ),
439 create_event(
440 EventType::TaskFailed,
441 -1,
442 Some("foo".to_string()),
443 None,
444 Some(0),
445 ),
446 ];
447
448 let mut state = OrchestrationState::new(history);
449
450 match state.find_start_event("foo", EventType::TaskScheduled) {
451 Some((idx, entry)) => {
452 assert_eq!(idx, 1);
453 assert_eq!(entry.event_type, EventType::TaskScheduled);
454 match state.find_end_event(
455 idx,
456 EventType::TaskCompleted,
457 Some(EventType::TaskFailed),
458 ) {
459 Some((idx, entry)) => {
460 assert_eq!(idx, 2);
461 assert_eq!(entry.event_type, EventType::TaskFailed);
462 }
463 None => assert!(false),
464 }
465 }
466 None => assert!(false),
467 }
468 }
469
470 #[test]
471 fn it_does_not_update_state_if_there_is_no_completed_event() {
472 let history = vec![
473 create_event(EventType::OrchestratorStarted, -1, None, None, None),
474 create_event(
475 EventType::TaskScheduled,
476 0,
477 Some("foo".to_string()),
478 None,
479 None,
480 ),
481 create_event(
482 EventType::TaskFailed,
483 -1,
484 Some("foo".to_string()),
485 None,
486 Some(0),
487 ),
488 ];
489
490 let mut state = OrchestrationState::new(history);
491 assert!(!state.is_replaying());
492
493 let current_time = state.current_time();
494
495 state.update(2);
496
497 assert_eq!(state.current_time(), current_time);
498 assert!(!state.is_replaying());
499 }
500
501 #[test]
502 fn it_does_not_update_state_if_index_is_less_than_end() {
503 let history = vec![
504 create_event(EventType::OrchestratorStarted, -1, None, None, None),
505 create_event(
506 EventType::TaskScheduled,
507 0,
508 Some("foo".to_string()),
509 None,
510 None,
511 ),
512 create_event(
513 EventType::TaskFailed,
514 -1,
515 Some("foo".to_string()),
516 None,
517 Some(0),
518 ),
519 create_event(EventType::OrchestratorCompleted, -1, None, None, None),
520 create_event(EventType::OrchestratorStarted, -1, None, None, None),
521 ];
522
523 let mut state = OrchestrationState::new(history);
524 assert!(state.is_replaying());
525
526 let current_time = state.current_time();
527
528 state.update(2);
529
530 assert_eq!(state.current_time(), current_time);
531 assert!(state.is_replaying());
532 }
533
534 #[test]
535 fn it_updates_when_the_index_is_greater_with_end() {
536 let history = vec![
537 create_event(EventType::OrchestratorStarted, -1, None, None, None),
538 create_event(
539 EventType::TaskScheduled,
540 0,
541 Some("foo".to_string()),
542 None,
543 None,
544 ),
545 create_event(EventType::OrchestratorCompleted, -1, None, None, None),
546 create_event(EventType::OrchestratorStarted, -1, None, None, None),
547 create_event(
548 EventType::TaskFailed,
549 -1,
550 Some("foo".to_string()),
551 None,
552 Some(0),
553 ),
554 create_event(EventType::OrchestratorCompleted, -1, None, None, None),
555 create_event(EventType::OrchestratorStarted, -1, None, None, None),
556 ];
557
558 let mut state = OrchestrationState::new(history);
559 assert!(state.is_replaying());
560
561 let current_time = state.current_time();
562
563 state.update(4);
564
565 assert_ne!(state.current_time(), current_time);
566 assert!(state.is_replaying());
567 }
568
569 #[test]
570 fn it_updates_when_the_index_is_greater() {
571 let history = vec![
572 create_event(EventType::OrchestratorStarted, -1, None, None, None),
573 create_event(
574 EventType::TaskScheduled,
575 0,
576 Some("foo".to_string()),
577 None,
578 None,
579 ),
580 create_event(EventType::OrchestratorCompleted, -1, None, None, None),
581 create_event(EventType::OrchestratorStarted, -1, None, None, None),
582 create_event(
583 EventType::TaskFailed,
584 -1,
585 Some("foo".to_string()),
586 None,
587 Some(0),
588 ),
589 ];
590
591 let mut state = OrchestrationState::new(history);
592 assert!(state.is_replaying());
593
594 let current_time = state.current_time();
595
596 state.update(4);
597
598 assert_ne!(state.current_time(), current_time);
599 assert!(!state.is_replaying());
600 }
601}