mmids_core/http_api/handlers/
get_workflow_details.rs

1//! Contains the handler for getting details about a running workflow
2
3use crate::http_api::routing::RouteHandler;
4use crate::workflows::manager::{WorkflowManagerRequest, WorkflowManagerRequestOperation};
5use crate::workflows::steps::StepStatus;
6use crate::workflows::{WorkflowState, WorkflowStatus, WorkflowStepState};
7use async_trait::async_trait;
8use hyper::http::HeaderValue;
9use hyper::{Body, Error, Request, Response, StatusCode};
10use serde::Serialize;
11use std::collections::HashMap;
12use std::time::Duration;
13use tokio::sync::mpsc::UnboundedSender;
14use tokio::sync::oneshot::channel;
15use tokio::time::timeout;
16use tracing::error;
17
18/// Handles HTTP requests to get details for a specific workflow.  It requires a single path
19/// parameter with the name `workflow` containing the name of the workflow to query for.  Response
20/// will always be returned in json format.
21pub struct GetWorkflowDetailsHandler {
22    manager: UnboundedSender<WorkflowManagerRequest>,
23}
24
25/// The API's response for the state of the requested workflow
26#[derive(Serialize)]
27pub struct WorkflowStateResponse {
28    status: String,
29    active_steps: Vec<WorkflowStepStateResponse>,
30    pending_steps: Vec<WorkflowStepStateResponse>,
31}
32
33/// API's response for the details of an individual workflow step
34#[derive(Serialize)]
35pub struct WorkflowStepStateResponse {
36    step_id: String,
37    step_type: String,
38    parameters: HashMap<String, Option<String>>,
39    status: String,
40}
41
42impl GetWorkflowDetailsHandler {
43    pub fn new(manager: UnboundedSender<WorkflowManagerRequest>) -> Self {
44        GetWorkflowDetailsHandler { manager }
45    }
46}
47
48#[async_trait]
49impl RouteHandler for GetWorkflowDetailsHandler {
50    async fn execute(
51        &self,
52        _request: &mut Request<Body>,
53        path_parameters: HashMap<String, String>,
54        request_id: String,
55    ) -> Result<Response<Body>, Error> {
56        let workflow_name = match path_parameters.get("workflow") {
57            Some(value) => value.to_string(),
58            None => {
59                error!("Get workflow endpoint called without a 'workflow' path parameter");
60                let mut response = Response::default();
61                *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
62
63                return Ok(response);
64            }
65        };
66
67        let (sender, receiver) = channel();
68        let _ = self.manager.send(WorkflowManagerRequest {
69            request_id,
70            operation: WorkflowManagerRequestOperation::GetWorkflowDetails {
71                name: workflow_name,
72                response_channel: sender,
73            },
74        });
75
76        let details = match timeout(Duration::from_secs(1), receiver).await {
77            Ok(Ok(details)) => details,
78            Ok(Err(_)) => {
79                error!("Receiver was dropped prior to sending a response");
80                let mut response = Response::default();
81                *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
82
83                return Ok(response);
84            }
85
86            Err(_) => {
87                error!("Request timed out");
88                let mut response = Response::default();
89                *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
90
91                return Ok(response);
92            }
93        };
94
95        let response = if let Some(details) = details {
96            let details = WorkflowStateResponse::from(details);
97            let json = match serde_json::to_string_pretty(&details) {
98                Ok(json) => json,
99                Err(e) => {
100                    error!("Could not serialize workflow details response: {:?}", e);
101                    let mut response = Response::default();
102                    *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
103
104                    return Ok(response);
105                }
106            };
107
108            let mut response = Response::new(Body::from(json));
109            let headers = response.headers_mut();
110            headers.insert(
111                hyper::http::header::CONTENT_TYPE,
112                HeaderValue::from_static("application/json"),
113            );
114
115            response
116        } else {
117            let mut response = Response::new(Body::from("Workflow not found"));
118            *response.status_mut() = StatusCode::NOT_FOUND;
119
120            response
121        };
122
123        Ok(response)
124    }
125}
126
127impl From<WorkflowState> for WorkflowStateResponse {
128    fn from(workflow: WorkflowState) -> Self {
129        WorkflowStateResponse {
130            status: match workflow.status {
131                WorkflowStatus::Running => "Running".to_string(),
132                WorkflowStatus::Error {
133                    failed_step_id,
134                    message,
135                } => format!("Step id {} failed: {}", failed_step_id, message),
136            },
137
138            active_steps: workflow
139                .active_steps
140                .into_iter()
141                .map(|x| WorkflowStepStateResponse::from(x))
142                .collect(),
143
144            pending_steps: workflow
145                .pending_steps
146                .into_iter()
147                .map(|x| WorkflowStepStateResponse::from(x))
148                .collect(),
149        }
150    }
151}
152
153impl From<WorkflowStepState> for WorkflowStepStateResponse {
154    fn from(step_state: WorkflowStepState) -> Self {
155        WorkflowStepStateResponse {
156            step_id: step_state.definition.get_id().to_string(),
157            step_type: step_state.definition.step_type.0,
158            parameters: step_state.definition.parameters,
159            status: match step_state.status {
160                StepStatus::Created => "Created".to_string(),
161                StepStatus::Active => "Active".to_string(),
162                StepStatus::Error { message } => format!("Error: {}", message),
163                StepStatus::Shutdown => "Shut Down".to_string(),
164            },
165        }
166    }
167}