use mqttrust::{Mqtt, QoS};
use serde::Serialize;
use crate::jobs::{
data_types::JobStatus, JobTopic, MAX_CLIENT_TOKEN_LEN, MAX_JOB_ID_LEN, MAX_THING_NAME_LEN,
};
use super::{JobError, StatusDetailsOwned};
#[derive(Debug, PartialEq, Serialize)]
pub struct UpdateJobExecutionRequest<'a> {
#[serde(rename = "executionNumber")]
#[serde(skip_serializing_if = "Option::is_none")]
pub execution_number: Option<i64>,
#[serde(rename = "expectedVersion")]
#[serde(skip_serializing_if = "Option::is_none")]
pub expected_version: Option<i64>,
#[serde(rename = "includeJobDocument")]
#[serde(skip_serializing_if = "Option::is_none")]
pub include_job_document: Option<bool>,
#[serde(rename = "includeJobExecutionState")]
#[serde(skip_serializing_if = "Option::is_none")]
pub include_job_execution_state: Option<bool>,
#[serde(rename = "status")]
pub status: JobStatus,
#[serde(rename = "statusDetails")]
#[serde(skip_serializing_if = "Option::is_none")]
pub status_details: Option<&'a StatusDetailsOwned>,
#[serde(rename = "stepTimeoutInMinutes")]
#[serde(skip_serializing_if = "Option::is_none")]
pub step_timeout_in_minutes: Option<i64>,
#[serde(rename = "clientToken")]
#[serde(skip_serializing_if = "Option::is_none")]
pub client_token: Option<&'a str>,
}
pub struct Update<'a> {
job_id: &'a str,
status: JobStatus,
client_token: Option<&'a str>,
status_details: Option<&'a StatusDetailsOwned>,
include_job_document: bool,
execution_number: Option<i64>,
include_job_execution_state: bool,
expected_version: Option<i64>,
step_timeout_in_minutes: Option<i64>,
}
impl<'a> Update<'a> {
pub fn new(job_id: &'a str, status: JobStatus) -> Self {
assert!(job_id.len() < MAX_JOB_ID_LEN);
Self {
job_id,
status,
status_details: None,
include_job_document: false,
execution_number: None,
include_job_execution_state: false,
expected_version: None,
client_token: None,
step_timeout_in_minutes: None,
}
}
pub fn client_token(self, client_token: &'a str) -> Self {
assert!(client_token.len() < MAX_CLIENT_TOKEN_LEN);
Self {
client_token: Some(client_token),
..self
}
}
pub fn status_details(self, status_details: &'a StatusDetailsOwned) -> Self {
Self {
status_details: Some(status_details),
..self
}
}
pub fn include_job_document(self) -> Self {
Self {
include_job_document: true,
..self
}
}
pub fn include_job_execution_state(self) -> Self {
Self {
include_job_execution_state: true,
..self
}
}
pub fn execution_number(self, execution_number: i64) -> Self {
Self {
execution_number: Some(execution_number),
..self
}
}
pub fn expected_version(self, expected_version: i64) -> Self {
Self {
expected_version: Some(expected_version),
..self
}
}
pub fn step_timeout_in_minutes(self, step_timeout_in_minutes: i64) -> Self {
Self {
step_timeout_in_minutes: Some(step_timeout_in_minutes),
..self
}
}
pub fn topic_payload(
self,
client_id: &str,
) -> Result<
(
heapless::String<{ MAX_THING_NAME_LEN + MAX_JOB_ID_LEN + 25 }>,
heapless::Vec<u8, 512>,
),
JobError,
> {
let payload = serde_json_core::to_vec(&UpdateJobExecutionRequest {
execution_number: self.execution_number,
include_job_document: self.include_job_document.then(|| true),
expected_version: self.expected_version,
include_job_execution_state: self.include_job_execution_state.then(|| true),
status: self.status,
status_details: self.status_details,
step_timeout_in_minutes: self.step_timeout_in_minutes,
client_token: self.client_token,
})
.map_err(|_| JobError::Encoding)?;
Ok((JobTopic::Update(self.job_id).format(client_id)?, payload))
}
pub fn send<M: Mqtt>(self, mqtt: &M, qos: QoS) -> Result<(), JobError> {
let (topic, payload) = self.topic_payload(mqtt.client_id())?;
mqtt.publish(topic.as_str(), &payload, qos)?;
Ok(())
}
}
#[cfg(test)]
mod test {
use super::*;
use serde_json_core::to_string;
#[test]
fn serialize_requests() {
let req = UpdateJobExecutionRequest {
client_token: Some("test_client:token_update"),
step_timeout_in_minutes: Some(50),
execution_number: Some(5),
expected_version: Some(2),
include_job_document: Some(true),
include_job_execution_state: Some(true),
status_details: None,
status: JobStatus::Failed,
};
assert_eq!(
&to_string::<_, 512>(&req).unwrap(),
r#"{"executionNumber":5,"expectedVersion":2,"includeJobDocument":true,"includeJobExecutionState":true,"status":"FAILED","stepTimeoutInMinutes":50,"clientToken":"test_client:token_update"}"#
);
}
#[test]
fn topic_payload() {
let (topic, payload) = Update::new("test_job_id", JobStatus::Failed)
.client_token("test_client:token_update")
.step_timeout_in_minutes(50)
.execution_number(5)
.expected_version(2)
.include_job_document()
.include_job_execution_state()
.topic_payload("test_client")
.unwrap();
assert_eq!(payload, br#"{"executionNumber":5,"expectedVersion":2,"includeJobDocument":true,"includeJobExecutionState":true,"status":"FAILED","stepTimeoutInMinutes":50,"clientToken":"test_client:token_update"}"#);
assert_eq!(
topic.as_str(),
"$aws/things/test_client/jobs/test_job_id/update"
);
}
}