azure_functions/bindings/
durable_orchestration_client.rs1use crate::http::Body;
2use crate::rpc::{typed_data::Data, TypedData};
3use azure_functions_durable::{
4 Client, OrchestrationData, OrchestrationRuntimeStatus, OrchestrationStatus, Result,
5};
6use chrono::{DateTime, Utc};
7use serde::Deserialize;
8use serde_json::{from_str, to_value, Value};
9
10pub struct DurableOrchestrationClient {
47 client: Client,
48}
49
50impl DurableOrchestrationClient {
51 pub async fn instance_status(
53 &self,
54 instance_id: &str,
55 show_history: bool,
56 show_history_output: bool,
57 show_input: bool,
58 ) -> Result<OrchestrationStatus> {
59 self.client
60 .instance_status(instance_id, show_history, show_history_output, show_input)
61 .await
62 }
63
64 #[allow(clippy::too_many_arguments)]
66 pub async fn query_instances<I>(
67 &self,
68 created_time_from: Option<DateTime<Utc>>,
69 created_time_to: Option<DateTime<Utc>>,
70 runtime_statuses: Option<I>,
71 top: Option<u32>,
72 show_history: bool,
73 show_history_output: bool,
74 show_input: bool,
75 ) -> Result<Vec<OrchestrationStatus>>
76 where
77 I: Iterator<Item = OrchestrationRuntimeStatus>,
78 {
79 self.client
80 .query_instances(
81 created_time_from,
82 created_time_to,
83 runtime_statuses,
84 top,
85 show_history,
86 show_history_output,
87 show_input,
88 )
89 .await
90 }
91
92 pub async fn purge_history(&self, instance_id: &str) -> Result<()> {
94 self.client.purge_history(instance_id).await
95 }
96
97 pub async fn purge_history_by_query<I>(
99 &self,
100 created_time_from: Option<DateTime<Utc>>,
101 created_time_to: Option<DateTime<Utc>>,
102 runtime_statuses: Option<I>,
103 ) -> Result<u32>
104 where
105 I: Iterator<Item = OrchestrationRuntimeStatus>,
106 {
107 self.client
108 .purge_history_by_query(created_time_from, created_time_to, runtime_statuses)
109 .await
110 }
111
112 pub async fn raise_event<D>(
114 &self,
115 instance_id: &str,
116 event_name: &str,
117 event_data: D,
118 ) -> Result<()>
119 where
120 D: Into<Value>,
121 {
122 self.client
123 .raise_event(instance_id, event_name, event_data)
124 .await
125 }
126
127 pub async fn rewind(&self, instance_id: &str, reason: &str) -> Result<()> {
129 self.client.rewind(instance_id, reason).await
130 }
131
132 pub async fn start_new<D>(
134 &self,
135 function_name: &str,
136 instance_id: Option<&str>,
137 input: D,
138 ) -> Result<OrchestrationData>
139 where
140 D: Into<Value>,
141 {
142 self.client
143 .start_new(function_name, instance_id, input)
144 .await
145 }
146
147 pub async fn terminate(&self, instance_id: &str, reason: &str) -> Result<()> {
149 self.client.terminate(instance_id, reason).await
150 }
151}
152
153#[doc(hidden)]
154impl From<TypedData> for DurableOrchestrationClient {
155 fn from(data: TypedData) -> Self {
156 #[derive(Debug, Clone, Deserialize)]
157 #[serde(rename_all = "camelCase")]
158 struct ManagementUrls {
159 #[serde(rename = "statusQueryGetUri")]
160 status_query_url: String,
161 }
162
163 #[derive(Deserialize)]
164 #[serde(rename_all = "camelCase")]
165 struct BindingData {
166 management_urls: ManagementUrls,
167 }
168
169 let data: BindingData = match &data.data {
170 Some(Data::String(s)) => {
171 from_str(s).expect("failed to parse durable orchestration client data")
172 }
173 _ => panic!("expected string data for durable orchestration client"),
174 };
175
176 DurableOrchestrationClient {
177 client: Client::new(&data.management_urls.status_query_url),
178 }
179 }
180}
181
182impl<'a> Into<Body<'a>> for OrchestrationData {
183 fn into(self) -> Body<'a> {
184 to_value(&self).unwrap().into()
185 }
186}
187
188#[cfg(test)]
189mod tests {
190 use super::*;
191
192 #[test]
193 fn it_converts_from_typed_data() {
194 let data = TypedData {
195 data: Some(Data::String(r#"{"taskHubName":"DurableFunctionsHub","creationUrls":{"createNewInstancePostUri":"http://localhost:8080/runtime/webhooks/durabletask/orchestrators/{functionName}[/{instanceId}]?code=foo","createAndWaitOnNewInstancePostUri":"http://localhost:8080/runtime/webhooks/durabletask/orchestrators/{functionName}[/{instanceId}]?timeout={timeoutInSeconds}&pollingInterval={intervalInSeconds}&code=foo"},"managementUrls":{"id":"INSTANCEID","statusQueryGetUri":"http://localhost:8080/runtime/webhooks/durabletask/instances/INSTANCEID?taskHub=DurableFunctionsHub&connection=Storage&code=foo","sendEventPostUri":"http://localhost:8080/runtime/webhooks/durabletask/instances/INSTANCEID/raiseEvent/{eventName}?taskHub=DurableFunctionsHub&connection=Storage&code=foo","terminatePostUri":"http://localhost:8080/runtime/webhooks/durabletask/instances/INSTANCEID/terminate?reason={text}&taskHub=DurableFunctionsHub&connection=Storage&code=foo","rewindPostUri":"http://localhost:8080/runtime/webhooks/durabletask/instances/INSTANCEID/rewind?reason={text}&taskHub=DurableFunctionsHub&connection=Storage&code=foo","purgeHistoryDeleteUri":"http://localhost:8080/runtime/webhooks/durabletask/instances/INSTANCEID?taskHub=DurableFunctionsHub&connection=Storage&code=foo"}}"#.to_owned())),
196 };
197
198 let client: DurableOrchestrationClient = data.into();
199 assert_eq!(client.client.task_hub(), "DurableFunctionsHub");
200 }
201}