use std::collections::BTreeSet;
use std::sync::Arc;
use crate::meerkat_machine::dsl::PeerEndpoint;
use crate::protocol_comms_trust_reconcile::CommsTrustReconcileObligation;
use meerkat_core::agent::{CommsCapabilityError, CommsRuntime};
use meerkat_core::comms::{
CommsTrustMutation, CommsTrustMutationResult, GeneratedCommsTrustAuthoritySourceKind,
PeerAddress, PeerId, PeerName, SendError, TrustedPeerDescriptor,
};
#[derive(Debug, thiserror::Error)]
pub enum CommsTrustReconcileError {
#[error("add_trusted_peer for `{peer_id}` failed: {source}")]
AddTrustFailed {
peer_id: String,
#[source]
source: SendError,
},
#[error("remove_trusted_peer for `{peer_id}` failed: {source}")]
RemoveTrustFailed {
peer_id: String,
#[source]
source: SendError,
},
#[error("invalid peer endpoint `{peer_id}`: {detail}")]
InvalidEndpoint { peer_id: String, detail: String },
#[error("canonical trust-store snapshot unavailable: {0}")]
TrustSnapshotUnavailable(#[from] CommsCapabilityError),
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct ReconcileReport {
pub added: Vec<PeerEndpoint>,
pub removed: Vec<PeerEndpoint>,
pub applied_epoch: u64,
}
pub struct CommsTrustReconciler {
comms: Arc<dyn CommsRuntime>,
}
impl std::fmt::Debug for CommsTrustReconciler {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CommsTrustReconciler")
.finish_non_exhaustive()
}
}
impl CommsTrustReconciler {
pub fn new(comms: Arc<dyn CommsRuntime>) -> Self {
Self { comms }
}
pub async fn reconcile(
&self,
obligation: &CommsTrustReconcileObligation,
) -> Result<ReconcileReport, CommsTrustReconcileError> {
let effective_peers = crate::protocol_comms_trust_reconcile::effective_peers(obligation);
let previous_peers = self.canonical_trusted_peer_snapshot().await?;
let to_add: Vec<PeerEndpoint> = effective_peers
.iter()
.filter(|ep| !previous_peers.contains(*ep))
.cloned()
.collect();
let effective_peer_ids = effective_peers
.iter()
.map(|endpoint| endpoint.peer_id.0.as_str())
.collect::<BTreeSet<_>>();
let to_remove: Vec<PeerEndpoint> = previous_peers
.iter()
.filter(|ep| !effective_peer_ids.contains(ep.peer_id.0.as_str()))
.cloned()
.collect();
let mut added = Vec::new();
let mut removed = Vec::new();
for endpoint in to_add {
let descriptor = endpoint_to_descriptor(&endpoint)?;
let authority = crate::protocol_comms_trust_reconcile::authority_for_endpoint(
obligation, &endpoint,
)
.map_err(|detail| CommsTrustReconcileError::AddTrustFailed {
peer_id: endpoint.peer_id.0.clone(),
source: SendError::Validation(detail),
})?;
self.comms
.apply_trust_mutation(CommsTrustMutation::AddTrustedPeer {
authority,
peer: descriptor,
})
.await
.map_err(|source| CommsTrustReconcileError::AddTrustFailed {
peer_id: endpoint.peer_id.0.clone(),
source,
})?;
added.push(endpoint);
}
for endpoint in to_remove {
let authority = crate::protocol_comms_trust_reconcile::removal_authority_for_peer_id(
obligation,
endpoint.peer_id.0.as_str(),
)
.map_err(|detail| CommsTrustReconcileError::RemoveTrustFailed {
peer_id: endpoint.peer_id.0.clone(),
source: SendError::Validation(detail),
})?;
let result = self
.comms
.apply_trust_mutation(CommsTrustMutation::RemoveTrustedPeer {
peer_id: endpoint.peer_id.0.clone(),
authority,
})
.await
.map_err(|source| CommsTrustReconcileError::RemoveTrustFailed {
peer_id: endpoint.peer_id.0.clone(),
source,
})?;
if !matches!(result, CommsTrustMutationResult::Removed { .. }) {
return Err(CommsTrustReconcileError::RemoveTrustFailed {
peer_id: endpoint.peer_id.0.clone(),
source: SendError::Internal(
"generated trust removal returned non-removal result".to_string(),
),
});
}
removed.push(endpoint);
}
Ok(ReconcileReport {
added,
removed,
applied_epoch: obligation.peer_projection_epoch(),
})
}
async fn canonical_trusted_peer_snapshot(
&self,
) -> Result<BTreeSet<PeerEndpoint>, CommsTrustReconcileError> {
self.comms
.trusted_peer_projection_snapshot_for_source(
GeneratedCommsTrustAuthoritySourceKind::MeerkatMachinePeerProjection,
)
.await?
.iter()
.map(descriptor_to_endpoint)
.collect()
}
}
pub(crate) fn endpoint_to_descriptor(
endpoint: &PeerEndpoint,
) -> Result<TrustedPeerDescriptor, CommsTrustReconcileError> {
let name = PeerName::new(endpoint.name.0.as_str()).map_err(|detail| {
CommsTrustReconcileError::InvalidEndpoint {
peer_id: endpoint.peer_id.0.clone(),
detail: format!("invalid name: {detail}"),
}
})?;
let peer_id = PeerId::parse(endpoint.peer_id.0.as_str()).map_err(|err| {
CommsTrustReconcileError::InvalidEndpoint {
peer_id: endpoint.peer_id.0.clone(),
detail: format!("invalid peer_id: {err}"),
}
})?;
let address = parse_peer_address(endpoint.address.0.as_str()).map_err(|detail| {
CommsTrustReconcileError::InvalidEndpoint {
peer_id: endpoint.peer_id.0.clone(),
detail,
}
})?;
Ok(TrustedPeerDescriptor {
name,
peer_id,
address,
pubkey: endpoint.signing_key.0,
})
}
fn parse_peer_address(raw: &str) -> Result<PeerAddress, String> {
PeerAddress::parse(raw).map_err(|err| err.to_string())
}
fn descriptor_to_endpoint(
descriptor: &TrustedPeerDescriptor,
) -> Result<PeerEndpoint, CommsTrustReconcileError> {
Ok(PeerEndpoint {
name: crate::meerkat_machine::dsl::PeerName(descriptor.name.as_str().to_owned()),
peer_id: crate::meerkat_machine::dsl::PeerId(descriptor.peer_id.to_string()),
address: crate::meerkat_machine::dsl::PeerAddress(descriptor.address.to_string()),
signing_key: crate::meerkat_machine::dsl::PeerSigningKey(descriptor.pubkey),
})
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
use async_trait::async_trait;
use meerkat_core::{
PeerIngressAuthorityPhase, PeerIngressQueueSnapshot, PeerIngressRuntimeSnapshot,
};
use std::sync::atomic::{AtomicBool, Ordering};
fn endpoint(name: &str, peer_id_uuid: &str) -> PeerEndpoint {
PeerEndpoint {
name: crate::meerkat_machine::dsl::PeerName(format!("ep-{name}")),
peer_id: crate::meerkat_machine::dsl::PeerId(peer_id_uuid.to_string()),
address: crate::meerkat_machine::dsl::PeerAddress(format!("inproc://{name}")),
signing_key: crate::meerkat_machine::dsl::PeerSigningKey([name.as_bytes()[0]; 32]),
}
}
const UUID_A: &str = "f805a14c-4089-5328-b4cb-39ede8b4464d";
const UUID_B: &str = "a576ebe3-ccd6-565d-8f48-5f29c0db055d";
const UUID_C: &str = "76f30618-7a02-578b-a8bc-6d82ae7ba8cf";
const LOCAL_UUID: &str = "00000000-0000-4000-8000-000000000000";
fn local_peer_id() -> PeerId {
PeerId::parse(LOCAL_UUID).expect("valid local test peer id")
}
#[test]
fn endpoint_to_descriptor_rejects_malformed_identity_atoms_at_boundary() {
assert!(endpoint_to_descriptor(&endpoint("ok", UUID_A)).is_ok());
let bad_peer_id = endpoint("bad-id", "not-a-uuid");
assert!(
matches!(
endpoint_to_descriptor(&bad_peer_id),
Err(CommsTrustReconcileError::InvalidEndpoint { .. })
),
"a non-UUID peer_id must be rejected at the ingress boundary"
);
let mut bad_address = endpoint("bad-addr", UUID_B);
bad_address.address =
crate::meerkat_machine::dsl::PeerAddress("::::not a valid url".to_string());
assert!(
matches!(
endpoint_to_descriptor(&bad_address),
Err(CommsTrustReconcileError::InvalidEndpoint { .. })
),
"a malformed address must be rejected at the ingress boundary"
);
}
#[derive(Default)]
struct RecordingCommsRuntime {
adds: std::sync::Mutex<Vec<TrustedPeerDescriptor>>,
removes: std::sync::Mutex<Vec<String>>,
trusted: std::sync::Mutex<BTreeSet<PeerEndpoint>>,
fail_next_add: AtomicBool,
fail_next_remove: AtomicBool,
}
impl std::fmt::Debug for RecordingCommsRuntime {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RecordingCommsRuntime").finish()
}
}
#[async_trait]
impl CommsRuntime for RecordingCommsRuntime {
async fn drain_messages(&self) -> Vec<String> {
Vec::new()
}
fn inbox_notify(&self) -> Arc<tokio::sync::Notify> {
Arc::new(tokio::sync::Notify::new())
}
async fn apply_trust_mutation(
&self,
mutation: CommsTrustMutation,
) -> Result<CommsTrustMutationResult, SendError> {
match mutation {
CommsTrustMutation::AddTrustedPeer { peer, authority } => {
authority
.validate_public_add(Some(local_peer_id()), &peer)
.map_err(SendError::Validation)?;
if self.fail_next_add.swap(false, Ordering::SeqCst) {
return Err(SendError::Unsupported("synthetic failure".into()));
}
self.adds
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.push(peer.clone());
let mut trusted = self
.trusted
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let created = !trusted
.iter()
.any(|endpoint| endpoint.peer_id.0 == peer.peer_id.to_string());
trusted.retain(|endpoint| endpoint.peer_id.0 != peer.peer_id.to_string());
trusted
.insert(descriptor_to_endpoint(&peer).expect("test descriptor should map"));
Ok(CommsTrustMutationResult::Added { created })
}
CommsTrustMutation::RemoveTrustedPeer { peer_id, authority } => {
let parsed_peer_id = PeerId::parse(&peer_id)
.map_err(|err| SendError::Validation(err.to_string()))?;
authority
.validate_public_remove(Some(local_peer_id()), parsed_peer_id)
.map_err(SendError::Validation)?;
if self.fail_next_remove.swap(false, Ordering::SeqCst) {
return Err(SendError::Unsupported("synthetic failure".into()));
}
self.removes
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.push(peer_id.clone());
self.trusted
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.retain(|endpoint| endpoint.peer_id.0 != peer_id);
Ok(CommsTrustMutationResult::Removed { removed: true })
}
_ => Err(SendError::Unsupported(
"test runtime only supports public trust projection".into(),
)),
}
}
async fn peer_ingress_runtime_snapshot(
&self,
) -> Result<PeerIngressRuntimeSnapshot, CommsCapabilityError> {
let trusted_peers = self
.trusted
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.iter()
.map(endpoint_to_descriptor)
.collect::<Result<Vec<_>, _>>()
.map_err(|err| CommsCapabilityError::Unsupported(err.to_string()))?;
Ok(PeerIngressRuntimeSnapshot {
self_peer_id: local_peer_id(),
auth_required: true,
authority_phase: PeerIngressAuthorityPhase::Received,
trusted_peers,
submission_queue_len: 0,
queue: PeerIngressQueueSnapshot::default(),
})
}
async fn public_trusted_peer_projection_snapshot(
&self,
) -> Result<Vec<TrustedPeerDescriptor>, CommsCapabilityError> {
self.trusted
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.iter()
.map(endpoint_to_descriptor)
.collect::<Result<Vec<_>, _>>()
.map_err(|err| CommsCapabilityError::Unsupported(err.to_string()))
}
async fn trusted_peer_projection_snapshot_for_source(
&self,
source_kind: GeneratedCommsTrustAuthoritySourceKind,
) -> Result<Vec<TrustedPeerDescriptor>, CommsCapabilityError> {
if source_kind != GeneratedCommsTrustAuthoritySourceKind::MeerkatMachinePeerProjection {
return Ok(Vec::new());
}
self.public_trusted_peer_projection_snapshot().await
}
}
impl RecordingCommsRuntime {
fn add_calls(&self) -> Vec<TrustedPeerDescriptor> {
self.adds
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.clone()
}
fn remove_calls(&self) -> Vec<String> {
self.removes
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.clone()
}
}
fn obligation(
epoch: u64,
direct_peer_endpoints: BTreeSet<PeerEndpoint>,
) -> CommsTrustReconcileObligation {
let authority = Arc::new(std::sync::Mutex::new(
crate::meerkat_machine::dsl::MeerkatMachineAuthority::new(),
));
let projection_epoch = epoch.max(1);
let transition = {
let mut guard = authority
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
guard
.apply_signal(crate::meerkat_machine::dsl::MeerkatMachineSignal::Initialize)
.expect("Initialize signal");
crate::meerkat_machine::dsl::MeerkatMachineMutator::apply(
&mut *guard,
crate::meerkat_machine::dsl::MeerkatMachineInput::RegisterSession {
session_id: crate::meerkat_machine::dsl::SessionId::from(
"comms-trust-reconcile-test",
),
},
)
.expect("RegisterSession input");
crate::meerkat_machine::dsl::MeerkatMachineMutator::apply(
&mut *guard,
crate::meerkat_machine::dsl::MeerkatMachineInput::PublishLocalEndpoint {
endpoint: endpoint("local", LOCAL_UUID),
},
)
.expect("PublishLocalEndpoint input");
let mut transition = None;
for overlay_epoch in 1..=projection_epoch {
transition = Some(
crate::meerkat_machine::dsl::MeerkatMachineMutator::apply(
&mut *guard,
crate::meerkat_machine::dsl::MeerkatMachineInput::ApplyMobPeerOverlay {
epoch: overlay_epoch,
endpoints: direct_peer_endpoints.clone(),
},
)
.expect("ApplyMobPeerOverlay input"),
);
}
transition.expect("projection epoch loop produces transition")
};
crate::protocol_comms_trust_reconcile::extract_obligations_with_freshness(
&transition,
crate::protocol_comms_trust_reconcile::PeerProjectionFreshnessAuthority::from_authority(
authority,
),
)
.into_iter()
.next()
.expect("generated reconcile obligation")
}
fn stale_obligation_pair(
first_peer_endpoints: BTreeSet<PeerEndpoint>,
second_peer_endpoints: BTreeSet<PeerEndpoint>,
) -> (CommsTrustReconcileObligation, CommsTrustReconcileObligation) {
let authority = Arc::new(std::sync::Mutex::new(
crate::meerkat_machine::dsl::MeerkatMachineAuthority::new(),
));
let (first_transition, second_transition) = {
let mut guard = authority
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
guard
.apply_signal(crate::meerkat_machine::dsl::MeerkatMachineSignal::Initialize)
.expect("Initialize signal");
crate::meerkat_machine::dsl::MeerkatMachineMutator::apply(
&mut *guard,
crate::meerkat_machine::dsl::MeerkatMachineInput::RegisterSession {
session_id: crate::meerkat_machine::dsl::SessionId::from(
"comms-trust-reconcile-stale-test",
),
},
)
.expect("RegisterSession input");
crate::meerkat_machine::dsl::MeerkatMachineMutator::apply(
&mut *guard,
crate::meerkat_machine::dsl::MeerkatMachineInput::PublishLocalEndpoint {
endpoint: endpoint("local", LOCAL_UUID),
},
)
.expect("PublishLocalEndpoint input");
let first = crate::meerkat_machine::dsl::MeerkatMachineMutator::apply(
&mut *guard,
crate::meerkat_machine::dsl::MeerkatMachineInput::ApplyMobPeerOverlay {
epoch: 1,
endpoints: first_peer_endpoints,
},
)
.expect("ApplyMobPeerOverlay first input");
let second = crate::meerkat_machine::dsl::MeerkatMachineMutator::apply(
&mut *guard,
crate::meerkat_machine::dsl::MeerkatMachineInput::ApplyMobPeerOverlay {
epoch: 2,
endpoints: second_peer_endpoints,
},
)
.expect("ApplyMobPeerOverlay second input");
(first, second)
};
let freshness =
crate::protocol_comms_trust_reconcile::PeerProjectionFreshnessAuthority::from_authority(
Arc::clone(&authority),
);
let first = crate::protocol_comms_trust_reconcile::extract_obligations_with_freshness(
&first_transition,
freshness.clone(),
)
.into_iter()
.next()
.expect("first generated reconcile obligation");
let second = crate::protocol_comms_trust_reconcile::extract_obligations_with_freshness(
&second_transition,
freshness,
)
.into_iter()
.next()
.expect("second generated reconcile obligation");
(first, second)
}
#[test]
fn generated_reconcile_obligation_mints_each_peer_operation_once() {
let endpoint_a = endpoint("A", UUID_A);
let obligation = obligation(1, BTreeSet::from([endpoint_a.clone()]));
crate::protocol_comms_trust_reconcile::authority_for_endpoint(&obligation, &endpoint_a)
.expect("first add authority mint succeeds");
let duplicate_add =
crate::protocol_comms_trust_reconcile::authority_for_endpoint(&obligation, &endpoint_a)
.expect_err("same obligation must not mint the same add authority twice");
assert!(
duplicate_add.contains("already minted"),
"unexpected duplicate add rejection: {duplicate_add}",
);
crate::protocol_comms_trust_reconcile::removal_authority_for_peer_id(&obligation, UUID_B)
.expect("first remove authority for a non-effective peer succeeds");
let duplicate_remove =
crate::protocol_comms_trust_reconcile::removal_authority_for_peer_id(
&obligation,
UUID_B,
)
.expect_err("same obligation must not mint the same remove authority twice");
assert!(
duplicate_remove.contains("already minted"),
"unexpected duplicate remove rejection: {duplicate_remove}",
);
}
#[test]
fn stale_generated_reconcile_obligation_cannot_mint_authority() {
let endpoint_a = endpoint("A", UUID_A);
let endpoint_b = endpoint("B", UUID_B);
let (stale, current) = stale_obligation_pair(
BTreeSet::from([endpoint_a.clone()]),
BTreeSet::from([endpoint_b.clone()]),
);
crate::protocol_comms_trust_reconcile::authority_for_endpoint(¤t, &endpoint_b)
.expect("current obligation mints authority");
let stale_error =
crate::protocol_comms_trust_reconcile::authority_for_endpoint(&stale, &endpoint_a)
.expect_err("stale obligation must fail closed");
assert!(
stale_error.contains("stale generated peer projection trust obligation"),
"unexpected stale-obligation rejection: {stale_error}",
);
}
#[tokio::test]
async fn first_reconcile_registers_all_effective_peers() {
let comms = Arc::new(RecordingCommsRuntime::default());
let reconciler = CommsTrustReconciler::new(comms.clone());
let peers = BTreeSet::from([endpoint("A", UUID_A), endpoint("B", UUID_B)]);
let report = reconciler
.reconcile(&obligation(1, peers.clone()))
.await
.expect("first reconcile succeeds");
assert_eq!(report.applied_epoch, 1);
assert_eq!(report.added.len(), 2);
assert!(report.removed.is_empty());
let add_calls = comms.add_calls();
assert_eq!(add_calls.len(), 2);
assert_eq!(comms.remove_calls().len(), 0);
assert!(
add_calls
.iter()
.any(|d| d.peer_id.to_string() == UUID_A && d.name.as_str() == "ep-A"),
"add_trusted_peer must be called with the parsed (name, peer_id, address) triple",
);
assert!(
add_calls
.iter()
.any(|d| d.peer_id.to_string() == UUID_A && d.pubkey == [b'A'; 32]),
"reconciler must forward the machine-owned signing key for peer A",
);
assert!(
add_calls
.iter()
.any(|d| d.peer_id.to_string() == UUID_B && d.pubkey == [b'B'; 32]),
"reconciler must forward the machine-owned signing key for peer B",
);
}
#[tokio::test]
async fn subsequent_reconcile_adds_new_and_removes_departed() {
let comms = Arc::new(RecordingCommsRuntime::default());
let reconciler = CommsTrustReconciler::new(comms.clone());
let peers_v1 = BTreeSet::from([endpoint("A", UUID_A), endpoint("B", UUID_B)]);
reconciler
.reconcile(&obligation(1, peers_v1))
.await
.expect("v1 reconcile");
let peers_v2 = BTreeSet::from([endpoint("A", UUID_A), endpoint("C", UUID_C)]);
let report = reconciler
.reconcile(&obligation(2, peers_v2))
.await
.expect("v2 reconcile");
assert_eq!(report.added, vec![endpoint("C", UUID_C)]);
assert_eq!(report.removed, vec![endpoint("B", UUID_B)]);
assert_eq!(report.applied_epoch, 2);
assert_eq!(comms.add_calls().len(), 3);
assert_eq!(comms.remove_calls(), vec![UUID_B.to_string()]);
}
#[tokio::test]
async fn peer_metadata_update_does_not_remove_still_desired_peer_id() {
let comms = Arc::new(RecordingCommsRuntime::default());
let reconciler = CommsTrustReconciler::new(comms.clone());
reconciler
.reconcile(&obligation(1, BTreeSet::from([endpoint("A", UUID_A)])))
.await
.expect("initial reconcile");
let updated = endpoint("A2", UUID_A);
let report = reconciler
.reconcile(&obligation(2, BTreeSet::from([updated.clone()])))
.await
.expect("metadata update reconcile");
assert_eq!(report.added, vec![updated]);
assert!(report.removed.is_empty());
assert!(comms.remove_calls().is_empty());
assert_eq!(comms.add_calls().len(), 2);
}
#[tokio::test]
async fn reconcile_reads_canonical_store_not_local_applied_view() {
let comms = Arc::new(RecordingCommsRuntime::default());
let reconciler = CommsTrustReconciler::new(comms.clone());
reconciler
.reconcile(&obligation(5, BTreeSet::from([endpoint("A", UUID_A)])))
.await
.expect("first reconcile");
comms
.trusted
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.clear();
comms
.trusted
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.insert(endpoint("B", UUID_B));
let report = reconciler
.reconcile(&obligation(6, BTreeSet::from([endpoint("A", UUID_A)])))
.await
.expect("reconcile should read canonical trust store");
assert_eq!(report.added, vec![endpoint("A", UUID_A)]);
assert_eq!(report.removed, vec![endpoint("B", UUID_B)]);
assert_eq!(report.applied_epoch, 6);
assert_eq!(comms.add_calls().len(), 2);
assert_eq!(comms.remove_calls(), vec![UUID_B.to_string()]);
}
#[tokio::test]
async fn empty_effective_set_clears_all_previously_trusted_peers() {
let comms = Arc::new(RecordingCommsRuntime::default());
let reconciler = CommsTrustReconciler::new(comms.clone());
reconciler
.reconcile(&obligation(
1,
BTreeSet::from([endpoint("A", UUID_A), endpoint("B", UUID_B)]),
))
.await
.expect("seed");
let report = reconciler
.reconcile(&obligation(2, BTreeSet::new()))
.await
.expect("clear all");
assert_eq!(report.removed.len(), 2);
assert!(report.added.is_empty());
let mut removes = comms.remove_calls();
removes.sort();
let mut expected = vec![UUID_A.to_string(), UUID_B.to_string()];
expected.sort();
assert_eq!(removes, expected);
}
#[tokio::test]
async fn invalid_endpoint_surfaces_typed_error_without_touching_trust_store() {
let comms = Arc::new(RecordingCommsRuntime::default());
let reconciler = CommsTrustReconciler::new(comms.clone());
let bad = PeerEndpoint {
name: crate::meerkat_machine::dsl::PeerName("ep-bad".into()),
peer_id: crate::meerkat_machine::dsl::PeerId("not-a-uuid".into()),
address: crate::meerkat_machine::dsl::PeerAddress("inproc://bad".into()),
signing_key: crate::meerkat_machine::dsl::PeerSigningKey([9u8; 32]),
};
let err = reconciler
.reconcile(&obligation(1, BTreeSet::from([bad])))
.await
.expect_err("invalid endpoint must surface a typed error");
assert!(
matches!(err, CommsTrustReconcileError::InvalidEndpoint { .. }),
"expected InvalidEndpoint, got {err:?}",
);
assert!(comms.add_calls().is_empty());
}
#[tokio::test]
async fn unknown_address_scheme_surfaces_typed_error_without_touching_trust_store() {
let comms = Arc::new(RecordingCommsRuntime::default());
let reconciler = CommsTrustReconciler::new(comms.clone());
let mut bad = endpoint("bad", UUID_A);
bad.address = crate::meerkat_machine::dsl::PeerAddress("http://127.0.0.1:4200".into());
let err = reconciler
.reconcile(&obligation(1, BTreeSet::from([bad])))
.await
.expect_err("unknown address scheme must surface a typed error");
match err {
CommsTrustReconcileError::InvalidEndpoint { detail, .. } => assert!(
detail.contains("unknown peer address transport"),
"unexpected detail: {detail}",
),
other => panic!("expected InvalidEndpoint, got {other:?}"),
}
assert!(comms.add_calls().is_empty());
}
}