azure_functions/durable/
orchestration_state.rs

1use crate::durable::{Action, EventType, HistoryEvent};
2use chrono::{DateTime, Utc};
3use serde::Serialize;
4use serde_json::{to_string, Value};
5use sha1::Sha1;
6use uuid::Uuid;
7
8#[derive(Debug, Serialize, Default)]
9#[serde(rename_all = "camelCase")]
10struct ExecutionResult {
11    is_done: bool,
12    actions: Vec<Vec<Action>>,
13    output: Option<Value>,
14    custom_status: Option<Value>,
15    error: Option<String>,
16}
17
18#[doc(hidden)]
19pub struct OrchestrationState {
20    pub(crate) history: Vec<HistoryEvent>,
21    result: ExecutionResult,
22    started_index: usize,
23    completed_index: Option<usize>,
24    guid_counter: u32,
25}
26
27impl OrchestrationState {
28    pub(crate) fn new(history: Vec<HistoryEvent>) -> Self {
29        let started_index = history
30            .iter()
31            .position(|event| event.event_type == EventType::OrchestratorStarted)
32            .expect("failed to find orchestrator started event");
33
34        let completed_index = history[started_index..]
35            .iter()
36            .position(|event| event.event_type == EventType::OrchestratorCompleted)
37            .map(|pos| pos + started_index);
38
39        OrchestrationState {
40            history,
41            result: ExecutionResult::default(),
42            started_index,
43            completed_index,
44            guid_counter: 0,
45        }
46    }
47
48    pub(crate) fn is_replaying(&self) -> bool {
49        match self.completed_index {
50            Some(i) => self.history.len() != (i + 1),
51            None => false,
52        }
53    }
54
55    pub(crate) fn current_time(&self) -> DateTime<Utc> {
56        self.history[self.started_index].timestamp
57    }
58
59    pub(crate) fn push_action(&mut self, action: Action) {
60        if self.result.actions.is_empty() {
61            self.result.actions.push(Vec::new());
62        }
63
64        self.result.actions.last_mut().unwrap().push(action);
65    }
66
67    pub(crate) fn set_output(&mut self, value: Value) {
68        self.result.output = Some(value);
69        self.result.is_done = true;
70    }
71
72    pub(crate) fn set_custom_status(&mut self, value: Value) {
73        self.result.custom_status = Some(value);
74    }
75
76    pub(crate) fn result(&self) -> String {
77        to_string(&self.result).unwrap()
78    }
79
80    pub(crate) fn find_start_event(
81        &mut self,
82        name: &str,
83        event_type: EventType,
84    ) -> Option<(usize, &mut HistoryEvent)> {
85        let index = self.history.iter().position(|event| {
86            !event.is_processed
87                && event.event_type == event_type
88                && event.name.as_ref().map(|n| n.as_ref()) == Some(name)
89        })?;
90
91        Some((index, &mut self.history[index]))
92    }
93
94    pub(crate) fn find_end_event(
95        &mut self,
96        start_index: usize,
97        completed_type: EventType,
98        failed_type: Option<EventType>,
99    ) -> Option<(usize, &mut HistoryEvent)> {
100        if start_index + 1 >= self.history.len() {
101            return None;
102        }
103
104        let id = self.history[start_index].event_id;
105
106        let index = self.history[start_index + 1..]
107            .iter()
108            .position(|event| {
109                !event.is_processed
110                    && (event.event_type == completed_type
111                        || (failed_type.is_some()
112                            && event.event_type == *failed_type.as_ref().unwrap()))
113                    && event.task_scheduled_id == Some(id)
114            })
115            .map(|p| p + start_index + 1)?;
116
117        Some((index, &mut self.history[index]))
118    }
119
120    pub(crate) fn find_timer_created(&mut self) -> Option<(usize, &mut HistoryEvent)> {
121        let index = self
122            .history
123            .iter()
124            .position(|event| !event.is_processed && event.event_type == EventType::TimerCreated)?;
125
126        Some((index, &mut self.history[index]))
127    }
128
129    pub(crate) fn find_timer_fired(
130        &mut self,
131        created_index: usize,
132    ) -> Option<(usize, &mut HistoryEvent)> {
133        if created_index + 1 >= self.history.len() {
134            return None;
135        }
136
137        let id = self.history[created_index].event_id;
138
139        let index = self.history[created_index + 1..]
140            .iter()
141            .position(|event| {
142                !event.is_processed
143                    && event.event_type == EventType::TimerFired
144                    && event.timer_id == Some(id)
145            })
146            .map(|p| p + created_index + 1)?;
147
148        Some((index, &mut self.history[index]))
149    }
150
151    pub(crate) fn find_event_raised(&mut self, name: &str) -> Option<(usize, &mut HistoryEvent)> {
152        let index = self.history.iter().position(|event| {
153            !event.is_processed
154                && event.event_type == EventType::EventRaised
155                && event.name.as_ref().map(|n| n.as_ref()) == Some(name)
156        })?;
157
158        Some((index, &mut self.history[index]))
159    }
160
161    pub(crate) fn update(&mut self, event_index: usize) {
162        // Check for end of history
163        if self.started_index + 1 >= self.history.len() || self.completed_index.is_none() {
164            return;
165        }
166
167        while self.completed_index.unwrap() < event_index {
168            let started_index = self.history[self.started_index + 1..]
169                .iter()
170                .position(|event| event.event_type == EventType::OrchestratorStarted)
171                .map(|pos| pos + self.started_index + 1);
172
173            if started_index.is_none() {
174                return;
175            }
176
177            self.started_index = started_index.unwrap();
178            self.completed_index = self.history[self.started_index..]
179                .iter()
180                .position(|event| event.event_type == EventType::OrchestratorCompleted)
181                .map(|pos| pos + self.started_index);
182
183            self.result.actions.push(Vec::new());
184
185            if self.completed_index.is_none() {
186                return;
187            }
188        }
189    }
190
191    pub(crate) fn new_guid(&mut self, instance_id: &str) -> uuid::Uuid {
192        const GUID_NAMESPACE: &str = "9e952958-5e33-4daf-827f-2fa12937b875";
193
194        let mut hasher = Sha1::new();
195        hasher.update(
196            format!(
197                "{}_{}_{}",
198                instance_id,
199                self.current_time().to_string(),
200                self.guid_counter
201            )
202            .as_bytes(),
203        );
204
205        self.guid_counter += 1;
206
207        Uuid::new_v5(
208            &Uuid::parse_str(GUID_NAMESPACE).expect("failed to parse namespace GUID"),
209            &hasher.digest().bytes(),
210        )
211    }
212}
213
214#[cfg(test)]
215mod tests {
216    use super::*;
217    use crate::durable::tests::create_event;
218    use serde_json::json;
219
220    #[test]
221    #[should_panic(expected = "failed to find orchestrator started event")]
222    fn it_requires_an_orchestration_start_event() {
223        OrchestrationState::new(Vec::new());
224    }
225
226    #[test]
227    fn it_constructs() {
228        let history = vec![create_event(
229            EventType::OrchestratorStarted,
230            -1,
231            None,
232            None,
233            None,
234        )];
235
236        let timestamp = history[0].timestamp;
237
238        let state = OrchestrationState::new(history);
239
240        assert_eq!(state.current_time(), timestamp);
241        assert_eq!(state.is_replaying(), false);
242    }
243
244    #[test]
245    fn it_pushes_an_action() {
246        let history = vec![create_event(
247            EventType::OrchestratorStarted,
248            -1,
249            None,
250            None,
251            None,
252        )];
253
254        let mut state = OrchestrationState::new(history);
255
256        let action = Action::CallActivity {
257            function_name: "test".to_string(),
258            input: json!("hello"),
259        };
260
261        state.push_action(action.clone());
262
263        assert_eq!(state.result.actions.len(), 1);
264        assert_eq!(state.result.actions[0].len(), 1);
265        assert_eq!(state.result.actions[0][0], action);
266    }
267
268    #[test]
269    fn it_sets_done_with_output() {
270        let history = vec![create_event(
271            EventType::OrchestratorStarted,
272            -1,
273            None,
274            None,
275            None,
276        )];
277
278        let mut state = OrchestrationState::new(history);
279
280        state.set_output(json!(42));
281
282        assert!(state.result.is_done);
283        assert_eq!(state.result.output.as_ref().unwrap(), &json!(42));
284    }
285
286    #[test]
287    fn it_returns_a_json_result() {
288        let history = vec![create_event(
289            EventType::OrchestratorStarted,
290            -1,
291            None,
292            None,
293            None,
294        )];
295
296        let mut state = OrchestrationState::new(history);
297
298        state.push_action(Action::CallActivity {
299            function_name: "test".to_string(),
300            input: json!("hello"),
301        });
302
303        state.set_output(json!("hello"));
304
305        assert_eq!(
306            state.result(),
307            r#"{"isDone":true,"actions":[[{"actionType":"callActivity","functionName":"test","input":"hello"}]],"output":"hello","customStatus":null,"error":null}"#
308        );
309    }
310
311    #[test]
312    fn it_returns_none_if_scheduled_activity_is_not_in_history() {
313        let history = vec![create_event(
314            EventType::OrchestratorStarted,
315            -1,
316            None,
317            None,
318            None,
319        )];
320
321        let mut state = OrchestrationState::new(history);
322
323        assert_eq!(
324            state.find_start_event("foo", EventType::TaskScheduled),
325            None
326        );
327    }
328
329    #[test]
330    fn it_returns_some_if_scheduled_activity_is_in_history() {
331        let history = vec![
332            create_event(EventType::OrchestratorStarted, -1, None, None, None),
333            create_event(
334                EventType::TaskScheduled,
335                0,
336                Some("foo".to_string()),
337                None,
338                None,
339            ),
340        ];
341
342        let mut state = OrchestrationState::new(history);
343
344        match state.find_start_event("foo", EventType::TaskScheduled) {
345            Some((idx, entry)) => {
346                assert_eq!(idx, 1);
347                assert_eq!(entry.event_type, EventType::TaskScheduled);
348            }
349            None => assert!(false),
350        }
351    }
352
353    #[test]
354    fn it_returns_none_if_finished_activity_is_not_in_history() {
355        let history = vec![
356            create_event(EventType::OrchestratorStarted, -1, None, None, None),
357            create_event(
358                EventType::TaskScheduled,
359                0,
360                Some("foo".to_string()),
361                None,
362                None,
363            ),
364        ];
365
366        let mut state = OrchestrationState::new(history);
367
368        match state.find_start_event("foo", EventType::TaskScheduled) {
369            Some((idx, entry)) => {
370                assert_eq!(idx, 1);
371                assert_eq!(entry.event_type, EventType::TaskScheduled);
372                assert_eq!(
373                    state.find_end_event(
374                        idx,
375                        EventType::TaskCompleted,
376                        Some(EventType::TaskFailed)
377                    ),
378                    None
379                );
380            }
381            None => assert!(false),
382        }
383    }
384
385    #[test]
386    fn it_returns_some_if_completed_activity_is_in_history() {
387        let history = vec![
388            create_event(EventType::OrchestratorStarted, -1, None, None, None),
389            create_event(
390                EventType::TaskScheduled,
391                0,
392                Some("foo".to_string()),
393                None,
394                None,
395            ),
396            create_event(
397                EventType::TaskCompleted,
398                -1,
399                Some("foo".to_string()),
400                Some(json!("bar").to_string()),
401                Some(0),
402            ),
403        ];
404
405        let mut state = OrchestrationState::new(history);
406
407        match state.find_start_event("foo", EventType::TaskScheduled) {
408            Some((idx, entry)) => {
409                assert_eq!(idx, 1);
410                assert_eq!(entry.event_type, EventType::TaskScheduled);
411                match state.find_end_event(
412                    idx,
413                    EventType::TaskCompleted,
414                    Some(EventType::TaskFailed),
415                ) {
416                    Some((idx, entry)) => {
417                        assert_eq!(idx, 2);
418                        assert_eq!(entry.event_type, EventType::TaskCompleted);
419                        assert_eq!(entry.result, Some(json!("bar").to_string()));
420                    }
421                    None => assert!(false),
422                }
423            }
424            None => assert!(false),
425        }
426    }
427
428    #[test]
429    fn it_returns_some_if_failed_activity_is_in_history() {
430        let history = vec![
431            create_event(EventType::OrchestratorStarted, -1, None, None, None),
432            create_event(
433                EventType::TaskScheduled,
434                0,
435                Some("foo".to_string()),
436                None,
437                None,
438            ),
439            create_event(
440                EventType::TaskFailed,
441                -1,
442                Some("foo".to_string()),
443                None,
444                Some(0),
445            ),
446        ];
447
448        let mut state = OrchestrationState::new(history);
449
450        match state.find_start_event("foo", EventType::TaskScheduled) {
451            Some((idx, entry)) => {
452                assert_eq!(idx, 1);
453                assert_eq!(entry.event_type, EventType::TaskScheduled);
454                match state.find_end_event(
455                    idx,
456                    EventType::TaskCompleted,
457                    Some(EventType::TaskFailed),
458                ) {
459                    Some((idx, entry)) => {
460                        assert_eq!(idx, 2);
461                        assert_eq!(entry.event_type, EventType::TaskFailed);
462                    }
463                    None => assert!(false),
464                }
465            }
466            None => assert!(false),
467        }
468    }
469
470    #[test]
471    fn it_does_not_update_state_if_there_is_no_completed_event() {
472        let history = vec![
473            create_event(EventType::OrchestratorStarted, -1, None, None, None),
474            create_event(
475                EventType::TaskScheduled,
476                0,
477                Some("foo".to_string()),
478                None,
479                None,
480            ),
481            create_event(
482                EventType::TaskFailed,
483                -1,
484                Some("foo".to_string()),
485                None,
486                Some(0),
487            ),
488        ];
489
490        let mut state = OrchestrationState::new(history);
491        assert!(!state.is_replaying());
492
493        let current_time = state.current_time();
494
495        state.update(2);
496
497        assert_eq!(state.current_time(), current_time);
498        assert!(!state.is_replaying());
499    }
500
501    #[test]
502    fn it_does_not_update_state_if_index_is_less_than_end() {
503        let history = vec![
504            create_event(EventType::OrchestratorStarted, -1, None, None, None),
505            create_event(
506                EventType::TaskScheduled,
507                0,
508                Some("foo".to_string()),
509                None,
510                None,
511            ),
512            create_event(
513                EventType::TaskFailed,
514                -1,
515                Some("foo".to_string()),
516                None,
517                Some(0),
518            ),
519            create_event(EventType::OrchestratorCompleted, -1, None, None, None),
520            create_event(EventType::OrchestratorStarted, -1, None, None, None),
521        ];
522
523        let mut state = OrchestrationState::new(history);
524        assert!(state.is_replaying());
525
526        let current_time = state.current_time();
527
528        state.update(2);
529
530        assert_eq!(state.current_time(), current_time);
531        assert!(state.is_replaying());
532    }
533
534    #[test]
535    fn it_updates_when_the_index_is_greater_with_end() {
536        let history = vec![
537            create_event(EventType::OrchestratorStarted, -1, None, None, None),
538            create_event(
539                EventType::TaskScheduled,
540                0,
541                Some("foo".to_string()),
542                None,
543                None,
544            ),
545            create_event(EventType::OrchestratorCompleted, -1, None, None, None),
546            create_event(EventType::OrchestratorStarted, -1, None, None, None),
547            create_event(
548                EventType::TaskFailed,
549                -1,
550                Some("foo".to_string()),
551                None,
552                Some(0),
553            ),
554            create_event(EventType::OrchestratorCompleted, -1, None, None, None),
555            create_event(EventType::OrchestratorStarted, -1, None, None, None),
556        ];
557
558        let mut state = OrchestrationState::new(history);
559        assert!(state.is_replaying());
560
561        let current_time = state.current_time();
562
563        state.update(4);
564
565        assert_ne!(state.current_time(), current_time);
566        assert!(state.is_replaying());
567    }
568
569    #[test]
570    fn it_updates_when_the_index_is_greater() {
571        let history = vec![
572            create_event(EventType::OrchestratorStarted, -1, None, None, None),
573            create_event(
574                EventType::TaskScheduled,
575                0,
576                Some("foo".to_string()),
577                None,
578                None,
579            ),
580            create_event(EventType::OrchestratorCompleted, -1, None, None, None),
581            create_event(EventType::OrchestratorStarted, -1, None, None, None),
582            create_event(
583                EventType::TaskFailed,
584                -1,
585                Some("foo".to_string()),
586                None,
587                Some(0),
588            ),
589        ];
590
591        let mut state = OrchestrationState::new(history);
592        assert!(state.is_replaying());
593
594        let current_time = state.current_time();
595
596        state.update(4);
597
598        assert_ne!(state.current_time(), current_time);
599        assert!(!state.is_replaying());
600    }
601}