mmids_core/http_api/handlers/
get_workflow_details.rs1use crate::http_api::routing::RouteHandler;
4use crate::workflows::manager::{WorkflowManagerRequest, WorkflowManagerRequestOperation};
5use crate::workflows::steps::StepStatus;
6use crate::workflows::{WorkflowState, WorkflowStatus, WorkflowStepState};
7use async_trait::async_trait;
8use hyper::http::HeaderValue;
9use hyper::{Body, Error, Request, Response, StatusCode};
10use serde::Serialize;
11use std::collections::HashMap;
12use std::time::Duration;
13use tokio::sync::mpsc::UnboundedSender;
14use tokio::sync::oneshot::channel;
15use tokio::time::timeout;
16use tracing::error;
17
18pub struct GetWorkflowDetailsHandler {
22 manager: UnboundedSender<WorkflowManagerRequest>,
23}
24
25#[derive(Serialize)]
27pub struct WorkflowStateResponse {
28 status: String,
29 active_steps: Vec<WorkflowStepStateResponse>,
30 pending_steps: Vec<WorkflowStepStateResponse>,
31}
32
33#[derive(Serialize)]
35pub struct WorkflowStepStateResponse {
36 step_id: String,
37 step_type: String,
38 parameters: HashMap<String, Option<String>>,
39 status: String,
40}
41
42impl GetWorkflowDetailsHandler {
43 pub fn new(manager: UnboundedSender<WorkflowManagerRequest>) -> Self {
44 GetWorkflowDetailsHandler { manager }
45 }
46}
47
48#[async_trait]
49impl RouteHandler for GetWorkflowDetailsHandler {
50 async fn execute(
51 &self,
52 _request: &mut Request<Body>,
53 path_parameters: HashMap<String, String>,
54 request_id: String,
55 ) -> Result<Response<Body>, Error> {
56 let workflow_name = match path_parameters.get("workflow") {
57 Some(value) => value.to_string(),
58 None => {
59 error!("Get workflow endpoint called without a 'workflow' path parameter");
60 let mut response = Response::default();
61 *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
62
63 return Ok(response);
64 }
65 };
66
67 let (sender, receiver) = channel();
68 let _ = self.manager.send(WorkflowManagerRequest {
69 request_id,
70 operation: WorkflowManagerRequestOperation::GetWorkflowDetails {
71 name: workflow_name,
72 response_channel: sender,
73 },
74 });
75
76 let details = match timeout(Duration::from_secs(1), receiver).await {
77 Ok(Ok(details)) => details,
78 Ok(Err(_)) => {
79 error!("Receiver was dropped prior to sending a response");
80 let mut response = Response::default();
81 *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
82
83 return Ok(response);
84 }
85
86 Err(_) => {
87 error!("Request timed out");
88 let mut response = Response::default();
89 *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
90
91 return Ok(response);
92 }
93 };
94
95 let response = if let Some(details) = details {
96 let details = WorkflowStateResponse::from(details);
97 let json = match serde_json::to_string_pretty(&details) {
98 Ok(json) => json,
99 Err(e) => {
100 error!("Could not serialize workflow details response: {:?}", e);
101 let mut response = Response::default();
102 *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
103
104 return Ok(response);
105 }
106 };
107
108 let mut response = Response::new(Body::from(json));
109 let headers = response.headers_mut();
110 headers.insert(
111 hyper::http::header::CONTENT_TYPE,
112 HeaderValue::from_static("application/json"),
113 );
114
115 response
116 } else {
117 let mut response = Response::new(Body::from("Workflow not found"));
118 *response.status_mut() = StatusCode::NOT_FOUND;
119
120 response
121 };
122
123 Ok(response)
124 }
125}
126
127impl From<WorkflowState> for WorkflowStateResponse {
128 fn from(workflow: WorkflowState) -> Self {
129 WorkflowStateResponse {
130 status: match workflow.status {
131 WorkflowStatus::Running => "Running".to_string(),
132 WorkflowStatus::Error {
133 failed_step_id,
134 message,
135 } => format!("Step id {} failed: {}", failed_step_id, message),
136 },
137
138 active_steps: workflow
139 .active_steps
140 .into_iter()
141 .map(|x| WorkflowStepStateResponse::from(x))
142 .collect(),
143
144 pending_steps: workflow
145 .pending_steps
146 .into_iter()
147 .map(|x| WorkflowStepStateResponse::from(x))
148 .collect(),
149 }
150 }
151}
152
153impl From<WorkflowStepState> for WorkflowStepStateResponse {
154 fn from(step_state: WorkflowStepState) -> Self {
155 WorkflowStepStateResponse {
156 step_id: step_state.definition.get_id().to_string(),
157 step_type: step_state.definition.step_type.0,
158 parameters: step_state.definition.parameters,
159 status: match step_state.status {
160 StepStatus::Created => "Created".to_string(),
161 StepStatus::Active => "Active".to_string(),
162 StepStatus::Error { message } => format!("Error: {}", message),
163 StepStatus::Shutdown => "Shut Down".to_string(),
164 },
165 }
166 }
167}