1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
//! # Iot Jobs Data
//!
//! ## Programming Devices to Work with Jobs
//!
//! The examples in this section use MQTT to illustrate how a device works with
//! the AWS IoT Jobs service. Alternatively, you could use the corresponding API
//! or CLI commands. For these examples, we assume a device called MyThing
//! subscribes to the following MQTT topics:
//!
//! $aws/things/{MyThing}/jobs/notify (or
//! $aws/things/{MyThing}/jobs/notify-next)
//!
//! $aws/things/{MyThing}/jobs/get/accepted
//!
//! $aws/things/{MyThing}/jobs/get/rejected
//!
//! $aws/things/{MyThing}/jobs/{jobId}/get/accepted
//!
//! $aws/things/{MyThing}/jobs/{jobId}/get/rejected
//!
//! If you are using Code-signing for AWS IoT your device code must verify the
//! signature of your code file. The signature is in the job document in the
//! codesign property.
//!
//! ## Workflow:
//! 1. When a device first comes online, it should subscribe to the device's
//!    notify-next topic.
//! 2. Call the DescribeJobExecution MQTT API with jobId $next to get the next
//!    job, its job document, and other details, including any state saved in
//!    statusDetails. If the job document has a code file signature, you must
//!    verify the signature before proceeding with processing the job request.
//! 3. Call the UpdateJobExecution MQTT API to update the job status. Or, to
//!    combine this and the previous step in one call, the device can call
//!    StartNextPendingJobExecution.
//! 4. (Optional) You can add a step timer by setting a value for
//!    stepTimeoutInMinutes when you call either UpdateJobExecution or
//!    StartNextPendingJobExecution.
//! 5. Perform the actions specified by the job document using the
//!    UpdateJobExecution MQTT API to report on the progress of the job.
//! 6. Continue to monitor the job execution by calling the DescribeJobExecution
//!    MQTT API with this jobId. If the job execution is canceled or deleted
//!    while the device is running the job, the device should be capable of
//!    recovering to a valid state.
//! 7. Call the UpdateJobExecution MQTT API when finished with the job to update
//!    the job status and report success or failure.
//! 8. Because this job's execution status has been changed to a terminal state,
//!    the next job available for execution (if any) changes. The device is
//!    notified that the next pending job execution has changed. At this point,
//!    the device should continue as described in step 2.
//!
//! If the device remains online, it continues to receive a notifications of the
//! next pending job execution, including its job execution data, when it
//! completes a job or a new pending job execution is added. When this occurs,
//! the device continues as described in step 2.
//!
//! If the device is unable to execute the job, it should call the
//! UpdateJobExecution MQTT API to update the job status to REJECTED.
//!
//!
//! ## Jobs Notifications
//! The AWS IoT Jobs service publishes MQTT messages to reserved topics when
//! jobs are pending or when the first job execution in the list changes.
//! Devices can keep track of pending jobs by subscribing to these topics.
//!
//! Job notifications are published to MQTT topics as JSON payloads. There are
//! two kinds of notifications:
//!
//! A ListNotification contains a list of no more than 10 pending job
//! executions. The job executions in this list have status values of either
//! IN_PROGRESS or QUEUED. They are sorted by status (IN_PROGRESS job executions
//! before QUEUED job executions) and then by the times when they were queued.
//!
//! A ListNotification is published whenever one of the following criteria is
//! met.
//!
//! A new job execution is queued or changes to a non-terminal status
//! (IN_PROGRESS or QUEUED).
//!
//! An old job execution changes to a terminal status (FAILED, SUCCEEDED,
//! CANCELED, TIMED_OUT, REJECTED, or REMOVED).
//!
//! A NextNotification contains summary information about the one job execution
//! that is next in the queue.
//!
//! A NextNotification is published whenever the first job execution in the list
//! changes.
//!
//! A new job execution is added to the list as QUEUED, and it is the first one
//! in the list.
//!
//! The status of an existing job execution that was not the first one in the
//! list changes from QUEUED to IN_PROGRESS and becomes the first one in the
//! list. (This happens when there are no other IN_PROGRESS job executions in
//! the list or when the job execution whose status changes from QUEUED to
//! IN_PROGRESS was queued earlier than any other IN_PROGRESS job execution in
//! the list.)
//!
//! The status of the job execution that is first in the list changes to a
//! terminal status and is removed from the list.
pub mod data_types;
pub mod describe;
pub mod get_pending;
pub mod start_next;
pub mod subscribe;
pub mod unsubscribe;
pub mod update;

use core::fmt::Write;

use self::{
    data_types::JobStatus, describe::Describe, get_pending::GetPending, start_next::StartNext,
    subscribe::Subscribe, unsubscribe::Unsubscribe, update::Update,
};
pub use subscribe::Topic;

/// https://docs.aws.amazon.com/iot/latest/apireference/API_DescribeThing.html
pub const MAX_THING_NAME_LEN: usize = 128;
pub const MAX_CLIENT_TOKEN_LEN: usize = MAX_THING_NAME_LEN + 10;
pub const MAX_JOB_ID_LEN: usize = 64;
pub const MAX_STREAM_ID_LEN: usize = MAX_JOB_ID_LEN;
pub const MAX_PENDING_JOBS: usize = 1;
pub const MAX_RUNNING_JOBS: usize = 1;

pub type StatusDetails = heapless::FnvIndexMap<heapless::String<15>, heapless::String<11>, 4>;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum JobError {
    Overflow,
    Encoding,
    Mqtt(mqttrust::MqttError),
}

impl From<mqttrust::MqttError> for JobError {
    fn from(e: mqttrust::MqttError) -> Self {
        Self::Mqtt(e)
    }
}

#[derive(Debug, Clone, PartialEq)]
enum JobTopic<'a> {
    // Outgoing Topics
    GetNext,
    GetPending,
    StartNext,
    Get(&'a str),
    Update(&'a str),

    // Incoming Topics
    Notify,
    NotifyNext,
    GetAccepted,
    GetRejected,
    StartNextAccepted,
    StartNextRejected,
    DescribeAccepted(&'a str),
    DescribeRejected(&'a str),
    UpdateAccepted(&'a str),
    UpdateRejected(&'a str),
}

impl<'a> JobTopic<'a> {
    pub fn format<const L: usize>(&self, client_id: &str) -> Result<heapless::String<L>, JobError> {
        let mut topic_path = heapless::String::new();
        match self {
            Self::GetNext => {
                topic_path.write_fmt(format_args!("$aws/things/{}/jobs/$next/get", client_id))
            }
            Self::GetPending => {
                topic_path.write_fmt(format_args!("$aws/things/{}/jobs/get", client_id))
            }
            Self::StartNext => {
                topic_path.write_fmt(format_args!("$aws/things/{}/jobs/start-next", client_id))
            }
            Self::Get(job_id) => topic_path.write_fmt(format_args!(
                "$aws/things/{}/jobs/{}/get",
                client_id, job_id
            )),
            Self::Update(job_id) => topic_path.write_fmt(format_args!(
                "$aws/things/{}/jobs/{}/update",
                client_id, job_id
            )),

            Self::Notify => {
                topic_path.write_fmt(format_args!("$aws/things/{}/jobs/notify", client_id))
            }
            Self::NotifyNext => {
                topic_path.write_fmt(format_args!("$aws/things/{}/jobs/notify-next", client_id))
            }
            Self::GetAccepted => {
                topic_path.write_fmt(format_args!("$aws/things/{}/jobs/get/accepted", client_id))
            }
            Self::GetRejected => {
                topic_path.write_fmt(format_args!("$aws/things/{}/jobs/get/rejected", client_id))
            }
            Self::StartNextAccepted => topic_path.write_fmt(format_args!(
                "$aws/things/{}/jobs/start-next/accepted",
                client_id
            )),
            Self::StartNextRejected => topic_path.write_fmt(format_args!(
                "$aws/things/{}/jobs/start-next/rejected",
                client_id
            )),
            Self::DescribeAccepted(job_id) => topic_path.write_fmt(format_args!(
                "$aws/things/{}/jobs/{}/get/accepted",
                client_id, job_id
            )),
            Self::DescribeRejected(job_id) => topic_path.write_fmt(format_args!(
                "$aws/things/{}/jobs/{}/get/rejected",
                client_id, job_id
            )),
            Self::UpdateAccepted(job_id) => topic_path.write_fmt(format_args!(
                "$aws/things/{}/jobs/{}/update/accepted",
                client_id, job_id
            )),
            Self::UpdateRejected(job_id) => topic_path.write_fmt(format_args!(
                "$aws/things/{}/jobs/{}/update/rejected",
                client_id, job_id
            )),
        }
        .map_err(|_| JobError::Overflow)?;

        Ok(topic_path)
    }
}

pub struct Jobs;

impl Jobs {
    pub fn get_pending<'a>() -> GetPending<'a> {
        GetPending::new()
    }

    pub fn start_next<'a>() -> StartNext<'a> {
        StartNext::new()
    }

    pub fn describe<'a>() -> Describe<'a> {
        Describe::new()
    }

    pub fn update(job_id: &str, status: JobStatus) -> Update {
        Update::new(job_id, status)
    }

    pub fn subscribe<'a, const N: usize>() -> Subscribe<'a, N> {
        Subscribe::new()
    }

    pub fn unsubscribe<'a, const N: usize>() -> Unsubscribe<'a, N> {
        Unsubscribe::new()
    }
}