use std::time::Duration;
use aes::cipher::{block_padding::NoPadding, BlockDecryptMut};
use async_trait::async_trait;
use cbc::cipher::{BlockEncryptMut, KeyIvInit};
use cosey as cose;
use hkdf::Hkdf;
use hmac::Mac;
use p256::{
ecdh::EphemeralSecret, elliptic_curve::sec1::FromEncodedPoint, EncodedPoint,
PublicKey as P256PublicKey,
};
use rand::{rngs::OsRng, thread_rng, Rng, SeedableRng};
use sha2::{Digest, Sha256};
use tracing::{error, instrument, warn};
use x509_parser::nom::AsBytes;
use crate::{
proto::{
ctap2::{Ctap2, Ctap2ClientPinRequest, Ctap2GetInfoResponse, Ctap2PinUvAuthProtocol},
CtapError,
},
transport::Channel,
webauthn::{
error::{Error, PlatformError},
pin_uv_auth_token::{obtain_pin, obtain_shared_secret, select_uv_proto},
},
};
type Aes256CbcEncryptor = cbc::Encryptor<aes::Aes256>;
type Aes256CbcDecryptor = cbc::Decryptor<aes::Aes256>;
type HmacSha256 = hmac::Hmac<Sha256>;
#[derive(Default, Debug)]
pub struct PinUvAuthToken {
pub rpid: Option<String>,
pub user_verified: bool,
pub user_present: bool,
}
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum PinRequestReason {
RelyingPartyRequest,
AuthenticatorPolicy,
FallbackFromUV,
}
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum PinNotSetReason {
PinNotSet,
PinTooShort,
PinTooLong,
PinPolicyViolation,
}
pub trait PinUvAuthProtocol: Send + Sync {
fn version(&self) -> Ctap2PinUvAuthProtocol;
fn encapsulate(
&self,
peer_public_key: &cose::PublicKey,
) -> Result<(cose::PublicKey, Vec<u8>), Error>;
fn encrypt(&self, key: &[u8], plaintext: &[u8]) -> Result<Vec<u8>, Error>;
fn decrypt(&self, key: &[u8], ciphertext: &[u8]) -> Result<Vec<u8>, Error>;
fn authenticate(&self, key: &[u8], message: &[u8]) -> Result<Vec<u8>, Error>;
}
trait ECPrivateKeyPinUvAuthProtocol {
fn private_key(&self) -> &EphemeralSecret;
fn public_key(&self) -> &P256PublicKey;
fn kdf(&self, bytes: &[u8]) -> Result<Vec<u8>, Error>;
}
trait ECDHPinUvAuthProtocol {
fn ecdh(&self, peer_public_key: &cose::PublicKey) -> Result<Vec<u8>, Error>;
fn encapsulate(
&self,
peer_public_key: &cose::PublicKey,
) -> Result<(cose::PublicKey, Vec<u8>), Error>;
fn get_public_key(&self) -> Result<cose::PublicKey, Error>;
}
pub struct PinUvAuthProtocolOne {
private_key: EphemeralSecret,
public_key: P256PublicKey,
}
impl Default for PinUvAuthProtocolOne {
fn default() -> Self {
Self::new()
}
}
impl PinUvAuthProtocolOne {
pub fn new() -> Self {
let private_key = if cfg!(test) {
let mut rng = rand::rngs::StdRng::seed_from_u64(42);
EphemeralSecret::random(&mut rng)
} else {
EphemeralSecret::random(&mut OsRng)
};
let public_key = private_key.public_key();
Self {
private_key,
public_key,
}
}
}
impl ECPrivateKeyPinUvAuthProtocol for PinUvAuthProtocolOne {
fn private_key(&self) -> &EphemeralSecret {
&self.private_key
}
fn public_key(&self) -> &P256PublicKey {
&self.public_key
}
fn kdf(&self, bytes: &[u8]) -> Result<Vec<u8>, Error> {
let mut hasher = Sha256::default();
hasher.update(bytes);
Ok(hasher.finalize().to_vec())
}
}
impl<P> ECDHPinUvAuthProtocol for P
where
P: ECPrivateKeyPinUvAuthProtocol,
{
#[instrument(skip_all)]
fn encapsulate(
&self,
peer_public_key: &cose::PublicKey,
) -> Result<(cose::PublicKey, Vec<u8>), Error> {
let shared_secret = self.ecdh(peer_public_key)?;
Ok((self.get_public_key()?, shared_secret))
}
fn ecdh(&self, peer_public_key: &cose::PublicKey) -> Result<Vec<u8>, Error> {
let cose::PublicKey::EcdhEsHkdf256Key(peer_public_key) = peer_public_key else {
error!(
?peer_public_key,
"Unsupported peerCoseKey format. Only EcdhEsHkdf256Key is supported."
);
return Err(Error::Ctap(CtapError::Other));
};
let x: &[u8; 32] = peer_public_key.x.as_bytes().try_into().map_err(|_| {
error!(
x_len = peer_public_key.x.as_bytes().len(),
"Peer public key x coordinate is not 32 bytes"
);
Error::Ctap(CtapError::Other)
})?;
let y: &[u8; 32] = peer_public_key.y.as_bytes().try_into().map_err(|_| {
error!(
y_len = peer_public_key.y.as_bytes().len(),
"Peer public key y coordinate is not 32 bytes"
);
Error::Ctap(CtapError::Other)
})?;
let encoded_point = EncodedPoint::from_affine_coordinates(x.into(), y.into(), false);
let Some(peer_public_key) = P256PublicKey::from_encoded_point(&encoded_point).into() else {
error!("Failed to parse public key.");
return Err(Error::Ctap(CtapError::Other));
};
let shared = self.private_key().diffie_hellman(&peer_public_key);
self.kdf(shared.raw_secret_bytes().as_bytes())
}
fn get_public_key(&self) -> Result<cose::PublicKey, Error> {
let point = EncodedPoint::from(self.public_key());
let x_bytes = point.x().ok_or_else(|| {
error!("Public key is the identity point");
Error::Platform(PlatformError::CryptoError(
"public key is the identity point".into(),
))
})?;
let y_bytes = point.y().ok_or_else(|| {
error!("Public key is identity or compressed");
Error::Platform(PlatformError::CryptoError(
"public key is identity or compressed".into(),
))
})?;
let x: heapless::Vec<u8, 32> =
heapless::Vec::from_slice(x_bytes.as_bytes()).map_err(|_| {
Error::Platform(PlatformError::CryptoError(
"x coordinate exceeds 32 bytes".into(),
))
})?;
let y: heapless::Vec<u8, 32> =
heapless::Vec::from_slice(y_bytes.as_bytes()).map_err(|_| {
Error::Platform(PlatformError::CryptoError(
"y coordinate exceeds 32 bytes".into(),
))
})?;
Ok(cose::PublicKey::EcdhEsHkdf256Key(
cose::EcdhEsHkdf256PublicKey {
x: x.into(),
y: y.into(),
},
))
}
}
impl PinUvAuthProtocol for PinUvAuthProtocolOne {
fn version(&self) -> Ctap2PinUvAuthProtocol {
Ctap2PinUvAuthProtocol::One
}
#[instrument(skip_all)]
fn encrypt(&self, key: &[u8], plaintext: &[u8]) -> Result<Vec<u8>, Error> {
let iv: &[u8] = &[0; 16];
let Ok(enc) = Aes256CbcEncryptor::new_from_slices(key, iv) else {
error!(?key, "Invalid key for AES-256 encryption");
return Err(Error::Ctap(CtapError::Other));
};
Ok(enc.encrypt_padded_vec_mut::<NoPadding>(plaintext))
}
#[instrument(skip_all)]
fn authenticate(&self, key: &[u8], message: &[u8]) -> Result<Vec<u8>, Error> {
let hmac = hmac_sha256(key, message)?;
let truncated = hmac.get(..16).ok_or_else(|| {
error!(len = hmac.len(), "HMAC output shorter than 16 bytes");
Error::Platform(PlatformError::CryptoError(
"HMAC output shorter than 16 bytes".into(),
))
})?;
Ok(Vec::from(truncated))
}
#[instrument(skip_all)]
fn decrypt(&self, key: &[u8], ciphertext: &[u8]) -> Result<Vec<u8>, Error> {
if !ciphertext.len().is_multiple_of(16) {
error!(
?ciphertext,
"Ciphertext length is not a multiple of AES block length"
);
return Err(Error::Ctap(CtapError::Other));
}
let iv: &[u8] = &[0; 16];
let Ok(dec) = Aes256CbcDecryptor::new_from_slices(key, iv) else {
error!(?key, "Invalid key for AES-256 decryption");
return Err(Error::Ctap(CtapError::Other));
};
let Ok(plaintext) = dec.decrypt_padded_vec_mut::<NoPadding>(ciphertext) else {
error!("Unpad error while decrypting");
return Err(Error::Ctap(CtapError::Other));
};
Ok(plaintext)
}
fn encapsulate(
&self,
peer_public_key: &cose::PublicKey,
) -> Result<(cose::PublicKey, Vec<u8>), Error> {
<Self as ECDHPinUvAuthProtocol>::encapsulate(self, peer_public_key)
}
}
pub struct PinUvAuthProtocolTwo {
private_key: EphemeralSecret,
public_key: P256PublicKey,
}
impl Default for PinUvAuthProtocolTwo {
fn default() -> Self {
Self::new()
}
}
impl PinUvAuthProtocolTwo {
pub fn new() -> Self {
let private_key = if cfg!(test) {
let mut rng = rand::rngs::StdRng::seed_from_u64(42);
EphemeralSecret::random(&mut rng)
} else {
EphemeralSecret::random(&mut OsRng)
};
let public_key = private_key.public_key();
Self {
private_key,
public_key,
}
}
}
impl ECPrivateKeyPinUvAuthProtocol for PinUvAuthProtocolTwo {
fn private_key(&self) -> &EphemeralSecret {
&self.private_key
}
fn public_key(&self) -> &P256PublicKey {
&self.public_key
}
fn kdf(&self, ikm: &[u8]) -> Result<Vec<u8>, Error> {
let salt: &[u8] = &[0u8; 32];
let mut output = hkdf_sha256(Some(salt), ikm, "CTAP2 HMAC key".as_bytes())?;
output.extend(hkdf_sha256(Some(salt), ikm, "CTAP2 AES key".as_bytes())?);
Ok(output)
}
}
impl PinUvAuthProtocol for PinUvAuthProtocolTwo {
fn version(&self) -> Ctap2PinUvAuthProtocol {
Ctap2PinUvAuthProtocol::Two
}
#[instrument(skip_all)]
fn encapsulate(
&self,
peer_public_key: &cose::PublicKey,
) -> Result<(cose::PublicKey, Vec<u8>), Error> {
<Self as ECDHPinUvAuthProtocol>::encapsulate(self, peer_public_key)
}
fn encrypt(&self, key: &[u8], plaintext: &[u8]) -> Result<Vec<u8>, Error> {
let key = key.get(32..).ok_or_else(|| {
error!(
key_len = key.len(),
"key shorter than 32 bytes; cannot select AES-key portion"
);
Error::Ctap(CtapError::Other)
})?;
let iv: [u8; 16] = thread_rng().gen();
let Ok(enc) = Aes256CbcEncryptor::new_from_slices(key, &iv) else {
error!(?key, "Invalid key for AES-256 encryption");
return Err(Error::Ctap(CtapError::Other));
};
let ct = enc.encrypt_padded_vec_mut::<NoPadding>(plaintext);
let mut out = Vec::from(iv);
out.extend(ct);
Ok(out)
}
fn decrypt(&self, key: &[u8], ciphertext: &[u8]) -> Result<Vec<u8>, Error> {
let key = key.get(32..).ok_or_else(|| {
error!(
key_len = key.len(),
"key shorter than 32 bytes; cannot select AES-key portion"
);
Error::Ctap(CtapError::Other)
})?;
if ciphertext.len() < 16 {
error!({ len = ciphertext.len() }, "Invalid length for ciphertext");
return Err(Error::Ctap(CtapError::Other));
};
let (iv, ciphertext) = ciphertext.split_at(16);
let Ok(dec) = Aes256CbcDecryptor::new_from_slices(key, iv) else {
error!(?key, "Invalid key for AES-256 decryption");
return Err(Error::Ctap(CtapError::Other));
};
let Ok(plaintext) = dec.decrypt_padded_vec_mut::<NoPadding>(ciphertext) else {
error!("Unpad error while decrypting");
return Err(Error::Ctap(CtapError::Other));
};
Ok(plaintext)
}
fn authenticate(&self, key: &[u8], message: &[u8]) -> Result<Vec<u8>, Error> {
let key = key.get(..32).ok_or_else(|| {
error!(
key_len = key.len(),
"key shorter than 32 bytes; cannot select HMAC-key portion"
);
Error::Ctap(CtapError::Other)
})?;
hmac_sha256(key, message)
}
}
pub fn pin_hash(pin: &[u8]) -> Vec<u8> {
let mut hasher = Sha256::default();
hasher.update(pin);
let hashed = hasher.finalize();
hashed.into_iter().take(16).collect()
}
pub fn hmac_sha256(key: &[u8], message: &[u8]) -> Result<Vec<u8>, Error> {
let mut hmac = HmacSha256::new_from_slice(key).map_err(|e| {
error!("HMAC key error: {e}");
Error::Platform(PlatformError::CryptoError(format!("HMAC key error: {e}")))
})?;
hmac.update(message);
Ok(hmac.finalize().into_bytes().to_vec())
}
pub fn hkdf_sha256(salt: Option<&[u8]>, ikm: &[u8], info: &[u8]) -> Result<Vec<u8>, Error> {
let hk = Hkdf::<Sha256>::new(salt, ikm);
let mut okm = [0u8; 32]; hk.expand(info, &mut okm).map_err(|e| {
error!("HKDF expand error: {e}");
Error::Platform(PlatformError::CryptoError(format!(
"HKDF expand error: {e}"
)))
})?;
Ok(Vec::from(okm))
}
pub(crate) mod internal {
use super::*;
#[async_trait]
pub trait PinManagementInternal {
async fn change_pin_internal(
&mut self,
get_info_response: &Ctap2GetInfoResponse,
new_pin: String,
timeout: Duration,
) -> Result<(), Error>;
}
#[async_trait]
impl<C> PinManagementInternal for C
where
C: Channel,
{
async fn change_pin_internal(
&mut self,
get_info_response: &Ctap2GetInfoResponse,
new_pin: String,
timeout: Duration,
) -> Result<(), Error> {
if new_pin.len() < get_info_response.min_pin_length.unwrap_or(4) as usize {
return Err(Error::Platform(PlatformError::PinTooShort));
}
if new_pin.len() >= 64 {
return Err(Error::Platform(PlatformError::PinTooLong));
}
let Some(uv_proto) = select_uv_proto(
#[cfg(feature = "virt")]
self.get_forced_pin_protocol(),
get_info_response,
)
.await
else {
error!("No supported PIN/UV auth protocols found");
return Err(Error::Ctap(CtapError::Other));
};
let current_pin = match get_info_response
.options
.as_ref()
.ok_or(Error::Platform(PlatformError::InvalidDeviceResponse))?
.get("clientPin")
{
Some(true) => Some(
obtain_pin(
self,
get_info_response,
uv_proto.version(),
PinRequestReason::AuthenticatorPolicy,
timeout,
)
.await?,
),
Some(false) => None,
None => {
return Err(Error::Platform(PlatformError::PinNotSupported));
}
};
let (public_key, shared_secret) =
obtain_shared_secret(self, uv_proto.as_ref(), timeout).await?;
let mut padded_new_pin = new_pin.as_bytes().to_vec();
padded_new_pin.resize(64, 0x00);
let new_pin_enc = uv_proto.encrypt(&shared_secret, &padded_new_pin)?;
let req = match current_pin {
Some(curr_pin) => {
let pin_hash = pin_hash(&curr_pin);
let pin_hash_enc = uv_proto.encrypt(&shared_secret, &pin_hash)?;
let uv_auth_param = uv_proto.authenticate(
&shared_secret,
&[new_pin_enc.as_slice(), pin_hash_enc.as_slice()].concat(),
)?;
Ctap2ClientPinRequest::new_change_pin(
uv_proto.version(),
&new_pin_enc,
&pin_hash_enc,
public_key,
&uv_auth_param,
)
}
None => {
let uv_auth_param = uv_proto.authenticate(&shared_secret, &new_pin_enc)?;
Ctap2ClientPinRequest::new_set_pin(
uv_proto.version(),
&new_pin_enc,
public_key,
&uv_auth_param,
)
}
};
let _ = self.ctap2_client_pin(&req, timeout).await?;
Ok(())
}
}
}
use internal::PinManagementInternal;
#[async_trait]
pub trait PinManagement: PinManagementInternal {
async fn change_pin(&mut self, new_pin: String, timeout: Duration) -> Result<(), Error>;
}
#[async_trait]
impl<C> PinManagement for C
where
C: Channel,
{
async fn change_pin(&mut self, new_pin: String, timeout: Duration) -> Result<(), Error> {
let get_info_response = self.ctap2_get_info().await?;
self.change_pin_internal(&get_info_response, new_pin, timeout)
.await
}
}
#[cfg(test)]
mod tests {
use super::*;
use cosey::{Bytes, EcdhEsHkdf256PublicKey};
fn make_peer_key(x: &[u8], y: &[u8]) -> cose::PublicKey {
cose::PublicKey::EcdhEsHkdf256Key(EcdhEsHkdf256PublicKey {
x: Bytes::from_slice(x).unwrap(),
y: Bytes::from_slice(y).unwrap(),
})
}
#[test]
fn ecdh_rejects_short_x() {
let proto = PinUvAuthProtocolOne::new();
let x = vec![0x01u8; 31];
let y = vec![0x02u8; 32];
let key = make_peer_key(&x, &y);
let result = PinUvAuthProtocol::encapsulate(&proto, &key);
assert!(matches!(result, Err(Error::Ctap(CtapError::Other))));
}
#[test]
fn ecdh_rejects_empty_x() {
let proto = PinUvAuthProtocolOne::new();
let x: Vec<u8> = Vec::new();
let y = vec![0x02u8; 32];
let key = make_peer_key(&x, &y);
let result = PinUvAuthProtocol::encapsulate(&proto, &key);
assert!(matches!(result, Err(Error::Ctap(CtapError::Other))));
}
#[test]
fn ecdh_rejects_short_y() {
let proto = PinUvAuthProtocolTwo::new();
let x = vec![0x01u8; 32];
let y = vec![0x02u8; 16];
let key = make_peer_key(&x, &y);
let result = PinUvAuthProtocol::encapsulate(&proto, &key);
assert!(matches!(result, Err(Error::Ctap(CtapError::Other))));
}
#[test]
fn proto_two_authenticate_rejects_empty_key() {
let proto = PinUvAuthProtocolTwo::new();
let result = proto.authenticate(&[], b"clientDataHash");
assert!(matches!(result, Err(Error::Ctap(CtapError::Other))));
}
#[test]
fn proto_two_authenticate_rejects_short_key() {
let proto = PinUvAuthProtocolTwo::new();
let short_key = [0u8; 16];
let result = proto.authenticate(&short_key, b"hello");
assert!(matches!(result, Err(Error::Ctap(CtapError::Other))));
}
#[test]
fn proto_two_encrypt_rejects_short_key() {
let proto = PinUvAuthProtocolTwo::new();
let short_key = [0u8; 16];
let plaintext = [0u8; 16];
let result = proto.encrypt(&short_key, &plaintext);
assert!(matches!(result, Err(Error::Ctap(CtapError::Other))));
}
#[test]
fn proto_two_decrypt_rejects_short_key() {
let proto = PinUvAuthProtocolTwo::new();
let short_key = [0u8; 16];
let ct = [0u8; 32];
let result = proto.decrypt(&short_key, &ct);
assert!(matches!(result, Err(Error::Ctap(CtapError::Other))));
}
}