azure-functions 0.11.0

Azure Functions for Rust
Documentation
use crate::http::Body;
use crate::rpc::{typed_data::Data, TypedData};
use azure_functions_durable::{
    Client, OrchestrationData, OrchestrationRuntimeStatus, OrchestrationStatus, Result,
};
use chrono::{DateTime, Utc};
use serde::Deserialize;
use serde_json::{from_str, to_value, Value};

/// Represents the Durable Functions orchestration client input binding.
///
/// The following binding attributes are supported:
///
/// | Name         | Description                                                                                                                                 |
/// |--------------|---------------------------------------------------------------------------------------------------------------------------------------------|
/// | `name`       | The name of the parameter being bound.                                                                                                      |
/// | `task_hub`   | The name of the task hub to use.  Defaults to the value from host.json                                                                      |
/// | `connection` | The name of an app setting that contains a storage account connection string. Defaults to the storage account for the function application. |
///
/// # Examples
///
/// Starting a new orchestration:
///
/// ```rust
/// use azure_functions::{
///     bindings::{DurableOrchestrationClient, HttpRequest, HttpResponse},
///     func,
/// };
/// use serde_json::Value;
///
/// #[func]
/// pub async fn start(_req: HttpRequest, client: DurableOrchestrationClient) -> HttpResponse {
///     match client
///         .start_new(
///             "orchestration",
///             None,
///             Value::Null,
///         )
///         .await
///     {
///         Ok(data) => data.into(),
///         Err(e) => format!("Failed to start orchestration: {}", e).into(),
///     }
/// }
/// ```
pub struct DurableOrchestrationClient {
    client: Client,
}

impl DurableOrchestrationClient {
    /// Gets the status of an orchestration instance.
    pub async fn instance_status(
        &self,
        instance_id: &str,
        show_history: bool,
        show_history_output: bool,
        show_input: bool,
    ) -> Result<OrchestrationStatus> {
        self.client
            .instance_status(instance_id, show_history, show_history_output, show_input)
            .await
    }

    /// Queries the status for instances in a given date range or with runtime status.
    #[allow(clippy::too_many_arguments)]
    pub async fn query_instances<I>(
        &self,
        created_time_from: Option<DateTime<Utc>>,
        created_time_to: Option<DateTime<Utc>>,
        runtime_statuses: Option<I>,
        top: Option<u32>,
        show_history: bool,
        show_history_output: bool,
        show_input: bool,
    ) -> Result<Vec<OrchestrationStatus>>
    where
        I: Iterator<Item = OrchestrationRuntimeStatus>,
    {
        self.client
            .query_instances(
                created_time_from,
                created_time_to,
                runtime_statuses,
                top,
                show_history,
                show_history_output,
                show_input,
            )
            .await
    }

    /// Purges the history of the given orchestration instance.
    pub async fn purge_history(&self, instance_id: &str) -> Result<()> {
        self.client.purge_history(instance_id).await
    }

    /// Purges the history of orchestrations matching the given date range or runtime statuses.
    pub async fn purge_history_by_query<I>(
        &self,
        created_time_from: Option<DateTime<Utc>>,
        created_time_to: Option<DateTime<Utc>>,
        runtime_statuses: Option<I>,
    ) -> Result<u32>
    where
        I: Iterator<Item = OrchestrationRuntimeStatus>,
    {
        self.client
            .purge_history_by_query(created_time_from, created_time_to, runtime_statuses)
            .await
    }

    /// Raises an event for the given orchestration instance.
    pub async fn raise_event<D>(
        &self,
        instance_id: &str,
        event_name: &str,
        event_data: D,
    ) -> Result<()>
    where
        D: Into<Value>,
    {
        self.client
            .raise_event(instance_id, event_name, event_data)
            .await
    }

    /// Restores a failed orchestration instance into a running state by replaying the most recent failed operations.
    pub async fn rewind(&self, instance_id: &str, reason: &str) -> Result<()> {
        self.client.rewind(instance_id, reason).await
    }

    /// Starts a new orchestration by calling the given orchestration function.
    pub async fn start_new<D>(
        &self,
        function_name: &str,
        instance_id: Option<&str>,
        input: D,
    ) -> Result<OrchestrationData>
    where
        D: Into<Value>,
    {
        self.client
            .start_new(function_name, instance_id, input)
            .await
    }

    /// Terminates a running orchestration instance.
    pub async fn terminate(&self, instance_id: &str, reason: &str) -> Result<()> {
        self.client.terminate(instance_id, reason).await
    }
}

#[doc(hidden)]
impl From<TypedData> for DurableOrchestrationClient {
    fn from(data: TypedData) -> Self {
        #[derive(Debug, Clone, Deserialize)]
        #[serde(rename_all = "camelCase")]
        struct ManagementUrls {
            #[serde(rename = "statusQueryGetUri")]
            status_query_url: String,
        }

        #[derive(Deserialize)]
        #[serde(rename_all = "camelCase")]
        struct BindingData {
            management_urls: ManagementUrls,
        }

        let data: BindingData = match &data.data {
            Some(Data::String(s)) => {
                from_str(s).expect("failed to parse durable orchestration client data")
            }
            _ => panic!("expected string data for durable orchestration client"),
        };

        DurableOrchestrationClient {
            client: Client::new(&data.management_urls.status_query_url),
        }
    }
}

impl<'a> Into<Body<'a>> for OrchestrationData {
    fn into(self) -> Body<'a> {
        to_value(&self).unwrap().into()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn it_converts_from_typed_data() {
        let data = TypedData {
            data: Some(Data::String(r#"{"taskHubName":"DurableFunctionsHub","creationUrls":{"createNewInstancePostUri":"http://localhost:8080/runtime/webhooks/durabletask/orchestrators/{functionName}[/{instanceId}]?code=foo","createAndWaitOnNewInstancePostUri":"http://localhost:8080/runtime/webhooks/durabletask/orchestrators/{functionName}[/{instanceId}]?timeout={timeoutInSeconds}&pollingInterval={intervalInSeconds}&code=foo"},"managementUrls":{"id":"INSTANCEID","statusQueryGetUri":"http://localhost:8080/runtime/webhooks/durabletask/instances/INSTANCEID?taskHub=DurableFunctionsHub&connection=Storage&code=foo","sendEventPostUri":"http://localhost:8080/runtime/webhooks/durabletask/instances/INSTANCEID/raiseEvent/{eventName}?taskHub=DurableFunctionsHub&connection=Storage&code=foo","terminatePostUri":"http://localhost:8080/runtime/webhooks/durabletask/instances/INSTANCEID/terminate?reason={text}&taskHub=DurableFunctionsHub&connection=Storage&code=foo","rewindPostUri":"http://localhost:8080/runtime/webhooks/durabletask/instances/INSTANCEID/rewind?reason={text}&taskHub=DurableFunctionsHub&connection=Storage&code=foo","purgeHistoryDeleteUri":"http://localhost:8080/runtime/webhooks/durabletask/instances/INSTANCEID?taskHub=DurableFunctionsHub&connection=Storage&code=foo"}}"#.to_owned())),
        };

        let client: DurableOrchestrationClient = data.into();
        assert_eq!(client.client.task_hub(), "DurableFunctionsHub");
    }
}