azure_functions/bindings/
durable_orchestration_client.rs

1use crate::http::Body;
2use crate::rpc::{typed_data::Data, TypedData};
3use azure_functions_durable::{
4    Client, OrchestrationData, OrchestrationRuntimeStatus, OrchestrationStatus, Result,
5};
6use chrono::{DateTime, Utc};
7use serde::Deserialize;
8use serde_json::{from_str, to_value, Value};
9
10/// Represents the Durable Functions orchestration client input binding.
11///
12/// The following binding attributes are supported:
13///
14/// | Name         | Description                                                                                                                                 |
15/// |--------------|---------------------------------------------------------------------------------------------------------------------------------------------|
16/// | `name`       | The name of the parameter being bound.                                                                                                      |
17/// | `task_hub`   | The name of the task hub to use.  Defaults to the value from host.json                                                                      |
18/// | `connection` | The name of an app setting that contains a storage account connection string. Defaults to the storage account for the function application. |
19///
20/// # Examples
21///
22/// Starting a new orchestration:
23///
24/// ```rust
25/// use azure_functions::{
26///     bindings::{DurableOrchestrationClient, HttpRequest, HttpResponse},
27///     func,
28/// };
29/// use serde_json::Value;
30///
31/// #[func]
32/// pub async fn start(_req: HttpRequest, client: DurableOrchestrationClient) -> HttpResponse {
33///     match client
34///         .start_new(
35///             "orchestration",
36///             None,
37///             Value::Null,
38///         )
39///         .await
40///     {
41///         Ok(data) => data.into(),
42///         Err(e) => format!("Failed to start orchestration: {}", e).into(),
43///     }
44/// }
45/// ```
46pub struct DurableOrchestrationClient {
47    client: Client,
48}
49
50impl DurableOrchestrationClient {
51    /// Gets the status of an orchestration instance.
52    pub async fn instance_status(
53        &self,
54        instance_id: &str,
55        show_history: bool,
56        show_history_output: bool,
57        show_input: bool,
58    ) -> Result<OrchestrationStatus> {
59        self.client
60            .instance_status(instance_id, show_history, show_history_output, show_input)
61            .await
62    }
63
64    /// Queries the status for instances in a given date range or with runtime status.
65    #[allow(clippy::too_many_arguments)]
66    pub async fn query_instances<I>(
67        &self,
68        created_time_from: Option<DateTime<Utc>>,
69        created_time_to: Option<DateTime<Utc>>,
70        runtime_statuses: Option<I>,
71        top: Option<u32>,
72        show_history: bool,
73        show_history_output: bool,
74        show_input: bool,
75    ) -> Result<Vec<OrchestrationStatus>>
76    where
77        I: Iterator<Item = OrchestrationRuntimeStatus>,
78    {
79        self.client
80            .query_instances(
81                created_time_from,
82                created_time_to,
83                runtime_statuses,
84                top,
85                show_history,
86                show_history_output,
87                show_input,
88            )
89            .await
90    }
91
92    /// Purges the history of the given orchestration instance.
93    pub async fn purge_history(&self, instance_id: &str) -> Result<()> {
94        self.client.purge_history(instance_id).await
95    }
96
97    /// Purges the history of orchestrations matching the given date range or runtime statuses.
98    pub async fn purge_history_by_query<I>(
99        &self,
100        created_time_from: Option<DateTime<Utc>>,
101        created_time_to: Option<DateTime<Utc>>,
102        runtime_statuses: Option<I>,
103    ) -> Result<u32>
104    where
105        I: Iterator<Item = OrchestrationRuntimeStatus>,
106    {
107        self.client
108            .purge_history_by_query(created_time_from, created_time_to, runtime_statuses)
109            .await
110    }
111
112    /// Raises an event for the given orchestration instance.
113    pub async fn raise_event<D>(
114        &self,
115        instance_id: &str,
116        event_name: &str,
117        event_data: D,
118    ) -> Result<()>
119    where
120        D: Into<Value>,
121    {
122        self.client
123            .raise_event(instance_id, event_name, event_data)
124            .await
125    }
126
127    /// Restores a failed orchestration instance into a running state by replaying the most recent failed operations.
128    pub async fn rewind(&self, instance_id: &str, reason: &str) -> Result<()> {
129        self.client.rewind(instance_id, reason).await
130    }
131
132    /// Starts a new orchestration by calling the given orchestration function.
133    pub async fn start_new<D>(
134        &self,
135        function_name: &str,
136        instance_id: Option<&str>,
137        input: D,
138    ) -> Result<OrchestrationData>
139    where
140        D: Into<Value>,
141    {
142        self.client
143            .start_new(function_name, instance_id, input)
144            .await
145    }
146
147    /// Terminates a running orchestration instance.
148    pub async fn terminate(&self, instance_id: &str, reason: &str) -> Result<()> {
149        self.client.terminate(instance_id, reason).await
150    }
151}
152
153#[doc(hidden)]
154impl From<TypedData> for DurableOrchestrationClient {
155    fn from(data: TypedData) -> Self {
156        #[derive(Debug, Clone, Deserialize)]
157        #[serde(rename_all = "camelCase")]
158        struct ManagementUrls {
159            #[serde(rename = "statusQueryGetUri")]
160            status_query_url: String,
161        }
162
163        #[derive(Deserialize)]
164        #[serde(rename_all = "camelCase")]
165        struct BindingData {
166            management_urls: ManagementUrls,
167        }
168
169        let data: BindingData = match &data.data {
170            Some(Data::String(s)) => {
171                from_str(s).expect("failed to parse durable orchestration client data")
172            }
173            _ => panic!("expected string data for durable orchestration client"),
174        };
175
176        DurableOrchestrationClient {
177            client: Client::new(&data.management_urls.status_query_url),
178        }
179    }
180}
181
182impl<'a> Into<Body<'a>> for OrchestrationData {
183    fn into(self) -> Body<'a> {
184        to_value(&self).unwrap().into()
185    }
186}
187
188#[cfg(test)]
189mod tests {
190    use super::*;
191
192    #[test]
193    fn it_converts_from_typed_data() {
194        let data = TypedData {
195            data: Some(Data::String(r#"{"taskHubName":"DurableFunctionsHub","creationUrls":{"createNewInstancePostUri":"http://localhost:8080/runtime/webhooks/durabletask/orchestrators/{functionName}[/{instanceId}]?code=foo","createAndWaitOnNewInstancePostUri":"http://localhost:8080/runtime/webhooks/durabletask/orchestrators/{functionName}[/{instanceId}]?timeout={timeoutInSeconds}&pollingInterval={intervalInSeconds}&code=foo"},"managementUrls":{"id":"INSTANCEID","statusQueryGetUri":"http://localhost:8080/runtime/webhooks/durabletask/instances/INSTANCEID?taskHub=DurableFunctionsHub&connection=Storage&code=foo","sendEventPostUri":"http://localhost:8080/runtime/webhooks/durabletask/instances/INSTANCEID/raiseEvent/{eventName}?taskHub=DurableFunctionsHub&connection=Storage&code=foo","terminatePostUri":"http://localhost:8080/runtime/webhooks/durabletask/instances/INSTANCEID/terminate?reason={text}&taskHub=DurableFunctionsHub&connection=Storage&code=foo","rewindPostUri":"http://localhost:8080/runtime/webhooks/durabletask/instances/INSTANCEID/rewind?reason={text}&taskHub=DurableFunctionsHub&connection=Storage&code=foo","purgeHistoryDeleteUri":"http://localhost:8080/runtime/webhooks/durabletask/instances/INSTANCEID?taskHub=DurableFunctionsHub&connection=Storage&code=foo"}}"#.to_owned())),
196        };
197
198        let client: DurableOrchestrationClient = data.into();
199        assert_eq!(client.client.task_hub(), "DurableFunctionsHub");
200    }
201}