use crate::{
cbor::value::Value,
common::{AsCborValue, CborOrdering},
iana,
iana::EnumI64,
util::{to_cbor_array, ValueTryAs},
Algorithm, CoseError, Label, Result,
};
use alloc::{collections::BTreeSet, vec, vec::Vec};
#[cfg(test)]
mod tests;
pub type KeyType = crate::RegisteredLabel<iana::KeyType>;
impl Default for KeyType {
fn default() -> Self {
KeyType::Assigned(iana::KeyType::Reserved)
}
}
pub type KeyOperation = crate::RegisteredLabel<iana::KeyOperation>;
#[derive(Clone, Debug, Default, PartialEq)]
pub struct CoseKeySet(pub Vec<CoseKey>);
impl crate::CborSerializable for CoseKeySet {}
impl AsCborValue for CoseKeySet {
fn from_cbor_value(value: Value) -> Result<Self> {
Ok(Self(
value.try_as_array_then_convert(CoseKey::from_cbor_value)?,
))
}
fn to_cbor_value(self) -> Result<Value> {
to_cbor_array(self.0)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct CoseKey {
pub kty: KeyType,
pub key_id: Vec<u8>,
pub alg: Option<Algorithm>,
pub key_ops: BTreeSet<KeyOperation>,
pub base_iv: Vec<u8>,
pub params: Vec<(Label, Value)>,
}
const SEC1_COMPRESSED_SIGN_0: u8 = 0x02;
const SEC1_COMPRESSED_SIGN_1: u8 = 0x03;
const SEC1_UNCOMPRESSED: u8 = 0x04;
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum ToSec1OctetStringError {
NotEcKey,
MissingCoordinate,
InvalidCoordinateType,
UnequalCoordinateLength,
}
impl core::error::Error for ToSec1OctetStringError {}
impl core::fmt::Display for ToSec1OctetStringError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
ToSec1OctetStringError::NotEcKey => write!(f, "not an EC key"),
ToSec1OctetStringError::MissingCoordinate => write!(f, "missing coordinate"),
ToSec1OctetStringError::InvalidCoordinateType => write!(f, "invalid coordinate type"),
ToSec1OctetStringError::UnequalCoordinateLength => {
write!(f, "unequal coordinate lengths")
}
}
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct ParseSec1OctetStringError;
impl core::error::Error for ParseSec1OctetStringError {}
impl core::fmt::Display for ParseSec1OctetStringError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "ParseSec1OctetStringError")
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub enum MlDsaVariant {
MlDsa44,
MlDsa65,
MlDsa87,
}
impl From<MlDsaVariant> for iana::Algorithm {
fn from(value: MlDsaVariant) -> iana::Algorithm {
match value {
MlDsaVariant::MlDsa44 => iana::Algorithm::ML_DSA_44,
MlDsaVariant::MlDsa65 => iana::Algorithm::ML_DSA_65,
MlDsaVariant::MlDsa87 => iana::Algorithm::ML_DSA_87,
}
}
}
impl core::convert::TryFrom<iana::Algorithm> for MlDsaVariant {
type Error = CoseError;
fn try_from(alg: iana::Algorithm) -> Result<MlDsaVariant, CoseError> {
match alg {
iana::Algorithm::ML_DSA_44 => Ok(MlDsaVariant::MlDsa44),
iana::Algorithm::ML_DSA_65 => Ok(MlDsaVariant::MlDsa65),
iana::Algorithm::ML_DSA_87 => Ok(MlDsaVariant::MlDsa87),
_ => Err(CoseError::OutOfRangeIntegerValue),
}
}
}
impl CoseKey {
pub fn canonicalize(&mut self, ordering: CborOrdering) {
match ordering {
CborOrdering::Lexicographic => self.params.sort_by(|l, r| l.0.cmp(&r.0)),
CborOrdering::LengthFirstLexicographic => {
self.params.sort_by(|l, r| l.0.cmp_canonical(&r.0))
}
}
}
pub fn to_sec1_octet_string(&self) -> Result<Vec<u8>, ToSec1OctetStringError> {
if self.kty != KeyType::Assigned(iana::KeyType::EC2) {
return Err(ToSec1OctetStringError::NotEcKey);
}
let x_param = self
.params
.iter()
.find(|(k, _)| k == &Label::Int(iana::Ec2KeyParameter::X as i64))
.ok_or(ToSec1OctetStringError::MissingCoordinate)?;
let y_param = self
.params
.iter()
.find(|(k, _)| k == &Label::Int(iana::Ec2KeyParameter::Y as i64))
.ok_or(ToSec1OctetStringError::MissingCoordinate)?;
let x = x_param
.1
.as_bytes()
.ok_or(ToSec1OctetStringError::InvalidCoordinateType)?
.as_slice();
match &y_param.1 {
Value::Bool(false) => Ok([&[SEC1_COMPRESSED_SIGN_0], x].concat()),
Value::Bool(true) => Ok([&[SEC1_COMPRESSED_SIGN_1], x].concat()),
Value::Bytes(y) if x.len() == y.len() => Ok([&[SEC1_UNCOMPRESSED], x, y].concat()),
Value::Bytes(_) => Err(ToSec1OctetStringError::UnequalCoordinateLength),
_ => Err(ToSec1OctetStringError::InvalidCoordinateType),
}
}
}
impl crate::CborSerializable for CoseKey {}
const KTY: Label = Label::Int(iana::KeyParameter::Kty as i64);
const KID: Label = Label::Int(iana::KeyParameter::Kid as i64);
const ALG: Label = Label::Int(iana::KeyParameter::Alg as i64);
const KEY_OPS: Label = Label::Int(iana::KeyParameter::KeyOps as i64);
const BASE_IV: Label = Label::Int(iana::KeyParameter::BaseIv as i64);
impl AsCborValue for CoseKey {
fn from_cbor_value(value: Value) -> Result<Self> {
let m = value.try_as_map()?;
let mut key = Self::default();
let mut seen = BTreeSet::new();
for (l, value) in m.into_iter() {
let label = Label::from_cbor_value(l)?;
if seen.contains(&label) {
return Err(CoseError::DuplicateMapKey);
}
seen.insert(label.clone());
match label {
KTY => key.kty = KeyType::from_cbor_value(value)?,
KID => {
key.key_id = value.try_as_nonempty_bytes()?;
}
ALG => key.alg = Some(Algorithm::from_cbor_value(value)?),
KEY_OPS => {
let key_ops = value.try_as_array()?;
for key_op in key_ops.into_iter() {
if !key.key_ops.insert(KeyOperation::from_cbor_value(key_op)?) {
return Err(CoseError::UnexpectedItem(
"repeated array entry",
"unique array label",
));
}
}
if key.key_ops.is_empty() {
return Err(CoseError::UnexpectedItem("empty array", "non-empty array"));
}
}
BASE_IV => {
key.base_iv = value.try_as_nonempty_bytes()?;
}
label => key.params.push((label, value)),
}
}
if key.kty == KeyType::Assigned(iana::KeyType::Reserved) {
return Err(CoseError::UnexpectedItem(
"no kty label",
"mandatory kty label",
));
}
Ok(key)
}
fn to_cbor_value(self) -> Result<Value> {
let mut map: Vec<(Value, Value)> = vec![(KTY.to_cbor_value()?, self.kty.to_cbor_value()?)];
if !self.key_id.is_empty() {
map.push((KID.to_cbor_value()?, Value::Bytes(self.key_id)));
}
if let Some(alg) = self.alg {
map.push((ALG.to_cbor_value()?, alg.to_cbor_value()?));
}
if !self.key_ops.is_empty() {
map.push((KEY_OPS.to_cbor_value()?, to_cbor_array(self.key_ops)?));
}
if !self.base_iv.is_empty() {
map.push((BASE_IV.to_cbor_value()?, Value::Bytes(self.base_iv)));
}
let mut seen = BTreeSet::new();
for (label, value) in self.params {
if seen.contains(&label) {
return Err(CoseError::DuplicateMapKey);
}
seen.insert(label.clone());
map.push((label.to_cbor_value()?, value));
}
Ok(Value::Map(map))
}
}
#[derive(Debug, Default)]
pub struct CoseKeyBuilder(CoseKey);
impl CoseKeyBuilder {
builder! {CoseKey}
builder_set! {kty: KeyType}
builder_set! {key_id: Vec<u8>}
builder_set! {base_iv: Vec<u8>}
pub fn new_ec2_pub_key(curve: iana::EllipticCurve, x: Vec<u8>, y: Vec<u8>) -> Self {
Self(CoseKey {
kty: KeyType::Assigned(iana::KeyType::EC2),
params: vec![
(
Label::Int(iana::Ec2KeyParameter::Crv as i64),
Value::from(curve as u64),
),
(Label::Int(iana::Ec2KeyParameter::X as i64), Value::Bytes(x)),
(Label::Int(iana::Ec2KeyParameter::Y as i64), Value::Bytes(y)),
],
..Default::default()
})
}
pub fn new_ec2_pub_key_y_sign(curve: iana::EllipticCurve, x: Vec<u8>, y_sign: bool) -> Self {
Self(CoseKey {
kty: KeyType::Assigned(iana::KeyType::EC2),
params: vec![
(
Label::Int(iana::Ec2KeyParameter::Crv as i64),
Value::from(curve as u64),
),
(Label::Int(iana::Ec2KeyParameter::X as i64), Value::Bytes(x)),
(
Label::Int(iana::Ec2KeyParameter::Y as i64),
Value::Bool(y_sign),
),
],
..Default::default()
})
}
pub fn new_ec2_pub_key_sec1_octet_string(
curve: iana::EllipticCurve,
sec1: &[u8],
) -> Result<Self, ParseSec1OctetStringError> {
let (first, rest) = sec1.split_first().ok_or(ParseSec1OctetStringError)?;
match *first {
SEC1_COMPRESSED_SIGN_0 => Ok(Self::new_ec2_pub_key_y_sign(curve, rest.to_vec(), false)),
SEC1_COMPRESSED_SIGN_1 => Ok(Self::new_ec2_pub_key_y_sign(curve, rest.to_vec(), true)),
SEC1_UNCOMPRESSED if rest.len() % 2 == 0 => {
let (x, y) = rest.split_at(rest.len() / 2);
Ok(Self::new_ec2_pub_key(curve, x.to_vec(), y.to_vec()))
}
_ => Err(ParseSec1OctetStringError),
}
}
pub fn new_ec2_priv_key(
curve: iana::EllipticCurve,
x: Vec<u8>,
y: Vec<u8>,
d: Vec<u8>,
) -> Self {
let mut builder = Self::new_ec2_pub_key(curve, x, y);
builder
.0
.params
.push((Label::Int(iana::Ec2KeyParameter::D as i64), Value::Bytes(d)));
builder
}
pub fn new_mldsa_pub_key(variant: MlDsaVariant, k: Vec<u8>) -> Self {
Self(CoseKey {
kty: KeyType::Assigned(iana::KeyType::AKP),
alg: Some(Algorithm::Assigned(variant.into())),
params: vec![(
Label::Int(iana::AkpKeyParameter::Pub as i64),
Value::Bytes(k),
)],
..Default::default()
})
}
pub fn new_symmetric_key(k: Vec<u8>) -> Self {
Self(CoseKey {
kty: KeyType::Assigned(iana::KeyType::Symmetric),
params: vec![(
Label::Int(iana::SymmetricKeyParameter::K as i64),
Value::Bytes(k),
)],
..Default::default()
})
}
pub fn new_okp_key() -> Self {
Self(CoseKey {
kty: KeyType::Assigned(iana::KeyType::OKP),
..Default::default()
})
}
#[must_use]
pub fn key_type(mut self, key_type: iana::KeyType) -> Self {
self.0.kty = KeyType::Assigned(key_type);
self
}
#[must_use]
pub fn algorithm(mut self, alg: iana::Algorithm) -> Self {
self.0.alg = Some(Algorithm::Assigned(alg));
self
}
#[must_use]
pub fn add_key_op(mut self, op: iana::KeyOperation) -> Self {
self.0.key_ops.insert(KeyOperation::Assigned(op));
self
}
#[must_use]
pub fn param(mut self, label: i64, value: Value) -> Self {
if iana::KeyParameter::from_i64(label).is_some() {
panic!("param() method used to set KeyParameter"); }
self.0.params.push((Label::Int(label), value));
self
}
}