azure_functions_durable/
client.rs

1use crate::endpoint::Endpoint;
2use crate::error::ClientError;
3use crate::Result;
4use chrono::{DateTime, Utc};
5use futures::TryStreamExt;
6use hyper::{self, Body, Request, StatusCode};
7use serde::{Deserialize, Serialize};
8use serde_json::{from_slice, to_string, Value};
9use std::fmt::{Display, Formatter};
10use url::Url;
11
12/// Represents the runtime status of an orchestration.
13#[derive(Debug, Clone, Deserialize, PartialEq)]
14pub enum OrchestrationRuntimeStatus {
15    /// The orchestration is running.
16    Running,
17    /// The orchestration is pending.
18    Pending,
19    /// The orchestration has failed.
20    Failed,
21    /// The orchestration was canceled.
22    Canceled,
23    /// The orchestration was terminated.
24    Terminated,
25    /// The orchestration completed successfully.
26    Completed,
27}
28
29impl Display for OrchestrationRuntimeStatus {
30    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
31        write!(f, "{:?}", self)
32    }
33}
34
35/// Represents an orchestration history event.
36#[derive(Debug, Clone, Deserialize)]
37#[serde(rename_all = "PascalCase")]
38pub struct OrchestrationHistoryEvent {
39    /// The event type.
40    pub event_type: String,
41    /// The orchestration status for the event.
42    pub orchestration_status: Option<OrchestrationRuntimeStatus>,
43    /// The function name for the event.
44    pub function_name: Option<String>,
45    /// The result (output) for the event.
46    pub result: Option<Value>,
47    /// The scheduled time for the event.
48    pub scheduled_time: Option<DateTime<Utc>>,
49    /// The timestamp for the event.
50    pub timestamp: DateTime<Utc>,
51}
52
53/// Represents an orchestration's status.
54#[derive(Debug, Clone, Deserialize)]
55#[serde(rename_all = "camelCase")]
56pub struct OrchestrationStatus {
57    /// The runtime status of the orchestration.
58    pub runtime_status: OrchestrationRuntimeStatus,
59    /// The input of the orchestration.
60    pub input: Option<Value>,
61    /// The custom status of the orchestration.
62    pub custom_status: Option<Value>,
63    /// The output of the orchestration.
64    pub output: Option<Value>,
65    /// The created time of the orchestration.
66    pub created_time: DateTime<Utc>,
67    /// The event history of the orchestration.
68    pub history_events: Option<Vec<OrchestrationHistoryEvent>>,
69}
70
71/// Represents new orchestration data.
72#[derive(Debug, Clone, Serialize, Deserialize)]
73#[serde(rename_all = "camelCase")]
74pub struct OrchestrationData {
75    /// The orchestration instance id.
76    #[serde(rename = "id")]
77    pub instance_id: String,
78    /// The instance status query URI (GET).
79    pub status_query_get_uri: String,
80    /// The send event URI (POST).
81    pub send_event_post_uri: String,
82    /// The terminate instance URI (POST).
83    pub terminate_post_uri: String,
84    /// The purge history URI (DELETE).
85    pub purge_history_delete_uri: String,
86    /// The rewind URI (POST).
87    pub rewind_post_uri: Option<String>,
88}
89
90/// Represents the Durable Functions HTTP client.
91pub struct Client {
92    endpoint: Endpoint,
93    client: hyper::Client<hyper::client::HttpConnector>,
94}
95
96impl Client {
97    /// Creates a new client from the given status query URL.
98    pub fn new(status_query_url: &str) -> Self {
99        Self {
100            endpoint: Endpoint::new(
101                Url::parse(status_query_url).expect("expected a valid query URL"),
102            ),
103            client: hyper::Client::builder().build_http(),
104        }
105    }
106
107    /// Gets the task hub associated with the client.
108    pub fn task_hub(&self) -> &str {
109        self.endpoint.task_hub()
110    }
111
112    /// Gets the status of an orchestration instance.
113    pub async fn instance_status(
114        &self,
115        instance_id: &str,
116        show_history: bool,
117        show_history_output: bool,
118        show_input: bool,
119    ) -> Result<OrchestrationStatus> {
120        let mut url = self.endpoint.status_query_url(Some(instance_id));
121
122        url.query_pairs_mut()
123            .append_pair("showHistory", if show_history { "true" } else { "false " })
124            .append_pair(
125                "showHistoryOutput",
126                if show_history_output {
127                    "true"
128                } else {
129                    "false "
130                },
131            )
132            .append_pair("showInput", if show_input { "true" } else { "false " });
133
134        let req = Request::builder()
135            .method("GET")
136            .uri(url.into_string())
137            .header("Content-Type", "application/json")
138            .body(Body::empty())
139            .unwrap();
140
141        match self.client.request(req).await {
142            Ok(res) => match res.status() {
143                StatusCode::OK | StatusCode::ACCEPTED => {
144                    let body = res.into_body().try_concat().await;
145                    body.map(|b| {
146                        from_slice(&b).map_err(|e| {
147                            ClientError::Message(format!(
148                                "failed to deserialize orchestration status: {}",
149                                e
150                            ))
151                        })
152                    })
153                    .unwrap_or_else(|e| {
154                        Err(ClientError::Message(format!(
155                            "failed to read response: {}",
156                            e
157                        )))
158                    })
159                }
160                StatusCode::BAD_REQUEST => Err(ClientError::InstanceFailedOrTerminated),
161                StatusCode::NOT_FOUND => Err(ClientError::InstanceNotFound),
162                StatusCode::INTERNAL_SERVER_ERROR => Err(ClientError::InternalServerError),
163                _ => unreachable!("unexpected response from server"),
164            },
165            Err(e) => Err(ClientError::Message(format!(
166                "failed to send request: {}",
167                e
168            ))),
169        }
170    }
171
172    /// Queries the status for instances in a given date range or with runtime status.
173    #[allow(clippy::too_many_arguments)]
174    pub async fn query_instances<I>(
175        &self,
176        created_time_from: Option<DateTime<Utc>>,
177        created_time_to: Option<DateTime<Utc>>,
178        runtime_statuses: Option<I>,
179        top: Option<u32>,
180        show_history: bool,
181        show_history_output: bool,
182        show_input: bool,
183    ) -> Result<Vec<OrchestrationStatus>>
184    where
185        I: Iterator<Item = OrchestrationRuntimeStatus>,
186    {
187        let mut url = self.endpoint.status_query_url(None);
188
189        {
190            let mut query = url.query_pairs_mut();
191
192            created_time_from.map(|t| query.append_pair("createdTimeFrom", &t.to_rfc3339()));
193            created_time_to.map(|t| query.append_pair("createdTimeTo", &t.to_rfc3339()));
194            runtime_statuses.map(|s| {
195                query.append_pair(
196                    "runtimeStatus",
197                    &s.map(|s| s.to_string()).collect::<Vec<_>>().join(","),
198                )
199            });
200
201            top.map(|t| query.append_pair("top", &t.to_string()));
202
203            query
204                .append_pair("showHistory", if show_history { "true" } else { "false " })
205                .append_pair(
206                    "showHistoryOutput",
207                    if show_history_output {
208                        "true"
209                    } else {
210                        "false "
211                    },
212                )
213                .append_pair("showInput", if show_input { "true" } else { "false " });
214        }
215
216        let req = Request::builder()
217            .method("GET")
218            .uri(url.into_string())
219            .header("Content-Type", "application/json")
220            .body(Body::empty())
221            .unwrap();
222
223        match self.client.request(req).await {
224            Ok(res) => match res.status() {
225                StatusCode::OK | StatusCode::ACCEPTED => {
226                    let body = res.into_body().try_concat().await;
227                    body.map(|b| {
228                        from_slice(&b).map_err(|e| {
229                            ClientError::Message(format!(
230                                "failed to deserialize orchestration status: {}",
231                                e
232                            ))
233                        })
234                    })
235                    .unwrap_or_else(|e| {
236                        Err(ClientError::Message(format!(
237                            "failed to read response: {}",
238                            e
239                        )))
240                    })
241                }
242                StatusCode::BAD_REQUEST => Err(ClientError::InstanceFailedOrTerminated),
243                StatusCode::NOT_FOUND => Err(ClientError::InstanceNotFound),
244                StatusCode::INTERNAL_SERVER_ERROR => Err(ClientError::InternalServerError),
245                _ => unreachable!("unexpected response from server"),
246            },
247            Err(e) => Err(ClientError::Message(format!(
248                "failed to send request: {}",
249                e
250            ))),
251        }
252    }
253
254    /// Purges the history of the given orchestration instance.
255    pub async fn purge_history(&self, instance_id: &str) -> Result<()> {
256        let req = Request::builder()
257            .method("DELETE")
258            .uri(
259                self.endpoint
260                    .purge_history_url(Some(instance_id))
261                    .into_string(),
262            )
263            .header("Content-Type", "application/json")
264            .body(Body::empty())
265            .unwrap();
266
267        match self.client.request(req).await {
268            Ok(res) => match res.status() {
269                StatusCode::OK => Ok(()),
270                StatusCode::NOT_FOUND => Err(ClientError::InstanceNotFound),
271                _ => unreachable!("unexpected response from server"),
272            },
273            Err(e) => Err(ClientError::Message(format!(
274                "failed to send request: {}",
275                e
276            ))),
277        }
278    }
279
280    /// Purges the history of orchestrations matching the given date range or runtime statuses.
281    pub async fn purge_history_by_query<I>(
282        &self,
283        created_time_from: Option<DateTime<Utc>>,
284        created_time_to: Option<DateTime<Utc>>,
285        runtime_statuses: Option<I>,
286    ) -> Result<u32>
287    where
288        I: Iterator<Item = OrchestrationRuntimeStatus>,
289    {
290        let mut url = self.endpoint.purge_history_url(None);
291
292        {
293            let mut query = url.query_pairs_mut();
294
295            created_time_from.map(|t| query.append_pair("createdTimeFrom", &t.to_rfc3339()));
296            created_time_to.map(|t| query.append_pair("createdTimeTo", &t.to_rfc3339()));
297            runtime_statuses.map(|s| {
298                query.append_pair(
299                    "runtimeStatus",
300                    &s.map(|s| s.to_string()).collect::<Vec<_>>().join(","),
301                )
302            });
303        }
304
305        let req = Request::builder()
306            .method("DELETE")
307            .uri(url.into_string())
308            .header("Content-Type", "application/json")
309            .body(Body::empty())
310            .unwrap();
311
312        #[derive(Debug, Clone, Deserialize)]
313        #[serde(rename_all = "camelCase")]
314        struct PurgeHistoryResult {
315            instances_deleted: u32,
316        }
317
318        match self.client.request(req).await {
319            Ok(res) => match res.status() {
320                StatusCode::OK => {
321                    let body = res.into_body().try_concat().await;
322                    let result: PurgeHistoryResult = body
323                        .map(|b| {
324                            from_slice(&b).map_err(|e| {
325                                ClientError::Message(format!(
326                                    "failed to deserialize orchestration status: {}",
327                                    e
328                                ))
329                            })
330                        })
331                        .unwrap_or_else(|e| {
332                            Err(ClientError::Message(format!(
333                                "failed to read response: {}",
334                                e
335                            )))
336                        })?;
337
338                    Ok(result.instances_deleted)
339                }
340                StatusCode::NOT_FOUND => Err(ClientError::InstanceNotFound),
341                _ => unreachable!("unexpected response from server"),
342            },
343            Err(e) => Err(ClientError::Message(format!(
344                "failed to send request: {}",
345                e
346            ))),
347        }
348    }
349
350    /// Raises an event for the given orchestration instance.
351    pub async fn raise_event<D>(
352        &self,
353        instance_id: &str,
354        event_name: &str,
355        event_data: D,
356    ) -> Result<()>
357    where
358        D: Into<Value>,
359    {
360        let req = Request::builder()
361            .method("POST")
362            .uri(
363                self.endpoint
364                    .raise_event_url(instance_id, event_name)
365                    .into_string(),
366            )
367            .header("Content-Type", "application/json")
368            .body(Body::from(to_string(&event_data.into()).unwrap()))
369            .unwrap();
370
371        match self.client.request(req).await {
372            Ok(res) => match res.status() {
373                StatusCode::ACCEPTED => Ok(()),
374                StatusCode::NOT_FOUND => Err(ClientError::InstanceNotFound),
375                StatusCode::BAD_REQUEST => Err(ClientError::BadRequest),
376                StatusCode::GONE => Err(ClientError::InstanceCompletedOrFailed),
377                _ => unreachable!("unexpected response from server"),
378            },
379            Err(e) => Err(ClientError::Message(format!(
380                "failed to send request: {}",
381                e
382            ))),
383        }
384    }
385
386    /// Restores a failed orchestration instance into a running state by replaying the most recent failed operations.
387    pub async fn rewind(&self, instance_id: &str, reason: &str) -> Result<()> {
388        let req = Request::builder()
389            .method("POST")
390            .uri(self.endpoint.rewind_url(instance_id, reason).into_string())
391            .header("Content-Type", "application/json")
392            .body(Body::empty())
393            .unwrap();
394
395        match self.client.request(req).await {
396            Ok(res) => match res.status() {
397                StatusCode::ACCEPTED => Ok(()),
398                StatusCode::NOT_FOUND => Err(ClientError::InstanceNotFound),
399                StatusCode::BAD_REQUEST => Err(ClientError::BadRequest),
400                StatusCode::GONE => Err(ClientError::InstanceCompletedOrFailed),
401                _ => unreachable!("unexpected response from server"),
402            },
403            Err(e) => Err(ClientError::Message(format!(
404                "failed to send request: {}",
405                e
406            ))),
407        }
408    }
409
410    /// Starts a new orchestration by calling the given orchestration function.
411    pub async fn start_new<D>(
412        &self,
413        function_name: &str,
414        instance_id: Option<&str>,
415        input: D,
416    ) -> Result<OrchestrationData>
417    where
418        D: Into<Value>,
419    {
420        let req = Request::builder()
421            .method("POST")
422            .uri(
423                self.endpoint
424                    .create_new_instance_url(function_name, instance_id)
425                    .into_string(),
426            )
427            .header("Content-Type", "application/json")
428            .body(Body::from(input.into().to_string()))
429            .unwrap();
430
431        match self.client.request(req).await {
432            Ok(res) => match res.status() {
433                StatusCode::ACCEPTED => {
434                    let body = res.into_body().try_concat().await;
435                    body.map(|b| {
436                        from_slice(&b).map_err(|e| {
437                            ClientError::Message(format!(
438                                "failed to deserialize orchestration data: {}",
439                                e
440                            ))
441                        })
442                    })
443                    .unwrap_or_else(|e| {
444                        Err(ClientError::Message(format!(
445                            "failed to read response: {}",
446                            e
447                        )))
448                    })
449                }
450                StatusCode::BAD_REQUEST => Err(ClientError::BadCreateRequest),
451                _ => unreachable!("unexpected response from server"),
452            },
453            Err(e) => Err(ClientError::Message(format!(
454                "failed to send request: {}",
455                e
456            ))),
457        }
458    }
459
460    /// Terminates a running orchestration instance.
461    pub async fn terminate(&self, instance_id: &str, reason: &str) -> Result<()> {
462        let req = Request::builder()
463            .method("POST")
464            .uri(
465                self.endpoint
466                    .terminate_url(instance_id, reason)
467                    .into_string(),
468            )
469            .header("Content-Type", "application/json")
470            .body(Body::empty())
471            .unwrap();
472
473        match self.client.request(req).await {
474            Ok(res) => match res.status() {
475                StatusCode::ACCEPTED => Ok(()),
476                StatusCode::NOT_FOUND => Err(ClientError::InstanceNotFound),
477                StatusCode::GONE => Err(ClientError::InstanceCompletedOrFailed),
478                _ => unreachable!("unexpected response from server"),
479            },
480            Err(e) => Err(ClientError::Message(format!(
481                "failed to send request: {}",
482                e
483            ))),
484        }
485    }
486}
487
488#[cfg(test)]
489mod tests {
490    use super::*;
491    use chrono::offset::TimeZone;
492    use serde_json::from_str;
493
494    #[test]
495    fn test_instance_history() {
496        let h1: String = r#"{"EventType": "ExecutionStarted",
497          "FunctionName": "E1_HelloSequence",
498          "Timestamp": "2018-02-28T05:18:49Z"
499        }"#
500        .to_owned();
501
502        let compare_dt = Utc.ymd(2018, 2, 28).and_hms(5, 18, 49);
503
504        let h1_obj: OrchestrationHistoryEvent = from_str(&h1).unwrap();
505        assert_eq!(h1_obj.event_type, "ExecutionStarted");
506        assert_eq!(h1_obj.timestamp, compare_dt);
507
508        let h2: String = r#"{
509          "EventType": "ExecutionCompleted",
510          "OrchestrationStatus": "Completed",
511          "Result": [
512              "Hello Tokyo!",
513              "Hello Seattle!",
514              "Hello London!"
515          ],
516          "Timestamp": "2018-02-28T05:18:54.3660895Z"
517        }"#
518        .to_owned();
519
520        let h2_obj: OrchestrationHistoryEvent = from_str(&h2).unwrap();
521        assert_eq!(h2_obj.orchestration_status.is_some(), true);
522        assert_eq!(
523            h2_obj.orchestration_status.unwrap(),
524            OrchestrationRuntimeStatus::Completed
525        );
526    }
527
528    #[test]
529    fn test_instance_status() {
530        let example: String = r#"{
531            "createdTime": "2018-02-28T05:18:49Z",
532            "historyEvents": [
533            {
534                "EventType": "ExecutionStarted",
535                "FunctionName": "E1_HelloSequence",
536                "Timestamp": "2018-02-28T05:18:49.3452372Z"
537            },
538            {
539                "EventType": "TaskCompleted",
540                "FunctionName": "E1_SayHello",
541                "Result": "Hello Tokyo!",
542                "ScheduledTime": "2018-02-28T05:18:51.3939873Z",
543                "Timestamp": "2018-02-28T05:18:52.2895622Z"
544            },
545            {
546                "EventType": "TaskCompleted",
547                "FunctionName": "E1_SayHello",
548                "Result": "Hello Seattle!",
549                "ScheduledTime": "2018-02-28T05:18:52.8755705Z",
550                "Timestamp": "2018-02-28T05:18:53.1765771Z"
551            },
552            {
553                "EventType": "TaskCompleted",
554                "FunctionName": "E1_SayHello",
555                "Result": "Hello London!",
556                "ScheduledTime": "2018-02-28T05:18:53.5170791Z",
557                "Timestamp": "2018-02-28T05:18:53.891081Z"
558            },
559            {
560                "EventType": "ExecutionCompleted",
561                "OrchestrationStatus": "Completed",
562                "Result": [
563                    "Hello Tokyo!",
564                    "Hello Seattle!",
565                    "Hello London!"
566                ],
567                "Timestamp": "2018-02-28T05:18:54.3660895Z"
568            }
569        ],
570        "input": null,
571        "customStatus": { "nextActions": ["A", "B", "C"], "foo": 2 },
572        "lastUpdatedTime": "2018-02-28T05:18:54Z",
573        "output": [
574            "Hello Tokyo!",
575            "Hello Seattle!",
576            "Hello London!"
577        ],
578        "runtimeStatus": "Completed"
579        }"#
580        .to_owned();
581
582        let instance_status: OrchestrationStatus = from_str(&example).unwrap();
583        assert_eq!(instance_status.history_events.is_some(), true);
584        assert_eq!(instance_status.history_events.unwrap().len(), 5);
585
586        assert_eq!(instance_status.custom_status.is_some(), true);
587        assert_eq!(instance_status.custom_status.unwrap().is_object(), true);
588    }
589}