mmids_core/http_api/handlers/
start_workflow.rs1use crate::http_api::routing::RouteHandler;
4use crate::workflows::definitions::WorkflowDefinition;
5use crate::workflows::manager::{WorkflowManagerRequest, WorkflowManagerRequestOperation};
6use async_trait::async_trait;
7use bytes::Bytes;
8use hyper::http::HeaderValue;
9use hyper::{Body, Error, Request, Response, StatusCode};
10use serde::Serialize;
11use std::collections::HashMap;
12use tokio::sync::mpsc::UnboundedSender;
13use tracing::{error, warn};
14
15const MMIDS_MIME_TYPE: &'static str = "application/vnd.mmids.workflow";
16
17pub struct StartWorkflowHandler {
33 manager: UnboundedSender<WorkflowManagerRequest>,
34}
35
36#[derive(Serialize)]
38pub struct ErrorResponse {
39 pub error: String,
40}
41
42impl StartWorkflowHandler {
43 pub fn new(manager: UnboundedSender<WorkflowManagerRequest>) -> Self {
44 StartWorkflowHandler { manager }
45 }
46}
47
48#[async_trait]
49impl RouteHandler for StartWorkflowHandler {
50 async fn execute(
51 &self,
52 request: &mut Request<Body>,
53 _path_parameters: HashMap<String, String>,
54 request_id: String,
55 ) -> Result<Response<Body>, Error> {
56 let body = hyper::body::to_bytes(request.body_mut()).await?;
57 let content_type = match request.headers().get(hyper::http::header::CONTENT_TYPE) {
58 Some(content_type) => content_type.to_str().unwrap_or(MMIDS_MIME_TYPE),
59 None => {
60 warn!("No content type specified, assuming '{}'", MMIDS_MIME_TYPE);
61 MMIDS_MIME_TYPE
62 }
63 };
64
65 let workflow = match content_type.to_lowercase().trim() {
66 MMIDS_MIME_TYPE => parse_mmids_mime_type(body)?,
67
68 x => {
69 warn!("Invalid content type specified: '{}'", x);
70 let error = ErrorResponse {
71 error: format!("Invalid content type specified: {}", x),
72 };
73 return Ok(error.to_json_bad_request());
74 }
75 };
76
77 let workflow = match workflow {
78 Ok(workflow) => workflow,
79 Err(error) => {
80 return Ok(error.to_json_bad_request());
81 }
82 };
83
84 let result = self.manager.send(WorkflowManagerRequest {
85 request_id,
86 operation: WorkflowManagerRequestOperation::UpsertWorkflow {
87 definition: workflow,
88 },
89 });
90
91 match result {
92 Ok(_) => Ok(Response::default()),
93
94 Err(_) => {
95 error!("Workflow manager no longer exists");
96 let mut response = Response::default();
97 *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
98
99 Ok(response)
100 }
101 }
102 }
103}
104
105impl ErrorResponse {
106 fn to_json_bad_request(self) -> Response<Body> {
107 let json = match serde_json::to_string_pretty(&self) {
108 Ok(json) => json,
109 Err(error) => {
110 error!("Failed to serialize error response to json: {:?}", error);
111 let mut response = Response::default();
112 *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
113
114 return response;
115 }
116 };
117
118 let mut response = Response::new(Body::from(json));
119 *response.status_mut() = StatusCode::BAD_REQUEST;
120
121 let headers = response.headers_mut();
122 headers.insert(
123 hyper::http::header::CONTENT_TYPE,
124 HeaderValue::from_static("application/json"),
125 );
126
127 return response;
128 }
129}
130
131fn parse_mmids_mime_type(body: Bytes) -> Result<Result<WorkflowDefinition, ErrorResponse>, Error> {
132 let content = match String::from_utf8(body.to_vec()) {
133 Ok(content) => content,
134 Err(utf8_error) => {
135 return Ok(Err(ErrorResponse {
136 error: format!("Body was not valid utf8 content: {}", utf8_error),
137 }));
138 }
139 };
140
141 let mut config = match crate::config::parse(content.as_str()) {
142 Ok(config) => config,
143 Err(parse_error) => {
144 return Ok(Err(ErrorResponse {
145 error: format!("Failed to parse input: {:?}", parse_error),
146 }));
147 }
148 };
149
150 if config.workflows.len() > 1 {
151 return Ok(Err(ErrorResponse {
152 error: format!(
153 "Each request can only contain 1 workflow, {} were specified",
154 config.workflows.len()
155 ),
156 }));
157 }
158
159 if config.workflows.len() == 0 {
160 return Ok(Err(ErrorResponse {
161 error: "No workflows specified".to_string(),
162 }));
163 }
164
165 let workflow = config.workflows.drain().nth(0).unwrap().1;
166 Ok(Ok(workflow))
167}