1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
// Copyright 2025 NXP
//
// SPDX-License-Identifier: BSD-3-Clause
#![allow(
clippy::doc_markdown,
reason = "Some comments do not include any meaningful identifiers that would need to be enclosed in backticks."
)]
use core::panic;
use std::sync::Mutex;
use pyo3::{Py, prelude::*, types::PyType};
use crate::{
CommunicationError, KeyProvisioningResponse, McuBoot,
bindings::NOT_OPENED_ERROR,
mboot::{ResultComm, ResultStatus},
protocols::{ProtocolOpen, protocol_impl::ProtocolImpl, uart::UARTProtocol},
tags::{
command::{KeyProvOperation, KeyProvUserKeyType, TrustProvOperation},
property::PropertyTagDiscriminants,
status::StatusCode,
},
};
use pyo3_stub_gen::derive::*;
#[gen_stub_pyclass]
#[pyclass(name = "McuBoot")]
struct McuBootPython {
identifier: String,
// Python can (and frequently) does pass class between threads, therefore each class needs to
// implement Sync; on serialport, that can only be achieved with a mutex
// you could also do it by making it linux only, TTYPort does implement sync unlike COMPort
interface: Option<Mutex<McuBoot<ProtocolImpl>>>,
#[pyo3(get)]
status_code: StatusCode,
}
// TODO implement python exceptions for error
#[gen_stub_pymethods]
#[pymethods]
impl McuBootPython {
#[new]
fn py_new(identifier: String) -> Self {
McuBootPython {
identifier,
interface: None,
status_code: StatusCode::Success,
}
}
#[getter]
fn get_status_code_int(&self) -> usize {
self.status_code as usize
}
#[getter]
fn get_status_code_str(&self) -> String {
self.status_code.to_string()
}
/// Connect to the device.
fn open(&mut self) {
let device = UARTProtocol::open(&self.identifier)
.expect("device could not be opened")
.into();
let boot = McuBoot::new(device);
self.interface = Some(Mutex::new(boot));
}
/// Disconnect from the device.
fn close(&mut self) {
self.interface = None;
}
#[pyo3(name = "__exit__", signature = (_exception_type = None, _exception_value = None, _traceback = None))]
fn exit(
&mut self,
_exception_type: Option<Py<PyType>>,
_exception_value: Option<Py<PyAny>>,
_traceback: Option<Py<PyAny>>,
) {
self.close();
}
#[pyo3(name = "__enter__")]
fn enter(mut slf: PyRefMut<Self>) -> PyRefMut<Self> {
slf.open();
slf
}
/// Get specified property value.
///
/// :param property: Property TAG (see `PropertyTag` Enum)
/// :param index: External memory ID or internal memory region index (depends on property type), defaults to 0
/// :return: list integers representing the property; None in case no response from device
#[pyo3(signature = (property, index = None))]
fn get_property(&mut self, property: PropertyTagDiscriminants, index: Option<u32>) -> Option<Vec<u32>> {
let index = index.unwrap_or(0);
let res = self.get_mut_interface().get_property(property, index);
let res = self.process_result(res)?;
self.status_code = res.status;
Some(res.response_words.to_vec())
}
/// Set value of specified property.
///
/// :param property: Property TAG (see `PropertyTag` enum)
/// :param value: The value of selected property
/// :return: False in case of any problem; True otherwise
fn set_property(&mut self, property: PropertyTagDiscriminants, value: u32) {
let res = self.get_mut_interface().set_property(property, value);
if let Some(status) = self.process_result(res) {
self.status_code = status;
}
}
/// Erase complete flash memory without recovering flash security section.
///
/// :param mem_id: Memory ID, defaults to 0
/// :return: False in case of any problem; True otherwise
#[pyo3(signature = (mem_id = None))]
fn flash_erase_all(&mut self, mem_id: Option<u32>) -> bool {
let mem_id = mem_id.unwrap_or(0);
let res = self.get_mut_interface().flash_erase_all(mem_id);
self.process_status_res(res)
}
/// Erase specified range of flash.
///
/// :param address: Start address
/// :param length: Count of bytes
/// :param mem_id: Memory ID, defaults to 0
/// :return: False in case of any problem; True otherwise
#[pyo3(signature = (address, length, mem_id = None))]
fn flash_erase_region(&mut self, address: u32, length: u32, mem_id: Option<u32>) -> bool {
let mem_id = mem_id.unwrap_or(0);
let res = self.get_mut_interface().flash_erase_region(address, length, mem_id);
self.process_status_res(res)
}
/// Write data into MCU memory.
///
/// :param address: Start address
/// :param data: List of bytes
/// :param `mem_id`: Memory ID, use `0` for internal memory, defaults to 0
/// :return: False in case of any problem; True otherwise
#[pyo3(signature = (address, data, mem_id = None))]
fn write_memory(&mut self, address: u32, data: Vec<u8>, mem_id: Option<u32>) -> bool {
let mem_id = mem_id.unwrap_or(0);
let res = self.get_mut_interface().write_memory(address, mem_id, &data);
self.process_status_res(res)
}
/// Read data from MCU memory.
///
/// :param address: Start address
/// :param length: Count of bytes
/// :param `mem_id`: Memory ID, defaults to 0
/// :return: Data read from the memory; None in case of a failure
#[pyo3(signature = (address, length, mem_id = None))]
fn read_memory(&mut self, address: u32, length: u32, mem_id: Option<u32>) -> Option<Vec<u8>> {
let mem_id = mem_id.unwrap_or(0);
let res = self.get_mut_interface().read_memory(address, length, mem_id);
match self.process_result(res) {
Some(data) => Some(data.bytes.to_vec()),
None => None,
}
}
/// Program fuse.
///
/// :param address: Start address
/// :param data: List of bytes
/// :param mem_id: Memory ID, defaults to 0
/// :return: False in case of any problem; True otherwise
#[pyo3(signature = (address, data, mem_id = None))]
fn fuse_program(&mut self, address: u32, data: Vec<u8>, mem_id: Option<u32>) -> bool {
let mem_id = mem_id.unwrap_or(0);
let res = self.get_mut_interface().fuse_program(address, mem_id, &data);
self.process_status_res(res)
}
/// Read fuse.
///
/// :param address: Start address
/// :param length: Count of bytes
/// :param mem_id: Memory ID, defaults to 0
/// :return: Data read from the fuse; None in case of a failure
#[pyo3(signature = (address, length, mem_id = None))]
fn fuse_read(&mut self, address: u32, length: u32, mem_id: Option<u32>) -> Option<Vec<u8>> {
let mem_id = mem_id.unwrap_or(0);
let res = self.get_mut_interface().fuse_read(address, length, mem_id);
match self.process_result(res) {
Some(data) => Some(data.bytes.to_vec()),
None => None,
}
}
/// Execute program on a given address using the stack pointer.
///
/// :param address: Jump address (must be word aligned)
/// :param argument: Function arguments address
/// :param sp: Stack pointer address
/// :return: False in case of any problem; True otherwise
#[pyo3(signature = (address, argument, sp))]
fn execute(&mut self, address: u32, argument: u32, sp: u32) -> bool {
let res = self.get_mut_interface().execute(address, argument, sp);
self.process_status_res(res)
}
/// Call function on a given address.
///
/// :param address: Call address (must be word aligned)
/// :param argument: Function arguments address
/// :return: False in case of any problem; True otherwise
#[pyo3(signature = (address, argument))]
fn call(&mut self, address: u32, argument: u32) -> bool {
let res = self.get_mut_interface().call(address, argument);
self.process_status_res(res)
}
/// Reset the MCU.
///
/// :return: False in case of any problem; True otherwise
fn reset(&mut self) -> bool {
let res = self.get_mut_interface().reset();
self.process_status_res(res)
}
/// Fill memory region with a pattern.
///
/// :param start_address: Start address (must be word aligned)
/// :param byte_count: Number of bytes to fill (must be word aligned)
/// :param pattern: 32-bit pattern to fill with
/// :return: False in case of any problem; True otherwise
fn fill_memory(&mut self, start_address: u32, byte_count: u32, pattern: u32) -> bool {
let res = self.get_mut_interface().fill_memory(start_address, byte_count, pattern);
self.process_status_res(res)
}
/// Erase all flash and recover security section.
///
/// :return: False in case of any problem; True otherwise
fn flash_erase_all_unsecure(&mut self) -> bool {
let res = self.get_mut_interface().flash_erase_all_unsecure();
self.process_status_res(res)
}
/// Configure external memory.
///
/// :param memory_id: Memory ID to configure
/// :param address: Address containing configuration data
/// :return: False in case of any problem; True otherwise
fn configure_memory(&mut self, memory_id: u32, address: u32) -> bool {
let res = self.get_mut_interface().configure_memory(memory_id, address);
self.process_status_res(res)
}
/// Receive and process a Secure Binary (SB) file.
///
/// :param data: SB file data as list of bytes
/// :return: False in case of any problem; True otherwise
fn receive_sb_file(&mut self, data: Vec<u8>) -> bool {
let res = self.get_mut_interface().receive_sb_file(&data);
self.process_status_res(res)
}
/// Execute trust provisioning operation.
///
/// :param operation: The trust provisioning operation to execute
/// :return: Tuple of (success: bool, response_data: list of integers); (False, []) in case of failure
fn trust_provisioning(&mut self, operation: &TrustProvOperation) -> (bool, Vec<u32>) {
let res = self.get_mut_interface().trust_provisioning(operation);
match self.process_result(res) {
Some((status, response_words)) => {
self.status_code = status;
(true, response_words.to_vec())
}
None => (false, Vec::new()),
}
}
/// Load image data directly to the device.
///
/// :param data: Raw image data to be loaded as list of bytes
/// :return: False in case of any problem; True otherwise
fn load_image(&mut self, data: Vec<u8>) -> bool {
let res = self.get_mut_interface().load_image(&data);
self.process_status_res(res)
}
/// Read from MCU flash program once region (eFuse/OTP).
///
/// :param index: Start index of the eFuse/OTP region
/// :param count: Number of bytes to read (must be 4)
/// :return: The read value as 32-bit integer; None in case of failure
fn flash_read_once(&mut self, index: u32, count: u32) -> Option<u32> {
let res = self.get_mut_interface().flash_read_once(index, count);
self.process_result(res)
}
/// Write into MCU once program region (eFuse/OTP).
///
/// :param index: Start index of the eFuse/OTP region
/// :param count: Number of bytes to write (must be 4)
/// :param data: 32-bit value to write
/// :param verify: If true, reads back and verifies the written value, defaults to False
/// :return: False in case of any problem; True otherwise
#[pyo3(signature = (index, count, data, verify = None))]
fn flash_program_once(&mut self, index: u32, count: u32, data: u32, verify: Option<bool>) -> bool {
let verify = verify.unwrap_or(false);
let res = self.get_mut_interface().flash_program_once(index, count, data, verify);
self.process_status_res(res)
}
/// Key provisioning: Enroll Command (start PUF).
///
/// :return: False in case of any problem; True otherwise
fn kp_enroll(&mut self) -> bool {
let operation = KeyProvOperation::Enroll;
let res = self.get_mut_interface().key_provisioning(&operation);
self.process_keyprov_result(res).0
}
/// Key provisioning: Generate Intrinsic Key.
///
/// :param key_type: Type of the key
/// :param key_size: Size of the key
/// :return: False in case of any problem; True otherwise
fn kp_set_intrinsic_key(&mut self, key_type: KeyProvUserKeyType, key_size: u32) -> bool {
let operation = KeyProvOperation::SetKey { key_type, key_size };
let res = self.get_mut_interface().key_provisioning(&operation);
self.process_keyprov_result(res).0
}
/// Key provisioning: Write the key to a nonvolatile memory.
///
/// :param memory_id: The memory ID, defaults to 0
/// :return: False in case of any problem; True otherwise
#[pyo3(signature = (memory_id = None))]
fn kp_write_nonvolatile(&mut self, memory_id: Option<u32>) -> bool {
let operation = KeyProvOperation::WriteKeyNonvolatile {
memory_id: memory_id.unwrap_or(0),
};
let res = self.get_mut_interface().key_provisioning(&operation);
self.process_keyprov_result(res).0
}
/// Key provisioning: Load the key from a nonvolatile memory to bootloader.
///
/// :param memory_id: The memory ID, defaults to 0
/// :return: False in case of any problem; True otherwise
#[pyo3(signature = (memory_id = None))]
fn kp_read_nonvolatile(&mut self, memory_id: Option<u32>) -> bool {
let operation = KeyProvOperation::ReadKeyNonvolatile {
memory_id: memory_id.unwrap_or(0),
};
let res = self.get_mut_interface().key_provisioning(&operation);
self.process_keyprov_result(res).0
}
/// Key provisioning: Send the user key specified by <key_type> to bootloader.
///
/// :param key_type: type of the user key, see enumeration for details
/// :param key_data: binary content of the user key
/// :return: False in case of any problem; True otherwise
fn kp_set_user_key(&mut self, key_type: KeyProvUserKeyType, key_data: Vec<u8>) -> bool {
let operation = KeyProvOperation::SetUserKey {
key_type,
key_data: key_data.into(),
};
let res = self.get_mut_interface().key_provisioning(&operation);
self.process_keyprov_result(res).0
}
/// Key provisioning: Write key data into key store area.
///
/// :param keystore_data: key store binary content to be written to processor
/// :return: result of the operation; True means success
fn kp_write_key_store(&mut self, keystore_data: Vec<u8>) -> bool {
let operation = KeyProvOperation::WriteKeyStore {
keystore_data: keystore_data.into(),
};
let res = self.get_mut_interface().key_provisioning(&operation);
self.process_keyprov_result(res).0
}
/// Key provisioning: Read key data from key store area.
///
/// :return: Key store data as bytes; None in case of failure
fn kp_read_key_store(&mut self) -> Option<Vec<u8>> {
let operation = KeyProvOperation::ReadKeyStore {
file: String::new(),
use_hexdump: false,
};
let res = self.get_mut_interface().key_provisioning(&operation);
let (_, res) = self.process_keyprov_result(res);
match res {
Some(KeyProvisioningResponse::KeyStore { bytes, .. }) => Some(bytes.to_vec()),
_ => None,
}
}
}
impl McuBootPython {
fn get_mut_interface(&mut self) -> &mut McuBoot<ProtocolImpl> {
self.interface.as_mut().expect(NOT_OPENED_ERROR).get_mut().unwrap()
}
fn process_keyprov_result(
&mut self,
packet: ResultComm<KeyProvisioningResponse>,
) -> (bool, Option<KeyProvisioningResponse>) {
match packet {
Ok(res @ KeyProvisioningResponse::KeyStore { status, .. }) => {
self.status_code = status;
(true, Some(res))
}
Ok(KeyProvisioningResponse::Status(status)) => {
self.status_code = status;
(true, None)
}
Err(_) => (false, None),
}
}
fn process_result<T>(&mut self, packet: ResultComm<T>) -> Option<T> {
match packet {
Ok(res) => Some(res),
Err(CommunicationError::UnexpectedStatus(status, _)) => {
self.status_code = status;
None
}
Err(err) => panic!("{}", err),
}
}
fn process_status_res(&mut self, packet: ResultStatus) -> bool {
let res = self.process_result(packet);
match res {
Some(status) => {
self.status_code = status;
true
}
None => false,
}
}
}
pub fn register(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<McuBootPython>()?;
Ok(())
}