use std::collections::BTreeSet;
use std::sync::Arc;
use crate::meerkat_machine::dsl::PeerEndpoint;
use meerkat_core::agent::{CommsCapabilityError, CommsRuntime};
use meerkat_core::comms::{PeerAddress, PeerId, PeerName, SendError, TrustedPeerDescriptor};
#[derive(Debug, thiserror::Error)]
pub enum CommsTrustReconcileError {
#[error("add_trusted_peer for `{peer_id}` failed: {source}")]
AddTrustFailed {
peer_id: String,
#[source]
source: SendError,
},
#[error("remove_trusted_peer for `{peer_id}` failed: {source}")]
RemoveTrustFailed {
peer_id: String,
#[source]
source: SendError,
},
#[error("invalid peer endpoint `{peer_id}`: {detail}")]
InvalidEndpoint { peer_id: String, detail: String },
#[error("canonical trust-store snapshot unavailable: {0}")]
TrustSnapshotUnavailable(#[from] CommsCapabilityError),
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct ReconcileReport {
pub added: Vec<PeerEndpoint>,
pub removed: Vec<PeerEndpoint>,
pub applied_epoch: u64,
}
pub struct CommsTrustReconciler {
comms: Arc<dyn CommsRuntime>,
}
impl std::fmt::Debug for CommsTrustReconciler {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CommsTrustReconciler")
.finish_non_exhaustive()
}
}
impl CommsTrustReconciler {
pub fn new(comms: Arc<dyn CommsRuntime>) -> Self {
Self { comms }
}
pub async fn reconcile(
&self,
epoch: u64,
effective_peers: BTreeSet<PeerEndpoint>,
) -> Result<ReconcileReport, CommsTrustReconcileError> {
let previous_peers = self.canonical_trusted_peer_snapshot().await?;
let to_add: Vec<PeerEndpoint> = effective_peers
.iter()
.filter(|ep| !previous_peers.contains(*ep))
.cloned()
.collect();
let to_remove: Vec<PeerEndpoint> = previous_peers
.iter()
.filter(|ep| !effective_peers.contains(*ep))
.cloned()
.collect();
let mut added = Vec::new();
let mut removed = Vec::new();
for endpoint in to_add {
let descriptor = endpoint_to_descriptor(&endpoint)?;
self.comms
.add_trusted_peer(descriptor)
.await
.map_err(|source| CommsTrustReconcileError::AddTrustFailed {
peer_id: endpoint.peer_id.0.clone(),
source,
})?;
added.push(endpoint);
}
for endpoint in to_remove {
self.comms
.remove_trusted_peer(endpoint.peer_id.0.as_str())
.await
.map_err(|source| CommsTrustReconcileError::RemoveTrustFailed {
peer_id: endpoint.peer_id.0.clone(),
source,
})?;
removed.push(endpoint);
}
Ok(ReconcileReport {
added,
removed,
applied_epoch: epoch,
})
}
async fn canonical_trusted_peer_snapshot(
&self,
) -> Result<BTreeSet<PeerEndpoint>, CommsTrustReconcileError> {
let snapshot = self.comms.peer_ingress_runtime_snapshot().await?;
snapshot
.trusted_peers
.iter()
.map(descriptor_to_endpoint)
.collect()
}
}
fn endpoint_to_descriptor(
endpoint: &PeerEndpoint,
) -> Result<TrustedPeerDescriptor, CommsTrustReconcileError> {
let name = PeerName::new(endpoint.name.0.as_str()).map_err(|detail| {
CommsTrustReconcileError::InvalidEndpoint {
peer_id: endpoint.peer_id.0.clone(),
detail: format!("invalid name: {detail}"),
}
})?;
let peer_id = PeerId::parse(endpoint.peer_id.0.as_str()).map_err(|err| {
CommsTrustReconcileError::InvalidEndpoint {
peer_id: endpoint.peer_id.0.clone(),
detail: format!("invalid peer_id: {err}"),
}
})?;
let address = parse_peer_address(endpoint.address.0.as_str()).map_err(|detail| {
CommsTrustReconcileError::InvalidEndpoint {
peer_id: endpoint.peer_id.0.clone(),
detail,
}
})?;
Ok(TrustedPeerDescriptor {
name,
peer_id,
address,
pubkey: endpoint.signing_key.0,
})
}
fn parse_peer_address(raw: &str) -> Result<PeerAddress, String> {
PeerAddress::parse(raw).map_err(|err| err.to_string())
}
fn descriptor_to_endpoint(
descriptor: &TrustedPeerDescriptor,
) -> Result<PeerEndpoint, CommsTrustReconcileError> {
Ok(PeerEndpoint {
name: crate::meerkat_machine::dsl::PeerName(descriptor.name.as_str().to_owned()),
peer_id: crate::meerkat_machine::dsl::PeerId(descriptor.peer_id.to_string()),
address: crate::meerkat_machine::dsl::PeerAddress(descriptor.address.to_string()),
signing_key: crate::meerkat_machine::dsl::PeerSigningKey(descriptor.pubkey),
})
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
use async_trait::async_trait;
use meerkat_core::{
PeerIngressAuthorityPhase, PeerIngressQueueSnapshot, PeerIngressRuntimeSnapshot,
};
use std::sync::atomic::{AtomicBool, Ordering};
fn endpoint(name: &str, peer_id_uuid: &str) -> PeerEndpoint {
PeerEndpoint {
name: crate::meerkat_machine::dsl::PeerName(format!("ep-{name}")),
peer_id: crate::meerkat_machine::dsl::PeerId(peer_id_uuid.to_string()),
address: crate::meerkat_machine::dsl::PeerAddress(format!("inproc://{name}")),
signing_key: crate::meerkat_machine::dsl::PeerSigningKey([name.as_bytes()[0]; 32]),
}
}
const UUID_A: &str = "aaaaaaaa-0000-4000-8000-000000000001";
const UUID_B: &str = "bbbbbbbb-0000-4000-8000-000000000002";
const UUID_C: &str = "cccccccc-0000-4000-8000-000000000003";
#[derive(Default)]
struct RecordingCommsRuntime {
adds: std::sync::Mutex<Vec<TrustedPeerDescriptor>>,
removes: std::sync::Mutex<Vec<String>>,
trusted: std::sync::Mutex<BTreeSet<PeerEndpoint>>,
fail_next_add: AtomicBool,
fail_next_remove: AtomicBool,
}
impl std::fmt::Debug for RecordingCommsRuntime {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RecordingCommsRuntime").finish()
}
}
#[async_trait]
impl CommsRuntime for RecordingCommsRuntime {
async fn drain_messages(&self) -> Vec<String> {
Vec::new()
}
fn inbox_notify(&self) -> Arc<tokio::sync::Notify> {
Arc::new(tokio::sync::Notify::new())
}
async fn add_trusted_peer(&self, peer: TrustedPeerDescriptor) -> Result<(), SendError> {
if self.fail_next_add.swap(false, Ordering::SeqCst) {
return Err(SendError::Unsupported("synthetic failure".into()));
}
self.adds
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.push(peer.clone());
self.trusted
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.insert(descriptor_to_endpoint(&peer).expect("test descriptor should map"));
Ok(())
}
async fn remove_trusted_peer(&self, peer_id: &str) -> Result<bool, SendError> {
if self.fail_next_remove.swap(false, Ordering::SeqCst) {
return Err(SendError::Unsupported("synthetic failure".into()));
}
self.removes
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.push(peer_id.to_string());
self.trusted
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.retain(|endpoint| endpoint.peer_id.0 != peer_id);
Ok(true)
}
async fn peer_ingress_runtime_snapshot(
&self,
) -> Result<PeerIngressRuntimeSnapshot, CommsCapabilityError> {
let trusted_peers = self
.trusted
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.iter()
.map(endpoint_to_descriptor)
.collect::<Result<Vec<_>, _>>()
.map_err(|err| CommsCapabilityError::Unsupported(err.to_string()))?;
Ok(PeerIngressRuntimeSnapshot {
self_peer_id: PeerId::parse("00000000-0000-4000-8000-000000000000")
.expect("valid test peer id"),
auth_required: true,
authority_phase: PeerIngressAuthorityPhase::Received,
trusted_peers,
submission_queue_len: 0,
queue: PeerIngressQueueSnapshot::default(),
})
}
}
impl RecordingCommsRuntime {
fn add_calls(&self) -> Vec<TrustedPeerDescriptor> {
self.adds
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.clone()
}
fn remove_calls(&self) -> Vec<String> {
self.removes
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.clone()
}
}
#[tokio::test]
async fn first_reconcile_registers_all_effective_peers() {
let comms = Arc::new(RecordingCommsRuntime::default());
let reconciler = CommsTrustReconciler::new(comms.clone());
let peers = BTreeSet::from([endpoint("A", UUID_A), endpoint("B", UUID_B)]);
let report = reconciler
.reconcile(1, peers.clone())
.await
.expect("first reconcile succeeds");
assert_eq!(report.applied_epoch, 1);
assert_eq!(report.added.len(), 2);
assert!(report.removed.is_empty());
let add_calls = comms.add_calls();
assert_eq!(add_calls.len(), 2);
assert_eq!(comms.remove_calls().len(), 0);
assert!(
add_calls
.iter()
.any(|d| d.peer_id.to_string() == UUID_A && d.name.as_str() == "ep-A"),
"add_trusted_peer must be called with the parsed (name, peer_id, address) triple",
);
assert!(
add_calls
.iter()
.any(|d| d.peer_id.to_string() == UUID_A && d.pubkey == [b'A'; 32]),
"reconciler must forward the machine-owned signing key for peer A",
);
assert!(
add_calls
.iter()
.any(|d| d.peer_id.to_string() == UUID_B && d.pubkey == [b'B'; 32]),
"reconciler must forward the machine-owned signing key for peer B",
);
}
#[tokio::test]
async fn subsequent_reconcile_adds_new_and_removes_departed() {
let comms = Arc::new(RecordingCommsRuntime::default());
let reconciler = CommsTrustReconciler::new(comms.clone());
let peers_v1 = BTreeSet::from([endpoint("A", UUID_A), endpoint("B", UUID_B)]);
reconciler
.reconcile(1, peers_v1)
.await
.expect("v1 reconcile");
let peers_v2 = BTreeSet::from([endpoint("A", UUID_A), endpoint("C", UUID_C)]);
let report = reconciler
.reconcile(2, peers_v2)
.await
.expect("v2 reconcile");
assert_eq!(report.added, vec![endpoint("C", UUID_C)]);
assert_eq!(report.removed, vec![endpoint("B", UUID_B)]);
assert_eq!(report.applied_epoch, 2);
assert_eq!(comms.add_calls().len(), 3);
assert_eq!(comms.remove_calls(), vec![UUID_B.to_string()]);
}
#[tokio::test]
async fn reconcile_reads_canonical_store_not_local_applied_view() {
let comms = Arc::new(RecordingCommsRuntime::default());
let reconciler = CommsTrustReconciler::new(comms.clone());
reconciler
.reconcile(5, BTreeSet::from([endpoint("A", UUID_A)]))
.await
.expect("first reconcile");
comms
.trusted
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.clear();
comms
.trusted
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.insert(endpoint("B", UUID_B));
let report = reconciler
.reconcile(4, BTreeSet::from([endpoint("A", UUID_A)]))
.await
.expect("reconcile should read canonical trust store");
assert_eq!(report.added, vec![endpoint("A", UUID_A)]);
assert_eq!(report.removed, vec![endpoint("B", UUID_B)]);
assert_eq!(report.applied_epoch, 4);
assert_eq!(comms.add_calls().len(), 2);
assert_eq!(comms.remove_calls(), vec![UUID_B.to_string()]);
}
#[tokio::test]
async fn empty_effective_set_clears_all_previously_trusted_peers() {
let comms = Arc::new(RecordingCommsRuntime::default());
let reconciler = CommsTrustReconciler::new(comms.clone());
reconciler
.reconcile(
1,
BTreeSet::from([endpoint("A", UUID_A), endpoint("B", UUID_B)]),
)
.await
.expect("seed");
let report = reconciler
.reconcile(2, BTreeSet::new())
.await
.expect("clear all");
assert_eq!(report.removed.len(), 2);
assert!(report.added.is_empty());
let mut removes = comms.remove_calls();
removes.sort();
assert_eq!(removes, vec![UUID_A.to_string(), UUID_B.to_string()]);
}
#[tokio::test]
async fn invalid_endpoint_surfaces_typed_error_without_touching_trust_store() {
let comms = Arc::new(RecordingCommsRuntime::default());
let reconciler = CommsTrustReconciler::new(comms.clone());
let bad = PeerEndpoint {
name: crate::meerkat_machine::dsl::PeerName("ep-bad".into()),
peer_id: crate::meerkat_machine::dsl::PeerId("not-a-uuid".into()),
address: crate::meerkat_machine::dsl::PeerAddress("inproc://bad".into()),
signing_key: crate::meerkat_machine::dsl::PeerSigningKey([9u8; 32]),
};
let err = reconciler
.reconcile(1, BTreeSet::from([bad]))
.await
.expect_err("invalid endpoint must surface a typed error");
assert!(
matches!(err, CommsTrustReconcileError::InvalidEndpoint { .. }),
"expected InvalidEndpoint, got {err:?}",
);
assert!(comms.add_calls().is_empty());
}
#[tokio::test]
async fn unknown_address_scheme_surfaces_typed_error_without_touching_trust_store() {
let comms = Arc::new(RecordingCommsRuntime::default());
let reconciler = CommsTrustReconciler::new(comms.clone());
let mut bad = endpoint("bad", UUID_A);
bad.address = crate::meerkat_machine::dsl::PeerAddress("http://127.0.0.1:4200".into());
let err = reconciler
.reconcile(1, BTreeSet::from([bad]))
.await
.expect_err("unknown address scheme must surface a typed error");
match err {
CommsTrustReconcileError::InvalidEndpoint { detail, .. } => assert!(
detail.contains("unknown peer address transport"),
"unexpected detail: {detail}",
),
other => panic!("expected InvalidEndpoint, got {other:?}"),
}
assert!(comms.add_calls().is_empty());
}
}