use crate::{
cbor::value::Value,
common::AsCborValue,
iana,
iana::EnumI64,
util::{cbor_type_error, to_cbor_array, ValueTryAs},
Algorithm, CborSerializable, CoseError, CoseSignature, Label, RegisteredLabelWithPrivate,
Result,
};
use alloc::{collections::BTreeSet, string::String, vec, vec::Vec};
#[cfg(test)]
mod tests;
pub type ContentType = crate::RegisteredLabel<iana::CoapContentFormat>;
#[derive(Clone, Debug, Default, PartialEq)]
pub struct Header {
pub alg: Option<Algorithm>,
pub crit: Vec<RegisteredLabelWithPrivate<iana::HeaderParameter>>,
pub content_type: Option<ContentType>,
pub key_id: Vec<u8>,
pub iv: Vec<u8>,
pub partial_iv: Vec<u8>,
pub counter_signatures: Vec<CoseSignature>,
pub rest: Vec<(Label, Value)>,
}
impl Header {
pub fn is_empty(&self) -> bool {
self.alg.is_none()
&& self.crit.is_empty()
&& self.content_type.is_none()
&& self.key_id.is_empty()
&& self.iv.is_empty()
&& self.partial_iv.is_empty()
&& self.counter_signatures.is_empty()
&& self.rest.is_empty()
}
}
impl crate::CborSerializable for Header {}
const ALG: Label = Label::Int(iana::HeaderParameter::Alg as i64);
const CRIT: Label = Label::Int(iana::HeaderParameter::Crit as i64);
const CONTENT_TYPE: Label = Label::Int(iana::HeaderParameter::ContentType as i64);
const KID: Label = Label::Int(iana::HeaderParameter::Kid as i64);
const IV: Label = Label::Int(iana::HeaderParameter::Iv as i64);
const PARTIAL_IV: Label = Label::Int(iana::HeaderParameter::PartialIv as i64);
const COUNTER_SIG: Label = Label::Int(iana::HeaderParameter::CounterSignature as i64);
impl AsCborValue for Header {
fn from_cbor_value(value: Value) -> Result<Self> {
let m = value.try_as_map()?;
let mut headers = Self::default();
let mut seen = BTreeSet::new();
for (l, value) in m.into_iter() {
let label = Label::from_cbor_value(l)?;
if seen.contains(&label) {
return Err(CoseError::DuplicateMapKey);
}
seen.insert(label.clone());
match label {
ALG => headers.alg = Some(Algorithm::from_cbor_value(value)?),
CRIT => match value {
Value::Array(a) => {
if a.is_empty() {
return Err(CoseError::UnexpectedItem(
"empty array",
"non-empty array",
));
}
for v in a {
headers.crit.push(
RegisteredLabelWithPrivate::<iana::HeaderParameter>::from_cbor_value(v)?,
);
}
}
v => return cbor_type_error(&v, "array value"),
},
CONTENT_TYPE => {
headers.content_type = Some(ContentType::from_cbor_value(value)?);
if let Some(ContentType::Text(text)) = &headers.content_type {
if text.is_empty() {
return Err(CoseError::UnexpectedItem("empty tstr", "non-empty tstr"));
}
if text.trim() != text {
return Err(CoseError::UnexpectedItem(
"leading/trailing whitespace",
"no leading/trailing whitespace",
));
}
if text.matches('/').count() != 1 {
return Err(CoseError::UnexpectedItem(
"arbitrary text",
"text of form type/subtype",
));
}
}
}
KID => {
headers.key_id = value.try_as_nonempty_bytes()?;
}
IV => {
headers.iv = value.try_as_nonempty_bytes()?;
}
PARTIAL_IV => {
headers.partial_iv = value.try_as_nonempty_bytes()?;
}
COUNTER_SIG => {
let sig_or_sigs = value.try_as_array()?;
if sig_or_sigs.is_empty() {
return Err(CoseError::UnexpectedItem(
"empty sig array",
"non-empty sig array",
));
}
match &sig_or_sigs[0] {
Value::Bytes(_) => headers
.counter_signatures
.push(CoseSignature::from_cbor_value(Value::Array(sig_or_sigs))?),
Value::Array(_) => {
for sig in sig_or_sigs.into_iter() {
headers
.counter_signatures
.push(CoseSignature::from_cbor_value(sig)?);
}
}
v => return cbor_type_error(v, "array or bstr value"),
}
}
label => headers.rest.push((label, value)),
}
if !headers.iv.is_empty() && !headers.partial_iv.is_empty() {
return Err(CoseError::UnexpectedItem(
"IV and partial-IV specified",
"only one of IV and partial IV",
));
}
}
Ok(headers)
}
fn to_cbor_value(mut self) -> Result<Value> {
let mut map = Vec::<(Value, Value)>::new();
if let Some(alg) = self.alg {
map.push((ALG.to_cbor_value()?, alg.to_cbor_value()?));
}
if !self.crit.is_empty() {
map.push((CRIT.to_cbor_value()?, to_cbor_array(self.crit)?));
}
if let Some(content_type) = self.content_type {
map.push((CONTENT_TYPE.to_cbor_value()?, content_type.to_cbor_value()?));
}
if !self.key_id.is_empty() {
map.push((KID.to_cbor_value()?, Value::Bytes(self.key_id)));
}
if !self.iv.is_empty() {
map.push((IV.to_cbor_value()?, Value::Bytes(self.iv)));
}
if !self.partial_iv.is_empty() {
map.push((PARTIAL_IV.to_cbor_value()?, Value::Bytes(self.partial_iv)));
}
if !self.counter_signatures.is_empty() {
if self.counter_signatures.len() == 1 {
map.push((
COUNTER_SIG.to_cbor_value()?,
self.counter_signatures.remove(0).to_cbor_value()?,
));
} else {
map.push((
COUNTER_SIG.to_cbor_value()?,
to_cbor_array(self.counter_signatures)?,
));
}
}
let mut seen = BTreeSet::new();
for (label, value) in self.rest.into_iter() {
if seen.contains(&label) {
return Err(CoseError::DuplicateMapKey);
}
seen.insert(label.clone());
map.push((label.to_cbor_value()?, value));
}
Ok(Value::Map(map))
}
}
#[derive(Debug, Default)]
pub struct HeaderBuilder(Header);
impl HeaderBuilder {
builder! {Header}
builder_set! {key_id: Vec<u8>}
#[must_use]
pub fn algorithm(self, alg: iana::Algorithm) -> Self {
self.algorithm_label(alg.into())
}
#[must_use]
pub fn algorithm_label(mut self, label: RegisteredLabelWithPrivate<iana::Algorithm>) -> Self {
self.0.alg = Some(label);
self
}
#[must_use]
pub fn add_critical(self, param: iana::HeaderParameter) -> Self {
self.add_critical_label(param.into())
}
#[must_use]
pub fn add_critical_label(
mut self,
label: RegisteredLabelWithPrivate<iana::HeaderParameter>,
) -> Self {
self.0.crit.push(label);
self
}
#[must_use]
pub fn content_format(mut self, content_type: iana::CoapContentFormat) -> Self {
self.0.content_type = Some(ContentType::Assigned(content_type));
self
}
#[must_use]
pub fn content_type(mut self, content_type: String) -> Self {
self.0.content_type = Some(ContentType::Text(content_type));
self
}
#[must_use]
pub fn iv(mut self, iv: Vec<u8>) -> Self {
self.0.iv = iv;
self.0.partial_iv.clear();
self
}
#[must_use]
pub fn partial_iv(mut self, iv: Vec<u8>) -> Self {
self.0.partial_iv = iv;
self.0.iv.clear();
self
}
#[must_use]
pub fn add_counter_signature(mut self, sig: CoseSignature) -> Self {
self.0.counter_signatures.push(sig);
self
}
#[must_use]
pub fn value(mut self, label: i64, value: Value) -> Self {
if label >= iana::HeaderParameter::Alg.to_i64()
&& label <= iana::HeaderParameter::CounterSignature.to_i64()
{
panic!("value() method used to set core header parameter"); }
self.0.rest.push((Label::Int(label), value));
self
}
#[must_use]
pub fn text_value(mut self, label: String, value: Value) -> Self {
self.0.rest.push((Label::Text(label), value));
self
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct ProtectedHeader {
pub original_data: Option<Vec<u8>>,
pub header: Header,
}
impl ProtectedHeader {
#[inline]
pub fn from_cbor_bstr(val: Value) -> Result<Self> {
let data = val.try_as_bytes()?;
let header = if data.is_empty() {
Header::default()
} else {
Header::from_slice(&data)?
};
Ok(ProtectedHeader {
original_data: Some(data),
header,
})
}
#[inline]
pub fn cbor_bstr(self) -> Result<Value> {
Ok(Value::Bytes(
if let Some(original_data) = self.original_data {
original_data
} else if self.is_empty() {
vec![]
} else {
self.to_vec()?
},
))
}
pub fn is_empty(&self) -> bool {
self.header.is_empty()
}
}
impl crate::CborSerializable for ProtectedHeader {}
impl AsCborValue for ProtectedHeader {
fn from_cbor_value(value: Value) -> Result<Self> {
Ok(ProtectedHeader {
original_data: None,
header: Header::from_cbor_value(value)?,
})
}
fn to_cbor_value(self) -> Result<Value> {
self.header.to_cbor_value()
}
}