use crate::error::{IdentityError, Result};
use crate::groups::kem_envelope::{AgentKemKeypair, KEM_VARIANT};
use crate::identity::{AgentId, MachineId};
use saorsa_gossip_types::TopicId;
use saorsa_pqc::api::kem::{MlKem, MlKemPublicKey};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use chacha20poly1305::aead::{Aead, KeyInit, Payload};
use chacha20poly1305::{ChaCha20Poly1305, Nonce};
pub const DM_PROTOCOL_VERSION: u16 = 1;
pub const MAX_ENVELOPE_BYTES: usize = 65_536;
pub const MAX_PAYLOAD_BYTES: usize = 49_152;
pub const MAX_ENVELOPE_LIFETIME_MS: u64 = 600_000;
pub const CLOCK_SKEW_TOLERANCE_MS: u64 = 30_000;
const DM_SIGN_DOMAIN: &[u8] = b"x0x-dm-v1";
const DM_AEAD_DOMAIN: &[u8] = b"x0x-dm-payload-v1";
const DM_INBOX_TOPIC_PREFIX: &[u8] = b"x0x/dm/v1/inbox/";
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct DmCapabilities {
pub max_protocol_version: u16,
pub gossip_inbox: bool,
pub kem_algorithm: String,
pub max_envelope_bytes: usize,
#[serde(default)]
pub kem_public_key: Vec<u8>,
}
impl DmCapabilities {
#[must_use]
pub fn pending() -> Self {
Self {
max_protocol_version: DM_PROTOCOL_VERSION,
gossip_inbox: false,
kem_algorithm: "ML-KEM-768".to_string(),
max_envelope_bytes: MAX_ENVELOPE_BYTES,
kem_public_key: Vec::new(),
}
}
#[must_use]
pub fn v1_gossip_ready(kem_public_key: Vec<u8>) -> Self {
Self {
max_protocol_version: DM_PROTOCOL_VERSION,
gossip_inbox: true,
kem_algorithm: "ML-KEM-768".to_string(),
max_envelope_bytes: MAX_ENVELOPE_BYTES,
kem_public_key,
}
}
#[must_use]
pub fn with_kem_public_key(mut self, kem_public_key: Vec<u8>) -> Self {
self.kem_public_key = kem_public_key;
self.gossip_inbox = true;
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DmEnvelope {
pub protocol_version: u16,
pub request_id: [u8; 16],
pub sender_agent_id: [u8; 32],
pub sender_machine_id: [u8; 32],
pub recipient_agent_id: [u8; 32],
pub created_at_unix_ms: u64,
pub expires_at_unix_ms: u64,
pub body: DmBody,
pub signature: Vec<u8>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DmBody {
Payload(DmPayload),
Ack(DmAckBody),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DmPayload {
pub kem_ciphertext: Vec<u8>,
pub body_nonce: [u8; 12],
pub body_ciphertext: Vec<u8>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DmPlaintext {
pub request_id: [u8; 16],
pub payload: Vec<u8>,
pub content_type: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DmAckBody {
pub acks_request_id: [u8; 16],
pub outcome: DmAckOutcome,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum DmAckOutcome {
Accepted,
RejectedByPolicy { reason: String },
}
#[derive(Debug, thiserror::Error)]
pub enum DmError {
#[error("recipient key material unavailable: {0}")]
RecipientKeyUnavailable(String),
#[error("timed out after {retries} retries over {elapsed:?}")]
Timeout { retries: u8, elapsed: Duration },
#[error("recipient rejected: {reason}")]
RecipientRejected { reason: String },
#[error("local gossip publish failed: {0}")]
LocalGossipUnavailable(String),
#[error("envelope construction failed: {0}")]
EnvelopeConstruction(String),
#[error("gossip transport error: {0}")]
PublishFailed(String),
}
impl From<IdentityError> for DmError {
fn from(value: IdentityError) -> Self {
Self::EnvelopeConstruction(value.to_string())
}
}
#[derive(Debug, Clone)]
pub struct DmReceipt {
pub request_id: [u8; 16],
pub accepted_at: Instant,
pub retries_used: u8,
pub path: DmPath,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DmPath {
GossipInbox,
RawQuic,
}
#[derive(Debug, Clone)]
pub struct DmSendConfig {
pub timeout_per_attempt: Duration,
pub max_retries: u8,
pub backoff: BackoffPolicy,
pub require_gossip: bool,
pub prefer_raw_quic_if_connected: bool,
}
impl Default for DmSendConfig {
fn default() -> Self {
Self {
timeout_per_attempt: Duration::from_secs(5),
max_retries: 1,
backoff: BackoffPolicy::Fixed(Duration::from_millis(500)),
require_gossip: false,
prefer_raw_quic_if_connected: false,
}
}
}
#[derive(Debug, Clone, Copy)]
pub enum BackoffPolicy {
Fixed(Duration),
ExponentialFromTimeout { factor: u32 },
}
impl BackoffPolicy {
#[must_use]
pub fn delay(&self, base_timeout: Duration, attempt_idx: u8) -> Duration {
match self {
Self::Fixed(d) => *d,
Self::ExponentialFromTimeout { factor } => {
let mut delay = base_timeout;
for _ in 0..attempt_idx {
delay = delay.saturating_mul(*factor);
}
delay
}
}
}
}
#[must_use]
pub fn dm_inbox_topic(agent_id: &AgentId) -> TopicId {
let mut hasher = blake3::Hasher::new();
hasher.update(DM_INBOX_TOPIC_PREFIX);
hasher.update(agent_id.as_bytes());
TopicId::new(hasher.finalize().into())
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct DedupeKey {
pub sender_agent_id: [u8; 32],
pub request_id: [u8; 16],
}
impl DedupeKey {
#[must_use]
pub fn new(sender_agent_id: [u8; 32], request_id: [u8; 16]) -> Self {
Self {
sender_agent_id,
request_id,
}
}
}
pub struct RecentDeliveryCache {
inner: Mutex<RecentDeliveryCacheInner>,
}
struct RecentDeliveryCacheInner {
order: VecDeque<DedupeKey>,
entries: HashMap<DedupeKey, CachedOutcome>,
ttl: Duration,
max_size: usize,
}
#[derive(Debug, Clone)]
pub struct CachedOutcome {
pub outcome: DmAckOutcome,
pub first_seen: Instant,
}
impl RecentDeliveryCache {
#[must_use]
pub fn with_defaults() -> Self {
Self::new(Duration::from_secs(300), 10_000)
}
#[must_use]
pub fn new(ttl: Duration, max_size: usize) -> Self {
Self {
inner: Mutex::new(RecentDeliveryCacheInner {
order: VecDeque::new(),
entries: HashMap::new(),
ttl,
max_size,
}),
}
}
pub fn lookup(&self, key: &DedupeKey) -> Option<CachedOutcome> {
let Ok(mut inner) = self.inner.lock() else {
return None;
};
let now = Instant::now();
let entry = inner.entries.get(key)?.clone();
if now.duration_since(entry.first_seen) > inner.ttl {
inner.entries.remove(key);
if let Some(pos) = inner.order.iter().position(|k| k == key) {
inner.order.remove(pos);
}
return None;
}
Some(entry)
}
pub fn insert(&self, key: DedupeKey, outcome: DmAckOutcome) {
let Ok(mut inner) = self.inner.lock() else {
return;
};
if inner.entries.contains_key(&key) {
return;
}
inner.entries.insert(
key,
CachedOutcome {
outcome,
first_seen: Instant::now(),
},
);
inner.order.push_back(key);
while inner.entries.len() > inner.max_size {
let Some(oldest) = inner.order.pop_front() else {
break;
};
inner.entries.remove(&oldest);
}
}
pub fn len(&self) -> usize {
self.inner
.lock()
.map(|g| g.entries.len())
.unwrap_or_default()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
pub fn validate_timestamp_window(
created_at_unix_ms: u64,
expires_at_unix_ms: u64,
now_unix_ms: u64,
) -> std::result::Result<(), TimestampValidationError> {
if created_at_unix_ms > now_unix_ms.saturating_add(CLOCK_SKEW_TOLERANCE_MS) {
return Err(TimestampValidationError::FromFuture);
}
if now_unix_ms >= expires_at_unix_ms {
return Err(TimestampValidationError::Expired);
}
if expires_at_unix_ms > created_at_unix_ms.saturating_add(MAX_ENVELOPE_LIFETIME_MS) {
return Err(TimestampValidationError::LifetimeExceedsMax);
}
Ok(())
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TimestampValidationError {
FromFuture,
Expired,
LifetimeExceedsMax,
}
impl std::fmt::Display for TimestampValidationError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::FromFuture => write!(f, "envelope created_at is too far in the future"),
Self::Expired => write!(f, "envelope has expired"),
Self::LifetimeExceedsMax => write!(
f,
"envelope lifetime exceeds protocol max ({} ms)",
MAX_ENVELOPE_LIFETIME_MS
),
}
}
}
#[allow(clippy::too_many_arguments)]
fn build_signed_bytes(
protocol_version: u16,
request_id: &[u8; 16],
sender_agent_id: &[u8; 32],
sender_machine_id: &[u8; 32],
recipient_agent_id: &[u8; 32],
created_at_unix_ms: u64,
expires_at_unix_ms: u64,
body: &DmBody,
) -> Result<Vec<u8>> {
let body_bytes = postcard::to_stdvec(body)
.map_err(|e| IdentityError::Serialization(format!("DM body postcard: {e}")))?;
let mut out =
Vec::with_capacity(DM_SIGN_DOMAIN.len() + 2 + 16 + 32 * 3 + 8 * 2 + body_bytes.len());
out.extend_from_slice(DM_SIGN_DOMAIN);
out.extend_from_slice(&protocol_version.to_be_bytes());
out.extend_from_slice(request_id);
out.extend_from_slice(sender_agent_id);
out.extend_from_slice(sender_machine_id);
out.extend_from_slice(recipient_agent_id);
out.extend_from_slice(&created_at_unix_ms.to_be_bytes());
out.extend_from_slice(&expires_at_unix_ms.to_be_bytes());
out.extend_from_slice(&body_bytes);
Ok(out)
}
fn build_aead_aad(
request_id: &[u8; 16],
sender_agent_id: &[u8; 32],
recipient_agent_id: &[u8; 32],
created_at_unix_ms: u64,
) -> Vec<u8> {
let mut aad = Vec::with_capacity(DM_AEAD_DOMAIN.len() + 16 + 32 * 2 + 8);
aad.extend_from_slice(DM_AEAD_DOMAIN);
aad.extend_from_slice(request_id);
aad.extend_from_slice(sender_agent_id);
aad.extend_from_slice(recipient_agent_id);
aad.extend_from_slice(&created_at_unix_ms.to_be_bytes());
aad
}
pub fn encrypt_payload(
recipient_kem_pubkey_bytes: &[u8],
plaintext: &DmPlaintext,
aad: &[u8],
) -> Result<DmPayload> {
let plaintext_bytes = postcard::to_stdvec(plaintext)
.map_err(|e| IdentityError::Serialization(format!("DM plaintext postcard: {e}")))?;
let pk = MlKemPublicKey::from_bytes(KEM_VARIANT, recipient_kem_pubkey_bytes)
.map_err(|e| IdentityError::Serialization(format!("recipient KEM pubkey decode: {e}")))?;
let kem = MlKem::new(KEM_VARIANT);
let (shared, kem_ct) = kem
.encapsulate(&pk)
.map_err(|e| IdentityError::Serialization(format!("KEM encap: {e}")))?;
use rand::RngCore;
let mut nonce = [0u8; 12];
rand::thread_rng().fill_bytes(&mut nonce);
let cipher = ChaCha20Poly1305::new_from_slice(shared.as_bytes())
.map_err(|e| IdentityError::Serialization(format!("AEAD init (encrypt): {e}")))?;
let ct = cipher
.encrypt(
Nonce::from_slice(&nonce),
Payload {
msg: &plaintext_bytes,
aad,
},
)
.map_err(|e| IdentityError::Serialization(format!("AEAD encrypt: {e}")))?;
Ok(DmPayload {
kem_ciphertext: kem_ct.to_bytes(),
body_nonce: nonce,
body_ciphertext: ct,
})
}
pub fn decrypt_payload(
receiver_kem_keypair: &AgentKemKeypair,
payload: &DmPayload,
aad: &[u8],
) -> Result<DmPlaintext> {
let shared = receiver_kem_keypair.decapsulate(&payload.kem_ciphertext)?;
let cipher = ChaCha20Poly1305::new_from_slice(&shared)
.map_err(|e| IdentityError::Serialization(format!("AEAD init (decrypt): {e}")))?;
let plaintext_bytes = cipher
.decrypt(
Nonce::from_slice(&payload.body_nonce),
Payload {
msg: &payload.body_ciphertext,
aad,
},
)
.map_err(|e| IdentityError::Serialization(format!("AEAD decrypt: {e}")))?;
let plaintext: DmPlaintext = postcard::from_bytes(&plaintext_bytes)
.map_err(|e| IdentityError::Serialization(format!("DM plaintext postcard decode: {e}")))?;
Ok(plaintext)
}
impl DmEnvelope {
pub fn signed_bytes(&self) -> Result<Vec<u8>> {
build_signed_bytes(
self.protocol_version,
&self.request_id,
&self.sender_agent_id,
&self.sender_machine_id,
&self.recipient_agent_id,
self.created_at_unix_ms,
self.expires_at_unix_ms,
&self.body,
)
}
#[must_use]
pub fn aead_aad(&self) -> Vec<u8> {
build_aead_aad(
&self.request_id,
&self.sender_agent_id,
&self.recipient_agent_id,
self.created_at_unix_ms,
)
}
pub fn to_wire_bytes(&self) -> Result<Vec<u8>> {
postcard::to_stdvec(self)
.map_err(|e| IdentityError::Serialization(format!("DM envelope postcard: {e}")))
}
pub fn from_wire_bytes(bytes: &[u8]) -> Result<Self> {
if bytes.len() > MAX_ENVELOPE_BYTES {
return Err(IdentityError::Serialization(format!(
"DM envelope exceeds MAX_ENVELOPE_BYTES ({} > {})",
bytes.len(),
MAX_ENVELOPE_BYTES
)));
}
postcard::from_bytes(bytes)
.map_err(|e| IdentityError::Serialization(format!("DM envelope decode: {e}")))
}
#[must_use]
pub fn dedupe_key(&self) -> DedupeKey {
DedupeKey::new(self.sender_agent_id, self.request_id)
}
}
pub struct EnvelopeBuilder;
impl EnvelopeBuilder {
pub fn build_payload_body(
request_id: &[u8; 16],
sender_agent_id: &[u8; 32],
recipient_agent_id: &[u8; 32],
created_at_unix_ms: u64,
payload: Vec<u8>,
content_type: Option<String>,
recipient_kem_pubkey_bytes: &[u8],
) -> Result<DmBody> {
if payload.len() > MAX_PAYLOAD_BYTES {
return Err(IdentityError::Serialization(format!(
"DM payload exceeds MAX_PAYLOAD_BYTES ({} > {})",
payload.len(),
MAX_PAYLOAD_BYTES
)));
}
let plaintext = DmPlaintext {
request_id: *request_id,
payload,
content_type,
};
let aad = build_aead_aad(
request_id,
sender_agent_id,
recipient_agent_id,
created_at_unix_ms,
);
let ciphertext = encrypt_payload(recipient_kem_pubkey_bytes, &plaintext, &aad)?;
Ok(DmBody::Payload(ciphertext))
}
#[must_use]
pub fn build_ack_body(acks_request_id: [u8; 16], outcome: DmAckOutcome) -> DmBody {
DmBody::Ack(DmAckBody {
acks_request_id,
outcome,
})
}
}
#[derive(Default)]
pub struct InFlightAcks {
inner: Arc<dashmap::DashMap<[u8; 16], tokio::sync::oneshot::Sender<DmAckOutcome>>>,
}
impl Clone for InFlightAcks {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}
impl InFlightAcks {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn register(&self, request_id: [u8; 16]) -> tokio::sync::oneshot::Receiver<DmAckOutcome> {
let (tx, rx) = tokio::sync::oneshot::channel();
self.inner.insert(request_id, tx);
rx
}
pub fn resolve(&self, request_id: &[u8; 16], outcome: DmAckOutcome) -> bool {
if let Some((_, tx)) = self.inner.remove(request_id) {
let _ = tx.send(outcome);
true
} else {
false
}
}
pub fn cancel(&self, request_id: &[u8; 16]) {
self.inner.remove(request_id);
}
pub fn outstanding(&self) -> usize {
self.inner.len()
}
}
#[must_use]
pub fn now_unix_ms() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or_default()
}
#[allow(dead_code)]
fn _type_witness(_: AgentId, _: MachineId) {}
#[cfg(test)]
mod tests {
use super::*;
fn dummy_agent_id(seed: u8) -> [u8; 32] {
[seed; 32]
}
#[test]
fn timestamp_window_accepts_valid() {
let now = now_unix_ms();
let created = now - 1000;
let expires = now + 30_000;
assert!(validate_timestamp_window(created, expires, now).is_ok());
}
#[test]
fn timestamp_window_rejects_future_created_beyond_skew() {
let now = now_unix_ms();
let created = now + CLOCK_SKEW_TOLERANCE_MS + 1000;
let expires = created + 30_000;
assert_eq!(
validate_timestamp_window(created, expires, now),
Err(TimestampValidationError::FromFuture)
);
}
#[test]
fn timestamp_window_accepts_within_skew() {
let now = now_unix_ms();
let created = now + CLOCK_SKEW_TOLERANCE_MS - 100;
let expires = created + 30_000;
assert!(validate_timestamp_window(created, expires, now).is_ok());
}
#[test]
fn timestamp_window_rejects_expired() {
let now = now_unix_ms();
let created = now - 60_000;
let expires = now - 1000;
assert_eq!(
validate_timestamp_window(created, expires, now),
Err(TimestampValidationError::Expired)
);
}
#[test]
fn timestamp_window_rejects_lifetime_over_max() {
let now = now_unix_ms();
let created = now;
let expires = created + MAX_ENVELOPE_LIFETIME_MS + 1000;
assert_eq!(
validate_timestamp_window(created, expires, now),
Err(TimestampValidationError::LifetimeExceedsMax)
);
}
#[test]
fn dedupe_cache_insert_and_lookup() {
let cache = RecentDeliveryCache::with_defaults();
let k = DedupeKey::new(dummy_agent_id(1), [9; 16]);
assert!(cache.lookup(&k).is_none());
cache.insert(k, DmAckOutcome::Accepted);
let hit = cache.lookup(&k).expect("cache hit");
assert_eq!(hit.outcome, DmAckOutcome::Accepted);
}
#[test]
fn dedupe_cache_ttl_expiry() {
let cache = RecentDeliveryCache::new(Duration::from_millis(50), 100);
let k = DedupeKey::new(dummy_agent_id(1), [9; 16]);
cache.insert(k, DmAckOutcome::Accepted);
std::thread::sleep(Duration::from_millis(100));
assert!(cache.lookup(&k).is_none());
}
#[test]
fn dedupe_cache_lru_eviction() {
let cache = RecentDeliveryCache::new(Duration::from_secs(600), 3);
for i in 0..5u8 {
cache.insert(
DedupeKey::new(dummy_agent_id(i), [i; 16]),
DmAckOutcome::Accepted,
);
}
assert_eq!(cache.len(), 3);
assert!(cache
.lookup(&DedupeKey::new(dummy_agent_id(0), [0; 16]))
.is_none());
assert!(cache
.lookup(&DedupeKey::new(dummy_agent_id(1), [1; 16]))
.is_none());
for i in 2..5u8 {
assert!(cache
.lookup(&DedupeKey::new(dummy_agent_id(i), [i; 16]))
.is_some());
}
}
#[test]
fn kem_payload_round_trip() {
let kp = AgentKemKeypair::generate().expect("keygen");
let rid = [7u8; 16];
let sender = dummy_agent_id(1);
let recipient = dummy_agent_id(2);
let now = now_unix_ms();
let plaintext = DmPlaintext {
request_id: rid,
payload: b"hello direct".to_vec(),
content_type: Some("text/plain".to_string()),
};
let aad = build_aead_aad(&rid, &sender, &recipient, now);
let ct = encrypt_payload(&kp.public_bytes, &plaintext, &aad).expect("encrypt");
let decrypted = decrypt_payload(&kp, &ct, &aad).expect("decrypt");
assert_eq!(decrypted.request_id, rid);
assert_eq!(decrypted.payload, b"hello direct");
assert_eq!(decrypted.content_type.as_deref(), Some("text/plain"));
}
#[test]
fn kem_payload_aad_tamper_fails() {
let kp = AgentKemKeypair::generate().expect("keygen");
let rid = [7u8; 16];
let sender = dummy_agent_id(1);
let recipient = dummy_agent_id(2);
let now = now_unix_ms();
let plaintext = DmPlaintext {
request_id: rid,
payload: b"hi".to_vec(),
content_type: None,
};
let aad = build_aead_aad(&rid, &sender, &recipient, now);
let ct = encrypt_payload(&kp.public_bytes, &plaintext, &aad).expect("encrypt");
let mut bad_aad = aad.clone();
bad_aad[0] ^= 1;
assert!(decrypt_payload(&kp, &ct, &bad_aad).is_err());
}
#[test]
fn inbox_topic_is_deterministic_and_unique() {
let a = AgentId([1u8; 32]);
let b = AgentId([2u8; 32]);
let ta1 = dm_inbox_topic(&a);
let ta2 = dm_inbox_topic(&a);
let tb = dm_inbox_topic(&b);
assert_eq!(ta1, ta2);
assert_ne!(ta1, tb);
}
#[test]
fn envelope_wire_round_trip() {
let ack_body = EnvelopeBuilder::build_ack_body([3u8; 16], DmAckOutcome::Accepted);
let env = DmEnvelope {
protocol_version: DM_PROTOCOL_VERSION,
request_id: [5u8; 16],
sender_agent_id: dummy_agent_id(1),
sender_machine_id: dummy_agent_id(11),
recipient_agent_id: dummy_agent_id(2),
created_at_unix_ms: now_unix_ms(),
expires_at_unix_ms: now_unix_ms() + 120_000,
body: ack_body,
signature: vec![0u8; 64],
};
let bytes = env.to_wire_bytes().expect("encode");
let back = DmEnvelope::from_wire_bytes(&bytes).expect("decode");
assert_eq!(back.request_id, env.request_id);
assert_eq!(back.sender_agent_id, env.sender_agent_id);
match back.body {
DmBody::Ack(a) => {
assert_eq!(a.acks_request_id, [3u8; 16]);
assert_eq!(a.outcome, DmAckOutcome::Accepted);
}
DmBody::Payload(_) => panic!("expected Ack body"),
}
}
#[test]
fn envelope_oversized_rejected() {
let bytes = vec![0u8; MAX_ENVELOPE_BYTES + 1];
assert!(DmEnvelope::from_wire_bytes(&bytes).is_err());
}
#[test]
fn backoff_exponential_schedule() {
let base = Duration::from_secs(10);
let b = BackoffPolicy::ExponentialFromTimeout { factor: 2 };
assert_eq!(b.delay(base, 0), Duration::from_secs(10));
assert_eq!(b.delay(base, 1), Duration::from_secs(20));
assert_eq!(b.delay(base, 2), Duration::from_secs(40));
}
#[test]
fn in_flight_acks_resolve_and_cancel() {
let acks = InFlightAcks::new();
let rid = [1u8; 16];
let rx = acks.register(rid);
assert!(acks.resolve(&rid, DmAckOutcome::Accepted));
let received = tokio::runtime::Runtime::new().expect("rt").block_on(rx);
assert_eq!(received.expect("ok"), DmAckOutcome::Accepted);
assert!(!acks.resolve(&rid, DmAckOutcome::Accepted));
let rid2 = [2u8; 16];
let _rx2 = acks.register(rid2);
acks.cancel(&rid2);
assert_eq!(acks.outstanding(), 0);
}
}