mmids_core/http_api/handlers/
list_workflows.rs1use crate::http_api::routing::RouteHandler;
4use crate::workflows::manager::{WorkflowManagerRequest, WorkflowManagerRequestOperation};
5use async_trait::async_trait;
6use hyper::header::HeaderValue;
7use hyper::{Body, Error, Request, Response, StatusCode};
8use serde::Serialize;
9use std::collections::HashMap;
10use std::time::Duration;
11use tokio::sync::mpsc::UnboundedSender;
12use tokio::sync::oneshot::channel;
13use tokio::time::timeout;
14use tracing::error;
15
16pub struct ListWorkflowsHandler {
18 manager: UnboundedSender<WorkflowManagerRequest>,
19}
20
21#[derive(Serialize)]
23pub struct WorkflowListItemResponse {
24 name: String,
25}
26
27impl ListWorkflowsHandler {
28 pub fn new(manager: UnboundedSender<WorkflowManagerRequest>) -> Self {
29 ListWorkflowsHandler { manager }
30 }
31}
32
33#[async_trait]
34impl RouteHandler for ListWorkflowsHandler {
35 async fn execute(
36 &self,
37 _request: &mut Request<Body>,
38 _path_parameters: HashMap<String, String>,
39 request_id: String,
40 ) -> Result<Response<Body>, Error> {
41 let (response_sender, response_receiver) = channel();
42 let message = WorkflowManagerRequest {
43 request_id,
44 operation: WorkflowManagerRequestOperation::GetRunningWorkflows {
45 response_channel: response_sender,
46 },
47 };
48
49 match self.manager.send(message) {
50 Ok(_) => (),
51 Err(_) => {
52 error!("Workflow manager is no longer operational");
53 let mut response = Response::default();
54 *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
55
56 return Ok(response);
57 }
58 };
59
60 let response = match timeout(Duration::from_secs(10), response_receiver).await {
61 Ok(Ok(response)) => response,
62
63 Ok(Err(_)) => {
64 error!("Workflow manager is no longer operational");
65 let mut response = Response::default();
66 *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
67
68 return Ok(response);
69 }
70
71 Err(_) => {
72 error!("Get workflow request timed out");
73 let mut response = Response::default();
74 *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
75
76 return Ok(response);
77 }
78 };
79
80 let response = response
81 .into_iter()
82 .map(|x| WorkflowListItemResponse { name: x.name })
83 .collect::<Vec<_>>();
84 let json = match serde_json::to_string_pretty(&response) {
85 Ok(json) => json,
86 Err(error) => {
87 error!("Failed to serialize workflows to json: {:?}", error);
88 let mut response = Response::default();
89 *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
90
91 return Ok(response);
92 }
93 };
94
95 let mut response = Response::new(Body::from(json));
96 let headers = response.headers_mut();
97 headers.insert(
98 hyper::http::header::CONTENT_TYPE,
99 HeaderValue::from_static("application/json"),
100 );
101
102 Ok(response)
103 }
104}