1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252
//! # Iot Jobs Data
//!
//! ## Programming Devices to Work with Jobs
//!
//! The examples in this section use MQTT to illustrate how a device works with
//! the AWS IoT Jobs service. Alternatively, you could use the corresponding API
//! or CLI commands. For these examples, we assume a device called MyThing
//! subscribes to the following MQTT topics:
//!
//! $aws/things/{MyThing}/jobs/notify (or
//! $aws/things/{MyThing}/jobs/notify-next)
//!
//! $aws/things/{MyThing}/jobs/get/accepted
//!
//! $aws/things/{MyThing}/jobs/get/rejected
//!
//! $aws/things/{MyThing}/jobs/{jobId}/get/accepted
//!
//! $aws/things/{MyThing}/jobs/{jobId}/get/rejected
//!
//! If you are using Code-signing for AWS IoT your device code must verify the
//! signature of your code file. The signature is in the job document in the
//! codesign property.
//!
//! ## Workflow:
//! 1. When a device first comes online, it should subscribe to the device's
//! notify-next topic.
//! 2. Call the DescribeJobExecution MQTT API with jobId $next to get the next
//! job, its job document, and other details, including any state saved in
//! statusDetails. If the job document has a code file signature, you must
//! verify the signature before proceeding with processing the job request.
//! 3. Call the UpdateJobExecution MQTT API to update the job status. Or, to
//! combine this and the previous step in one call, the device can call
//! StartNextPendingJobExecution.
//! 4. (Optional) You can add a step timer by setting a value for
//! stepTimeoutInMinutes when you call either UpdateJobExecution or
//! StartNextPendingJobExecution.
//! 5. Perform the actions specified by the job document using the
//! UpdateJobExecution MQTT API to report on the progress of the job.
//! 6. Continue to monitor the job execution by calling the DescribeJobExecution
//! MQTT API with this jobId. If the job execution is canceled or deleted
//! while the device is running the job, the device should be capable of
//! recovering to a valid state.
//! 7. Call the UpdateJobExecution MQTT API when finished with the job to update
//! the job status and report success or failure.
//! 8. Because this job's execution status has been changed to a terminal state,
//! the next job available for execution (if any) changes. The device is
//! notified that the next pending job execution has changed. At this point,
//! the device should continue as described in step 2.
//!
//! If the device remains online, it continues to receive a notifications of the
//! next pending job execution, including its job execution data, when it
//! completes a job or a new pending job execution is added. When this occurs,
//! the device continues as described in step 2.
//!
//! If the device is unable to execute the job, it should call the
//! UpdateJobExecution MQTT API to update the job status to REJECTED.
//!
//!
//! ## Jobs Notifications
//! The AWS IoT Jobs service publishes MQTT messages to reserved topics when
//! jobs are pending or when the first job execution in the list changes.
//! Devices can keep track of pending jobs by subscribing to these topics.
//!
//! Job notifications are published to MQTT topics as JSON payloads. There are
//! two kinds of notifications:
//!
//! A ListNotification contains a list of no more than 10 pending job
//! executions. The job executions in this list have status values of either
//! IN_PROGRESS or QUEUED. They are sorted by status (IN_PROGRESS job executions
//! before QUEUED job executions) and then by the times when they were queued.
//!
//! A ListNotification is published whenever one of the following criteria is
//! met.
//!
//! A new job execution is queued or changes to a non-terminal status
//! (IN_PROGRESS or QUEUED).
//!
//! An old job execution changes to a terminal status (FAILED, SUCCEEDED,
//! CANCELED, TIMED_OUT, REJECTED, or REMOVED).
//!
//! A NextNotification contains summary information about the one job execution
//! that is next in the queue.
//!
//! A NextNotification is published whenever the first job execution in the list
//! changes.
//!
//! A new job execution is added to the list as QUEUED, and it is the first one
//! in the list.
//!
//! The status of an existing job execution that was not the first one in the
//! list changes from QUEUED to IN_PROGRESS and becomes the first one in the
//! list. (This happens when there are no other IN_PROGRESS job executions in
//! the list or when the job execution whose status changes from QUEUED to
//! IN_PROGRESS was queued earlier than any other IN_PROGRESS job execution in
//! the list.)
//!
//! The status of the job execution that is first in the list changes to a
//! terminal status and is removed from the list.
pub mod data_types;
pub mod describe;
pub mod get_pending;
pub mod start_next;
pub mod subscribe;
pub mod unsubscribe;
pub mod update;
use core::fmt::Write;
use self::{
data_types::JobStatus, describe::Describe, get_pending::GetPending, start_next::StartNext,
subscribe::Subscribe, unsubscribe::Unsubscribe, update::Update,
};
pub use subscribe::Topic;
/// https://docs.aws.amazon.com/iot/latest/apireference/API_DescribeThing.html
pub const MAX_THING_NAME_LEN: usize = 128;
pub const MAX_CLIENT_TOKEN_LEN: usize = MAX_THING_NAME_LEN + 10;
pub const MAX_JOB_ID_LEN: usize = 64;
pub const MAX_STREAM_ID_LEN: usize = MAX_JOB_ID_LEN;
pub const MAX_PENDING_JOBS: usize = 1;
pub const MAX_RUNNING_JOBS: usize = 1;
pub type StatusDetails = heapless::FnvIndexMap<heapless::String<15>, heapless::String<11>, 4>;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum JobError {
Overflow,
Encoding,
Mqtt(mqttrust::MqttError),
}
impl From<mqttrust::MqttError> for JobError {
fn from(e: mqttrust::MqttError) -> Self {
Self::Mqtt(e)
}
}
#[derive(Debug, Clone, PartialEq)]
enum JobTopic<'a> {
// Outgoing Topics
GetNext,
GetPending,
StartNext,
Get(&'a str),
Update(&'a str),
// Incoming Topics
Notify,
NotifyNext,
GetAccepted,
GetRejected,
StartNextAccepted,
StartNextRejected,
DescribeAccepted(&'a str),
DescribeRejected(&'a str),
UpdateAccepted(&'a str),
UpdateRejected(&'a str),
}
impl<'a> JobTopic<'a> {
pub fn format<const L: usize>(&self, client_id: &str) -> Result<heapless::String<L>, JobError> {
let mut topic_path = heapless::String::new();
match self {
Self::GetNext => {
topic_path.write_fmt(format_args!("$aws/things/{}/jobs/$next/get", client_id))
}
Self::GetPending => {
topic_path.write_fmt(format_args!("$aws/things/{}/jobs/get", client_id))
}
Self::StartNext => {
topic_path.write_fmt(format_args!("$aws/things/{}/jobs/start-next", client_id))
}
Self::Get(job_id) => topic_path.write_fmt(format_args!(
"$aws/things/{}/jobs/{}/get",
client_id, job_id
)),
Self::Update(job_id) => topic_path.write_fmt(format_args!(
"$aws/things/{}/jobs/{}/update",
client_id, job_id
)),
Self::Notify => {
topic_path.write_fmt(format_args!("$aws/things/{}/jobs/notify", client_id))
}
Self::NotifyNext => {
topic_path.write_fmt(format_args!("$aws/things/{}/jobs/notify-next", client_id))
}
Self::GetAccepted => {
topic_path.write_fmt(format_args!("$aws/things/{}/jobs/get/accepted", client_id))
}
Self::GetRejected => {
topic_path.write_fmt(format_args!("$aws/things/{}/jobs/get/rejected", client_id))
}
Self::StartNextAccepted => topic_path.write_fmt(format_args!(
"$aws/things/{}/jobs/start-next/accepted",
client_id
)),
Self::StartNextRejected => topic_path.write_fmt(format_args!(
"$aws/things/{}/jobs/start-next/rejected",
client_id
)),
Self::DescribeAccepted(job_id) => topic_path.write_fmt(format_args!(
"$aws/things/{}/jobs/{}/get/accepted",
client_id, job_id
)),
Self::DescribeRejected(job_id) => topic_path.write_fmt(format_args!(
"$aws/things/{}/jobs/{}/get/rejected",
client_id, job_id
)),
Self::UpdateAccepted(job_id) => topic_path.write_fmt(format_args!(
"$aws/things/{}/jobs/{}/update/accepted",
client_id, job_id
)),
Self::UpdateRejected(job_id) => topic_path.write_fmt(format_args!(
"$aws/things/{}/jobs/{}/update/rejected",
client_id, job_id
)),
}
.map_err(|_| JobError::Overflow)?;
Ok(topic_path)
}
}
pub struct Jobs;
impl Jobs {
pub fn get_pending<'a>() -> GetPending<'a> {
GetPending::new()
}
pub fn start_next<'a>() -> StartNext<'a> {
StartNext::new()
}
pub fn describe<'a>() -> Describe<'a> {
Describe::new()
}
pub fn update(job_id: &str, status: JobStatus) -> Update {
Update::new(job_id, status)
}
pub fn subscribe<'a, const N: usize>() -> Subscribe<'a, N> {
Subscribe::new()
}
pub fn unsubscribe<'a, const N: usize>() -> Unsubscribe<'a, N> {
Unsubscribe::new()
}
}