1use crate::endpoint::Endpoint;
2use crate::error::ClientError;
3use crate::Result;
4use chrono::{DateTime, Utc};
5use futures::TryStreamExt;
6use hyper::{self, Body, Request, StatusCode};
7use serde::{Deserialize, Serialize};
8use serde_json::{from_slice, to_string, Value};
9use std::fmt::{Display, Formatter};
10use url::Url;
11
12#[derive(Debug, Clone, Deserialize, PartialEq)]
14pub enum OrchestrationRuntimeStatus {
15 Running,
17 Pending,
19 Failed,
21 Canceled,
23 Terminated,
25 Completed,
27}
28
29impl Display for OrchestrationRuntimeStatus {
30 fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
31 write!(f, "{:?}", self)
32 }
33}
34
35#[derive(Debug, Clone, Deserialize)]
37#[serde(rename_all = "PascalCase")]
38pub struct OrchestrationHistoryEvent {
39 pub event_type: String,
41 pub orchestration_status: Option<OrchestrationRuntimeStatus>,
43 pub function_name: Option<String>,
45 pub result: Option<Value>,
47 pub scheduled_time: Option<DateTime<Utc>>,
49 pub timestamp: DateTime<Utc>,
51}
52
53#[derive(Debug, Clone, Deserialize)]
55#[serde(rename_all = "camelCase")]
56pub struct OrchestrationStatus {
57 pub runtime_status: OrchestrationRuntimeStatus,
59 pub input: Option<Value>,
61 pub custom_status: Option<Value>,
63 pub output: Option<Value>,
65 pub created_time: DateTime<Utc>,
67 pub history_events: Option<Vec<OrchestrationHistoryEvent>>,
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize)]
73#[serde(rename_all = "camelCase")]
74pub struct OrchestrationData {
75 #[serde(rename = "id")]
77 pub instance_id: String,
78 pub status_query_get_uri: String,
80 pub send_event_post_uri: String,
82 pub terminate_post_uri: String,
84 pub purge_history_delete_uri: String,
86 pub rewind_post_uri: Option<String>,
88}
89
90pub struct Client {
92 endpoint: Endpoint,
93 client: hyper::Client<hyper::client::HttpConnector>,
94}
95
96impl Client {
97 pub fn new(status_query_url: &str) -> Self {
99 Self {
100 endpoint: Endpoint::new(
101 Url::parse(status_query_url).expect("expected a valid query URL"),
102 ),
103 client: hyper::Client::builder().build_http(),
104 }
105 }
106
107 pub fn task_hub(&self) -> &str {
109 self.endpoint.task_hub()
110 }
111
112 pub async fn instance_status(
114 &self,
115 instance_id: &str,
116 show_history: bool,
117 show_history_output: bool,
118 show_input: bool,
119 ) -> Result<OrchestrationStatus> {
120 let mut url = self.endpoint.status_query_url(Some(instance_id));
121
122 url.query_pairs_mut()
123 .append_pair("showHistory", if show_history { "true" } else { "false " })
124 .append_pair(
125 "showHistoryOutput",
126 if show_history_output {
127 "true"
128 } else {
129 "false "
130 },
131 )
132 .append_pair("showInput", if show_input { "true" } else { "false " });
133
134 let req = Request::builder()
135 .method("GET")
136 .uri(url.into_string())
137 .header("Content-Type", "application/json")
138 .body(Body::empty())
139 .unwrap();
140
141 match self.client.request(req).await {
142 Ok(res) => match res.status() {
143 StatusCode::OK | StatusCode::ACCEPTED => {
144 let body = res.into_body().try_concat().await;
145 body.map(|b| {
146 from_slice(&b).map_err(|e| {
147 ClientError::Message(format!(
148 "failed to deserialize orchestration status: {}",
149 e
150 ))
151 })
152 })
153 .unwrap_or_else(|e| {
154 Err(ClientError::Message(format!(
155 "failed to read response: {}",
156 e
157 )))
158 })
159 }
160 StatusCode::BAD_REQUEST => Err(ClientError::InstanceFailedOrTerminated),
161 StatusCode::NOT_FOUND => Err(ClientError::InstanceNotFound),
162 StatusCode::INTERNAL_SERVER_ERROR => Err(ClientError::InternalServerError),
163 _ => unreachable!("unexpected response from server"),
164 },
165 Err(e) => Err(ClientError::Message(format!(
166 "failed to send request: {}",
167 e
168 ))),
169 }
170 }
171
172 #[allow(clippy::too_many_arguments)]
174 pub async fn query_instances<I>(
175 &self,
176 created_time_from: Option<DateTime<Utc>>,
177 created_time_to: Option<DateTime<Utc>>,
178 runtime_statuses: Option<I>,
179 top: Option<u32>,
180 show_history: bool,
181 show_history_output: bool,
182 show_input: bool,
183 ) -> Result<Vec<OrchestrationStatus>>
184 where
185 I: Iterator<Item = OrchestrationRuntimeStatus>,
186 {
187 let mut url = self.endpoint.status_query_url(None);
188
189 {
190 let mut query = url.query_pairs_mut();
191
192 created_time_from.map(|t| query.append_pair("createdTimeFrom", &t.to_rfc3339()));
193 created_time_to.map(|t| query.append_pair("createdTimeTo", &t.to_rfc3339()));
194 runtime_statuses.map(|s| {
195 query.append_pair(
196 "runtimeStatus",
197 &s.map(|s| s.to_string()).collect::<Vec<_>>().join(","),
198 )
199 });
200
201 top.map(|t| query.append_pair("top", &t.to_string()));
202
203 query
204 .append_pair("showHistory", if show_history { "true" } else { "false " })
205 .append_pair(
206 "showHistoryOutput",
207 if show_history_output {
208 "true"
209 } else {
210 "false "
211 },
212 )
213 .append_pair("showInput", if show_input { "true" } else { "false " });
214 }
215
216 let req = Request::builder()
217 .method("GET")
218 .uri(url.into_string())
219 .header("Content-Type", "application/json")
220 .body(Body::empty())
221 .unwrap();
222
223 match self.client.request(req).await {
224 Ok(res) => match res.status() {
225 StatusCode::OK | StatusCode::ACCEPTED => {
226 let body = res.into_body().try_concat().await;
227 body.map(|b| {
228 from_slice(&b).map_err(|e| {
229 ClientError::Message(format!(
230 "failed to deserialize orchestration status: {}",
231 e
232 ))
233 })
234 })
235 .unwrap_or_else(|e| {
236 Err(ClientError::Message(format!(
237 "failed to read response: {}",
238 e
239 )))
240 })
241 }
242 StatusCode::BAD_REQUEST => Err(ClientError::InstanceFailedOrTerminated),
243 StatusCode::NOT_FOUND => Err(ClientError::InstanceNotFound),
244 StatusCode::INTERNAL_SERVER_ERROR => Err(ClientError::InternalServerError),
245 _ => unreachable!("unexpected response from server"),
246 },
247 Err(e) => Err(ClientError::Message(format!(
248 "failed to send request: {}",
249 e
250 ))),
251 }
252 }
253
254 pub async fn purge_history(&self, instance_id: &str) -> Result<()> {
256 let req = Request::builder()
257 .method("DELETE")
258 .uri(
259 self.endpoint
260 .purge_history_url(Some(instance_id))
261 .into_string(),
262 )
263 .header("Content-Type", "application/json")
264 .body(Body::empty())
265 .unwrap();
266
267 match self.client.request(req).await {
268 Ok(res) => match res.status() {
269 StatusCode::OK => Ok(()),
270 StatusCode::NOT_FOUND => Err(ClientError::InstanceNotFound),
271 _ => unreachable!("unexpected response from server"),
272 },
273 Err(e) => Err(ClientError::Message(format!(
274 "failed to send request: {}",
275 e
276 ))),
277 }
278 }
279
280 pub async fn purge_history_by_query<I>(
282 &self,
283 created_time_from: Option<DateTime<Utc>>,
284 created_time_to: Option<DateTime<Utc>>,
285 runtime_statuses: Option<I>,
286 ) -> Result<u32>
287 where
288 I: Iterator<Item = OrchestrationRuntimeStatus>,
289 {
290 let mut url = self.endpoint.purge_history_url(None);
291
292 {
293 let mut query = url.query_pairs_mut();
294
295 created_time_from.map(|t| query.append_pair("createdTimeFrom", &t.to_rfc3339()));
296 created_time_to.map(|t| query.append_pair("createdTimeTo", &t.to_rfc3339()));
297 runtime_statuses.map(|s| {
298 query.append_pair(
299 "runtimeStatus",
300 &s.map(|s| s.to_string()).collect::<Vec<_>>().join(","),
301 )
302 });
303 }
304
305 let req = Request::builder()
306 .method("DELETE")
307 .uri(url.into_string())
308 .header("Content-Type", "application/json")
309 .body(Body::empty())
310 .unwrap();
311
312 #[derive(Debug, Clone, Deserialize)]
313 #[serde(rename_all = "camelCase")]
314 struct PurgeHistoryResult {
315 instances_deleted: u32,
316 }
317
318 match self.client.request(req).await {
319 Ok(res) => match res.status() {
320 StatusCode::OK => {
321 let body = res.into_body().try_concat().await;
322 let result: PurgeHistoryResult = body
323 .map(|b| {
324 from_slice(&b).map_err(|e| {
325 ClientError::Message(format!(
326 "failed to deserialize orchestration status: {}",
327 e
328 ))
329 })
330 })
331 .unwrap_or_else(|e| {
332 Err(ClientError::Message(format!(
333 "failed to read response: {}",
334 e
335 )))
336 })?;
337
338 Ok(result.instances_deleted)
339 }
340 StatusCode::NOT_FOUND => Err(ClientError::InstanceNotFound),
341 _ => unreachable!("unexpected response from server"),
342 },
343 Err(e) => Err(ClientError::Message(format!(
344 "failed to send request: {}",
345 e
346 ))),
347 }
348 }
349
350 pub async fn raise_event<D>(
352 &self,
353 instance_id: &str,
354 event_name: &str,
355 event_data: D,
356 ) -> Result<()>
357 where
358 D: Into<Value>,
359 {
360 let req = Request::builder()
361 .method("POST")
362 .uri(
363 self.endpoint
364 .raise_event_url(instance_id, event_name)
365 .into_string(),
366 )
367 .header("Content-Type", "application/json")
368 .body(Body::from(to_string(&event_data.into()).unwrap()))
369 .unwrap();
370
371 match self.client.request(req).await {
372 Ok(res) => match res.status() {
373 StatusCode::ACCEPTED => Ok(()),
374 StatusCode::NOT_FOUND => Err(ClientError::InstanceNotFound),
375 StatusCode::BAD_REQUEST => Err(ClientError::BadRequest),
376 StatusCode::GONE => Err(ClientError::InstanceCompletedOrFailed),
377 _ => unreachable!("unexpected response from server"),
378 },
379 Err(e) => Err(ClientError::Message(format!(
380 "failed to send request: {}",
381 e
382 ))),
383 }
384 }
385
386 pub async fn rewind(&self, instance_id: &str, reason: &str) -> Result<()> {
388 let req = Request::builder()
389 .method("POST")
390 .uri(self.endpoint.rewind_url(instance_id, reason).into_string())
391 .header("Content-Type", "application/json")
392 .body(Body::empty())
393 .unwrap();
394
395 match self.client.request(req).await {
396 Ok(res) => match res.status() {
397 StatusCode::ACCEPTED => Ok(()),
398 StatusCode::NOT_FOUND => Err(ClientError::InstanceNotFound),
399 StatusCode::BAD_REQUEST => Err(ClientError::BadRequest),
400 StatusCode::GONE => Err(ClientError::InstanceCompletedOrFailed),
401 _ => unreachable!("unexpected response from server"),
402 },
403 Err(e) => Err(ClientError::Message(format!(
404 "failed to send request: {}",
405 e
406 ))),
407 }
408 }
409
410 pub async fn start_new<D>(
412 &self,
413 function_name: &str,
414 instance_id: Option<&str>,
415 input: D,
416 ) -> Result<OrchestrationData>
417 where
418 D: Into<Value>,
419 {
420 let req = Request::builder()
421 .method("POST")
422 .uri(
423 self.endpoint
424 .create_new_instance_url(function_name, instance_id)
425 .into_string(),
426 )
427 .header("Content-Type", "application/json")
428 .body(Body::from(input.into().to_string()))
429 .unwrap();
430
431 match self.client.request(req).await {
432 Ok(res) => match res.status() {
433 StatusCode::ACCEPTED => {
434 let body = res.into_body().try_concat().await;
435 body.map(|b| {
436 from_slice(&b).map_err(|e| {
437 ClientError::Message(format!(
438 "failed to deserialize orchestration data: {}",
439 e
440 ))
441 })
442 })
443 .unwrap_or_else(|e| {
444 Err(ClientError::Message(format!(
445 "failed to read response: {}",
446 e
447 )))
448 })
449 }
450 StatusCode::BAD_REQUEST => Err(ClientError::BadCreateRequest),
451 _ => unreachable!("unexpected response from server"),
452 },
453 Err(e) => Err(ClientError::Message(format!(
454 "failed to send request: {}",
455 e
456 ))),
457 }
458 }
459
460 pub async fn terminate(&self, instance_id: &str, reason: &str) -> Result<()> {
462 let req = Request::builder()
463 .method("POST")
464 .uri(
465 self.endpoint
466 .terminate_url(instance_id, reason)
467 .into_string(),
468 )
469 .header("Content-Type", "application/json")
470 .body(Body::empty())
471 .unwrap();
472
473 match self.client.request(req).await {
474 Ok(res) => match res.status() {
475 StatusCode::ACCEPTED => Ok(()),
476 StatusCode::NOT_FOUND => Err(ClientError::InstanceNotFound),
477 StatusCode::GONE => Err(ClientError::InstanceCompletedOrFailed),
478 _ => unreachable!("unexpected response from server"),
479 },
480 Err(e) => Err(ClientError::Message(format!(
481 "failed to send request: {}",
482 e
483 ))),
484 }
485 }
486}
487
488#[cfg(test)]
489mod tests {
490 use super::*;
491 use chrono::offset::TimeZone;
492 use serde_json::from_str;
493
494 #[test]
495 fn test_instance_history() {
496 let h1: String = r#"{"EventType": "ExecutionStarted",
497 "FunctionName": "E1_HelloSequence",
498 "Timestamp": "2018-02-28T05:18:49Z"
499 }"#
500 .to_owned();
501
502 let compare_dt = Utc.ymd(2018, 2, 28).and_hms(5, 18, 49);
503
504 let h1_obj: OrchestrationHistoryEvent = from_str(&h1).unwrap();
505 assert_eq!(h1_obj.event_type, "ExecutionStarted");
506 assert_eq!(h1_obj.timestamp, compare_dt);
507
508 let h2: String = r#"{
509 "EventType": "ExecutionCompleted",
510 "OrchestrationStatus": "Completed",
511 "Result": [
512 "Hello Tokyo!",
513 "Hello Seattle!",
514 "Hello London!"
515 ],
516 "Timestamp": "2018-02-28T05:18:54.3660895Z"
517 }"#
518 .to_owned();
519
520 let h2_obj: OrchestrationHistoryEvent = from_str(&h2).unwrap();
521 assert_eq!(h2_obj.orchestration_status.is_some(), true);
522 assert_eq!(
523 h2_obj.orchestration_status.unwrap(),
524 OrchestrationRuntimeStatus::Completed
525 );
526 }
527
528 #[test]
529 fn test_instance_status() {
530 let example: String = r#"{
531 "createdTime": "2018-02-28T05:18:49Z",
532 "historyEvents": [
533 {
534 "EventType": "ExecutionStarted",
535 "FunctionName": "E1_HelloSequence",
536 "Timestamp": "2018-02-28T05:18:49.3452372Z"
537 },
538 {
539 "EventType": "TaskCompleted",
540 "FunctionName": "E1_SayHello",
541 "Result": "Hello Tokyo!",
542 "ScheduledTime": "2018-02-28T05:18:51.3939873Z",
543 "Timestamp": "2018-02-28T05:18:52.2895622Z"
544 },
545 {
546 "EventType": "TaskCompleted",
547 "FunctionName": "E1_SayHello",
548 "Result": "Hello Seattle!",
549 "ScheduledTime": "2018-02-28T05:18:52.8755705Z",
550 "Timestamp": "2018-02-28T05:18:53.1765771Z"
551 },
552 {
553 "EventType": "TaskCompleted",
554 "FunctionName": "E1_SayHello",
555 "Result": "Hello London!",
556 "ScheduledTime": "2018-02-28T05:18:53.5170791Z",
557 "Timestamp": "2018-02-28T05:18:53.891081Z"
558 },
559 {
560 "EventType": "ExecutionCompleted",
561 "OrchestrationStatus": "Completed",
562 "Result": [
563 "Hello Tokyo!",
564 "Hello Seattle!",
565 "Hello London!"
566 ],
567 "Timestamp": "2018-02-28T05:18:54.3660895Z"
568 }
569 ],
570 "input": null,
571 "customStatus": { "nextActions": ["A", "B", "C"], "foo": 2 },
572 "lastUpdatedTime": "2018-02-28T05:18:54Z",
573 "output": [
574 "Hello Tokyo!",
575 "Hello Seattle!",
576 "Hello London!"
577 ],
578 "runtimeStatus": "Completed"
579 }"#
580 .to_owned();
581
582 let instance_status: OrchestrationStatus = from_str(&example).unwrap();
583 assert_eq!(instance_status.history_events.is_some(), true);
584 assert_eq!(instance_status.history_events.unwrap().len(), 5);
585
586 assert_eq!(instance_status.custom_status.is_some(), true);
587 assert_eq!(instance_status.custom_status.unwrap().is_object(), true);
588 }
589}