mmids_core/http_api/handlers/
list_workflows.rs

1//! Contains the handler for getting a list of workflows
2
3use crate::http_api::routing::RouteHandler;
4use crate::workflows::manager::{WorkflowManagerRequest, WorkflowManagerRequestOperation};
5use async_trait::async_trait;
6use hyper::header::HeaderValue;
7use hyper::{Body, Error, Request, Response, StatusCode};
8use serde::Serialize;
9use std::collections::HashMap;
10use std::time::Duration;
11use tokio::sync::mpsc::UnboundedSender;
12use tokio::sync::oneshot::channel;
13use tokio::time::timeout;
14use tracing::error;
15
16/// HTTP handler which provides a list of workflows that are actively running
17pub struct ListWorkflowsHandler {
18    manager: UnboundedSender<WorkflowManagerRequest>,
19}
20
21/// Defines what data the API will return for each running workflow
22#[derive(Serialize)]
23pub struct WorkflowListItemResponse {
24    name: String,
25}
26
27impl ListWorkflowsHandler {
28    pub fn new(manager: UnboundedSender<WorkflowManagerRequest>) -> Self {
29        ListWorkflowsHandler { manager }
30    }
31}
32
33#[async_trait]
34impl RouteHandler for ListWorkflowsHandler {
35    async fn execute(
36        &self,
37        _request: &mut Request<Body>,
38        _path_parameters: HashMap<String, String>,
39        request_id: String,
40    ) -> Result<Response<Body>, Error> {
41        let (response_sender, response_receiver) = channel();
42        let message = WorkflowManagerRequest {
43            request_id,
44            operation: WorkflowManagerRequestOperation::GetRunningWorkflows {
45                response_channel: response_sender,
46            },
47        };
48
49        match self.manager.send(message) {
50            Ok(_) => (),
51            Err(_) => {
52                error!("Workflow manager is no longer operational");
53                let mut response = Response::default();
54                *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
55
56                return Ok(response);
57            }
58        };
59
60        let response = match timeout(Duration::from_secs(10), response_receiver).await {
61            Ok(Ok(response)) => response,
62
63            Ok(Err(_)) => {
64                error!("Workflow manager is no longer operational");
65                let mut response = Response::default();
66                *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
67
68                return Ok(response);
69            }
70
71            Err(_) => {
72                error!("Get workflow request timed out");
73                let mut response = Response::default();
74                *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
75
76                return Ok(response);
77            }
78        };
79
80        let response = response
81            .into_iter()
82            .map(|x| WorkflowListItemResponse { name: x.name })
83            .collect::<Vec<_>>();
84        let json = match serde_json::to_string_pretty(&response) {
85            Ok(json) => json,
86            Err(error) => {
87                error!("Failed to serialize workflows to json: {:?}", error);
88                let mut response = Response::default();
89                *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
90
91                return Ok(response);
92            }
93        };
94
95        let mut response = Response::new(Body::from(json));
96        let headers = response.headers_mut();
97        headers.insert(
98            hyper::http::header::CONTENT_TYPE,
99            HeaderValue::from_static("application/json"),
100        );
101
102        Ok(response)
103    }
104}