mmids_core/http_api/handlers/
start_workflow.rs

1//! Contains the handler that creates and updates workflows
2
3use crate::http_api::routing::RouteHandler;
4use crate::workflows::definitions::WorkflowDefinition;
5use crate::workflows::manager::{WorkflowManagerRequest, WorkflowManagerRequestOperation};
6use async_trait::async_trait;
7use bytes::Bytes;
8use hyper::http::HeaderValue;
9use hyper::{Body, Error, Request, Response, StatusCode};
10use serde::Serialize;
11use std::collections::HashMap;
12use tokio::sync::mpsc::UnboundedSender;
13use tracing::{error, warn};
14
15const MMIDS_MIME_TYPE: &'static str = "application/vnd.mmids.workflow";
16
17/// Handles requests to start a workflow. Every workflow must have a name, and if a workflow is
18/// specified with a name that matches an already running workflow then the existing workflow
19/// will be updated to match the passed in workflow instead of a new workflow starting.
20///
21/// A successful result does not mean that the workflow has fully started, only that the workflow
22/// has been submitted to the workflow manager. Status updates of the workflow will need to be
23/// queried to know if it successfully became active.
24///
25/// The details of the workflow are expected in the passed in via the request body.  The format
26/// that the details come in are based on the `Content-Type` header of the request:
27///
28/// * `application/vnd.mmids.workflow` - Worklow definition that matches how workflows are defined in
29/// the mmids configuration files.
30///
31/// If no `Content-Type` is specified than `application/vnd.mmids.workflow` is assumed.
32pub struct StartWorkflowHandler {
33    manager: UnboundedSender<WorkflowManagerRequest>,
34}
35
36/// Response provided when an error is returned, such as an invalid workflow specified
37#[derive(Serialize)]
38pub struct ErrorResponse {
39    pub error: String,
40}
41
42impl StartWorkflowHandler {
43    pub fn new(manager: UnboundedSender<WorkflowManagerRequest>) -> Self {
44        StartWorkflowHandler { manager }
45    }
46}
47
48#[async_trait]
49impl RouteHandler for StartWorkflowHandler {
50    async fn execute(
51        &self,
52        request: &mut Request<Body>,
53        _path_parameters: HashMap<String, String>,
54        request_id: String,
55    ) -> Result<Response<Body>, Error> {
56        let body = hyper::body::to_bytes(request.body_mut()).await?;
57        let content_type = match request.headers().get(hyper::http::header::CONTENT_TYPE) {
58            Some(content_type) => content_type.to_str().unwrap_or(MMIDS_MIME_TYPE),
59            None => {
60                warn!("No content type specified, assuming '{}'", MMIDS_MIME_TYPE);
61                MMIDS_MIME_TYPE
62            }
63        };
64
65        let workflow = match content_type.to_lowercase().trim() {
66            MMIDS_MIME_TYPE => parse_mmids_mime_type(body)?,
67
68            x => {
69                warn!("Invalid content type specified: '{}'", x);
70                let error = ErrorResponse {
71                    error: format!("Invalid content type specified: {}", x),
72                };
73                return Ok(error.to_json_bad_request());
74            }
75        };
76
77        let workflow = match workflow {
78            Ok(workflow) => workflow,
79            Err(error) => {
80                return Ok(error.to_json_bad_request());
81            }
82        };
83
84        let result = self.manager.send(WorkflowManagerRequest {
85            request_id,
86            operation: WorkflowManagerRequestOperation::UpsertWorkflow {
87                definition: workflow,
88            },
89        });
90
91        match result {
92            Ok(_) => Ok(Response::default()),
93
94            Err(_) => {
95                error!("Workflow manager no longer exists");
96                let mut response = Response::default();
97                *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
98
99                Ok(response)
100            }
101        }
102    }
103}
104
105impl ErrorResponse {
106    fn to_json_bad_request(self) -> Response<Body> {
107        let json = match serde_json::to_string_pretty(&self) {
108            Ok(json) => json,
109            Err(error) => {
110                error!("Failed to serialize error response to json: {:?}", error);
111                let mut response = Response::default();
112                *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
113
114                return response;
115            }
116        };
117
118        let mut response = Response::new(Body::from(json));
119        *response.status_mut() = StatusCode::BAD_REQUEST;
120
121        let headers = response.headers_mut();
122        headers.insert(
123            hyper::http::header::CONTENT_TYPE,
124            HeaderValue::from_static("application/json"),
125        );
126
127        return response;
128    }
129}
130
131fn parse_mmids_mime_type(body: Bytes) -> Result<Result<WorkflowDefinition, ErrorResponse>, Error> {
132    let content = match String::from_utf8(body.to_vec()) {
133        Ok(content) => content,
134        Err(utf8_error) => {
135            return Ok(Err(ErrorResponse {
136                error: format!("Body was not valid utf8 content: {}", utf8_error),
137            }));
138        }
139    };
140
141    let mut config = match crate::config::parse(content.as_str()) {
142        Ok(config) => config,
143        Err(parse_error) => {
144            return Ok(Err(ErrorResponse {
145                error: format!("Failed to parse input: {:?}", parse_error),
146            }));
147        }
148    };
149
150    if config.workflows.len() > 1 {
151        return Ok(Err(ErrorResponse {
152            error: format!(
153                "Each request can only contain 1 workflow, {} were specified",
154                config.workflows.len()
155            ),
156        }));
157    }
158
159    if config.workflows.len() == 0 {
160        return Ok(Err(ErrorResponse {
161            error: "No workflows specified".to_string(),
162        }));
163    }
164
165    let workflow = config.workflows.drain().nth(0).unwrap().1;
166    Ok(Ok(workflow))
167}