#![cfg_attr(feature = "io_new", allow(dead_code))]
#[cfg(any(target_os = "nanosplus", target_os = "nanox"))]
use ledger_secure_sdk_sys::buttons::{ButtonEvent, ButtonsState, get_button_event};
use ledger_secure_sdk_sys::seph as sys_seph;
use ledger_secure_sdk_sys::*;
use crate::io_callbacks::nbgl_register_callbacks;
use crate::seph;
#[cfg(any(
target_os = "nanox",
target_os = "stax",
target_os = "flex",
target_os = "apex_p"
))]
use crate::seph::ItcUxEvent;
use core::convert::{Infallible, TryFrom};
use core::ops::{Index, IndexMut};
#[cfg(any(
target_os = "nanox",
target_os = "stax",
target_os = "flex",
target_os = "apex_p"
))]
unsafe extern "C" {
pub unsafe static mut G_ux_params: bolos_ux_params_t;
}
#[derive(Copy, Clone)]
#[repr(u16)]
pub enum StatusWords {
Ok = 0x9000,
NothingReceived = 0x6982,
BadCla = 0x6e00,
BadIns = 0x6e01,
BadP1P2 = 0x6e02,
BadLen = 0x6e03,
UserCancelled = 0x6985,
Unknown = 0x6d00,
Panic = 0xe000,
DeviceLocked = 0x5515,
}
#[derive(Debug)]
#[repr(u8)]
pub enum SyscallError {
InvalidParameter = 2,
Overflow,
Security,
InvalidCrc,
InvalidChecksum,
InvalidCounter,
NotSupported,
InvalidState,
Timeout,
InvalidPkiCertificate,
Unspecified,
}
impl From<u32> for SyscallError {
fn from(e: u32) -> SyscallError {
match e {
2 => SyscallError::InvalidParameter,
3 => SyscallError::Overflow,
4 => SyscallError::Security,
5 => SyscallError::InvalidCrc,
6 => SyscallError::InvalidChecksum,
7 => SyscallError::InvalidCounter,
8 => SyscallError::NotSupported,
9 => SyscallError::InvalidState,
10 => SyscallError::Timeout,
11 => SyscallError::InvalidPkiCertificate,
_ => SyscallError::Unspecified,
}
}
}
#[repr(u32)]
pub enum PkiLoadCertificateError {
InvalidStructureType,
IncorrectCertificateVersion,
IncorrectCertificateValidity,
IncorrectCertificateValidityIndex,
UnknownSignerKeyId,
UnknownSignatureAlgorithm,
UnknownPublicKeyId,
UnknownPublicKeyUsage,
IncorrectEllipticCurveId,
IncorrectSignatureAlgorithmAssociatedToPublicKey,
UnknownTargetDevice,
UnknownCertificateTag,
FailedToHashData,
ExpectedKeyUsageDoesNotMatchCertificateKeyUsage,
FailedToVerifySignature,
TrustedNameBufferTooSmall,
}
impl From<u32> for PkiLoadCertificateError {
fn from(e: u32) -> PkiLoadCertificateError {
match e {
0x422F => PkiLoadCertificateError::InvalidStructureType,
0x4230 => PkiLoadCertificateError::IncorrectCertificateVersion,
0x4231 => PkiLoadCertificateError::IncorrectCertificateValidity,
0x4232 => PkiLoadCertificateError::IncorrectCertificateValidityIndex,
0x4233 => PkiLoadCertificateError::UnknownSignerKeyId,
0x4234 => PkiLoadCertificateError::UnknownSignatureAlgorithm,
0x4235 => PkiLoadCertificateError::UnknownPublicKeyId,
0x4236 => PkiLoadCertificateError::UnknownPublicKeyUsage,
0x4237 => PkiLoadCertificateError::IncorrectEllipticCurveId,
0x4238 => PkiLoadCertificateError::IncorrectSignatureAlgorithmAssociatedToPublicKey,
0x4239 => PkiLoadCertificateError::UnknownTargetDevice,
0x422D => PkiLoadCertificateError::UnknownCertificateTag,
0x3301 => PkiLoadCertificateError::FailedToHashData,
0x422E => PkiLoadCertificateError::ExpectedKeyUsageDoesNotMatchCertificateKeyUsage,
0x5720 => PkiLoadCertificateError::FailedToVerifySignature,
0x4118 => PkiLoadCertificateError::TrustedNameBufferTooSmall,
_ => panic!("Unknown PKI Load Certificate Error"),
}
}
}
impl From<PkiLoadCertificateError> for SyscallError {
fn from(_e: PkiLoadCertificateError) -> SyscallError {
SyscallError::InvalidPkiCertificate
}
}
#[derive(Debug)]
#[repr(transparent)]
pub struct Reply(pub u16);
impl From<StatusWords> for Reply {
fn from(sw: StatusWords) -> Reply {
Reply(sw as u16)
}
}
impl From<SyscallError> for Reply {
fn from(exc: SyscallError) -> Reply {
Reply(0x6800 + exc as u16)
}
}
impl From<Infallible> for Reply {
fn from(_value: Infallible) -> Self {
Reply(0x9000)
}
}
#[derive(Eq, PartialEq)]
pub enum Event<T> {
Command(T),
#[cfg(any(target_os = "nanosplus", target_os = "nanox"))]
Button(ButtonEvent),
#[cfg(any(target_os = "stax", target_os = "flex", target_os = "apex_p"))]
TouchEvent,
Ticker,
}
pub struct Comm {
pub apdu_buffer: [u8; 272],
pub rx: usize,
pub tx: usize,
pub event_pending: bool,
#[cfg(any(target_os = "nanosplus", target_os = "nanox"))]
buttons: ButtonsState,
pub expected_cla: Option<u8>,
pub apdu_type: u8,
pub io_buffer: [u8; 273],
pub rx_length: usize,
pub tx_length: usize,
#[allow(dead_code)]
skip_rx_on_send: bool,
}
impl Default for Comm {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone, Copy)]
#[repr(C)]
pub struct ApduHeader {
pub cla: u8,
pub ins: u8,
pub p1: u8,
pub p2: u8,
}
impl Comm {
pub fn new() -> Self {
Self {
apdu_buffer: [0u8; 272],
rx: 0,
tx: 0,
event_pending: false,
#[cfg(any(target_os = "nanosplus", target_os = "nanox"))]
buttons: ButtonsState::new(),
expected_cla: None,
apdu_type: seph::PacketTypes::PacketTypeNone as u8,
io_buffer: [0u8; 273],
rx_length: 0,
tx_length: 0,
skip_rx_on_send: false,
}
}
pub(crate) fn nbgl_register_comm(&mut self) {
unsafe {
CURRENT_COMM = self as *mut Comm;
}
nbgl_register_callbacks(
default_nbgl_next_event_ahead,
default_nbgl_fetch_apdu_header,
default_nbgl_reply_status,
);
}
pub fn set_expected_cla(mut self, cla: u8) -> Self {
self.expected_cla = Some(cla);
self
}
fn apdu_send(&mut self) {
#[cfg(any(
target_os = "stax",
target_os = "flex",
target_os = "apex_p",
feature = "nano_nbgl"
))]
if !self.skip_rx_on_send {
let mut buffer: [u8; 273] = [0; 273];
let status = sys_seph::io_rx(&mut buffer, false);
if status > 0 {
let packet_type = seph::PacketTypes::from(buffer[0]);
let event = seph::Events::from(buffer[1]);
match (packet_type, event) {
(seph::PacketTypes::PacketTypeSeph, seph::Events::TickerEvent) => unsafe {
ux_process_ticker_event();
},
(_, _) => {}
}
}
}
self.skip_rx_on_send = false;
if self.tx != 0 {
sys_seph::io_tx(self.apdu_type, &self.apdu_buffer, self.tx);
self.tx = 0;
} else {
sys_seph::io_tx(self.apdu_type, &self.io_buffer, self.tx_length);
}
self.tx_length = 0;
self.rx_length = 0;
}
pub fn next_event<T>(&mut self) -> Event<T>
where
T: TryFrom<ApduHeader>,
Reply: From<<T as TryFrom<ApduHeader>>::Error>,
{
self.rx_length = 0;
loop {
let status = sys_seph::io_rx(&mut self.io_buffer, true);
if status > 0 {
if let Some(value) = self.decode_event(status) {
return value;
}
}
}
}
pub fn next_event_ahead<T>(&mut self) -> bool
where
T: TryFrom<ApduHeader>,
Reply: From<<T as TryFrom<ApduHeader>>::Error>,
{
let status = sys_seph::io_rx(&mut self.io_buffer, true);
if status > 0 {
return self.detect_apdu::<T>(status);
}
return false;
}
pub fn check_event<T>(&mut self) -> Option<Event<T>>
where
T: TryFrom<ApduHeader>,
Reply: From<<T as TryFrom<ApduHeader>>::Error>,
{
if self.event_pending {
self.event_pending = false;
if self.rx_length < 5 {
self.reply(StatusWords::BadLen);
return None;
}
if let Err(sw) = self.get_data() {
self.reply(sw);
return None;
}
if self.io_buffer[1] == 0xB0 {
self.skip_rx_on_send = true;
handle_bolos_apdu(
self,
self.io_buffer[2],
self.io_buffer[3],
self.io_buffer[4],
);
return None;
}
if let Some(cla) = self.expected_cla {
if self.io_buffer[1] != cla {
self.reply(StatusWords::BadCla);
return None;
}
}
let res = T::try_from(*self.get_apdu_metadata());
match res {
Ok(ins) => {
return Some(Event::Command(ins));
}
Err(sw) => {
self.reply(sw);
}
}
}
None
}
pub fn process_event<T>(&mut self, mut seph_buffer: [u8; 272], length: i32) -> Option<Event<T>>
where
T: TryFrom<ApduHeader>,
Reply: From<<T as TryFrom<ApduHeader>>::Error>,
{
let tag = seph_buffer[0];
let _len: usize = u16::from_be_bytes([seph_buffer[1], seph_buffer[2]]) as usize;
if (length as usize) < _len + 3 {
self.reply(StatusWords::BadLen);
return None;
}
match seph::Events::from(tag) {
#[cfg(any(target_os = "nanosplus", target_os = "nanox"))]
seph::Events::ButtonPushEvent => {
#[cfg(feature = "nano_nbgl")]
unsafe {
ux_process_button_event(seph_buffer.as_mut_ptr());
}
let button_info = seph_buffer[3] >> 1;
if let Some(btn_evt) = get_button_event(&mut self.buttons, button_info) {
return Some(Event::Button(btn_evt));
}
}
#[cfg(any(target_os = "stax", target_os = "flex", target_os = "apex_p"))]
seph::Events::ScreenTouchEvent => unsafe {
ux_process_finger_event(seph_buffer.as_mut_ptr());
return Some(Event::TouchEvent);
},
seph::Events::TickerEvent => {
#[cfg(any(
target_os = "stax",
target_os = "flex",
target_os = "apex_p",
feature = "nano_nbgl"
))]
unsafe {
ux_process_ticker_event();
}
return Some(Event::Ticker);
}
seph::Events::ItcEvent => {
#[cfg(any(
target_os = "nanox",
target_os = "stax",
target_os = "flex",
target_os = "apex_p"
))]
match ItcUxEvent::from(seph_buffer[3]) {
seph::ItcUxEvent::AskBlePairing => unsafe {
G_ux_params.ux_id = BOLOS_UX_ASYNCHMODAL_PAIRING_REQUEST;
G_ux_params.len = 20;
G_ux_params.u.pairing_request.type_ = seph_buffer[4];
G_ux_params.u.pairing_request.pairing_info_len = (_len - 2) as u32;
for i in 0..G_ux_params.u.pairing_request.pairing_info_len as usize {
G_ux_params.u.pairing_request.pairing_info[i as usize] =
seph_buffer[5 + i] as core::ffi::c_char;
}
G_ux_params.u.pairing_request.pairing_info
[G_ux_params.u.pairing_request.pairing_info_len as usize] = 0;
os_ux(&raw mut G_ux_params as *mut bolos_ux_params_t);
},
seph::ItcUxEvent::BlePairingStatus => unsafe {
G_ux_params.ux_id = BOLOS_UX_ASYNCHMODAL_PAIRING_STATUS;
G_ux_params.len = 0;
G_ux_params.u.pairing_status.pairing_ok = seph_buffer[4];
os_ux(&raw mut G_ux_params as *mut bolos_ux_params_t);
},
seph::ItcUxEvent::Redisplay => {
#[cfg(any(
target_os = "stax",
target_os = "flex",
target_os = "apex_p",
feature = "nano_nbgl"
))]
unsafe {
nbgl_objAllowDrawing(true);
nbgl_screenRedraw();
nbgl_refresh();
}
}
_ => return None,
}
return None;
}
_ => {
#[cfg(any(
target_os = "stax",
target_os = "flex",
target_os = "apex_p",
feature = "nano_nbgl"
))]
unsafe {
ux_process_default_event();
}
#[cfg(any(target_os = "nanox", target_os = "nanosplus"))]
if !cfg!(feature = "nano_nbgl") {
crate::uxapp::UxEvent::Event.request();
}
}
}
None
}
pub fn decode_event<T>(&mut self, length: i32) -> Option<Event<T>>
where
T: TryFrom<ApduHeader>,
Reply: From<<T as TryFrom<ApduHeader>>::Error>,
{
let packet_type = self.io_buffer[0];
match seph::PacketTypes::from(packet_type) {
seph::PacketTypes::PacketTypeSeph | seph::PacketTypes::PacketTypeSeEvent => {
let mut seph_buffer = [0u8; 272];
seph_buffer[0..272].copy_from_slice(&self.io_buffer[1..273]);
if let Some(event) = self.process_event(seph_buffer, length - 1) {
return Some(event);
}
}
seph::PacketTypes::PacketTypeRawApdu
| seph::PacketTypes::PacketTypeUsbHidApdu
| seph::PacketTypes::PacketTypeUsbWebusbApdu
| seph::PacketTypes::PacketTypeBleApdu => {
unsafe {
if os_perso_is_pin_set() == BOLOS_TRUE.try_into().unwrap()
&& os_global_pin_is_validated() != BOLOS_TRUE.try_into().unwrap()
{
self.reply(StatusWords::DeviceLocked);
return None;
}
}
self.apdu_buffer[0..272].copy_from_slice(&self.io_buffer[1..273]);
self.apdu_type = packet_type;
self.rx_length = length as usize;
self.rx = self.rx_length - 1;
self.event_pending = true;
return self.check_event();
}
_ => {}
}
None
}
fn detect_apdu<T>(&mut self, length: i32) -> bool
where
T: TryFrom<ApduHeader>,
Reply: From<<T as TryFrom<ApduHeader>>::Error>,
{
match self.decode_event::<T>(length) {
Some(Event::Command(_)) => {
self.rx_length = length as usize;
self.rx = self.rx_length - 1;
self.event_pending = true;
return true;
}
_ => return false,
}
}
pub fn next_command<T>(&mut self) -> T
where
T: TryFrom<ApduHeader>,
Reply: From<<T as TryFrom<ApduHeader>>::Error>,
{
loop {
if let Event::Command(ins) = self.next_event() {
return ins;
}
}
}
pub fn reply<T: Into<Reply>>(&mut self, reply: T) {
let sw = reply.into().0;
self.io_buffer[self.tx_length] = (sw >> 8) as u8;
self.io_buffer[self.tx_length + 1] = sw as u8;
self.tx_length += 2;
self.apdu_send();
}
pub fn swap_reply<T: Into<Reply>>(&mut self, reply: T) {
self.reply(reply);
}
pub fn reply_ok(&mut self) {
self.reply(StatusWords::Ok);
}
pub fn swap_reply_ok(&mut self) {
self.reply_ok();
}
pub fn get_apdu_metadata(&self) -> &ApduHeader {
assert!(self.io_buffer.len() >= 5);
let ptr = &self.io_buffer[1] as &u8 as *const u8 as *const ApduHeader;
unsafe { &*ptr }
}
pub fn get_data(&self) -> Result<&[u8], StatusWords> {
if self.rx == 4 {
Ok(&[]) } else {
let first_len_byte = self.apdu_buffer[4] as usize;
let get_data_from_buffer = |len, offset| {
if len == 0 || len + offset > self.rx {
Err(StatusWords::BadLen)
} else {
Ok(&self.apdu_buffer[offset..offset + len])
}
};
match (first_len_byte, self.rx) {
(0, 5) => Ok(&[]), (0, 6) => Err(StatusWords::BadLen),
(0, _) => {
let len =
u16::from_be_bytes([self.apdu_buffer[5], self.apdu_buffer[6]]) as usize;
get_data_from_buffer(len, 7)
}
(len, _) => get_data_from_buffer(len, 5),
}
}
}
pub fn get(&self, start: usize, end: usize) -> &[u8] {
&self.io_buffer[start..end]
}
pub fn append(&mut self, m: &[u8]) {
self.io_buffer[self.tx_length..self.tx_length + m.len()].copy_from_slice(m);
self.tx_length += m.len();
}
}
static mut CURRENT_COMM: *mut Comm = core::ptr::null_mut();
fn default_nbgl_next_event_ahead() -> bool {
unsafe {
if CURRENT_COMM.is_null() {
panic!("No Comm instance registered");
}
(*CURRENT_COMM).next_event_ahead::<ApduHeader>()
}
}
fn default_nbgl_fetch_apdu_header() -> Option<ApduHeader> {
unsafe {
if CURRENT_COMM.is_null() {
panic!("No Comm instance registered");
}
let comm = &mut *CURRENT_COMM;
if comm.event_pending && comm.rx_length >= 5 {
return Some(*comm.get_apdu_metadata());
}
None
}
}
fn default_nbgl_reply_status(reply: Reply) {
unsafe {
if CURRENT_COMM.is_null() {
panic!("No Comm instance registered");
}
(*CURRENT_COMM).reply(reply);
}
}
pub(crate) const BOLOS_INS_GET_VERSION: u8 = 0x01;
pub(crate) const BOLOS_INS_QUIT: u8 = 0xa7;
pub(crate) const BOLOS_INS_SET_PKI_CERT: u8 = 0x06;
#[cfg(feature = "stack_usage")]
pub(crate) const BOLOS_INS_STACK_CONSUMPTION: u8 = 0x57;
fn handle_bolos_apdu(com: &mut Comm, ins: u8, p1: u8, p2: u8) {
let _ = (p1, p2); match ins {
BOLOS_INS_GET_VERSION => {
unsafe {
com.tx_length = 0;
com.io_buffer[com.tx_length] = 0x01;
com.tx_length += 1;
let len = os_registry_get_current_app_tag(
BOLOS_TAG_APPNAME,
&mut com.io_buffer[com.tx_length + 1] as *mut u8,
(273 - com.tx_length - 2) as u32,
);
com.io_buffer[com.tx_length] = len as u8;
com.tx_length += 1 + (len as usize);
let len = os_registry_get_current_app_tag(
BOLOS_TAG_APPVERSION,
&mut com.io_buffer[com.tx_length + 1] as *mut u8,
(273 - com.tx_length - 2) as u32,
);
com.io_buffer[com.tx_length] = len as u8;
com.tx_length += 1 + (len as usize);
com.io_buffer[com.tx_length] = 1; com.tx_length += 1;
com.io_buffer[com.tx_length] = os_flags() as u8;
com.tx_length += 1;
}
com.reply_ok();
}
BOLOS_INS_QUIT => {
com.reply_ok();
crate::exit_app(0);
}
BOLOS_INS_SET_PKI_CERT => unsafe {
let public_key = cx_ecfp_384_public_key_t::default();
let err = os_pki_load_certificate(
com.io_buffer[3], com.io_buffer[6..].as_mut_ptr(), com.io_buffer[5] as usize, core::ptr::null_mut(),
core::ptr::null_mut(),
&public_key as *const cx_ecfp_384_public_key_t as *mut cx_ecfp_384_public_key_t,
);
if err != 0 {
com.reply(SyscallError::from(PkiLoadCertificateError::from(err)));
} else {
com.reply_ok();
}
},
#[cfg(feature = "stack_usage")]
BOLOS_INS_STACK_CONSUMPTION => {
crate::testing::handle_stack_consumption_apdu(p1, p2, com);
}
_ => {
com.reply(StatusWords::BadIns);
}
}
}
impl Index<usize> for Comm {
type Output = u8;
fn index(&self, idx: usize) -> &Self::Output {
&self.io_buffer[idx]
}
}
impl IndexMut<usize> for Comm {
fn index_mut(&mut self, idx: usize) -> &mut Self::Output {
self.tx_length = idx.max(self.tx_length);
&mut self.io_buffer[idx]
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::assert_eq_err as assert_eq;
use crate::testing::TestType;
use testmacro::test_item as test;
#[test]
fn apdu_metadata() {
let c = Comm::new();
let m = c.get_apdu_metadata();
assert_eq!(m.cla, 0);
assert_eq!(m.ins, 0);
assert_eq!(m.p1, 0);
assert_eq!(m.p2, 0);
}
}