azure_functions/bindings/
durable_orchestration_context.rs

1use crate::{
2    durable::{
3        Action, ActionFuture, EventType, HistoryEvent, JoinAll, OrchestrationFuture,
4        OrchestrationState, RetryOptions, SelectAll,
5    },
6    rpc::{typed_data::Data, TypedData},
7};
8use chrono::{DateTime, Utc};
9use serde::Deserialize;
10use serde_json::{from_str, Value};
11use std::{cell::RefCell, collections::HashMap, rc::Rc};
12
13/// Represents the Durable Functions orchestration context binding.
14///
15/// The following binding attributes are supported:
16///
17/// | Name            | Description                                                           |
18/// |-----------------|-----------------------------------------------------------------------|
19/// | `name`          | The name of the parameter being bound.                                |
20/// | `orchestration` | The name of the orchestration.  Defaults to the name of the function. |
21///
22/// # Examples
23///
24/// Calling multiple activities and waiting for them all to complete:
25///
26/// ```rust
27/// use azure_functions::{bindings::DurableOrchestrationContext, durable::OrchestrationOutput, func};
28/// use serde_json::Value;
29/// use log::error;
30///
31/// #[func]
32/// pub async fn run(context: DurableOrchestrationContext) -> OrchestrationOutput {
33///     let activities = vec![
34///         context.call_activity("say_hello", "Tokyo"),
35///         context.call_activity("say_hello", "London"),
36///         context.call_activity("say_hello", "Seattle"),
37///     ];
38///
39///     context.set_custom_status("Waiting for all activities to complete.");
40///
41///     let result: Value = context
42///         .join_all(activities)
43///         .await
44///         .into_iter()
45///         .filter_map(|r| {
46///             r.map(Some).unwrap_or_else(|e| {
47///                 error!("Activity failed: {}", e);
48///                 None
49///             })
50///         })
51///         .collect::<Vec<_>>()
52///         .into();
53///
54///     context.set_custom_status("All activities have completed.");
55///
56///     result.into()
57/// }
58/// ```
59pub struct DurableOrchestrationContext {
60    /// The orchestration instance identifier.
61    pub instance_id: String,
62    /// The parent orchestration instance identifier.
63    pub parent_instance_id: Option<String>,
64    /// The input value to the orchestration.
65    pub input: Value,
66    state: Rc<RefCell<OrchestrationState>>,
67}
68
69impl DurableOrchestrationContext {
70    #[doc(hidden)]
71    pub fn new(data: TypedData, _metadata: HashMap<String, TypedData>) -> Self {
72        #[derive(Deserialize)]
73        #[serde(rename_all = "camelCase")]
74        struct BindingData {
75            instance_id: String,
76            parent_instance_id: Option<String>,
77            input: Value,
78            history: Vec<HistoryEvent>,
79        }
80
81        match &data.data {
82            Some(Data::String(s)) => {
83                let data: BindingData =
84                    from_str(s).expect("failed to parse orchestration context data");
85
86                DurableOrchestrationContext {
87                    instance_id: data.instance_id,
88                    parent_instance_id: data.parent_instance_id,
89                    input: data.input,
90                    state: Rc::new(RefCell::new(OrchestrationState::new(data.history))),
91                }
92            }
93            _ => panic!("expected JSON data for orchestration context data"),
94        }
95    }
96
97    /// Gets a value indicating whether the orchestrator function is currently replaying itself.
98    pub fn is_replaying(&self) -> bool {
99        self.state.borrow().is_replaying()
100    }
101
102    /// Gets the current date/time in a way that is safe for use by orchestrator functions.
103    pub fn current_time(&self) -> DateTime<Utc> {
104        self.state.borrow().current_time()
105    }
106
107    /// Sets the custom status of the orchestration.
108    pub fn set_custom_status<S>(&self, status: S)
109    where
110        S: Into<Value>,
111    {
112        self.state.borrow_mut().set_custom_status(status.into());
113    }
114
115    /// Create a new deterministic GUID suitable for use with orchestrations.
116    pub fn new_guid(&self) -> uuid::Uuid {
117        let mut state = self.state.borrow_mut();
118        state.new_guid(&self.instance_id)
119    }
120
121    #[doc(hidden)]
122    pub fn state(&self) -> Rc<RefCell<OrchestrationState>> {
123        self.state.clone()
124    }
125
126    /// Creates a future which represents a collection of the outputs of the futures given.
127    ///
128    /// The returned future will drive execution for all of its underlying futures,
129    /// collecting the results into a destination `Vec<T>` in the same order as they
130    /// were provided.
131    #[must_use = "futures do nothing unless you `.await` or poll them"]
132    pub fn join_all<I>(&self, iter: I) -> JoinAll<I::Item>
133    where
134        I: IntoIterator,
135        I::Item: OrchestrationFuture,
136    {
137        JoinAll::new(self.state.clone(), iter)
138    }
139
140    /// Creates a new future which will select over a list of futures.
141    ///
142    /// The returned future will wait for any future within `iter` to be ready. Upon
143    /// completion the item resolved will be returned, along with the index of the
144    /// future that was ready and the list of all the remaining futures.
145    ///
146    /// # Panics
147    ///
148    /// This function will panic if the iterator specified contains no items.
149    pub fn select_all<I>(&self, iter: I) -> SelectAll<I::Item>
150    where
151        I: IntoIterator,
152        I::Item: OrchestrationFuture,
153    {
154        SelectAll::new(self.state.clone(), iter)
155    }
156
157    /// Schedules an activity function for execution.
158    #[must_use = "futures do nothing unless you `.await` or poll them"]
159    pub fn call_activity<D>(
160        &self,
161        activity_name: &str,
162        data: D,
163    ) -> ActionFuture<Result<Value, String>>
164    where
165        D: Into<Value>,
166    {
167        self.perform_call_action(
168            Action::CallActivity {
169                function_name: activity_name.to_string(),
170                input: data.into(),
171            },
172            activity_name,
173            EventType::TaskScheduled,
174            EventType::TaskCompleted,
175            Some(EventType::TaskFailed),
176        )
177    }
178
179    /// Schedules an activity function for execution with retry options.
180    #[must_use = "futures do nothing unless you `.await` or poll them"]
181    pub fn call_activity_with_retry<D>(
182        &self,
183        activity_name: &str,
184        data: D,
185        retry_options: RetryOptions,
186    ) -> ActionFuture<Result<Value, String>>
187    where
188        D: Into<Value>,
189    {
190        self.perform_call_action(
191            Action::CallActivityWithRetry {
192                function_name: activity_name.to_string(),
193                retry_options,
194                input: data.into(),
195            },
196            activity_name,
197            EventType::TaskScheduled,
198            EventType::TaskCompleted,
199            Some(EventType::TaskFailed),
200        )
201    }
202
203    /// Schedules an orchestration function for execution.
204    #[must_use = "futures do nothing unless you `.await` or poll them"]
205    pub fn call_sub_orchestrator<D>(
206        &self,
207        function_name: &str,
208        instance_id: Option<String>,
209        data: D,
210    ) -> ActionFuture<Result<Value, String>>
211    where
212        D: Into<Value>,
213    {
214        self.perform_call_action(
215            Action::CallSubOrchestrator {
216                function_name: function_name.to_string(),
217                instance_id,
218                input: data.into(),
219            },
220            function_name,
221            EventType::SubOrchestrationInstanceCreated,
222            EventType::SubOrchestrationInstanceCompleted,
223            Some(EventType::SubOrchestrationInstanceFailed),
224        )
225    }
226
227    /// Schedules an orchestration function for execution with retry.
228    #[must_use = "futures do nothing unless you `.await` or poll them"]
229    pub fn call_sub_orchestrator_with_retry<D>(
230        &self,
231        function_name: &str,
232        instance_id: Option<String>,
233        data: D,
234        retry_options: RetryOptions,
235    ) -> ActionFuture<Result<Value, String>>
236    where
237        D: Into<Value>,
238    {
239        self.perform_call_action(
240            Action::CallSubOrchestratorWithRetry {
241                function_name: function_name.to_string(),
242                retry_options,
243                instance_id,
244                input: data.into(),
245            },
246            function_name,
247            EventType::SubOrchestrationInstanceCreated,
248            EventType::SubOrchestrationInstanceCompleted,
249            Some(EventType::SubOrchestrationInstanceFailed),
250        )
251    }
252
253    /// Restarts the orchestration by clearing its history.
254    pub fn continue_as_new<D>(&self, input: D, preserve_unprocessed_events: bool)
255    where
256        D: Into<Value>,
257    {
258        let mut state = self.state.borrow_mut();
259
260        state.push_action(Action::ContinueAsNew {
261            input: input.into(),
262            preserve_unprocessed_events,
263        });
264    }
265
266    /// Creates a durable timer that expires at a specified time.
267    #[must_use = "futures do nothing unless you `.await` or poll them"]
268    pub fn create_timer(&self, fire_at: DateTime<Utc>) -> ActionFuture<()> {
269        let mut state = self.state.borrow_mut();
270
271        state.push_action(Action::CreateTimer {
272            fire_at,
273            canceled: false,
274        });
275
276        let mut result = None;
277        let mut event_index = None;
278
279        if let Some((idx, created)) = state.find_timer_created() {
280            created.is_processed = true;
281
282            if let Some((idx, fired)) = state.find_timer_fired(idx) {
283                fired.is_processed = true;
284                event_index = Some(idx);
285                result = Some(());
286            }
287        }
288
289        ActionFuture::new(result, self.state.clone(), event_index)
290    }
291
292    /// Wait for an external event of the given name.
293    pub fn wait_for_event(&self, name: &str) -> ActionFuture<Result<Value, String>> {
294        let mut state = self.state.borrow_mut();
295
296        state.push_action(Action::WaitForExternalEvent {
297            external_event_name: name.to_string(),
298        });
299
300        let mut input = None;
301        let mut event_index = None;
302
303        if let Some((idx, raised)) = state.find_event_raised(name) {
304            raised.is_processed = true;
305            // For some reason, the data comes through as stringified JSON, so parse it
306            input = Some(Ok(raised
307                .input
308                .as_ref()
309                .map(|v| {
310                    v.as_str()
311                        .map(|s| from_str(&s).unwrap_or_default())
312                        .unwrap_or_default()
313                })
314                .unwrap_or_default()));
315            event_index = Some(idx);
316        }
317
318        ActionFuture::new(input, self.state.clone(), event_index)
319    }
320
321    fn perform_call_action(
322        &self,
323        action: Action,
324        name: &str,
325        started_type: EventType,
326        completed_type: EventType,
327        failed_type: Option<EventType>,
328    ) -> ActionFuture<Result<Value, String>> {
329        let mut state = self.state.borrow_mut();
330
331        state.push_action(action);
332
333        let mut result: Option<Result<Value, String>> = None;
334        let mut event_index = None;
335
336        if let Some((idx, scheduled)) = state.find_start_event(name, started_type) {
337            scheduled.is_processed = true;
338
339            if let Some((idx, finished)) = state.find_end_event(idx, completed_type, failed_type) {
340                finished.is_processed = true;
341                event_index = Some(idx);
342
343                if finished.event_type == completed_type {
344                    result = Some(Ok(finished
345                        .result
346                        .as_ref()
347                        .map(|s| from_str(&s).unwrap_or_default())
348                        .unwrap_or_default()));
349                } else if let Some(failed_type) = failed_type {
350                    if finished.event_type == failed_type {
351                        result = Some(Err(finished.reason.clone().unwrap_or_default()));
352                    }
353                } else {
354                    panic!("event must be a completion or a failure");
355                }
356            }
357        }
358
359        ActionFuture::new(result, self.state.clone(), event_index)
360    }
361}
362
363#[cfg(test)]
364mod tests {
365    use super::*;
366    use crate::durable::{EventType, HistoryEvent};
367    use crate::rpc::typed_data::Data;
368    use chrono::DateTime;
369
370    #[test]
371    #[should_panic(expected = "expected JSON data for orchestration context data")]
372    fn new_panics_if_no_data_provided() {
373        let data = TypedData { data: None };
374
375        let _ = DurableOrchestrationContext::new(data, HashMap::new());
376    }
377
378    #[test]
379    #[should_panic(expected = "failed to parse orchestration context data")]
380    fn new_panics_if_no_json_provided() {
381        let data = TypedData {
382            data: Some(Data::String(r#"{ }"#.to_owned())),
383        };
384
385        let _ = DurableOrchestrationContext::new(data, HashMap::new());
386    }
387
388    #[test]
389    #[should_panic(expected = "failed to find orchestrator started event")]
390    fn it_panics_if_missing_history() {
391        let data = TypedData {
392            data: Some(Data::String(
393                r#"{
394                "instanceId":"49497890673e4a75ab380e7a956c607b",
395                "isReplaying":false,
396                "parentInstanceId":"1234123412341234123412341234",
397                "input": [],
398                "history": []
399            }"#
400                .to_owned(),
401            )),
402        };
403
404        DurableOrchestrationContext::new(data, HashMap::new());
405    }
406
407    #[test]
408    fn new_constructs_an_orchestration_context_with_history() {
409        let data = TypedData {
410            data: Some(Data::String(
411                r#"{
412                "history":[
413                    {
414                       "EventType":12,
415                       "EventId":-1,
416                       "IsPlayed":true,
417                       "Timestamp":"2019-07-18T06:22:27.016757Z"
418                    },
419                    {
420                        "OrchestrationInstance":{
421                           "InstanceId":"49497890673e4a75ab380e7a956c607b",
422                           "ExecutionId":"5d2025984bef476bbaacefaa499a4f5f"
423                        },
424                        "EventType":0,
425                        "ParentInstance":null,
426                        "Name":"HelloWorld",
427                        "Version":"",
428                        "Input":"{}",
429                        "Tags":null,
430                        "EventId":-1,
431                        "IsPlayed":false,
432                       "Timestamp":"2019-07-18T06:22:26.626966Z"
433                    }
434                ],
435                "instanceId":"49497890673e4a75ab380e7a956c607b",
436                "isReplaying":false,
437                "parentInstanceId":null,
438                "input": []
439            }"#
440                .to_owned(),
441            )),
442        };
443
444        let context = DurableOrchestrationContext::new(data, HashMap::new());
445        assert_eq!(context.instance_id, "49497890673e4a75ab380e7a956c607b");
446        assert_eq!(context.parent_instance_id, None);
447        assert!(!context.is_replaying());
448        assert_eq!(context.input, serde_json::Value::Array(vec![]));
449        assert_eq!(
450            context.state.borrow().history,
451            vec![
452                HistoryEvent {
453                    event_type: EventType::OrchestratorStarted,
454                    event_id: -1,
455                    is_played: true,
456                    timestamp: DateTime::<Utc>::from(
457                        DateTime::parse_from_rfc3339("2019-07-18T06:22:27.016757Z").unwrap()
458                    ),
459                    is_processed: false,
460                    name: None,
461                    input: None,
462                    result: None,
463                    task_scheduled_id: None,
464                    instance_id: None,
465                    reason: None,
466                    details: None,
467                    fire_at: None,
468                    timer_id: None,
469                },
470                HistoryEvent {
471                    event_type: EventType::ExecutionStarted,
472                    event_id: -1,
473                    is_played: false,
474                    timestamp: DateTime::<Utc>::from(
475                        DateTime::parse_from_rfc3339("2019-07-18T06:22:26.626966Z").unwrap()
476                    ),
477                    is_processed: false,
478                    name: Some("HelloWorld".to_owned()),
479                    input: Some("{}".into()),
480                    result: None,
481                    task_scheduled_id: None,
482                    instance_id: None,
483                    reason: None,
484                    details: None,
485                    fire_at: None,
486                    timer_id: None,
487                }
488            ]
489        );
490    }
491}