#![allow(clippy::large_futures)]
use std::sync::Arc;
use std::time::Duration;
use meerkat_core::agent::CommsRuntime;
#[allow(unused_imports)]
use meerkat_core::comms::{CommsCommand, PeerId, PeerRoute, TrustedPeerDescriptor};
use meerkat_core::event::AgentEvent;
use meerkat_core::interaction::{
InteractionContent, PeerIngressFact, PeerInputCandidate, PeerInputClass,
};
use meerkat_core::types::SessionId;
use meerkat_contracts::wire::supervisor_bridge::{
BridgeAck, BridgeBindResponse, BridgeCapabilities, BridgeCommand, BridgeDeliveryOutcome,
BridgeDeliveryPayload, BridgeDeliveryRejectionCause, BridgeDeliveryResponse,
BridgeDestroyResponse, BridgeMemberRuntimeState, BridgeObservationResponse,
BridgePeerConnectivity, BridgePeerIdentity, BridgePeerSpec, BridgeRejectionCause, BridgeReply,
BridgeRetireResponse, BridgeSupervisorPayload, SUPERVISOR_BRIDGE_INTENT,
canonicalize_bridge_address, decode_bridge_command,
};
#[cfg(test)]
use meerkat_contracts::wire::supervisor_bridge::{
SUPERVISOR_BRIDGE_BOOTSTRAP_TOKEN_PARAM, SUPERVISOR_BRIDGE_PROTOCOL_VERSION,
};
use crate::comms_bridge::classified_interaction_to_runtime_input;
use crate::completion::CompletionOutcome;
use crate::identifiers::IdempotencyKey;
#[cfg(test)]
use crate::identifiers::LogicalRuntimeId;
use crate::input::{
Input, InputDurability, InputHeader, InputOrigin, InputVisibility, PeerConvention, PeerInput,
};
use crate::meerkat_machine::{
DrainExitReason, MeerkatMachine, SupervisorBinding, SupervisorBindingStageError,
};
use crate::service_ext::SessionServiceRuntimeExt as _;
use crate::tokio::sync::mpsc;
use crate::traits::RuntimeControlPlane;
pub const DEFAULT_IDLE_TIMEOUT: Duration = Duration::from_secs(300);
pub fn spawn_comms_drain(
adapter: Arc<MeerkatMachine>,
session_id: SessionId,
comms_runtime: Arc<dyn CommsRuntime>,
idle_timeout: Option<Duration>,
) -> crate::tokio::task::JoinHandle<()> {
let timeout_dur = idle_timeout.unwrap_or(DEFAULT_IDLE_TIMEOUT);
let runtime_id = MeerkatMachine::logical_runtime_id(&session_id);
crate::tokio::spawn(async move {
if std::env::var_os("RKAT_TRACE_COMMS_DRAIN_BIND").is_some() {
tracing::info!(
%session_id,
comms_ptr = ?Arc::as_ptr(&comms_runtime),
"comms_drain task started"
);
}
let inbox_notify = comms_runtime.inbox_notify();
loop {
let notified = inbox_notify.notified();
let candidates = comms_runtime.drain_peer_input_candidates().await;
if candidates.is_empty() {
if comms_runtime.dismiss_received() {
tracing::info!("comms_drain: DISMISS received, stopping");
let _ = adapter
.stop_runtime_executor(&session_id, "peer DISMISS")
.await;
adapter
.notify_comms_drain_exited(&session_id, DrainExitReason::Dismissed)
.await;
return;
}
if crate::tokio::time::timeout(timeout_dur, notified)
.await
.is_err()
{
tracing::info!("comms_drain: idle timeout expired, stopping");
adapter
.notify_comms_drain_exited(&session_id, DrainExitReason::IdleTimeout)
.await;
return;
}
continue;
}
for candidate in candidates {
if try_handle_supervisor_bridge_command(
&adapter,
&session_id,
&comms_runtime,
&candidate,
)
.await
{
continue;
}
let candidate_class = candidate.class();
match candidate_class {
PeerInputClass::Ack => {
}
PeerInputClass::PeerLifecycleAdded
| PeerInputClass::PeerLifecycleRetired
| PeerInputClass::PeerLifecycleUnwired => {
tracing::debug!(
session_id = %session_id,
class = ?candidate_class,
lifecycle_peer = ?candidate.lifecycle_peer,
"comms_drain: consumed silent peer lifecycle notice"
);
}
PeerInputClass::ResponseProgress | PeerInputClass::ResponseTerminal => {
let terminal_status = match candidate.response_terminality {
Some(meerkat_core::interaction::TerminalityClass::Terminal {
disposition,
}) => match disposition {
meerkat_core::interaction::TerminalDisposition::Completed => {
Some(meerkat_core::handles::PeerTerminalDisposition::Completed)
}
meerkat_core::interaction::TerminalDisposition::Failed => {
Some(meerkat_core::handles::PeerTerminalDisposition::Failed)
}
_ => None,
},
Some(meerkat_core::interaction::TerminalityClass::Progress) | None => {
None
}
Some(_) => None,
};
let is_terminal = terminal_status.is_some();
if is_terminal {
let interaction_id = match &candidate.interaction.content {
meerkat_core::interaction::InteractionContent::Response {
in_reply_to,
..
} => *in_reply_to,
other => {
tracing::warn!(
content = ?other,
"comms_drain: terminal response candidate missing response content"
);
continue;
}
};
let subscriber = comms_runtime.interaction_subscriber(&interaction_id);
let peer_interaction_handle = comms_runtime.peer_interaction_handle();
let dsl_installed = peer_interaction_handle.is_some();
if let (Some(handle), Some(disposition)) =
(peer_interaction_handle.as_ref(), terminal_status)
{
let corr_id =
meerkat_core::PeerCorrelationId::from_uuid(interaction_id.0);
if let Err(err) = handle.response_terminal(corr_id, disposition) {
tracing::warn!(
error = %err,
corr_id = %corr_id,
"PeerInteractionHandle::response_terminal rejected (no DSL entry — classified drain saw an unknown corr_id)"
);
}
}
let content_input = match classified_interaction_to_runtime_input(
&candidate,
&runtime_id,
) {
Ok(input) => input,
Err(err) => {
tracing::warn!(
error = %err,
interaction_id = %interaction_id,
"comms_drain: rejected malformed terminal peer ingress"
);
if !dsl_installed {
comms_runtime.mark_interaction_complete(&interaction_id);
}
continue;
}
};
let result = adapter
.accept_input_with_completion(&session_id, content_input)
.await;
match result {
Ok((_outcome, handle)) => {
if subscriber.is_some() || handle.is_some() {
spawn_completion_bridge(
Some(comms_runtime.clone()),
interaction_id,
subscriber,
handle,
);
} else if !dsl_installed {
comms_runtime.mark_interaction_complete(&interaction_id);
}
}
Err(err) => {
tracing::warn!(
error = %err,
"comms_drain: failed to inject terminal response"
);
if !dsl_installed {
comms_runtime.mark_interaction_complete(&interaction_id);
}
}
}
} else {
if let Some(handle) = comms_runtime.peer_interaction_handle()
&& let meerkat_core::interaction::InteractionContent::Response {
in_reply_to,
..
} = &candidate.interaction.content
{
let corr_id =
meerkat_core::PeerCorrelationId::from_uuid(in_reply_to.0);
if let Err(err) = handle.response_progress(corr_id) {
tracing::warn!(
error = %err,
corr_id = %corr_id,
"PeerInteractionHandle::response_progress rejected"
);
}
}
let input = match classified_interaction_to_runtime_input(
&candidate,
&runtime_id,
) {
Ok(input) => input,
Err(err) => {
tracing::warn!(
error = %err,
interaction_id = %candidate.interaction.id,
"comms_drain: rejected malformed progress peer ingress"
);
continue;
}
};
if let Err(err) = adapter.accept_input(&session_id, input).await {
tracing::warn!(
error = %err,
"comms_drain: failed to inject progress response"
);
}
}
}
PeerInputClass::SilentRequest
| PeerInputClass::PeerLifecycleKickoffFailed
| PeerInputClass::PeerLifecycleKickoffCancelled
| PeerInputClass::ActionableMessage
| PeerInputClass::ActionableRequest
| PeerInputClass::PlainEvent => {
let interaction_id = candidate.interaction.id;
let is_inbound_peer_request = matches!(
candidate_class,
PeerInputClass::SilentRequest | PeerInputClass::ActionableRequest
);
if is_inbound_peer_request {
let Some(handle) =
comms_runtime.peer_request_response_authority_handle()
else {
tracing::warn!(
interaction_id = %interaction_id,
class = ?candidate_class,
"comms_drain: rejected inbound peer request without complete peer request authority"
);
comms_runtime.mark_interaction_complete(&interaction_id);
continue;
};
let corr_id =
meerkat_core::PeerCorrelationId::from_uuid(interaction_id.0);
if handle.inbound_state(corr_id).is_none()
&& let Err(err) = handle.request_received(corr_id)
{
tracing::warn!(
error = %err,
corr_id = %corr_id,
"PeerInteractionHandle::request_received rejected"
);
comms_runtime.mark_interaction_complete(&interaction_id);
continue;
}
}
let subscriber = comms_runtime.interaction_subscriber(&interaction_id);
let input = match classified_interaction_to_runtime_input(
&candidate,
&runtime_id,
) {
Ok(input) => input,
Err(err) => {
tracing::warn!(
error = %err,
interaction_id = %interaction_id,
"comms_drain: rejected malformed peer ingress"
);
comms_runtime.mark_interaction_complete(&interaction_id);
continue;
}
};
let result = adapter
.accept_input_with_completion(&session_id, input)
.await;
match result {
Ok((_outcome, handle)) => {
if subscriber.is_some() || handle.is_some() {
spawn_completion_bridge(
Some(comms_runtime.clone()),
interaction_id,
subscriber,
handle,
);
} else {
comms_runtime.mark_interaction_complete(&interaction_id);
}
}
Err(err) => {
tracing::warn!(
error = %err,
"comms_drain: failed to accept peer input"
);
comms_runtime.mark_interaction_complete(&interaction_id);
}
}
}
}
}
}
})
}
fn bridge_peer_identity(
peer: &BridgePeerSpec,
context: &str,
) -> Result<BridgePeerIdentity, (BridgeRejectionCause, String)> {
BridgePeerIdentity::try_from(peer).map_err(|error| {
(
BridgeRejectionCause::InvalidSupervisorSpec,
format!("{context}: invalid supervisor peer spec: {error}"),
)
})
}
fn bound_supervisor_peer_id(
peer_id: &str,
context: &str,
) -> Result<PeerId, (BridgeRejectionCause, String)> {
PeerId::parse(peer_id).map_err(|error| {
(
BridgeRejectionCause::Internal,
format!("{context}: invalid bound supervisor peer_id: {error}"),
)
})
}
fn sender_peer_label(sender: &PeerIngressFact) -> String {
sender.diagnostic_label()
}
fn sender_matches_bound_supervisor(sender: &PeerIngressFact, peer_id: &PeerId) -> bool {
sender
.canonical_peer_id
.is_some_and(|sender_peer_id| sender_peer_id == *peer_id)
}
fn sender_matches_bridge_peer(sender: &PeerIngressFact, peer: &BridgePeerIdentity) -> bool {
sender
.canonical_peer_id
.is_some_and(|sender_peer_id| sender_peer_id == peer.peer_id)
|| (!peer.pubkey.is_zero() && sender.signing_pubkey == Some(*peer.pubkey.as_bytes()))
}
fn require_authorized_supervisor(
sender: &PeerIngressFact,
payload: &BridgeSupervisorPayload,
current: &SupervisorBinding,
) -> Result<BridgePeerIdentity, (BridgeRejectionCause, String)> {
let SupervisorBinding::Bound {
peer_id: current_peer_id,
epoch: current_epoch,
..
} = current
else {
return Err((
BridgeRejectionCause::NotBound,
"no authorized supervisor registered".to_string(),
));
};
let supervisor = bridge_peer_identity(&payload.supervisor, "supervisor bridge request")?;
let current_peer_id = bound_supervisor_peer_id(current_peer_id, "supervisor bridge request")?;
if payload.epoch < *current_epoch {
return Err((
BridgeRejectionCause::StaleSupervisor,
format!(
"stale supervisor epoch {} (current {})",
payload.epoch, current_epoch
),
));
}
if payload.epoch != *current_epoch {
return Err((
BridgeRejectionCause::StaleSupervisor,
format!(
"unexpected supervisor epoch {} (current {})",
payload.epoch, current_epoch
),
));
}
if supervisor.peer_id != current_peer_id {
return Err((
BridgeRejectionCause::StaleSupervisor,
format!(
"stale supervisor peer '{}' (current '{current_peer_id}')",
supervisor.peer_id
),
));
}
if !sender_matches_bound_supervisor(sender, ¤t_peer_id) {
let sender_label = sender_peer_label(sender);
return Err((
BridgeRejectionCause::SenderMismatch,
format!(
"request sender '{sender_label}' does not match authorized supervisor '{current_peer_id}'"
),
));
}
Ok(supervisor)
}
fn bridge_capabilities() -> BridgeCapabilities {
BridgeCapabilities {
deliver_member_input: true,
observe_member: true,
interrupt_member: true,
hard_cancel_member: false,
retire_member: true,
destroy_member: true,
wire_member: true,
unwire_member: true,
..BridgeCapabilities::default()
}
}
fn peer_input_from_delivery_payload(
session_id: &SessionId,
sender_peer_id: PeerId,
request_id: meerkat_core::interaction::InteractionId,
payload: BridgeDeliveryPayload,
) -> Input {
let (body, blocks) = match payload.content {
meerkat_core::types::ContentInput::Text(body) => (body, None),
meerkat_core::types::ContentInput::Blocks(blocks) => {
let body = meerkat_core::types::text_content(&blocks);
(body, Some(blocks))
}
};
Input::Peer(PeerInput {
header: InputHeader {
id: meerkat_core::lifecycle::InputId::new(),
timestamp: chrono::Utc::now(),
source: InputOrigin::Peer {
peer_id: sender_peer_id.as_str(),
display_identity: Some(sender_peer_id.as_str()),
runtime_id: Some(MeerkatMachine::logical_runtime_id(session_id)),
},
durability: InputDurability::Durable,
visibility: InputVisibility {
transcript_eligible: true,
operator_eligible: true,
},
idempotency_key: Some(IdempotencyKey::new(payload.input_id)),
supersession_key: None,
correlation_id: Some(crate::identifiers::CorrelationId::from_uuid(request_id.0)),
},
convention: Some(PeerConvention::Message),
body,
payload: None,
blocks,
handling_mode: Some(payload.handling_mode),
})
}
fn bridge_delivery_rejection_cause(
reason: &crate::accept::RejectReason,
) -> BridgeDeliveryRejectionCause {
match reason {
crate::accept::RejectReason::NotReady { state } => BridgeDeliveryRejectionCause::NotReady {
state: runtime_state_to_bridge(*state),
},
crate::accept::RejectReason::DurabilityViolation { detail } => {
BridgeDeliveryRejectionCause::DurabilityViolation {
detail: detail.clone(),
}
}
crate::accept::RejectReason::PeerHandlingModeInvalid { detail } => {
BridgeDeliveryRejectionCause::PeerHandlingModeInvalid {
detail: detail.clone(),
}
}
crate::accept::RejectReason::PeerResponseTerminalInvalid { detail } => {
BridgeDeliveryRejectionCause::Internal {
detail: detail.clone(),
}
}
}
}
fn advertised_bind_bootstrap_token(
comms_runtime: &Arc<dyn CommsRuntime>,
) -> Result<String, (BridgeRejectionCause, String)> {
if let Some(token) = comms_runtime.bridge_bootstrap_token()
&& !token.is_empty()
{
return Ok(token);
}
Err((
BridgeRejectionCause::InvalidBootstrapToken,
"runtime does not expose a typed bridge bootstrap token".to_string(),
))
}
fn validate_bind_request(
comms_runtime: &Arc<dyn CommsRuntime>,
sender: &PeerIngressFact,
payload: &meerkat_contracts::wire::supervisor_bridge::BridgeBindPayload,
) -> Result<(TrustedPeerDescriptor, String), (BridgeRejectionCause, String)> {
let expected_bootstrap_token = advertised_bind_bootstrap_token(comms_runtime)?;
let advertised_address = comms_runtime.advertised_address().ok_or_else(|| {
(
BridgeRejectionCause::Internal,
"runtime does not expose an advertised address for bind bootstrap".to_string(),
)
})?;
if canonicalize_bridge_address(&payload.expected_address)
!= canonicalize_bridge_address(&advertised_address)
{
return Err((
BridgeRejectionCause::AddressMismatch,
format!(
"bind address mismatch: expected '{}', actual '{}'",
payload.expected_address, advertised_address
),
));
}
let supervisor = bridge_peer_identity(&payload.supervisor, "bind member failed")?;
if !sender_matches_bridge_peer(sender, &supervisor) {
let sender_label = sender_peer_label(sender);
return Err((
BridgeRejectionCause::SenderMismatch,
format!(
"request sender '{sender_label}' does not match supervisor '{}'",
supervisor.peer_id
),
));
}
let Some(actual_peer_id) = comms_runtime.peer_id().map(|peer_id| peer_id.as_str()) else {
return Err((
BridgeRejectionCause::Internal,
"bind member failed: runtime peer_id unavailable".to_string(),
));
};
if actual_peer_id != payload.expected_peer_id {
return Err((
BridgeRejectionCause::InvalidPeerSpec,
format!(
"bind peer_id mismatch: expected '{}', actual '{actual_peer_id}'",
payload.expected_peer_id
),
));
}
if payload.bootstrap_token.as_str() != expected_bootstrap_token {
return Err((
BridgeRejectionCause::InvalidBootstrapToken,
"bind member failed: invalid bootstrap token".to_string(),
));
}
Ok((
supervisor.into_trusted_peer_descriptor(),
advertised_address,
))
}
#[derive(Debug)]
enum AuthorizeSupervisorGate {
IdempotentAck,
Proceed,
}
#[derive(Clone, Debug)]
struct BoundSupervisorState {
name: String,
peer_id: String,
address: String,
epoch: u64,
}
impl BoundSupervisorState {
fn from_binding(binding: &SupervisorBinding) -> Option<Self> {
let SupervisorBinding::Bound {
name,
peer_id,
address,
epoch,
} = binding
else {
return None;
};
Some(Self {
name: name.clone(),
peer_id: peer_id.clone(),
address: address.clone(),
epoch: *epoch,
})
}
}
async fn rollback_bind_after_trust_publication_failure(
adapter: &Arc<MeerkatMachine>,
session_id: &SessionId,
peer_id: &str,
epoch: u64,
) -> Result<(), SupervisorBindingStageError> {
adapter
.stage_supervisor_revoke(session_id, peer_id.to_string(), epoch)
.await
}
async fn rollback_authorize_after_trust_publication_failure(
adapter: &Arc<MeerkatMachine>,
session_id: &SessionId,
previous: &BoundSupervisorState,
) -> Result<(), SupervisorBindingStageError> {
adapter
.stage_supervisor_authorize(
session_id,
previous.name.clone(),
previous.peer_id.clone(),
previous.address.clone(),
previous.epoch,
)
.await
.map(|_| ())
}
#[derive(Debug)]
enum BindMemberGate {
Bootstrap,
IdempotentAck,
}
fn validate_bind_request_against_state(
sender: &PeerIngressFact,
payload: &meerkat_contracts::wire::supervisor_bridge::BridgeBindPayload,
current: &SupervisorBinding,
) -> Result<BindMemberGate, (BridgeRejectionCause, String)> {
let SupervisorBinding::Bound {
peer_id: current_peer_id,
epoch: current_epoch,
..
} = current
else {
return Ok(BindMemberGate::Bootstrap);
};
let supervisor = bridge_peer_identity(&payload.supervisor, "bind member failed")?;
let current_peer_id = bound_supervisor_peer_id(current_peer_id, "bind member failed")?;
if supervisor.peer_id != current_peer_id {
return Err((
BridgeRejectionCause::AlreadyBound,
format!(
"bind member failed: supervisor already bound as '{current_peer_id}'; use authorize_supervisor to rotate"
),
));
}
if payload.epoch != *current_epoch {
return Err((
BridgeRejectionCause::AlreadyBound,
format!(
"bind member failed: epoch {} does not match bound supervisor epoch {current_epoch}; use authorize_supervisor to rotate",
payload.epoch
),
));
}
if !sender_matches_bound_supervisor(sender, ¤t_peer_id) {
let sender_label = sender_peer_label(sender);
return Err((
BridgeRejectionCause::SenderMismatch,
format!(
"bind member failed: request sender '{sender_label}' does not match authorized supervisor '{current_peer_id}'"
),
));
}
Ok(BindMemberGate::IdempotentAck)
}
fn validate_authorize_supervisor_request(
sender: &PeerIngressFact,
payload: &BridgeSupervisorPayload,
current: &SupervisorBinding,
) -> Result<AuthorizeSupervisorGate, (BridgeRejectionCause, String)> {
let supervisor = bridge_peer_identity(&payload.supervisor, "authorize supervisor failed")?;
if let SupervisorBinding::Bound {
peer_id: current_peer_id,
epoch: current_epoch,
..
} = current
{
if payload.epoch < *current_epoch {
return Err((
BridgeRejectionCause::StaleSupervisor,
format!(
"authorize supervisor failed: stale supervisor epoch {} (current {current_epoch})",
payload.epoch
),
));
}
let current_peer_id =
bound_supervisor_peer_id(current_peer_id, "authorize supervisor failed")?;
if !sender_matches_bound_supervisor(sender, ¤t_peer_id) {
let sender_label = sender_peer_label(sender);
return Err((
BridgeRejectionCause::SenderMismatch,
format!(
"authorize supervisor failed: request sender '{sender_label}' does not match authorized supervisor '{current_peer_id}'"
),
));
}
if payload.epoch == *current_epoch && supervisor.peer_id == current_peer_id {
return Ok(AuthorizeSupervisorGate::IdempotentAck);
}
return Ok(AuthorizeSupervisorGate::Proceed);
}
if !sender_matches_bridge_peer(sender, &supervisor) {
let sender_label = sender_peer_label(sender);
return Err((
BridgeRejectionCause::SenderMismatch,
format!(
"authorize supervisor failed: request sender '{sender_label}' does not match supervisor '{}'",
supervisor.peer_id
),
));
}
Err((
BridgeRejectionCause::NotBound,
"authorize supervisor failed: use bind_member to establish initial supervisor authority"
.to_string(),
))
}
async fn send_bridge_response(
comms_runtime: &Arc<dyn CommsRuntime>,
candidate: &PeerInputCandidate,
status: meerkat_core::interaction::ResponseStatus,
reply: BridgeReply,
) {
let result = serde_json::to_value(&reply).unwrap_or_else(|error| {
tracing::error!(
interaction_id = %candidate.interaction.id,
error = %error,
"comms_drain: BridgeReply serialization failed; falling back to minimal rejection"
);
serde_json::json!({
"result": "rejected",
"cause": "internal",
"reason": "bridge reply serialization failed",
})
});
let to = match resolve_bridge_response_route(comms_runtime, candidate).await {
Some(route) => route,
None => {
tracing::warn!(
from = %candidate.ingress.diagnostic_label(),
interaction_id = %candidate.interaction.id,
"comms_drain: failed to resolve bridge response peer route"
);
comms_runtime.mark_interaction_complete(&candidate.interaction.id);
return;
}
};
if let Err(error) = comms_runtime
.send(CommsCommand::PeerResponse {
to,
in_reply_to: candidate.interaction.id,
status,
result,
blocks: None,
handling_mode: None,
})
.await
{
tracing::warn!(
from = %candidate.ingress.diagnostic_label(),
interaction_id = %candidate.interaction.id,
error = %error,
"comms_drain: failed to send bridge response"
);
}
comms_runtime.mark_interaction_complete(&candidate.interaction.id);
}
async fn resolve_bridge_response_route(
comms_runtime: &Arc<dyn CommsRuntime>,
candidate: &PeerInputCandidate,
) -> Option<PeerRoute> {
if let Some(sender_route) = candidate.ingress.route.clone() {
if let Some(route) = resolve_peer_route(comms_runtime, sender_route.peer_id).await {
return Some(route);
}
return Some(sender_route);
}
if let Some(sender_peer_id) = candidate.ingress.canonical_peer_id {
return resolve_peer_route(comms_runtime, sender_peer_id)
.await
.or_else(|| Some(PeerRoute::new(sender_peer_id)));
}
None
}
async fn resolve_peer_route(
comms_runtime: &Arc<dyn CommsRuntime>,
peer_id: PeerId,
) -> Option<PeerRoute> {
let peers = comms_runtime.peers().await;
peers
.iter()
.find(|entry| entry.peer_id == peer_id)
.map(|entry| PeerRoute::with_display_name(entry.peer_id, entry.name.clone()))
}
fn record_bridge_inbound_peer_request(
comms_runtime: &Arc<dyn CommsRuntime>,
candidate: &PeerInputCandidate,
) -> bool {
let Some(handle) = comms_runtime.peer_request_response_authority_handle() else {
tracing::warn!(
interaction_id = %candidate.interaction.id,
"comms_drain: rejected supervisor bridge request without complete peer request authority"
);
comms_runtime.mark_interaction_complete(&candidate.interaction.id);
return false;
};
let corr_id = meerkat_core::PeerCorrelationId::from_uuid(candidate.interaction.id.0);
if handle.inbound_state(corr_id).is_some() {
return true;
}
if let Err(err) = handle.request_received(corr_id) {
tracing::warn!(
error = %err,
corr_id = %corr_id,
"PeerInteractionHandle::request_received rejected for supervisor bridge command"
);
comms_runtime.mark_interaction_complete(&candidate.interaction.id);
return false;
}
true
}
async fn send_bridge_failure(
comms_runtime: &Arc<dyn CommsRuntime>,
candidate: &PeerInputCandidate,
cause: BridgeRejectionCause,
message: impl Into<String>,
) {
send_bridge_response(
comms_runtime,
candidate,
meerkat_core::interaction::ResponseStatus::Failed,
BridgeReply::Rejected {
cause,
reason: message.into(),
},
)
.await;
}
async fn try_handle_supervisor_bridge_command(
adapter: &Arc<MeerkatMachine>,
session_id: &SessionId,
comms_runtime: &Arc<dyn CommsRuntime>,
candidate: &PeerInputCandidate,
) -> bool {
let InteractionContent::Request { intent, params, .. } = &candidate.interaction.content else {
return false;
};
if intent != SUPERVISOR_BRIDGE_INTENT {
return false;
}
if !record_bridge_inbound_peer_request(comms_runtime, candidate) {
return true;
}
let command: BridgeCommand = match decode_bridge_command(params.clone()) {
Ok(cmd) => cmd,
Err(error) => {
let cause = match &error {
meerkat_contracts::wire::supervisor_bridge::BridgeCommandDecodeError::UnsupportedProtocolVersion(_) => {
BridgeRejectionCause::UnsupportedProtocolVersion
}
meerkat_contracts::wire::supervisor_bridge::BridgeCommandDecodeError::Invalid(_) => {
BridgeRejectionCause::Unsupported
}
};
send_bridge_failure(
comms_runtime,
candidate,
cause,
format!("invalid bridge command: {error}"),
)
.await;
return true;
}
};
let sender = &candidate.ingress;
let current_binding = adapter.supervisor_binding(session_id).await;
match command {
BridgeCommand::BindMember(payload) => {
let gate = match validate_bind_request_against_state(sender, &payload, ¤t_binding)
{
Ok(gate) => gate,
Err((cause, reason)) => {
send_bridge_failure(comms_runtime, candidate, cause, reason).await;
return true;
}
};
match gate {
BindMemberGate::IdempotentAck => {
let Some(advertised) = comms_runtime.advertised_address() else {
send_bridge_failure(
comms_runtime,
candidate,
BridgeRejectionCause::Internal,
"idempotent ack invariant violated",
)
.await;
return true;
};
let Some(peer_id) = comms_runtime.peer_id().map(|peer_id| peer_id.as_str())
else {
send_bridge_failure(
comms_runtime,
candidate,
BridgeRejectionCause::Internal,
"idempotent ack invariant violated",
)
.await;
return true;
};
send_bridge_response(
comms_runtime,
candidate,
meerkat_core::interaction::ResponseStatus::Completed,
BridgeReply::BindMember(BridgeBindResponse {
peer_id,
address: canonicalize_bridge_address(&advertised),
capabilities: bridge_capabilities(),
}),
)
.await;
return true;
}
BindMemberGate::Bootstrap => {}
}
let (supervisor_spec, advertised_address) =
match validate_bind_request(comms_runtime, sender, &payload) {
Ok(binding) => binding,
Err((cause, reason)) => {
send_bridge_failure(comms_runtime, candidate, cause, reason).await;
return true;
}
};
let Some(peer_id) = comms_runtime.peer_id().map(|peer_id| peer_id.as_str()) else {
send_bridge_failure(
comms_runtime,
candidate,
BridgeRejectionCause::Internal,
"bind member failed: runtime peer_id unavailable",
)
.await;
return true;
};
if let Err(error) = adapter
.stage_supervisor_bind(
session_id,
supervisor_spec.name.as_str().to_owned(),
supervisor_spec.peer_id.as_str(),
advertised_address.clone(),
payload.epoch,
)
.await
{
send_bridge_failure(
comms_runtime,
candidate,
BridgeRejectionCause::Internal,
format!("bind member failed: DSL rejected binding: {error}"),
)
.await;
return true;
}
if let Err(error) = comms_runtime
.add_trusted_peer(supervisor_spec.clone())
.await
{
let peer_id_str = supervisor_spec.peer_id.as_str();
let reason = match rollback_bind_after_trust_publication_failure(
adapter,
session_id,
&peer_id_str,
payload.epoch,
)
.await
{
Ok(()) => format!(
"bind member failed: trust publication failed after DSL commit: {error}"
),
Err(rollback_error) => format!(
"bind member failed: trust publication failed after DSL commit: {error}; rollback failed: {rollback_error}"
),
};
send_bridge_failure(
comms_runtime,
candidate,
BridgeRejectionCause::Internal,
reason,
)
.await;
return true;
}
if let Err(error) = adapter
.stage_supervisor_trust_published(
session_id,
supervisor_spec.peer_id.as_str(),
payload.epoch,
)
.await
{
tracing::debug!(
%session_id,
epoch = payload.epoch,
%error,
"supervisor_trust_publish ack rejected by DSL (binding rotated?)"
);
}
send_bridge_response(
comms_runtime,
candidate,
meerkat_core::interaction::ResponseStatus::Completed,
BridgeReply::BindMember(BridgeBindResponse {
peer_id,
address: canonicalize_bridge_address(&advertised_address),
capabilities: bridge_capabilities(),
}),
)
.await;
true
}
BridgeCommand::AuthorizeSupervisor(payload) => {
match validate_authorize_supervisor_request(sender, &payload, ¤t_binding) {
Ok(AuthorizeSupervisorGate::IdempotentAck) => {
send_bridge_response(
comms_runtime,
candidate,
meerkat_core::interaction::ResponseStatus::Completed,
BridgeReply::Ack(BridgeAck { ok: true }),
)
.await;
return true;
}
Ok(AuthorizeSupervisorGate::Proceed) => {}
Err((cause, reason)) => {
send_bridge_failure(comms_runtime, candidate, cause, reason).await;
return true;
}
}
let Some(previous_binding) = BoundSupervisorState::from_binding(¤t_binding)
else {
send_bridge_failure(
comms_runtime,
candidate,
BridgeRejectionCause::Internal,
"authorize supervisor failed: missing current binding",
)
.await;
return true;
};
let supervisor_spec = match TrustedPeerDescriptor::try_from(payload.supervisor.clone())
{
Ok(spec) => spec,
Err(error) => {
send_bridge_failure(
comms_runtime,
candidate,
BridgeRejectionCause::InvalidSupervisorSpec,
format!(
"authorize supervisor failed: invalid supervisor peer spec: {error}"
),
)
.await;
return true;
}
};
if let Err(error) = adapter
.stage_supervisor_authorize(
session_id,
supervisor_spec.name.as_str().to_owned(),
supervisor_spec.peer_id.as_str(),
supervisor_spec.address.to_string(),
payload.epoch,
)
.await
{
send_bridge_failure(
comms_runtime,
candidate,
BridgeRejectionCause::Internal,
format!("authorize supervisor failed: DSL rejected rotation: {error}"),
)
.await;
return true;
}
if let Err(error) = comms_runtime
.add_trusted_peer(supervisor_spec.clone())
.await
{
let reason = match rollback_authorize_after_trust_publication_failure(
adapter,
session_id,
&previous_binding,
)
.await
{
Ok(()) => format!(
"authorize supervisor failed: trust publication failed after DSL commit: {error}"
),
Err(rollback_error) => format!(
"authorize supervisor failed: trust publication failed after DSL commit: {error}; rollback failed: {rollback_error}"
),
};
send_bridge_failure(
comms_runtime,
candidate,
BridgeRejectionCause::Internal,
reason,
)
.await;
return true;
}
if let Err(error) = adapter
.stage_supervisor_trust_published(
session_id,
supervisor_spec.peer_id.as_str(),
payload.epoch,
)
.await
{
tracing::debug!(
%session_id,
epoch = payload.epoch,
%error,
"supervisor_trust_publish ack rejected by DSL (binding rotated?)"
);
}
if previous_binding.peer_id != payload.supervisor.peer_id
&& let Err(error) = comms_runtime
.remove_trusted_peer(&previous_binding.peer_id)
.await
{
let rollback_result = rollback_authorize_after_trust_publication_failure(
adapter,
session_id,
&previous_binding,
)
.await;
let supervisor_peer_id_str = supervisor_spec.peer_id.as_str();
let cleanup_result = comms_runtime
.remove_trusted_peer(&supervisor_peer_id_str)
.await;
let mut reason = format!(
"authorize supervisor failed: previous supervisor trust removal failed after DSL commit: {error}"
);
if let Err(rollback_error) = rollback_result {
reason.push_str(&format!("; rollback failed: {rollback_error}"));
}
if let Err(cleanup_error) = cleanup_result {
reason.push_str(&format!(
"; cleanup failed while removing new supervisor trust: {cleanup_error}"
));
}
send_bridge_failure(
comms_runtime,
candidate,
BridgeRejectionCause::Internal,
reason,
)
.await;
return true;
}
send_bridge_response(
comms_runtime,
candidate,
meerkat_core::interaction::ResponseStatus::Completed,
BridgeReply::Ack(BridgeAck { ok: true }),
)
.await;
true
}
BridgeCommand::RevokeSupervisor(payload) => {
let authorized_supervisor =
match require_authorized_supervisor(sender, &payload, ¤t_binding) {
Ok(supervisor) => supervisor,
Err((cause, reason)) => {
send_bridge_failure(comms_runtime, candidate, cause, reason).await;
return true;
}
};
let supervisor_peer_id = authorized_supervisor.peer_id.as_str();
if let Err(error) = comms_runtime.remove_trusted_peer(&supervisor_peer_id).await {
send_bridge_failure(
comms_runtime,
candidate,
BridgeRejectionCause::Internal,
format!("revoke supervisor failed: {error}"),
)
.await;
return true;
}
if let Err(error) = adapter
.stage_supervisor_trust_revoked(
session_id,
supervisor_peer_id.clone(),
payload.epoch,
)
.await
{
tracing::debug!(
%session_id,
epoch = payload.epoch,
%error,
"supervisor_trust_revoke ack rejected by DSL (binding rotated?)"
);
}
if let Err(error) = adapter
.stage_supervisor_revoke(session_id, supervisor_peer_id, payload.epoch)
.await
{
send_bridge_failure(
comms_runtime,
candidate,
BridgeRejectionCause::Internal,
format!("revoke supervisor failed: DSL rejected revoke: {error}"),
)
.await;
return true;
}
send_bridge_response(
comms_runtime,
candidate,
meerkat_core::interaction::ResponseStatus::Completed,
BridgeReply::Ack(BridgeAck { ok: true }),
)
.await;
true
}
BridgeCommand::DeliverMemberInput(payload) => {
let sup_payload = BridgeSupervisorPayload {
supervisor: payload.supervisor.clone(),
epoch: payload.epoch,
protocol_version: payload.protocol_version,
};
let authorized_supervisor =
match require_authorized_supervisor(sender, &sup_payload, ¤t_binding) {
Ok(supervisor) => supervisor,
Err((cause, reason)) => {
send_bridge_failure(comms_runtime, candidate, cause, reason).await;
return true;
}
};
let request_input_id = payload.input_id.clone();
let input = peer_input_from_delivery_payload(
session_id,
authorized_supervisor.peer_id,
candidate.interaction.id,
payload,
);
match adapter.accept_input(session_id, input).await {
Ok(outcome) => {
let response = match outcome {
crate::accept::AcceptOutcome::Accepted { input_id, .. } => {
BridgeDeliveryResponse {
input_id: request_input_id,
canonical_input_id: Some(input_id.to_string()),
outcome: BridgeDeliveryOutcome::Accepted,
}
}
crate::accept::AcceptOutcome::Deduplicated { existing_id, .. } => {
let existing_id = existing_id.to_string();
BridgeDeliveryResponse {
input_id: request_input_id,
canonical_input_id: Some(existing_id.clone()),
outcome: BridgeDeliveryOutcome::Deduplicated {
existing_input_id: existing_id,
},
}
}
crate::accept::AcceptOutcome::Rejected { reason } => {
let cause = bridge_delivery_rejection_cause(&reason);
BridgeDeliveryResponse {
input_id: request_input_id,
canonical_input_id: None,
outcome: BridgeDeliveryOutcome::Rejected {
cause,
reason: reason.to_string(),
},
}
}
};
send_bridge_response(
comms_runtime,
candidate,
meerkat_core::interaction::ResponseStatus::Completed,
BridgeReply::Delivery(response),
)
.await;
}
Err(error) => {
send_bridge_failure(
comms_runtime,
candidate,
BridgeRejectionCause::Internal,
format!("deliver member input failed: {error}"),
)
.await;
}
}
true
}
BridgeCommand::InterruptMember(payload) => {
if let Err((cause, reason)) =
require_authorized_supervisor(sender, &payload, ¤t_binding)
{
send_bridge_failure(comms_runtime, candidate, cause, reason).await;
return true;
}
match adapter.cancel_after_boundary(session_id).await {
Ok(()) => {
send_bridge_response(
comms_runtime,
candidate,
meerkat_core::interaction::ResponseStatus::Completed,
BridgeReply::Ack(BridgeAck { ok: true }),
)
.await;
}
Err(error) => {
send_bridge_failure(
comms_runtime,
candidate,
BridgeRejectionCause::Internal,
format!("interrupt member failed: {error}"),
)
.await;
}
}
true
}
BridgeCommand::RetireMember(payload) => {
if let Err((cause, reason)) =
require_authorized_supervisor(sender, &payload, ¤t_binding)
{
send_bridge_failure(comms_runtime, candidate, cause, reason).await;
return true;
}
match adapter.retire_runtime(session_id).await {
Ok(report) => {
send_bridge_response(
comms_runtime,
candidate,
meerkat_core::interaction::ResponseStatus::Completed,
BridgeReply::Retire(BridgeRetireResponse {
inputs_abandoned: report.inputs_abandoned,
inputs_pending_drain: report.inputs_pending_drain,
}),
)
.await;
}
Err(error) => {
send_bridge_failure(
comms_runtime,
candidate,
BridgeRejectionCause::Internal,
format!("retire member failed: {error}"),
)
.await;
}
}
true
}
BridgeCommand::DestroyMember(payload) => {
if let Err((cause, reason)) =
require_authorized_supervisor(sender, &payload, ¤t_binding)
{
send_bridge_failure(comms_runtime, candidate, cause, reason).await;
return true;
}
let runtime_id = MeerkatMachine::logical_runtime_id(session_id);
match RuntimeControlPlane::destroy(adapter.as_ref(), &runtime_id).await {
Ok(report) => {
send_bridge_response(
comms_runtime,
candidate,
meerkat_core::interaction::ResponseStatus::Completed,
BridgeReply::Destroy(BridgeDestroyResponse {
inputs_abandoned: report.inputs_abandoned,
}),
)
.await;
}
Err(error) => {
send_bridge_failure(
comms_runtime,
candidate,
BridgeRejectionCause::Internal,
format!("destroy member failed: {error}"),
)
.await;
}
}
true
}
BridgeCommand::ObserveMember(payload) => {
if let Err((cause, reason)) =
require_authorized_supervisor(sender, &payload, ¤t_binding)
{
send_bridge_failure(comms_runtime, candidate, cause, reason).await;
return true;
}
match crate::service_ext::SessionServiceRuntimeExt::runtime_state(
adapter.as_ref(),
session_id,
)
.await
{
Ok(state) => {
let current_run_id = adapter
.meerkat_machine_spine_snapshot(session_id)
.await
.and_then(|snapshot| {
snapshot
.control
.current_run_id
.map(|run_id| run_id.to_string())
});
let bridge_state = runtime_state_to_bridge(state);
send_bridge_response(
comms_runtime,
candidate,
meerkat_core::interaction::ResponseStatus::Completed,
BridgeReply::Observation(BridgeObservationResponse::new(
bridge_state,
Some(state.can_accept_input()),
current_run_id,
Some(BridgePeerConnectivity::Reachable),
None,
chrono::Utc::now().to_rfc3339(),
)),
)
.await;
}
Err(error) => {
send_bridge_failure(
comms_runtime,
candidate,
BridgeRejectionCause::Internal,
format!("observe member failed: {error}"),
)
.await;
}
}
true
}
BridgeCommand::WireMember(payload) => {
let sup_payload = BridgeSupervisorPayload {
supervisor: payload.supervisor.clone(),
epoch: payload.epoch,
protocol_version: payload.protocol_version,
};
if let Err((cause, reason)) =
require_authorized_supervisor(sender, &sup_payload, ¤t_binding)
{
send_bridge_failure(comms_runtime, candidate, cause, reason).await;
return true;
}
let peer_spec = match TrustedPeerDescriptor::try_from(payload.peer_spec) {
Ok(spec) => spec,
Err(error) => {
send_bridge_failure(
comms_runtime,
candidate,
BridgeRejectionCause::InvalidPeerSpec,
format!("wire member failed: invalid trusted peer spec: {error}"),
)
.await;
return true;
}
};
let endpoint = crate::meerkat_machine::dsl::PeerEndpoint::from(&peer_spec);
match adapter
.stage_add_direct_peer_endpoint(session_id, endpoint, Arc::clone(comms_runtime))
.await
{
Ok(()) => {
send_bridge_response(
comms_runtime,
candidate,
meerkat_core::interaction::ResponseStatus::Completed,
BridgeReply::Ack(BridgeAck { ok: true }),
)
.await;
}
Err(error) => {
send_bridge_failure(
comms_runtime,
candidate,
BridgeRejectionCause::Internal,
format!("wire member failed: {error}"),
)
.await;
}
}
true
}
BridgeCommand::UnwireMember(payload) => {
let sup_payload = BridgeSupervisorPayload {
supervisor: payload.supervisor.clone(),
epoch: payload.epoch,
protocol_version: payload.protocol_version,
};
if let Err((cause, reason)) =
require_authorized_supervisor(sender, &sup_payload, ¤t_binding)
{
send_bridge_failure(comms_runtime, candidate, cause, reason).await;
return true;
}
let peer_spec = match TrustedPeerDescriptor::try_from(payload.peer_spec) {
Ok(spec) => spec,
Err(error) => {
send_bridge_failure(
comms_runtime,
candidate,
BridgeRejectionCause::InvalidPeerSpec,
format!("unwire member failed: invalid trusted peer spec: {error}"),
)
.await;
return true;
}
};
let endpoint = crate::meerkat_machine::dsl::PeerEndpoint::from(&peer_spec);
match adapter
.stage_remove_direct_peer_endpoint(session_id, endpoint, Arc::clone(comms_runtime))
.await
{
Ok(()) => {
send_bridge_response(
comms_runtime,
candidate,
meerkat_core::interaction::ResponseStatus::Completed,
BridgeReply::Ack(BridgeAck { ok: true }),
)
.await;
}
Err(error) => {
send_bridge_failure(
comms_runtime,
candidate,
BridgeRejectionCause::Internal,
format!("unwire member failed: {error}"),
)
.await;
}
}
true
}
_ => {
send_bridge_failure(
comms_runtime,
candidate,
BridgeRejectionCause::Unsupported,
"unsupported supervisor bridge command".to_string(),
)
.await;
true
}
}
}
fn runtime_state_to_bridge(state: crate::RuntimeState) -> BridgeMemberRuntimeState {
match state {
crate::RuntimeState::Initializing => BridgeMemberRuntimeState::Initializing,
crate::RuntimeState::Idle => BridgeMemberRuntimeState::Idle,
crate::RuntimeState::Attached => BridgeMemberRuntimeState::Attached,
crate::RuntimeState::Running => BridgeMemberRuntimeState::Running,
crate::RuntimeState::Retired => BridgeMemberRuntimeState::Retired,
crate::RuntimeState::Stopped => BridgeMemberRuntimeState::Stopped,
crate::RuntimeState::Destroyed => BridgeMemberRuntimeState::Destroyed,
}
}
fn interaction_terminal_event(
interaction_id: meerkat_core::interaction::InteractionId,
outcome: CompletionOutcome,
) -> AgentEvent {
match outcome {
CompletionOutcome::Completed(result) => AgentEvent::InteractionComplete {
interaction_id,
result: result.text,
structured_output: result.structured_output,
},
CompletionOutcome::CompletedWithoutResult => AgentEvent::InteractionComplete {
interaction_id,
result: String::new(),
structured_output: None,
},
CompletionOutcome::CallbackPending { tool_name, args } => {
AgentEvent::InteractionCallbackPending {
interaction_id,
tool_name,
args,
}
}
CompletionOutcome::Cancelled => AgentEvent::InteractionFailed {
interaction_id,
error: "cancelled".to_string(),
},
CompletionOutcome::Abandoned(reason)
| CompletionOutcome::AbandonedWithError { reason, .. }
| CompletionOutcome::RuntimeTerminated(reason) => AgentEvent::InteractionFailed {
interaction_id,
error: reason,
},
CompletionOutcome::CompletedWithFinalizationFailure { error, .. } => {
AgentEvent::InteractionFailed {
interaction_id,
error: error
.detail
.unwrap_or_else(|| "turn finalization failed".to_string()),
}
}
}
}
#[cfg(test)]
#[allow(clippy::items_after_test_module)]
#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
mod tests {
use super::*;
use meerkat_contracts::BridgePeerWiringPayload;
use meerkat_contracts::wire::supervisor_bridge::{
supervisor_bridge_current_protocol_version, supervisor_bridge_default_protocol_version,
supervisor_bridge_supported_protocol_versions,
};
use meerkat_core::InteractionId;
use meerkat_core::SendError;
use meerkat_core::interaction::InboxInteraction;
use meerkat_core::interaction::{PeerIngressConvention, PeerIngressIdentity};
use meerkat_core::types::HandlingMode;
use serde_json::json;
use std::collections::{HashMap, HashSet};
use uuid::Uuid;
const PEER_ID_RECEIVER: &str = "00000000-0000-0000-0000-00000000aaaa"; const PEER_ID_SUPERVISOR: &str = "00000000-0000-0000-0000-00000000bbbb"; const PEER_ID_OLD_SUPERVISOR: &str = "00000000-0000-0000-0000-00000000dddd";
fn test_pubkey(seed: u8) -> meerkat_comms::PubKey {
assert_ne!(seed, 0, "test pubkey seed must be non-zero");
meerkat_comms::PubKey::new([seed; 32])
}
fn bridge_peer_spec_with_seed(name: &str, seed: u8, address: &str) -> BridgePeerSpec {
let pubkey = test_pubkey(seed);
BridgePeerSpec {
name: name.to_string(),
peer_id: pubkey.to_peer_id().as_str(),
address: address.to_string(),
pubkey: *pubkey.as_bytes(),
}
}
fn supervisor_bridge_spec() -> BridgePeerSpec {
bridge_peer_spec_with_seed(
"mob/__mob_supervisor__",
0xbb,
"inproc://mob/__mob_supervisor__",
)
}
fn current_supervisor_bridge_spec() -> BridgePeerSpec {
bridge_peer_spec_with_seed(
"mob/__mob_supervisor__",
0xcc,
"inproc://mob/__mob_supervisor__",
)
}
fn old_supervisor_bridge_spec() -> BridgePeerSpec {
bridge_peer_spec_with_seed(
"mob/__mob_supervisor__",
0xdd,
"inproc://mob/__mob_supervisor__",
)
}
fn trusted_supervisor_descriptor(seed: u8) -> TrustedPeerDescriptor {
TrustedPeerDescriptor::try_from(bridge_peer_spec_with_seed(
"mob/__mob_supervisor__",
seed,
"inproc://mob/__mob_supervisor__",
))
.expect("valid supervisor spec")
}
fn trusted_peer_from_runtime(
name: &str,
runtime: &meerkat_comms::CommsRuntime,
) -> TrustedPeerDescriptor {
let pubkey = runtime.public_key();
TrustedPeerDescriptor::unsigned_with_pubkey(
name,
pubkey.to_peer_id().as_str(),
*pubkey.as_bytes(),
format!("inproc://{name}"),
)
.expect("valid non-zero trusted peer descriptor")
}
#[derive(Default)]
struct CountingPeerInteractionHandle {
inbound: std::sync::Mutex<HashSet<meerkat_core::PeerCorrelationId>>,
request_received_count: std::sync::atomic::AtomicUsize,
response_replied_count: std::sync::atomic::AtomicUsize,
reject_request_received: std::sync::atomic::AtomicBool,
}
impl CountingPeerInteractionHandle {
fn rejecting_request_received() -> Self {
let handle = Self::default();
handle
.reject_request_received
.store(true, std::sync::atomic::Ordering::SeqCst);
handle
}
fn rejected(
context: &'static str,
corr_id: meerkat_core::PeerCorrelationId,
) -> meerkat_core::handles::DslTransitionError {
meerkat_core::handles::DslTransitionError::guard_rejected(
context,
format!("test authority rejected corr_id {corr_id}"),
)
}
fn request_received_count(&self) -> usize {
self.request_received_count
.load(std::sync::atomic::Ordering::SeqCst)
}
fn response_replied_count(&self) -> usize {
self.response_replied_count
.load(std::sync::atomic::Ordering::SeqCst)
}
}
impl meerkat_core::handles::PeerInteractionHandle for CountingPeerInteractionHandle {
fn request_sent(
&self,
_corr_id: meerkat_core::PeerCorrelationId,
_to: String,
) -> Result<(), meerkat_core::handles::DslTransitionError> {
Ok(())
}
fn response_progress(
&self,
_corr_id: meerkat_core::PeerCorrelationId,
) -> Result<(), meerkat_core::handles::DslTransitionError> {
Ok(())
}
fn response_terminal(
&self,
_corr_id: meerkat_core::PeerCorrelationId,
_disposition: meerkat_core::handles::PeerTerminalDisposition,
) -> Result<(), meerkat_core::handles::DslTransitionError> {
Ok(())
}
fn request_timed_out(
&self,
_corr_id: meerkat_core::PeerCorrelationId,
) -> Result<(), meerkat_core::handles::DslTransitionError> {
Ok(())
}
fn request_received(
&self,
corr_id: meerkat_core::PeerCorrelationId,
) -> Result<(), meerkat_core::handles::DslTransitionError> {
if self
.reject_request_received
.load(std::sync::atomic::Ordering::SeqCst)
{
return Err(Self::rejected(
"PeerInteractionHandle::request_received",
corr_id,
));
}
let mut inbound = self.inbound.lock().expect("inbound mutex");
if !inbound.insert(corr_id) {
return Err(Self::rejected(
"PeerInteractionHandle::request_received",
corr_id,
));
}
self.request_received_count
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
Ok(())
}
fn response_replied(
&self,
corr_id: meerkat_core::PeerCorrelationId,
) -> Result<(), meerkat_core::handles::DslTransitionError> {
let mut inbound = self.inbound.lock().expect("inbound mutex");
if !inbound.remove(&corr_id) {
return Err(Self::rejected(
"PeerInteractionHandle::response_replied",
corr_id,
));
}
self.response_replied_count
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
Ok(())
}
fn outbound_state(
&self,
_corr_id: meerkat_core::PeerCorrelationId,
) -> Option<meerkat_core::OutboundPeerRequestState> {
None
}
fn inbound_state(
&self,
corr_id: meerkat_core::PeerCorrelationId,
) -> Option<meerkat_core::InboundPeerRequestState> {
self.inbound
.lock()
.expect("inbound mutex")
.contains(&corr_id)
.then_some(meerkat_core::InboundPeerRequestState::Received)
}
fn install_cleanup_observer(
&self,
_observer: Arc<dyn meerkat_core::handles::PeerInteractionCleanupObserver>,
) {
}
}
struct OneShotPeerRequestRuntime {
notify: Arc<tokio::sync::Notify>,
candidate: std::sync::Mutex<Option<PeerInputCandidate>>,
peer_handle: Option<Arc<dyn meerkat_core::handles::PeerInteractionHandle>>,
peer_request_response_handle: Option<Arc<dyn meerkat_core::handles::PeerInteractionHandle>>,
completed_count: std::sync::atomic::AtomicUsize,
}
impl OneShotPeerRequestRuntime {
fn new(
candidate: PeerInputCandidate,
peer_handle: Option<Arc<dyn meerkat_core::handles::PeerInteractionHandle>>,
) -> Self {
Self {
notify: Arc::new(tokio::sync::Notify::new()),
candidate: std::sync::Mutex::new(Some(candidate)),
peer_handle,
peer_request_response_handle: None,
completed_count: std::sync::atomic::AtomicUsize::new(0),
}
}
fn with_complete_authority(
candidate: PeerInputCandidate,
peer_handle: Arc<dyn meerkat_core::handles::PeerInteractionHandle>,
) -> Self {
Self {
notify: Arc::new(tokio::sync::Notify::new()),
candidate: std::sync::Mutex::new(Some(candidate)),
peer_handle: Some(peer_handle.clone()),
peer_request_response_handle: Some(peer_handle),
completed_count: std::sync::atomic::AtomicUsize::new(0),
}
}
fn completed_count(&self) -> usize {
self.completed_count
.load(std::sync::atomic::Ordering::SeqCst)
}
}
#[async_trait::async_trait]
impl CommsRuntime for OneShotPeerRequestRuntime {
async fn drain_messages(&self) -> Vec<String> {
Vec::new()
}
fn inbox_notify(&self) -> Arc<tokio::sync::Notify> {
self.notify.clone()
}
fn dismiss_received(&self) -> bool {
self.candidate.lock().expect("candidate mutex").is_none()
}
fn peer_interaction_handle(
&self,
) -> Option<Arc<dyn meerkat_core::handles::PeerInteractionHandle>> {
self.peer_handle.clone()
}
fn peer_request_response_authority_handle(
&self,
) -> Option<Arc<dyn meerkat_core::handles::PeerInteractionHandle>> {
self.peer_request_response_handle.clone()
}
async fn drain_peer_input_candidates(&self) -> Vec<PeerInputCandidate> {
self.candidate
.lock()
.expect("candidate mutex")
.take()
.into_iter()
.collect()
}
fn mark_interaction_complete(&self, _id: &InteractionId) {
self.completed_count
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
}
}
fn inbound_peer_request_candidate(class: PeerInputClass) -> PeerInputCandidate {
assert!(
matches!(
class,
PeerInputClass::SilentRequest | PeerInputClass::ActionableRequest
),
"test helper only builds inbound peer requests"
);
let id = InteractionId(Uuid::new_v4());
let intent = format!("test.inbound.{class:?}");
let sender = "partial-authority-peer";
PeerInputCandidate {
interaction: InboxInteraction {
id,
from_route: None,
from: sender.to_string(),
content: InteractionContent::Request {
intent: intent.clone(),
params: json!({ "ok": true }),
blocks: None,
},
rendered_text: String::new(),
handling_mode: HandlingMode::Queue,
render_metadata: None,
},
ingress: PeerIngressFact::peer(
id,
class,
meerkat_core::PeerIngressKind::Request,
Some(meerkat_core::PeerIngressAuthDecision::Required),
PeerIngressIdentity::new(
PeerId::new(),
sender,
PeerIngressConvention::Request {
request_id: id.to_string(),
intent,
},
),
),
lifecycle_peer: None,
response_terminality: None,
}
}
#[tokio::test]
async fn rejected_peer_request_recording_rejects_inbound_request_before_machine_admission() {
for class in [
PeerInputClass::SilentRequest,
PeerInputClass::ActionableRequest,
] {
let adapter = Arc::new(MeerkatMachine::ephemeral());
let session_id = SessionId::new();
adapter.register_session(session_id.clone()).await;
let peer_handle = Arc::new(CountingPeerInteractionHandle::rejecting_request_received());
let peer_authority: Arc<dyn meerkat_core::handles::PeerInteractionHandle> = peer_handle;
let runtime = Arc::new(OneShotPeerRequestRuntime::with_complete_authority(
inbound_peer_request_candidate(class),
peer_authority,
));
let drain = spawn_comms_drain(
adapter.clone(),
session_id.clone(),
runtime.clone(),
Some(Duration::from_millis(10)),
);
tokio::time::timeout(Duration::from_secs(1), drain)
.await
.expect("drain should exit after one candidate")
.expect("drain task should not panic");
let snapshot = adapter
.meerkat_machine_spine_snapshot(&session_id)
.await
.expect("registered session snapshot");
assert_eq!(
snapshot.ledger.input_count, 0,
"rejected PeerRequestReceived must not admit {class:?} into machine work"
);
assert_eq!(
runtime.completed_count(),
1,
"rejected {class:?} should be closed at the comms boundary"
);
}
}
#[tokio::test]
async fn incomplete_peer_request_authority_rejects_inbound_request_before_machine_admission() {
for class in [
PeerInputClass::SilentRequest,
PeerInputClass::ActionableRequest,
] {
let adapter = Arc::new(MeerkatMachine::ephemeral());
let session_id = SessionId::new();
adapter.register_session(session_id.clone()).await;
let peer_handle = Arc::new(CountingPeerInteractionHandle::default());
let peer_authority: Arc<dyn meerkat_core::handles::PeerInteractionHandle> =
peer_handle.clone();
let runtime = Arc::new(OneShotPeerRequestRuntime::new(
inbound_peer_request_candidate(class),
Some(peer_authority),
));
let drain = spawn_comms_drain(
adapter.clone(),
session_id.clone(),
runtime.clone(),
Some(Duration::from_millis(10)),
);
tokio::time::timeout(Duration::from_secs(1), drain)
.await
.expect("drain should exit after one candidate")
.expect("drain task should not panic");
let snapshot = adapter
.meerkat_machine_spine_snapshot(&session_id)
.await
.expect("registered session snapshot");
assert_eq!(
peer_handle.request_received_count(),
0,
"partial authority must not fire PeerRequestReceived for {class:?}"
);
assert_eq!(
snapshot.ledger.input_count, 0,
"partial authority must not admit {class:?} into machine work"
);
assert_eq!(
runtime.completed_count(),
1,
"rejected {class:?} should be closed at the comms boundary"
);
}
}
#[tokio::test]
async fn absent_peer_request_authority_rejects_inbound_request_before_machine_admission() {
for class in [
PeerInputClass::SilentRequest,
PeerInputClass::ActionableRequest,
] {
let adapter = Arc::new(MeerkatMachine::ephemeral());
let session_id = SessionId::new();
adapter.register_session(session_id.clone()).await;
let runtime = Arc::new(OneShotPeerRequestRuntime::new(
inbound_peer_request_candidate(class),
None,
));
let drain = spawn_comms_drain(
adapter.clone(),
session_id.clone(),
runtime.clone(),
Some(Duration::from_millis(10)),
);
tokio::time::timeout(Duration::from_secs(1), drain)
.await
.expect("drain should exit after one candidate")
.expect("drain task should not panic");
let snapshot = adapter
.meerkat_machine_spine_snapshot(&session_id)
.await
.expect("registered session snapshot");
assert_eq!(
snapshot.ledger.input_count, 0,
"missing authority must not admit {class:?} into machine work"
);
assert_eq!(
runtime.completed_count(),
1,
"rejected {class:?} should be closed at the comms boundary"
);
}
}
fn bridge_sender_fact(sender: &str) -> PeerIngressFact {
let id = InteractionId(Uuid::new_v4());
bridge_sender_fact_with_id(id, sender)
}
fn bridge_sender_fact_with_display(
canonical_peer_id: PeerId,
display_label: &str,
) -> PeerIngressFact {
let id = InteractionId(Uuid::new_v4());
PeerIngressFact::peer(
id,
PeerInputClass::ActionableRequest,
meerkat_core::PeerIngressKind::Request,
Some(meerkat_core::PeerIngressAuthDecision::Exempt(
meerkat_core::PeerIngressAuthExemption::SupervisorBridge,
)),
PeerIngressIdentity::new(
canonical_peer_id,
display_label,
meerkat_core::PeerIngressConvention::Request {
request_id: id.to_string(),
intent: SUPERVISOR_BRIDGE_INTENT.to_string(),
},
),
)
}
fn bridge_sender_fact_with_id(id: InteractionId, sender: &str) -> PeerIngressFact {
if let Ok(pubkey) = meerkat_comms::PubKey::from_pubkey_string(sender) {
return PeerIngressFact::peer(
id,
PeerInputClass::ActionableRequest,
meerkat_core::PeerIngressKind::Request,
Some(meerkat_core::PeerIngressAuthDecision::Exempt(
meerkat_core::PeerIngressAuthExemption::SupervisorBridge,
)),
PeerIngressIdentity::new(
pubkey.to_peer_id(),
sender,
meerkat_core::PeerIngressConvention::Request {
request_id: id.to_string(),
intent: SUPERVISOR_BRIDGE_INTENT.to_string(),
},
)
.with_signing_pubkey(*pubkey.as_bytes()),
);
}
if let Ok(peer_id) = PeerId::parse(sender) {
return PeerIngressFact::peer(
id,
PeerInputClass::ActionableRequest,
meerkat_core::PeerIngressKind::Request,
Some(meerkat_core::PeerIngressAuthDecision::Exempt(
meerkat_core::PeerIngressAuthExemption::SupervisorBridge,
)),
PeerIngressIdentity::new(
peer_id,
sender,
meerkat_core::PeerIngressConvention::Request {
request_id: id.to_string(),
intent: SUPERVISOR_BRIDGE_INTENT.to_string(),
},
),
);
}
PeerIngressFact::peer(
id,
PeerInputClass::ActionableRequest,
meerkat_core::PeerIngressKind::Request,
Some(meerkat_core::PeerIngressAuthDecision::Exempt(
meerkat_core::PeerIngressAuthExemption::SupervisorBridge,
)),
PeerIngressIdentity::new(
PeerId::new(),
sender,
meerkat_core::PeerIngressConvention::Request {
request_id: id.to_string(),
intent: SUPERVISOR_BRIDGE_INTENT.to_string(),
},
),
)
}
#[test]
fn sender_matches_bound_supervisor_rejects_same_display_name_with_different_peer_id() {
let current_peer_id = PeerId::parse(PEER_ID_SUPERVISOR).expect("valid supervisor peer id");
let attacker_peer_id =
PeerId::parse(PEER_ID_OLD_SUPERVISOR).expect("valid attacker peer id");
let sender = bridge_sender_fact_with_display(attacker_peer_id, "mob/__mob_supervisor__");
assert!(
!sender_matches_bound_supervisor(&sender, ¤t_peer_id),
"display label must not prove current supervisor authority"
);
}
#[test]
fn sender_matches_bridge_peer_rejects_same_display_name_with_different_peer_id() {
let attacker_peer_id =
PeerId::parse(PEER_ID_OLD_SUPERVISOR).expect("valid attacker peer id");
let peer = bridge_peer_identity(&supervisor_bridge_spec(), "test")
.expect("valid bridge peer identity");
let sender = bridge_sender_fact_with_display(attacker_peer_id, peer.name.as_str());
assert!(
!sender_matches_bridge_peer(&sender, &peer),
"display label must not prove bridge peer authority"
);
}
#[tokio::test]
async fn bridge_response_route_requires_known_canonical_peer_id() {
let peer = meerkat_comms::Keypair::generate();
let peer_pubkey = peer.public_key();
let peer_spec = TrustedPeerDescriptor::unsigned_with_pubkey(
"bridge-response-peer",
peer_pubkey.to_peer_id().as_str(),
*peer_pubkey.as_bytes(),
"inproc://bridge-response-peer",
)
.expect("valid peer spec");
let runtime: Arc<dyn CommsRuntime> = Arc::new(
meerkat_comms::CommsRuntime::inproc_only("bridge-response-route").expect("runtime"),
);
runtime
.add_trusted_peer(peer_spec.clone())
.await
.expect("trust peer");
let route = resolve_peer_route(&runtime, peer_spec.peer_id)
.await
.expect("canonical peer id should resolve through the peer directory");
assert_eq!(route.peer_id, peer_spec.peer_id);
let unknown_peer_id = PeerId::new();
assert!(
resolve_peer_route(&runtime, unknown_peer_id)
.await
.is_none(),
"unknown PeerIds must not synthesize bridge response routes"
);
}
#[tokio::test]
async fn bridge_response_route_uses_pubkey_sender_not_spoofed_payload_peer_id() {
let runtime: Arc<dyn CommsRuntime> = Arc::new(
meerkat_comms::CommsRuntime::inproc_only("bridge-response-spoof").expect("runtime"),
);
let spoofed_key = meerkat_comms::Keypair::generate();
let spoofed_pubkey = spoofed_key.public_key();
let spoofed_peer_id = spoofed_pubkey.to_peer_id();
runtime
.add_trusted_peer(
TrustedPeerDescriptor::unsigned_with_pubkey(
"spoofed-target",
spoofed_peer_id.as_str(),
*spoofed_pubkey.as_bytes(),
"inproc://spoofed-target",
)
.expect("valid spoofed target"),
)
.await
.expect("trust spoofed target");
let sender_key = meerkat_comms::Keypair::generate();
let sender_pubkey = sender_key.public_key();
let spoofed_peer_id = spoofed_peer_id.as_str();
let command = BridgeCommand::BindMember(
meerkat_contracts::wire::supervisor_bridge::BridgeBindPayload {
supervisor: BridgePeerSpec {
name: "mob/__mob_supervisor__".to_string(),
peer_id: spoofed_peer_id.clone(),
address: "inproc://mob/__mob_supervisor__".to_string(),
pubkey: *sender_pubkey.as_bytes(),
},
epoch: 1,
protocol_version: SUPERVISOR_BRIDGE_PROTOCOL_VERSION,
expected_peer_id: PEER_ID_RECEIVER.to_string(),
expected_address: "inproc://receiver".to_string(),
bootstrap_token: "bootstrap".to_string().into(),
},
);
let id = InteractionId(Uuid::new_v4());
let sender_label = sender_pubkey.to_pubkey_string();
let candidate = PeerInputCandidate {
interaction: InboxInteraction {
id,
from_route: None,
from: sender_pubkey.to_pubkey_string(),
content: InteractionContent::Request {
intent: SUPERVISOR_BRIDGE_INTENT.to_string(),
params: serde_json::to_value(command).expect("serialize bridge command"),
blocks: None,
},
rendered_text: String::new(),
handling_mode: HandlingMode::Queue,
render_metadata: None,
},
ingress: bridge_sender_fact_with_id(id, &sender_label),
lifecycle_peer: None,
response_terminality: None,
};
let route = resolve_bridge_response_route(&runtime, &candidate)
.await
.expect("raw pubkey sender should resolve to its derived route");
assert_eq!(route.peer_id, sender_pubkey.to_peer_id());
assert_ne!(
route.peer_id.as_str(),
spoofed_peer_id,
"bridge replies must not route to the caller-supplied supervisor.peer_id"
);
}
#[tokio::test]
async fn bridge_response_route_accepts_trusted_display_name_sender() {
let peer = meerkat_comms::Keypair::generate();
let peer_pubkey = peer.public_key();
let peer_spec = TrustedPeerDescriptor::unsigned_with_pubkey(
"mob/__mob_supervisor__",
peer_pubkey.to_peer_id().as_str(),
*peer_pubkey.as_bytes(),
"inproc://mob/__mob_supervisor__",
)
.expect("valid peer spec");
let runtime: Arc<dyn CommsRuntime> = Arc::new(
meerkat_comms::CommsRuntime::inproc_only("bridge-response-display").expect("runtime"),
);
runtime
.add_trusted_peer(peer_spec.clone())
.await
.expect("trust peer");
let command = BridgeCommand::ObserveMember(BridgeSupervisorPayload {
supervisor: BridgePeerSpec::from(peer_spec.clone()),
epoch: 1,
protocol_version: SUPERVISOR_BRIDGE_PROTOCOL_VERSION,
});
let id = InteractionId(Uuid::new_v4());
let ingress = PeerIngressFact::peer(
id,
PeerInputClass::ActionableRequest,
meerkat_core::PeerIngressKind::Request,
Some(meerkat_core::PeerIngressAuthDecision::Exempt(
meerkat_core::PeerIngressAuthExemption::SupervisorBridge,
)),
PeerIngressIdentity::new(
peer_pubkey.to_peer_id(),
peer_spec.name.as_str(),
meerkat_core::PeerIngressConvention::Request {
request_id: id.to_string(),
intent: SUPERVISOR_BRIDGE_INTENT.to_string(),
},
)
.with_signing_pubkey(*peer_pubkey.as_bytes()),
);
let candidate = bridge_candidate_with_ingress(peer_spec.name.as_str(), &command, ingress);
let route = resolve_bridge_response_route(&runtime, &candidate)
.await
.expect("trusted display-name sender should resolve through peer directory");
assert_eq!(route.peer_id, peer_spec.peer_id);
assert_eq!(
route.display_name.as_ref().map(|name| name.as_str()),
Some(peer_spec.name.as_str())
);
}
#[tokio::test]
async fn bridge_response_route_uses_typed_sender_peer_id_not_display_or_payload_identity() {
let peer = meerkat_comms::Keypair::generate();
let peer_pubkey = peer.public_key();
let peer_spec = TrustedPeerDescriptor::unsigned_with_pubkey(
"mob/__mob_supervisor__",
peer_pubkey.to_peer_id().as_str(),
*peer_pubkey.as_bytes(),
"inproc://mob/__mob_supervisor__",
)
.expect("valid peer spec");
let runtime: Arc<dyn CommsRuntime> = Arc::new(
meerkat_comms::CommsRuntime::inproc_only("bridge-response-typed-sender")
.expect("runtime"),
);
runtime
.add_trusted_peer(peer_spec.clone())
.await
.expect("trust peer");
let payload_pubkey = meerkat_comms::Keypair::generate().public_key();
let spoofed_payload_peer_id = PeerId::new();
let command = BridgeCommand::ObserveMember(BridgeSupervisorPayload {
supervisor: BridgePeerSpec {
name: peer_spec.name.to_string(),
peer_id: spoofed_payload_peer_id.as_str(),
address: "inproc://mob/__mob_supervisor__".to_string(),
pubkey: *payload_pubkey.as_bytes(),
},
epoch: 1,
protocol_version: SUPERVISOR_BRIDGE_PROTOCOL_VERSION,
});
let id = InteractionId(Uuid::new_v4());
let ingress = PeerIngressFact::peer(
id,
PeerInputClass::ActionableRequest,
meerkat_core::PeerIngressKind::Request,
Some(meerkat_core::PeerIngressAuthDecision::Exempt(
meerkat_core::PeerIngressAuthExemption::SupervisorBridge,
)),
PeerIngressIdentity::new(
peer_pubkey.to_peer_id(),
peer_spec.name.as_str(),
meerkat_core::PeerIngressConvention::Request {
request_id: id.to_string(),
intent: SUPERVISOR_BRIDGE_INTENT.to_string(),
},
)
.with_signing_pubkey(*peer_pubkey.as_bytes()),
);
let spoofed_candidate =
bridge_candidate_with_ingress(peer_spec.name.as_str(), &command, ingress);
let route = resolve_bridge_response_route(&runtime, &spoofed_candidate)
.await
.expect("typed ingress route should ignore spoofed payload peer_id");
assert_eq!(route.peer_id, peer_spec.peer_id);
assert_ne!(route.peer_id, spoofed_payload_peer_id);
assert_eq!(
route.display_name.as_ref().map(|name| name.as_str()),
Some(peer_spec.name.as_str())
);
}
#[tokio::test]
async fn bridge_response_seeds_inbound_request_before_peer_response_send() {
let suffix = Uuid::new_v4().simple().to_string();
let member_name = format!("bridge-response-member-{suffix}");
let supervisor_name = format!("bridge-response-supervisor-{suffix}");
let member_runtime =
Arc::new(meerkat_comms::CommsRuntime::inproc_only(&member_name).expect("member"));
let supervisor_runtime = Arc::new(
meerkat_comms::CommsRuntime::inproc_only(&supervisor_name).expect("supervisor"),
);
let peer_handle = Arc::new(CountingPeerInteractionHandle::default());
member_runtime.install_peer_request_response_authority(
meerkat_comms::PeerRequestResponseAuthority::new(
peer_handle.clone(),
Arc::new(crate::handles::RuntimeInteractionStreamHandle::ephemeral()),
),
);
let supervisor_pubkey = supervisor_runtime.public_key();
let supervisor_spec = TrustedPeerDescriptor::unsigned_with_pubkey(
&supervisor_name,
supervisor_pubkey.to_peer_id().as_str(),
*supervisor_pubkey.as_bytes(),
format!("inproc://{supervisor_name}"),
)
.expect("valid supervisor spec");
member_runtime
.add_trusted_peer(supervisor_spec.clone())
.await
.expect("member should trust supervisor");
let member_pubkey = member_runtime.public_key();
let member_spec = TrustedPeerDescriptor::unsigned_with_pubkey(
&member_name,
member_pubkey.to_peer_id().as_str(),
*member_pubkey.as_bytes(),
format!("inproc://{member_name}"),
)
.expect("valid member spec");
supervisor_runtime
.add_trusted_peer(member_spec)
.await
.expect("supervisor should trust member");
let adapter = Arc::new(MeerkatMachine::ephemeral());
let session_id = SessionId::new();
adapter.register_session(session_id.clone()).await;
adapter
.stage_supervisor_bind(
&session_id,
supervisor_spec.name.to_string(),
supervisor_spec.peer_id.as_str(),
supervisor_spec.address.to_string(),
1,
)
.await
.expect("pre-bind supervisor");
let command = BridgeCommand::ObserveMember(BridgeSupervisorPayload {
supervisor: BridgePeerSpec::from(supervisor_spec.clone()),
epoch: 1,
protocol_version: SUPERVISOR_BRIDGE_PROTOCOL_VERSION,
});
let interaction_id = InteractionId(Uuid::new_v4());
let ingress = PeerIngressFact::peer(
interaction_id,
PeerInputClass::ActionableRequest,
meerkat_core::PeerIngressKind::Request,
Some(meerkat_core::PeerIngressAuthDecision::Exempt(
meerkat_core::PeerIngressAuthExemption::SupervisorBridge,
)),
PeerIngressIdentity::new(
supervisor_spec.peer_id,
supervisor_spec.name.as_str(),
meerkat_core::PeerIngressConvention::Request {
request_id: interaction_id.to_string(),
intent: SUPERVISOR_BRIDGE_INTENT.to_string(),
},
)
.with_signing_pubkey(supervisor_spec.pubkey),
);
let candidate =
bridge_candidate_with_ingress(supervisor_spec.name.as_str(), &command, ingress);
assert!(
try_handle_supervisor_bridge_command(
&adapter,
&session_id,
&(member_runtime.clone() as Arc<dyn CommsRuntime>),
&candidate,
)
.await,
"bridge handler must own ObserveMember"
);
assert_eq!(
peer_handle.request_received_count(),
1,
"bridge handler must seed inbound peer request state before replying"
);
assert_eq!(
peer_handle.response_replied_count(),
1,
"real CommsRuntime::send(PeerResponse) should pass the inbound-state guard"
);
}
#[test]
fn bridge_response_route_source_does_not_use_interaction_from_ladder() {
let source = include_str!("comms_drain.rs");
let route_start = source
.find("async fn resolve_bridge_response_route")
.expect("route function exists");
let route_end = source[route_start..]
.find("async fn resolve_peer_route")
.map(|offset| route_start + offset)
.expect("peer route function follows bridge response route");
let route_source = &source[route_start..route_end];
let forbidden_from = ["candidate", "interaction", "from"].join(".");
assert!(
!route_source.contains(&forbidden_from),
"bridge response routing must use typed ingress facts, not candidate.interaction.from"
);
for removed_helper in [
["bridge_response", "route_from_sender"].join("_"),
["resolve_bridge", "display_name_sender_route"].join("_"),
] {
assert!(
!source.contains(&format!("fn {removed_helper}")),
"bridge response routing must not revive {removed_helper}"
);
}
}
struct BootstrapRuntime {
peer_id: String,
address: String,
bootstrap_token: Option<String>,
inbox_notify: Arc<tokio::sync::Notify>,
add_trusted_peer_errors: HashMap<String, String>,
remove_trusted_peer_errors: HashMap<String, String>,
trusted_peer_ids: Arc<tokio::sync::Mutex<HashSet<String>>>,
peer_handle: Option<Arc<dyn meerkat_core::handles::PeerInteractionHandle>>,
peer_request_response_handle: Option<Arc<dyn meerkat_core::handles::PeerInteractionHandle>>,
completed_count: Arc<std::sync::atomic::AtomicUsize>,
}
#[async_trait::async_trait]
impl CommsRuntime for BootstrapRuntime {
fn public_key(&self) -> Option<String> {
Some(self.peer_id.clone())
}
fn advertised_address(&self) -> Option<String> {
Some(self.address.clone())
}
fn bridge_bootstrap_token(&self) -> Option<String> {
self.bootstrap_token.clone()
}
async fn drain_messages(&self) -> Vec<String> {
Vec::new()
}
fn inbox_notify(&self) -> Arc<tokio::sync::Notify> {
self.inbox_notify.clone()
}
fn peer_interaction_handle(
&self,
) -> Option<Arc<dyn meerkat_core::handles::PeerInteractionHandle>> {
self.peer_handle.clone()
}
fn peer_request_response_authority_handle(
&self,
) -> Option<Arc<dyn meerkat_core::handles::PeerInteractionHandle>> {
self.peer_request_response_handle.clone()
}
fn mark_interaction_complete(&self, _id: &InteractionId) {
self.completed_count
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
}
async fn add_trusted_peer(&self, peer: TrustedPeerDescriptor) -> Result<(), SendError> {
let peer_id_str = peer.peer_id.as_str();
if let Some(message) = self.add_trusted_peer_errors.get(&peer_id_str) {
return Err(SendError::Internal(message.clone()));
}
self.trusted_peer_ids.lock().await.insert(peer_id_str);
Ok(())
}
async fn remove_trusted_peer(&self, peer_id: &str) -> Result<bool, SendError> {
match self.remove_trusted_peer_errors.get(peer_id) {
Some(message) => Err(SendError::Internal(message.clone())),
None => Ok(self.trusted_peer_ids.lock().await.remove(peer_id)),
}
}
}
fn bootstrap_runtime(
peer_id: &str,
address: &str,
bootstrap_token: Option<&str>,
) -> BootstrapRuntime {
let peer_handle = Arc::new(CountingPeerInteractionHandle::default());
let peer_handle: Arc<dyn meerkat_core::handles::PeerInteractionHandle> = peer_handle;
BootstrapRuntime {
peer_id: peer_id.to_string(),
address: address.to_string(),
bootstrap_token: bootstrap_token.map(ToString::to_string),
inbox_notify: Arc::new(tokio::sync::Notify::new()),
add_trusted_peer_errors: HashMap::new(),
remove_trusted_peer_errors: HashMap::new(),
trusted_peer_ids: Arc::new(tokio::sync::Mutex::new(HashSet::new())),
peer_handle: Some(peer_handle.clone()),
peer_request_response_handle: Some(peer_handle),
completed_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
}
}
fn lifecycle_candidate(
class: PeerInputClass,
intent: &str,
params: serde_json::Value,
) -> PeerInputCandidate {
let id = InteractionId(Uuid::new_v4());
let lifecycle_kind = match class {
PeerInputClass::PeerLifecycleRetired => {
meerkat_core::comms::PeerLifecycleKind::PeerRetired
}
PeerInputClass::PeerLifecycleUnwired => {
meerkat_core::comms::PeerLifecycleKind::PeerUnwired
}
_ => meerkat_core::comms::PeerLifecycleKind::PeerAdded,
};
PeerInputCandidate {
interaction: InboxInteraction {
id,
from_route: None,
from: "test-mob/__mob_supervisor__".to_string(),
content: InteractionContent::Request {
intent: intent.to_string(),
params,
blocks: None,
},
rendered_text: String::new(),
handling_mode: HandlingMode::Queue,
render_metadata: None,
},
ingress: PeerIngressFact::peer(
id,
class,
meerkat_core::PeerIngressKind::Request,
Some(meerkat_core::PeerIngressAuthDecision::Required),
PeerIngressIdentity::new(
PeerId::new(),
"test-mob/__mob_supervisor__",
meerkat_core::PeerIngressConvention::Lifecycle {
kind: lifecycle_kind,
peer: "peer-1".to_string(),
},
),
),
lifecycle_peer: Some("peer-1".to_string()),
response_terminality: None,
}
}
#[test]
fn completed_outcome_maps_structured_output_to_interaction_complete() {
let interaction_id = InteractionId(Uuid::new_v4());
let event = interaction_terminal_event(
interaction_id,
CompletionOutcome::Completed(Box::new(meerkat_core::RunResult {
text: "{\"answer\":42}".to_string(),
session_id: meerkat_core::SessionId::new(),
usage: meerkat_core::Usage::default(),
turns: 1,
tool_calls: 0,
terminal_cause_kind: None,
structured_output: Some(json!({"answer": 42})),
extraction_error: None,
schema_warnings: None,
skill_diagnostics: None,
})),
);
match event {
AgentEvent::InteractionComplete {
interaction_id: actual_id,
result,
structured_output,
} => {
assert_eq!(actual_id, interaction_id);
assert_eq!(result, "{\"answer\":42}");
assert_eq!(structured_output, Some(json!({"answer": 42})));
}
other => panic!("expected InteractionComplete, got {other:?}"),
}
}
#[test]
fn callback_pending_maps_to_interaction_callback_pending_terminal_event() {
let interaction_id = InteractionId(Uuid::new_v4());
let event = interaction_terminal_event(
interaction_id,
CompletionOutcome::CallbackPending {
tool_name: "external_mock".to_string(),
args: json!({ "value": "browser" }),
},
);
assert!(
matches!(event, AgentEvent::InteractionCallbackPending { .. }),
"expected callback-pending interaction event"
);
if let AgentEvent::InteractionCallbackPending {
interaction_id: actual_id,
tool_name,
args,
} = event
{
assert_eq!(actual_id, interaction_id);
assert_eq!(tool_name, "external_mock");
assert_eq!(args, json!({ "value": "browser" }));
}
}
#[tokio::test]
async fn peer_lifecycle_added_does_not_change_comms_trust() {
let runtime: Arc<dyn CommsRuntime> =
Arc::new(meerkat_comms::CommsRuntime::inproc_only("receiver-added").unwrap());
let peer = meerkat_comms::CommsRuntime::inproc_only("peer-added").unwrap();
let peer_spec = trusted_peer_from_runtime("peer-added", &peer);
let candidate = lifecycle_candidate(
PeerInputClass::PeerLifecycleAdded,
"mob.peer_added",
json!({
"peer": "peer-1",
"peer_spec": peer_spec,
}),
);
let peers_before = runtime.peers().await;
assert!(
peers_before.is_empty(),
"test runtime should start without trust"
);
let input =
classified_interaction_to_runtime_input(&candidate, &LogicalRuntimeId::new("s-1"))
.expect("lifecycle candidate should project to runtime input");
assert!(
matches!(input, Input::Peer(_)),
"lifecycle candidate should still route as peer input"
);
let peers = runtime.peers().await;
assert!(
peers.is_empty(),
"peer lifecycle add must not materialize comms trust before topology validation"
);
}
#[tokio::test]
async fn peer_lifecycle_unwired_and_retired_do_not_revoke_comms_trust() {
let runtime: Arc<dyn CommsRuntime> =
Arc::new(meerkat_comms::CommsRuntime::inproc_only("receiver-removed").unwrap());
let peer = meerkat_comms::CommsRuntime::inproc_only("peer-removed").unwrap();
let peer_spec = trusted_peer_from_runtime("peer-removed", &peer);
runtime.add_trusted_peer(peer_spec.clone()).await.unwrap();
let unwired = lifecycle_candidate(
PeerInputClass::PeerLifecycleUnwired,
"mob.peer_unwired",
json!({
"peer": "peer-1",
"peer_spec": peer_spec.clone(),
}),
);
let _ = classified_interaction_to_runtime_input(&unwired, &LogicalRuntimeId::new("s-1"))
.expect("unwired lifecycle candidate should project");
assert!(
runtime
.peers()
.await
.iter()
.any(|entry| entry.name.as_str() == "peer-removed"),
"peer lifecycle unwire must not revoke comms trust before topology validation"
);
runtime.add_trusted_peer(peer_spec.clone()).await.unwrap();
let retired = lifecycle_candidate(
PeerInputClass::PeerLifecycleRetired,
"mob.peer_retired",
json!({
"peer": "peer-1",
"peer_spec": peer_spec,
}),
);
let _ = classified_interaction_to_runtime_input(&retired, &LogicalRuntimeId::new("s-1"))
.expect("retired lifecycle candidate should project");
assert!(
runtime
.peers()
.await
.iter()
.any(|entry| entry.name.as_str() == "peer-removed"),
"peer lifecycle retire must not revoke comms trust before topology validation"
);
}
#[test]
fn bridge_capabilities_report_canonical_protocol_versions() {
let capabilities = bridge_capabilities();
assert_eq!(
capabilities.current_protocol_version,
supervisor_bridge_current_protocol_version()
);
assert_eq!(
capabilities.default_protocol_version,
supervisor_bridge_default_protocol_version()
);
assert_eq!(
capabilities.supported_protocol_versions,
supervisor_bridge_supported_protocol_versions()
);
assert!(
!capabilities.hard_cancel_member,
"supervisor bridge must not advertise live hard-cancel authority"
);
}
#[test]
fn validate_bind_request_rejects_missing_or_wrong_bootstrap_token() {
let runtime: Arc<dyn CommsRuntime> = Arc::new(bootstrap_runtime(
PEER_ID_RECEIVER,
"inproc://receiver",
Some("expected-token"),
));
let supervisor = supervisor_bridge_spec();
let payload = meerkat_contracts::wire::supervisor_bridge::BridgeBindPayload {
supervisor: supervisor.clone(),
epoch: 0,
protocol_version: SUPERVISOR_BRIDGE_PROTOCOL_VERSION,
expected_peer_id: PEER_ID_RECEIVER.to_string(),
expected_address: runtime.advertised_address().unwrap(),
bootstrap_token: "wrong-token".into(),
};
let (cause, error) =
validate_bind_request(&runtime, &bridge_sender_fact(&supervisor.peer_id), &payload)
.expect_err("bind must reject incorrect bootstrap token");
assert_eq!(cause, BridgeRejectionCause::InvalidBootstrapToken);
assert!(
error.contains("invalid bootstrap token"),
"bind rejection should explain the bootstrap proof failure, got: {error}"
);
}
#[test]
fn validate_bind_request_accepts_matching_bootstrap_token() {
let runtime: Arc<dyn CommsRuntime> = Arc::new(bootstrap_runtime(
PEER_ID_RECEIVER,
"inproc://receiver",
Some("expected-token"),
));
let supervisor = supervisor_bridge_spec();
let payload = meerkat_contracts::wire::supervisor_bridge::BridgeBindPayload {
supervisor: supervisor.clone(),
epoch: 0,
protocol_version: SUPERVISOR_BRIDGE_PROTOCOL_VERSION,
expected_peer_id: PEER_ID_RECEIVER.to_string(),
expected_address: runtime.advertised_address().unwrap(),
bootstrap_token: "expected-token".into(),
};
let (authorized, advertised_address) =
validate_bind_request(&runtime, &bridge_sender_fact(&supervisor.peer_id), &payload)
.expect("bind should accept the configured bootstrap token");
assert_eq!(authorized.peer_id.as_str(), supervisor.peer_id);
assert_eq!(advertised_address, runtime.advertised_address().unwrap());
}
#[test]
fn bridge_delivery_payload_preserves_explicit_queue_handling_mode() {
let input = peer_input_from_delivery_payload(
&SessionId::new(),
PeerId::parse(PEER_ID_SUPERVISOR).expect("valid supervisor peer id"),
InteractionId(Uuid::new_v4()),
BridgeDeliveryPayload {
supervisor: supervisor_bridge_spec(),
epoch: 1,
protocol_version: SUPERVISOR_BRIDGE_PROTOCOL_VERSION,
input_id: "bridge-delivery-queue-test".to_string(),
content: meerkat_core::types::ContentInput::Text("queued follow-up".to_string()),
handling_mode: HandlingMode::Queue,
},
);
let Input::Peer(peer) = input else {
panic!("bridge delivery must project to peer input");
};
assert_eq!(
peer.handling_mode,
Some(HandlingMode::Queue),
"bridge delivery explicit queue must survive into MeerkatMachine admission"
);
}
#[test]
fn validate_bind_request_rejects_same_display_name_different_canonical_sender() {
let runtime: Arc<dyn CommsRuntime> = Arc::new(bootstrap_runtime(
PEER_ID_RECEIVER,
"inproc://receiver",
Some("expected-token"),
));
let supervisor = supervisor_bridge_spec();
let payload = meerkat_contracts::wire::supervisor_bridge::BridgeBindPayload {
supervisor: supervisor.clone(),
epoch: 0,
protocol_version: SUPERVISOR_BRIDGE_PROTOCOL_VERSION,
expected_peer_id: PEER_ID_RECEIVER.to_string(),
expected_address: runtime.advertised_address().unwrap(),
bootstrap_token: "expected-token".into(),
};
let attacker_peer_id =
PeerId::parse(PEER_ID_OLD_SUPERVISOR).expect("valid attacker peer id");
let sender = bridge_sender_fact_with_display(attacker_peer_id, &supervisor.name);
let (cause, error) = validate_bind_request(&runtime, &sender, &payload)
.expect_err("same display name with a different canonical sender must reject");
assert_eq!(cause, BridgeRejectionCause::SenderMismatch);
assert!(
error.contains("does not match supervisor"),
"bind rejection should explain the sender mismatch, got: {error}"
);
}
#[test]
fn validate_bind_request_accepts_pubkey_sender_with_canonical_supervisor_peer_id() {
let runtime: Arc<dyn CommsRuntime> = Arc::new(bootstrap_runtime(
PEER_ID_RECEIVER,
"inproc://receiver",
Some("expected-token"),
));
let supervisor_key = meerkat_comms::Keypair::generate();
let supervisor_pubkey = supervisor_key.public_key();
let supervisor = BridgePeerSpec {
name: "mob/__mob_supervisor__".to_string(),
peer_id: supervisor_pubkey.to_peer_id().as_str(),
address: "inproc://mob/__mob_supervisor__".to_string(),
pubkey: *supervisor_pubkey.as_bytes(),
};
let payload = meerkat_contracts::wire::supervisor_bridge::BridgeBindPayload {
supervisor: supervisor.clone(),
epoch: 0,
protocol_version: SUPERVISOR_BRIDGE_PROTOCOL_VERSION,
expected_peer_id: PEER_ID_RECEIVER.to_string(),
expected_address: runtime.advertised_address().unwrap(),
bootstrap_token: "expected-token".into(),
};
let (authorized, advertised_address) = validate_bind_request(
&runtime,
&bridge_sender_fact(&supervisor_pubkey.to_pubkey_string()),
&payload,
)
.expect("bind should accept raw transport sender when payload carries pubkey");
assert_eq!(authorized.peer_id.as_str(), supervisor.peer_id);
assert_eq!(authorized.pubkey, supervisor.pubkey);
assert_eq!(advertised_address, runtime.advertised_address().unwrap());
}
#[test]
fn validate_bind_request_returns_runtime_advertised_address() {
let runtime: Arc<dyn CommsRuntime> = Arc::new(bootstrap_runtime(
PEER_ID_RECEIVER,
&format!(
"inproc://receiver-real?{SUPERVISOR_BRIDGE_BOOTSTRAP_TOKEN_PARAM}=expected-token"
),
Some("expected-token"),
));
let supervisor = supervisor_bridge_spec();
let payload = meerkat_contracts::wire::supervisor_bridge::BridgeBindPayload {
supervisor: supervisor.clone(),
epoch: 0,
protocol_version: SUPERVISOR_BRIDGE_PROTOCOL_VERSION,
expected_peer_id: PEER_ID_RECEIVER.to_string(),
expected_address: "inproc://receiver-real".to_string(),
bootstrap_token: "expected-token".into(),
};
let (_, advertised_address) =
validate_bind_request(&runtime, &bridge_sender_fact(&supervisor.peer_id), &payload)
.expect("bind should canonicalize to the callee's advertised address");
assert_eq!(advertised_address, runtime.advertised_address().unwrap());
}
#[test]
fn validate_bind_request_rejects_mismatched_expected_address() {
let runtime: Arc<dyn CommsRuntime> = Arc::new(bootstrap_runtime(
PEER_ID_RECEIVER,
"inproc://receiver-real",
Some("expected-token"),
));
let supervisor = supervisor_bridge_spec();
let payload = meerkat_contracts::wire::supervisor_bridge::BridgeBindPayload {
supervisor: supervisor.clone(),
epoch: 0,
protocol_version: SUPERVISOR_BRIDGE_PROTOCOL_VERSION,
expected_peer_id: PEER_ID_RECEIVER.to_string(),
expected_address: "inproc://receiver-stale".to_string(),
bootstrap_token: "expected-token".into(),
};
let (cause, error) =
validate_bind_request(&runtime, &bridge_sender_fact(&supervisor.peer_id), &payload)
.expect_err("bind should reject mismatched expected addresses");
assert_eq!(cause, BridgeRejectionCause::AddressMismatch);
assert!(
error.contains("bind address mismatch"),
"bind rejection should explain the address mismatch, got: {error}"
);
}
#[tokio::test]
async fn bridge_handler_rejects_unsupported_protocol_version_before_bind_validation() {
let sent: Arc<tokio::sync::Mutex<Vec<CommsCommand>>> =
Arc::new(tokio::sync::Mutex::new(Vec::new()));
let runtime: Arc<dyn CommsRuntime> = Arc::new(CapturingRuntime {
peer_id: PEER_ID_RECEIVER.to_string(),
advertised_address: Some("inproc://receiver".to_string()),
bootstrap_token: Some("expected-token".to_string()),
inbox_notify: Arc::new(tokio::sync::Notify::new()),
sent: sent.clone(),
});
let adapter = Arc::new(MeerkatMachine::ephemeral());
let session_id = SessionId::new();
adapter.register_session(session_id.clone()).await;
let params = json!({
"command": "bind_member",
"supervisor": {
"name": "mob/__mob_supervisor__",
"peer_id": PEER_ID_SUPERVISOR,
"address": "inproc://mob/__mob_supervisor__",
"pubkey": vec![0u8; 32],
},
"epoch": 0,
"protocol_version": 999,
"expected_peer_id": PEER_ID_RECEIVER,
"expected_address": "inproc://receiver",
"bootstrap_token": "expected-token",
});
let interaction_id = InteractionId(Uuid::new_v4());
let candidate = PeerInputCandidate {
interaction: InboxInteraction {
id: interaction_id,
from_route: None,
from: PEER_ID_SUPERVISOR.to_string(),
content: InteractionContent::Request {
intent: SUPERVISOR_BRIDGE_INTENT.to_string(),
params,
blocks: None,
},
rendered_text: String::new(),
handling_mode: HandlingMode::Queue,
render_metadata: None,
},
ingress: bridge_sender_fact_with_id(interaction_id, PEER_ID_SUPERVISOR),
lifecycle_peer: None,
response_terminality: None,
};
assert!(
try_handle_supervisor_bridge_command(&adapter, &session_id, &runtime, &candidate).await,
"bridge handler must own malformed supervisor bridge commands"
);
let (result, status) = sent
.lock()
.await
.iter()
.find_map(|cmd| match cmd {
CommsCommand::PeerResponse { result, status, .. } => {
Some((result.clone(), *status))
}
_ => None,
})
.expect("handler must send a typed rejection response");
assert!(matches!(
status,
meerkat_core::interaction::ResponseStatus::Failed
));
let reply: BridgeReply = serde_json::from_value(result).expect("typed bridge reply");
match reply {
BridgeReply::Rejected { cause, reason } => {
assert_eq!(cause, BridgeRejectionCause::UnsupportedProtocolVersion);
assert!(
reason.contains("unsupported supervisor bridge protocol version"),
"rejection should explain the typed version failure, got: {reason}"
);
}
other => unreachable!("expected Rejected reply, got {other:?}"),
}
}
#[test]
fn validate_bind_request_rejects_invalid_supervisor_peer_name() {
let runtime: Arc<dyn CommsRuntime> = Arc::new(bootstrap_runtime(
PEER_ID_RECEIVER,
"inproc://receiver",
Some("expected-token"),
));
let mut supervisor = supervisor_bridge_spec();
supervisor.name = String::new();
let payload = meerkat_contracts::wire::supervisor_bridge::BridgeBindPayload {
supervisor,
epoch: 0,
protocol_version: SUPERVISOR_BRIDGE_PROTOCOL_VERSION,
expected_peer_id: PEER_ID_RECEIVER.to_string(),
expected_address: runtime.advertised_address().unwrap(),
bootstrap_token: "expected-token".into(),
};
let (cause, error) = validate_bind_request(
&runtime,
&bridge_sender_fact(&payload.supervisor.peer_id),
&payload,
)
.expect_err("bind should reject invalid supervisor peer names");
assert_eq!(cause, BridgeRejectionCause::InvalidSupervisorSpec);
assert!(
error.contains("invalid supervisor peer spec"),
"bind rejection should explain invalid supervisor identity, got: {error}"
);
}
fn sample_bind_payload() -> meerkat_contracts::wire::supervisor_bridge::BridgeBindPayload {
meerkat_contracts::wire::supervisor_bridge::BridgeBindPayload {
supervisor: supervisor_bridge_spec(),
epoch: 1,
protocol_version: SUPERVISOR_BRIDGE_PROTOCOL_VERSION,
expected_peer_id: PEER_ID_RECEIVER.to_string(),
expected_address: "inproc://receiver".to_string(),
bootstrap_token: "expected-token".into(),
}
}
#[tokio::test]
async fn bridge_request_without_complete_peer_authority_fails_before_dispatch() {
for install_peer_handle in [false, true] {
let payload = sample_bind_payload();
let sender = payload.supervisor.peer_id.clone();
let mut runtime_impl = bootstrap_runtime(
PEER_ID_RECEIVER,
"inproc://receiver",
Some("expected-token"),
);
let peer_handle = Arc::new(CountingPeerInteractionHandle::default());
if install_peer_handle {
let peer_handle: Arc<dyn meerkat_core::handles::PeerInteractionHandle> =
peer_handle.clone();
runtime_impl.peer_handle = Some(peer_handle);
} else {
runtime_impl.peer_handle = None;
}
runtime_impl.peer_request_response_handle = None;
let completed_count = runtime_impl.completed_count.clone();
let runtime: Arc<dyn CommsRuntime> = Arc::new(runtime_impl);
let adapter = Arc::new(MeerkatMachine::ephemeral());
let session_id = SessionId::new();
adapter.register_session(session_id.clone()).await;
let candidate = bridge_candidate(&sender, &BridgeCommand::BindMember(payload));
assert!(
try_handle_supervisor_bridge_command(&adapter, &session_id, &runtime, &candidate)
.await,
"bridge handler must own bridge requests even when rejecting authority"
);
assert!(
matches!(
adapter.supervisor_binding(&session_id).await,
SupervisorBinding::Unbound
),
"bridge request must not mutate supervisor binding without complete authority"
);
assert_eq!(
peer_handle.request_received_count(),
0,
"peer-only authority must not record PeerRequestReceived through bridge dispatch"
);
assert_eq!(
completed_count.load(std::sync::atomic::Ordering::SeqCst),
1,
"bridge request rejected at the authority boundary should be marked complete"
);
}
}
#[tokio::test]
async fn bridge_request_rejected_by_peer_authority_fails_before_dispatch() {
let payload = sample_bind_payload();
let sender = payload.supervisor.peer_id.clone();
let mut runtime_impl = bootstrap_runtime(
PEER_ID_RECEIVER,
"inproc://receiver",
Some("expected-token"),
);
let peer_handle = Arc::new(CountingPeerInteractionHandle::rejecting_request_received());
let peer_handle: Arc<dyn meerkat_core::handles::PeerInteractionHandle> = peer_handle;
runtime_impl.peer_handle = Some(peer_handle.clone());
runtime_impl.peer_request_response_handle = Some(peer_handle);
let completed_count = runtime_impl.completed_count.clone();
let runtime: Arc<dyn CommsRuntime> = Arc::new(runtime_impl);
let adapter = Arc::new(MeerkatMachine::ephemeral());
let session_id = SessionId::new();
adapter.register_session(session_id.clone()).await;
let candidate = bridge_candidate(&sender, &BridgeCommand::BindMember(payload));
assert!(
try_handle_supervisor_bridge_command(&adapter, &session_id, &runtime, &candidate).await,
"bridge handler must own bridge requests even when rejecting authority"
);
assert!(
matches!(
adapter.supervisor_binding(&session_id).await,
SupervisorBinding::Unbound
),
"bridge request must not mutate supervisor binding when PeerRequestReceived is rejected"
);
assert_eq!(
completed_count.load(std::sync::atomic::Ordering::SeqCst),
1,
"bridge request rejected by peer authority should be marked complete"
);
}
fn authorized_state_for(
payload: &meerkat_contracts::wire::supervisor_bridge::BridgeBindPayload,
) -> SupervisorBinding {
let spec = TrustedPeerDescriptor::try_from(payload.supervisor.clone())
.expect("valid supervisor spec");
SupervisorBinding::Bound {
name: spec.name.to_string(),
peer_id: spec.peer_id.as_str(),
address: spec.address.to_string(),
epoch: payload.epoch,
}
}
#[test]
fn validate_bind_request_against_state_allows_bootstrap_when_unbound() {
let payload = sample_bind_payload();
let gate = validate_bind_request_against_state(
&bridge_sender_fact(&payload.supervisor.peer_id),
&payload,
&SupervisorBinding::Unbound,
)
.expect("unbound state should accept bootstrap bind");
assert!(matches!(gate, BindMemberGate::Bootstrap));
}
#[test]
fn validate_bind_request_against_state_rejects_different_supervisor_takeover() {
let current_payload = sample_bind_payload();
let state = authorized_state_for(¤t_payload);
let mut takeover = sample_bind_payload();
takeover.supervisor = old_supervisor_bridge_spec();
let (cause, error) = validate_bind_request_against_state(
&bridge_sender_fact(&takeover.supervisor.peer_id),
&takeover,
&state,
)
.expect_err("rebind with a different supervisor must be rejected");
assert_eq!(cause, BridgeRejectionCause::AlreadyBound);
assert!(
error.contains("supervisor already bound"),
"rejection should call out the already-bound supervisor, got: {error}"
);
assert!(
error.contains("authorize_supervisor"),
"rejection should direct callers to authorize_supervisor for rotation, got: {error}"
);
}
#[test]
fn validate_bind_request_against_state_rejects_lower_epoch_replay() {
let current_payload = sample_bind_payload();
let state = authorized_state_for(¤t_payload);
let mut replay = sample_bind_payload();
replay.epoch = current_payload.epoch - 1;
let (cause, error) = validate_bind_request_against_state(
&bridge_sender_fact(&replay.supervisor.peer_id),
&replay,
&state,
)
.expect_err("lower-epoch rebind must be rejected as a stale replay");
assert_eq!(cause, BridgeRejectionCause::AlreadyBound);
assert!(
error.contains("does not match bound supervisor epoch"),
"rejection should explain the epoch mismatch, got: {error}"
);
}
#[test]
fn validate_bind_request_against_state_rejects_higher_epoch_same_supervisor_rebind() {
let current_payload = sample_bind_payload();
let state = authorized_state_for(¤t_payload);
let mut advance = sample_bind_payload();
advance.epoch = current_payload.epoch + 5;
let (cause, error) = validate_bind_request_against_state(
&bridge_sender_fact(&advance.supervisor.peer_id),
&advance,
&state,
)
.expect_err("higher-epoch rebind with same supervisor must be rejected");
assert_eq!(cause, BridgeRejectionCause::AlreadyBound);
assert!(
error.contains("does not match bound supervisor epoch"),
"rejection should explain the epoch mismatch, got: {error}"
);
}
#[test]
fn validate_bind_request_against_state_rejects_spoofed_sender() {
let current_payload = sample_bind_payload();
let state = authorized_state_for(¤t_payload);
let retry = sample_bind_payload();
let attacker_peer_id = PeerId::new().as_str();
let (cause, error) = validate_bind_request_against_state(
&bridge_sender_fact(&attacker_peer_id),
&retry,
&state,
)
.expect_err("bind from an unauthorized sender must be rejected");
assert_eq!(cause, BridgeRejectionCause::SenderMismatch);
assert!(
error.contains("request sender"),
"rejection should surface the sender mismatch, got: {error}"
);
}
#[test]
fn validate_bind_request_against_state_rejects_same_display_name_different_canonical_sender() {
let current_payload = sample_bind_payload();
let state = authorized_state_for(¤t_payload);
let retry = sample_bind_payload();
let attacker_peer_id =
PeerId::parse(PEER_ID_OLD_SUPERVISOR).expect("valid attacker peer id");
let sender = bridge_sender_fact_with_display(attacker_peer_id, &retry.supervisor.name);
let (cause, error) = validate_bind_request_against_state(&sender, &retry, &state)
.expect_err("same display name with a different canonical sender must reject");
assert_eq!(cause, BridgeRejectionCause::SenderMismatch);
assert!(
error.contains("request sender"),
"stateful bind rejection should explain the sender mismatch, got: {error}"
);
}
#[test]
fn validate_bind_request_against_state_idempotently_acks_retry_from_current_supervisor() {
let current_payload = sample_bind_payload();
let state = authorized_state_for(¤t_payload);
let retry = sample_bind_payload();
let gate = validate_bind_request_against_state(
&bridge_sender_fact(&retry.supervisor.peer_id),
&retry,
&state,
)
.expect("exact-match retry should be idempotent");
assert!(matches!(gate, BindMemberGate::IdempotentAck));
}
#[tokio::test]
async fn bind_member_handler_rejects_rebind_after_supervisor_bound() {
let runtime: Arc<dyn CommsRuntime> = Arc::new(
meerkat_comms::CommsRuntime::inproc_only("bind-rebind-receiver")
.expect("receiver runtime"),
);
let supervisor_runtime = Arc::new(
meerkat_comms::CommsRuntime::inproc_only("mob/__mob_supervisor__")
.expect("supervisor runtime"),
);
let adapter = Arc::new(MeerkatMachine::ephemeral());
let session_id = SessionId::new();
adapter.register_session(session_id.clone()).await;
let current_supervisor =
trusted_peer_from_runtime("mob/__mob_supervisor__", &supervisor_runtime);
adapter
.stage_supervisor_bind(
&session_id,
current_supervisor.name.to_string(),
current_supervisor.peer_id.as_str(),
current_supervisor.address.to_string(),
1,
)
.await
.expect("initial bind must succeed");
let adversary_supervisor = old_supervisor_bridge_spec();
let adversary_peer_id = adversary_supervisor.peer_id.clone();
let command = BridgeCommand::BindMember(
meerkat_contracts::wire::supervisor_bridge::BridgeBindPayload {
supervisor: adversary_supervisor,
epoch: 2,
protocol_version: SUPERVISOR_BRIDGE_PROTOCOL_VERSION,
expected_peer_id: runtime
.peer_id()
.map(|peer_id| peer_id.as_str())
.unwrap_or_else(|| PEER_ID_RECEIVER.to_string()),
expected_address: runtime
.advertised_address()
.unwrap_or_else(|| "inproc://bind-rebind-receiver".to_string()),
bootstrap_token: runtime.bridge_bootstrap_token().unwrap_or_default().into(),
},
);
let candidate = bridge_candidate(&adversary_peer_id, &command);
assert!(
try_handle_supervisor_bridge_command(&adapter, &session_id, &runtime, &candidate,)
.await,
"bridge handler must own the BindMember command"
);
let binding = adapter.supervisor_binding(&session_id).await;
let SupervisorBinding::Bound { peer_id, epoch, .. } = binding else {
panic!("supervisor binding must be preserved as Bound");
};
assert_eq!(
peer_id,
current_supervisor.peer_id.as_str(),
"rebind attempt must not replace the authorized supervisor"
);
assert_eq!(
epoch, 1,
"rebind attempt must not advance the authorized epoch"
);
}
#[tokio::test]
async fn bind_member_handler_rebind_reply_is_typed_already_bound() {
let sent: Arc<tokio::sync::Mutex<Vec<CommsCommand>>> =
Arc::new(tokio::sync::Mutex::new(Vec::new()));
let runtime: Arc<dyn CommsRuntime> = Arc::new(CapturingRuntime {
peer_id: PEER_ID_RECEIVER.to_string(),
advertised_address: Some("inproc://receiver".to_string()),
bootstrap_token: Some("expected-token".to_string()),
inbox_notify: Arc::new(tokio::sync::Notify::new()),
sent: sent.clone(),
});
let adapter = Arc::new(MeerkatMachine::ephemeral());
let session_id = SessionId::new();
adapter.register_session(session_id.clone()).await;
let current = trusted_supervisor_descriptor(0xbb);
adapter
.stage_supervisor_bind(
&session_id,
current.name.to_string(),
current.peer_id.as_str(),
current.address.to_string(),
1,
)
.await
.expect("initial bind must succeed");
let adversary = old_supervisor_bridge_spec();
let adversary_peer_id = adversary.peer_id.clone();
let command = BridgeCommand::BindMember(
meerkat_contracts::wire::supervisor_bridge::BridgeBindPayload {
supervisor: adversary,
epoch: 2,
protocol_version: SUPERVISOR_BRIDGE_PROTOCOL_VERSION,
expected_peer_id: PEER_ID_RECEIVER.to_string(),
expected_address: "inproc://receiver".to_string(),
bootstrap_token: "expected-token".into(),
},
);
let candidate = bridge_candidate(&adversary_peer_id, &command);
assert!(
try_handle_supervisor_bridge_command(&adapter, &session_id, &runtime, &candidate,)
.await,
"bridge handler must own the BindMember command"
);
let binding = adapter.supervisor_binding(&session_id).await;
let SupervisorBinding::Bound { peer_id, .. } = binding else {
panic!("binding preserved");
};
assert_eq!(peer_id, current.peer_id.as_str());
let (result, status) = sent
.lock()
.await
.iter()
.find_map(|cmd| match cmd {
CommsCommand::PeerResponse { result, status, .. } => {
Some((result.clone(), *status))
}
_ => None,
})
.expect("handler must send a PeerResponse for the rejection");
assert!(
matches!(status, meerkat_core::interaction::ResponseStatus::Failed),
"rebind rejection must surface as Failed status"
);
let reply: BridgeReply = serde_json::from_value(result).expect("typed bridge reply");
match reply {
BridgeReply::Rejected { cause, .. } => {
assert_eq!(
cause,
BridgeRejectionCause::AlreadyBound,
"different-supervisor rebind must be rejected as AlreadyBound",
);
}
other => unreachable!("expected Rejected reply, got {other:?}"),
}
}
struct CapturingRuntime {
peer_id: String,
advertised_address: Option<String>,
bootstrap_token: Option<String>,
inbox_notify: Arc<tokio::sync::Notify>,
sent: Arc<tokio::sync::Mutex<Vec<CommsCommand>>>,
}
#[async_trait::async_trait]
impl CommsRuntime for CapturingRuntime {
fn public_key(&self) -> Option<String> {
Some(self.peer_id.clone())
}
fn advertised_address(&self) -> Option<String> {
self.advertised_address.clone()
}
fn bridge_bootstrap_token(&self) -> Option<String> {
self.bootstrap_token.clone()
}
async fn drain_messages(&self) -> Vec<String> {
Vec::new()
}
fn inbox_notify(&self) -> Arc<tokio::sync::Notify> {
self.inbox_notify.clone()
}
fn peer_request_response_authority_handle(
&self,
) -> Option<Arc<dyn meerkat_core::handles::PeerInteractionHandle>> {
Some(Arc::new(CountingPeerInteractionHandle::default()))
}
async fn add_trusted_peer(&self, _peer: TrustedPeerDescriptor) -> Result<(), SendError> {
Ok(())
}
async fn remove_trusted_peer(&self, _peer_id: &str) -> Result<bool, SendError> {
Ok(true)
}
async fn send(
&self,
cmd: CommsCommand,
) -> Result<meerkat_core::comms::SendReceipt, SendError> {
let receipt = match &cmd {
CommsCommand::PeerResponse { in_reply_to, .. } => {
meerkat_core::comms::SendReceipt::PeerResponseSent {
envelope_id: Uuid::new_v4(),
in_reply_to: *in_reply_to,
}
}
_ => meerkat_core::comms::SendReceipt::PeerMessageSent {
envelope_id: Uuid::new_v4(),
acked: true,
},
};
self.sent.lock().await.push(cmd);
Ok(receipt)
}
}
#[tokio::test]
async fn bind_member_handler_response_reports_canonical_protocol_versions() {
let sent: Arc<tokio::sync::Mutex<Vec<CommsCommand>>> =
Arc::new(tokio::sync::Mutex::new(Vec::new()));
let runtime: Arc<dyn CommsRuntime> = Arc::new(CapturingRuntime {
peer_id: PEER_ID_RECEIVER.to_string(),
advertised_address: Some("inproc://receiver".to_string()),
bootstrap_token: Some("expected-token".to_string()),
inbox_notify: Arc::new(tokio::sync::Notify::new()),
sent: sent.clone(),
});
let adapter = Arc::new(MeerkatMachine::ephemeral());
let session_id = SessionId::new();
adapter.register_session(session_id.clone()).await;
let supervisor = supervisor_bridge_spec();
let command = BridgeCommand::BindMember(
meerkat_contracts::wire::supervisor_bridge::BridgeBindPayload {
supervisor: supervisor.clone(),
epoch: 0,
protocol_version: supervisor_bridge_default_protocol_version(),
expected_peer_id: PEER_ID_RECEIVER.to_string(),
expected_address: "inproc://receiver".to_string(),
bootstrap_token: "expected-token".into(),
},
);
let candidate = bridge_candidate(&supervisor.peer_id, &command);
assert!(
try_handle_supervisor_bridge_command(&adapter, &session_id, &runtime, &candidate,)
.await,
"bridge handler must own the BindMember command"
);
let (result, status) = sent
.lock()
.await
.iter()
.find_map(|cmd| match cmd {
CommsCommand::PeerResponse { result, status, .. } => {
Some((result.clone(), *status))
}
_ => None,
})
.expect("handler must send a bind response");
assert!(matches!(
status,
meerkat_core::interaction::ResponseStatus::Completed
));
let reply: BridgeReply = serde_json::from_value(result).expect("typed bridge reply");
let BridgeReply::BindMember(response) = reply else {
panic!("expected bind response");
};
assert_eq!(
response.capabilities.current_protocol_version,
supervisor_bridge_current_protocol_version()
);
assert_eq!(
response.capabilities.default_protocol_version,
supervisor_bridge_default_protocol_version()
);
assert_eq!(
response.capabilities.supported_protocol_versions,
supervisor_bridge_supported_protocol_versions()
);
assert!(
!response.capabilities.hard_cancel_member,
"bind response must not advertise live hard-cancel authority"
);
}
#[tokio::test]
async fn hard_cancel_member_bridge_command_is_rejected_as_unsupported() {
let sent: Arc<tokio::sync::Mutex<Vec<CommsCommand>>> =
Arc::new(tokio::sync::Mutex::new(Vec::new()));
let runtime: Arc<dyn CommsRuntime> = Arc::new(CapturingRuntime {
peer_id: PEER_ID_RECEIVER.to_string(),
advertised_address: Some("inproc://receiver".to_string()),
bootstrap_token: Some("expected-token".to_string()),
inbox_notify: Arc::new(tokio::sync::Notify::new()),
sent: sent.clone(),
});
let adapter = Arc::new(MeerkatMachine::ephemeral());
let session_id = SessionId::new();
adapter.register_session(session_id.clone()).await;
let authorized = TrustedPeerDescriptor::test_only_unsigned_typed(
"mob/__mob_supervisor__",
PeerId::new(),
"inproc://mob/__mob_supervisor__",
)
.expect("valid supervisor spec");
adapter
.stage_supervisor_bind(
&session_id,
authorized.name.to_string(),
authorized.peer_id.as_str(),
authorized.address.to_string(),
11,
)
.await
.expect("pre-bind supervisor");
let command = BridgeCommand::HardCancelMember(
meerkat_contracts::wire::supervisor_bridge::BridgeHardCancelPayload {
supervisor: BridgePeerSpec::from(authorized.clone()),
epoch: 11,
protocol_version: SUPERVISOR_BRIDGE_PROTOCOL_VERSION,
reason: "must not cross supervisor bridge".to_string(),
},
);
let candidate = bridge_candidate(&authorized.peer_id.as_str(), &command);
assert!(
try_handle_supervisor_bridge_command(&adapter, &session_id, &runtime, &candidate,)
.await,
"bridge handler must reject the known but unsupported HardCancelMember command"
);
let (result, status) = sent
.lock()
.await
.iter()
.find_map(|cmd| match cmd {
CommsCommand::PeerResponse { result, status, .. } => {
Some((result.clone(), *status))
}
_ => None,
})
.expect("handler must send a hard-cancel rejection");
assert!(
matches!(status, meerkat_core::interaction::ResponseStatus::Failed),
"unsupported hard-cancel bridge command must fail at the comms boundary"
);
let reply: BridgeReply = serde_json::from_value(result).expect("typed bridge reply");
match reply {
BridgeReply::Rejected { cause, .. } => {
assert_eq!(cause, BridgeRejectionCause::Unsupported);
}
other => unreachable!("expected Rejected reply, got {other:?}"),
}
}
#[tokio::test]
async fn idempotent_ack_invariant_rejects_without_echoing_attacker_fields() {
let sent: Arc<tokio::sync::Mutex<Vec<CommsCommand>>> =
Arc::new(tokio::sync::Mutex::new(Vec::new()));
let runtime: Arc<dyn CommsRuntime> = Arc::new(CapturingRuntime {
peer_id: PEER_ID_RECEIVER.to_string(),
advertised_address: None,
bootstrap_token: Some("expected-token".to_string()),
inbox_notify: Arc::new(tokio::sync::Notify::new()),
sent: sent.clone(),
});
let adapter = Arc::new(MeerkatMachine::ephemeral());
let session_id = SessionId::new();
adapter.register_session(session_id.clone()).await;
let authorized = trusted_supervisor_descriptor(0xbb);
adapter
.stage_supervisor_bind(
&session_id,
authorized.name.to_string(),
authorized.peer_id.as_str(),
authorized.address.to_string(),
7,
)
.await
.expect("pre-bind supervisor");
let attacker_address = "inproc://ATTACKER-ADDRESS-DO-NOT-ECHO".to_string();
let attacker_peer_id = format!("{}-ATTACKER-PEER-ID-DO-NOT-ECHO", PeerId::new().as_str());
let command = BridgeCommand::BindMember(
meerkat_contracts::wire::supervisor_bridge::BridgeBindPayload {
supervisor: BridgePeerSpec::from(authorized.clone()),
epoch: 7,
protocol_version: SUPERVISOR_BRIDGE_PROTOCOL_VERSION,
expected_peer_id: attacker_peer_id.clone(),
expected_address: attacker_address.clone(),
bootstrap_token: "expected-token".into(),
},
);
let candidate = bridge_candidate(&authorized.peer_id.as_str(), &command);
assert!(
try_handle_supervisor_bridge_command(&adapter, &session_id, &runtime, &candidate,)
.await,
"bridge handler must own the BindMember command"
);
let binding = adapter.supervisor_binding(&session_id).await;
let SupervisorBinding::Bound {
peer_id: stored_peer_id,
epoch: stored_epoch,
..
} = binding
else {
panic!("binding must survive invariant failure");
};
assert_eq!(stored_peer_id, authorized.peer_id.as_str());
assert_eq!(stored_epoch, 7);
let sent_commands = sent.lock().await.clone();
let (result, status) = sent_commands
.into_iter()
.find_map(|cmd| match cmd {
CommsCommand::PeerResponse { result, status, .. } => Some((result, status)),
_ => None,
})
.expect("handler must send a PeerResponse for the invariant violation");
assert!(
matches!(status, meerkat_core::interaction::ResponseStatus::Failed),
"invariant violation must surface as Failed status"
);
let reply: BridgeReply =
serde_json::from_value(result.clone()).expect("typed bridge reply");
assert!(
matches!(reply, BridgeReply::Rejected { .. }),
"expected Rejected reply for invariant violation, got: {reply:?}"
);
if let BridgeReply::Rejected { cause, reason } = reply {
assert_eq!(cause, BridgeRejectionCause::Internal);
assert!(
!reason.contains(&attacker_address),
"rejection reason must not echo attacker-supplied address: {reason}"
);
assert!(
!reason.contains(&attacker_peer_id),
"rejection reason must not echo attacker-supplied peer_id: {reason}"
);
}
let raw = serde_json::to_string(&result).expect("serialize reply for attacker-field check");
assert!(
!raw.contains(&attacker_address),
"reply payload must not contain attacker-supplied address: {raw}"
);
assert!(
!raw.contains(&attacker_peer_id),
"reply payload must not contain attacker-supplied peer_id: {raw}"
);
}
#[test]
fn validate_authorize_supervisor_rejects_initial_claim_without_bind() {
let payload = BridgeSupervisorPayload {
supervisor: supervisor_bridge_spec(),
epoch: 0,
protocol_version: SUPERVISOR_BRIDGE_PROTOCOL_VERSION,
};
let (cause, error) = validate_authorize_supervisor_request(
&bridge_sender_fact(&payload.supervisor.peer_id),
&payload,
&SupervisorBinding::Unbound,
)
.expect_err("first supervisor claim must go through bind_member");
assert_eq!(cause, BridgeRejectionCause::NotBound);
assert!(
error.contains("bind_member"),
"initial authorize rejection should direct callers to bind_member, got: {error}"
);
}
#[test]
fn validate_authorize_supervisor_rejects_same_display_name_different_canonical_sender() {
let current_payload = sample_bind_payload();
let state = authorized_state_for(¤t_payload);
let payload = BridgeSupervisorPayload {
supervisor: current_supervisor_bridge_spec(),
epoch: current_payload.epoch + 1,
protocol_version: SUPERVISOR_BRIDGE_PROTOCOL_VERSION,
};
let attacker_peer_id =
PeerId::parse(PEER_ID_OLD_SUPERVISOR).expect("valid attacker peer id");
let sender =
bridge_sender_fact_with_display(attacker_peer_id, ¤t_payload.supervisor.name);
let (cause, error) = validate_authorize_supervisor_request(&sender, &payload, &state)
.expect_err("same display name with a different canonical sender must reject");
assert_eq!(cause, BridgeRejectionCause::SenderMismatch);
assert!(
error.contains("request sender"),
"authorize rejection should explain the sender mismatch, got: {error}"
);
}
#[test]
fn require_authorized_supervisor_rejects_same_display_name_different_canonical_sender() {
let current_payload = sample_bind_payload();
let current = authorized_state_for(¤t_payload);
let payload = BridgeSupervisorPayload {
supervisor: current_payload.supervisor.clone(),
epoch: current_payload.epoch,
protocol_version: SUPERVISOR_BRIDGE_PROTOCOL_VERSION,
};
let attacker_peer_id =
PeerId::parse(PEER_ID_OLD_SUPERVISOR).expect("valid attacker peer id");
let sender =
bridge_sender_fact_with_display(attacker_peer_id, ¤t_payload.supervisor.name);
let (cause, error) = require_authorized_supervisor(&sender, &payload, ¤t)
.expect_err("same display name with a different canonical sender must reject");
assert_eq!(cause, BridgeRejectionCause::SenderMismatch);
assert!(
error.contains("request sender"),
"supervisor command rejection should explain the sender mismatch, got: {error}"
);
}
#[test]
fn validate_bind_request_rejects_empty_bootstrap_token_at_runtime() {
let runtime: Arc<dyn CommsRuntime> = Arc::new(bootstrap_runtime(
PEER_ID_RECEIVER,
"inproc://receiver",
Some(""),
));
let supervisor = supervisor_bridge_spec();
let payload = meerkat_contracts::wire::supervisor_bridge::BridgeBindPayload {
supervisor: supervisor.clone(),
epoch: 0,
protocol_version: SUPERVISOR_BRIDGE_PROTOCOL_VERSION,
expected_peer_id: PEER_ID_RECEIVER.to_string(),
expected_address: runtime.advertised_address().unwrap(),
bootstrap_token: "whatever".into(),
};
let (cause, _error) =
validate_bind_request(&runtime, &bridge_sender_fact(&supervisor.peer_id), &payload)
.expect_err("runtime with empty token must refuse to validate");
assert_eq!(cause, BridgeRejectionCause::InvalidBootstrapToken);
}
#[test]
fn validate_bind_request_rejects_query_string_bootstrap_without_typed_runtime_token() {
let runtime: Arc<dyn CommsRuntime> = Arc::new(bootstrap_runtime(
PEER_ID_RECEIVER,
&format!("inproc://receiver?{SUPERVISOR_BRIDGE_BOOTSTRAP_TOKEN_PARAM}=expected-token"),
None,
));
let supervisor = supervisor_bridge_spec();
let payload = meerkat_contracts::wire::supervisor_bridge::BridgeBindPayload {
supervisor: supervisor.clone(),
epoch: 0,
protocol_version: SUPERVISOR_BRIDGE_PROTOCOL_VERSION,
expected_peer_id: PEER_ID_RECEIVER.to_string(),
expected_address: "inproc://receiver".to_string(),
bootstrap_token: "expected-token".into(),
};
let (cause, error) =
validate_bind_request(&runtime, &bridge_sender_fact(&supervisor.peer_id), &payload)
.expect_err("query-string token must not satisfy typed bootstrap proof");
assert_eq!(cause, BridgeRejectionCause::InvalidBootstrapToken);
assert!(
error.contains("typed bridge bootstrap token"),
"runtime should explain that the typed token field is required, got: {error}"
);
}
fn bridge_candidate(sender: &str, command: &BridgeCommand) -> PeerInputCandidate {
let id = InteractionId(Uuid::new_v4());
let ingress = bridge_sender_fact_with_id(id, sender);
bridge_candidate_with_ingress(sender, command, ingress)
}
fn bridge_candidate_with_ingress(
sender: &str,
command: &BridgeCommand,
ingress: PeerIngressFact,
) -> PeerInputCandidate {
let id = ingress.interaction_id;
PeerInputCandidate {
interaction: InboxInteraction {
id,
from_route: None,
from: sender.to_string(),
content: InteractionContent::Request {
intent: SUPERVISOR_BRIDGE_INTENT.to_string(),
params: serde_json::to_value(command).expect("serialize bridge command"),
blocks: None,
},
rendered_text: String::new(),
handling_mode: HandlingMode::Queue,
render_metadata: None,
},
ingress,
lifecycle_peer: None,
response_terminality: None,
}
}
#[tokio::test]
async fn bind_member_rolls_back_binding_when_trust_publication_fails() {
let payload = sample_bind_payload();
let sender = payload.supervisor.peer_id.clone();
let mut runtime_impl = bootstrap_runtime(
PEER_ID_RECEIVER,
"inproc://receiver",
Some("expected-token"),
);
runtime_impl
.add_trusted_peer_errors
.insert(payload.supervisor.peer_id.clone(), "boom".to_string());
let trusted_peer_ids = runtime_impl.trusted_peer_ids.clone();
let runtime: Arc<dyn CommsRuntime> = Arc::new(runtime_impl);
let adapter = Arc::new(MeerkatMachine::ephemeral());
let session_id = SessionId::new();
adapter.register_session(session_id.clone()).await;
let candidate = bridge_candidate(&sender, &BridgeCommand::BindMember(payload));
assert!(
try_handle_supervisor_bridge_command(&adapter, &session_id, &runtime, &candidate).await
);
assert!(
matches!(
adapter.supervisor_binding(&session_id).await,
SupervisorBinding::Unbound
),
"failed trust publication must roll BindMember back to Unbound"
);
assert!(
trusted_peer_ids.lock().await.is_empty(),
"failed trust publication must not leave the supervisor trusted"
);
}
#[tokio::test]
async fn authorize_supervisor_restores_old_binding_when_new_trust_publish_fails() {
let old_supervisor = trusted_supervisor_descriptor(0xbb);
let new_supervisor = current_supervisor_bridge_spec();
let payload = BridgeSupervisorPayload {
supervisor: new_supervisor.clone(),
epoch: 2,
protocol_version: SUPERVISOR_BRIDGE_PROTOCOL_VERSION,
};
let mut runtime_impl = bootstrap_runtime(
PEER_ID_RECEIVER,
"inproc://receiver",
Some("expected-token"),
);
runtime_impl
.add_trusted_peer_errors
.insert(new_supervisor.peer_id.clone(), "boom".to_string());
let trusted_peer_ids = runtime_impl.trusted_peer_ids.clone();
trusted_peer_ids
.lock()
.await
.insert(old_supervisor.peer_id.as_str());
let runtime: Arc<dyn CommsRuntime> = Arc::new(runtime_impl);
let adapter = Arc::new(MeerkatMachine::ephemeral());
let session_id = SessionId::new();
adapter.register_session(session_id.clone()).await;
adapter
.stage_supervisor_bind(
&session_id,
old_supervisor.name.to_string(),
old_supervisor.peer_id.as_str(),
old_supervisor.address.to_string(),
1,
)
.await
.expect("pre-bind old supervisor");
let candidate = bridge_candidate(
&old_supervisor.peer_id.as_str(),
&BridgeCommand::AuthorizeSupervisor(payload),
);
assert!(
try_handle_supervisor_bridge_command(&adapter, &session_id, &runtime, &candidate).await
);
match adapter.supervisor_binding(&session_id).await {
SupervisorBinding::Bound { peer_id, epoch, .. } => {
assert_eq!(peer_id, old_supervisor.peer_id.as_str());
assert_eq!(epoch, 1);
}
SupervisorBinding::Unbound => panic!("old supervisor must remain bound"),
}
let trusted = trusted_peer_ids.lock().await.clone();
assert!(
trusted.contains(&old_supervisor.peer_id.as_str()),
"old supervisor trust must stay active on failed rotation"
);
assert!(
!trusted.contains(&new_supervisor.peer_id),
"new supervisor trust must not publish on failed rotation"
);
}
#[tokio::test]
async fn authorize_supervisor_rolls_back_when_old_trust_removal_fails() {
let old_supervisor = trusted_supervisor_descriptor(0xbb);
let new_supervisor = current_supervisor_bridge_spec();
let payload = BridgeSupervisorPayload {
supervisor: new_supervisor.clone(),
epoch: 2,
protocol_version: SUPERVISOR_BRIDGE_PROTOCOL_VERSION,
};
let mut runtime_impl = bootstrap_runtime(
PEER_ID_RECEIVER,
"inproc://receiver",
Some("expected-token"),
);
runtime_impl
.remove_trusted_peer_errors
.insert(old_supervisor.peer_id.as_str(), "boom".to_string());
let trusted_peer_ids = runtime_impl.trusted_peer_ids.clone();
trusted_peer_ids
.lock()
.await
.insert(old_supervisor.peer_id.as_str());
let runtime: Arc<dyn CommsRuntime> = Arc::new(runtime_impl);
let adapter = Arc::new(MeerkatMachine::ephemeral());
let session_id = SessionId::new();
adapter.register_session(session_id.clone()).await;
adapter
.stage_supervisor_bind(
&session_id,
old_supervisor.name.to_string(),
old_supervisor.peer_id.as_str(),
old_supervisor.address.to_string(),
1,
)
.await
.expect("pre-bind old supervisor");
let candidate = bridge_candidate(
&old_supervisor.peer_id.as_str(),
&BridgeCommand::AuthorizeSupervisor(payload),
);
assert!(
try_handle_supervisor_bridge_command(&adapter, &session_id, &runtime, &candidate).await
);
match adapter.supervisor_binding(&session_id).await {
SupervisorBinding::Bound { peer_id, epoch, .. } => {
assert_eq!(peer_id, old_supervisor.peer_id.as_str());
assert_eq!(epoch, 1);
}
SupervisorBinding::Unbound => panic!("old supervisor must be restored after rollback"),
}
let trusted = trusted_peer_ids.lock().await.clone();
assert!(
trusted.contains(&old_supervisor.peer_id.as_str()),
"old supervisor trust must remain after rollback"
);
assert!(
!trusted.contains(&new_supervisor.peer_id),
"new supervisor trust must be cleaned up after rollback"
);
}
#[tokio::test]
async fn revoke_supervisor_keeps_authority_when_trust_removal_fails() {
let supervisor = supervisor_bridge_spec();
let mut runtime_impl = bootstrap_runtime(
PEER_ID_RECEIVER,
"inproc://receiver",
Some("expected-token"),
);
runtime_impl
.remove_trusted_peer_errors
.insert(supervisor.peer_id.clone(), "boom".to_string());
let runtime: Arc<dyn CommsRuntime> = Arc::new(runtime_impl);
let adapter = Arc::new(MeerkatMachine::ephemeral());
let session_id = SessionId::new();
adapter.register_session(session_id.clone()).await;
let payload = BridgeSupervisorPayload {
supervisor: supervisor.clone(),
epoch: 1,
protocol_version: SUPERVISOR_BRIDGE_PROTOCOL_VERSION,
};
let candidate = bridge_candidate(
&supervisor.peer_id,
&BridgeCommand::RevokeSupervisor(payload.clone()),
);
let spec =
TrustedPeerDescriptor::try_from(supervisor.clone()).expect("valid supervisor spec");
adapter
.stage_supervisor_bind(
&session_id,
spec.name.to_string(),
spec.peer_id.as_str(),
spec.address.to_string(),
payload.epoch,
)
.await
.expect("pre-bind supervisor");
assert!(
try_handle_supervisor_bridge_command(&adapter, &session_id, &runtime, &candidate,)
.await,
"revoke command should be handled"
);
assert!(
matches!(
adapter.supervisor_binding(&session_id).await,
SupervisorBinding::Bound { .. }
),
"failed revoke must preserve supervisor authority until trust removal succeeds"
);
}
#[tokio::test]
async fn dsl_supervisor_guards_block_rebind_and_stale_revoke() {
let adapter = Arc::new(MeerkatMachine::ephemeral());
let session_id = SessionId::new();
adapter.register_session(session_id.clone()).await;
adapter
.stage_supervisor_bind(
&session_id,
"super-a".to_string(),
"ed25519:super-a".to_string(),
"inproc://super-a".to_string(),
1,
)
.await
.expect("initial bind");
let rebind = adapter
.stage_supervisor_bind(
&session_id,
"super-b".to_string(),
"ed25519:super-b".to_string(),
"inproc://super-b".to_string(),
2,
)
.await;
assert!(
matches!(rebind, Err(SupervisorBindingStageError::Dsl(_))),
"double bind must be rejected by DSL guard, got: {rebind:?}"
);
let stale_peer_revoke = adapter
.stage_supervisor_revoke(&session_id, "ed25519:super-b".to_string(), 1)
.await;
assert!(
matches!(stale_peer_revoke, Err(SupervisorBindingStageError::Dsl(_))),
"revoke with mismatched peer_id must be DSL-rejected, got: {stale_peer_revoke:?}"
);
let stale_epoch_revoke = adapter
.stage_supervisor_revoke(&session_id, "ed25519:super-a".to_string(), 99)
.await;
assert!(
matches!(stale_epoch_revoke, Err(SupervisorBindingStageError::Dsl(_))),
"revoke with mismatched epoch must be DSL-rejected, got: {stale_epoch_revoke:?}"
);
adapter
.stage_supervisor_authorize(
&session_id,
"super-c".to_string(),
"ed25519:super-c".to_string(),
"inproc://super-c".to_string(),
2,
)
.await
.expect("rotation must succeed");
match adapter.supervisor_binding(&session_id).await {
SupervisorBinding::Bound { peer_id, epoch, .. } => {
assert_eq!(peer_id, "ed25519:super-c");
assert_eq!(epoch, 2);
}
SupervisorBinding::Unbound => panic!("expected Bound after rotation"),
}
adapter
.stage_supervisor_revoke(&session_id, "ed25519:super-c".to_string(), 2)
.await
.expect("matching revoke must succeed");
assert!(
matches!(
adapter.supervisor_binding(&session_id).await,
SupervisorBinding::Unbound
),
"binding must be Unbound after matching revoke"
);
}
#[tokio::test]
async fn dsl_supervisor_trust_publish_ack_stale_epoch_is_rejected() {
let adapter = Arc::new(MeerkatMachine::ephemeral());
let session_id = SessionId::new();
adapter.register_session(session_id.clone()).await;
adapter
.stage_supervisor_bind(
&session_id,
"super-a".to_string(),
"ed25519:super-a".to_string(),
"inproc://super-a".to_string(),
1,
)
.await
.expect("initial bind");
adapter
.stage_supervisor_authorize(
&session_id,
"super-a".to_string(),
"ed25519:super-a".to_string(),
"inproc://super-a".to_string(),
2,
)
.await
.expect("rotation to epoch 2");
let stale = adapter
.stage_supervisor_trust_published(&session_id, "ed25519:super-a".to_string(), 1)
.await;
assert!(
matches!(stale, Err(SupervisorBindingStageError::Dsl(_))),
"stale-epoch publish ack must be DSL-rejected, got: {stale:?}"
);
match adapter.supervisor_binding(&session_id).await {
SupervisorBinding::Bound { epoch, .. } => {
assert_eq!(epoch, 2, "binding must still be at epoch 2");
}
SupervisorBinding::Unbound => panic!("stale ack must not unbind"),
}
adapter
.stage_supervisor_trust_published(&session_id, "ed25519:super-a".to_string(), 2)
.await
.expect("matching-epoch ack must be accepted");
}
#[tokio::test]
async fn dsl_supervisor_trust_revoke_ack_stale_epoch_is_rejected() {
let adapter = Arc::new(MeerkatMachine::ephemeral());
let session_id = SessionId::new();
adapter.register_session(session_id.clone()).await;
adapter
.stage_supervisor_bind(
&session_id,
"super-a".to_string(),
"ed25519:super-a".to_string(),
"inproc://super-a".to_string(),
1,
)
.await
.expect("initial bind");
adapter
.stage_supervisor_authorize(
&session_id,
"super-a".to_string(),
"ed25519:super-a".to_string(),
"inproc://super-a".to_string(),
2,
)
.await
.expect("rotation to epoch 2");
let stale = adapter
.stage_supervisor_trust_revoked(&session_id, "ed25519:super-a".to_string(), 1)
.await;
assert!(
matches!(stale, Err(SupervisorBindingStageError::Dsl(_))),
"stale-epoch revoke ack must be DSL-rejected, got: {stale:?}"
);
match adapter.supervisor_binding(&session_id).await {
SupervisorBinding::Bound { epoch, .. } => {
assert_eq!(epoch, 2, "binding must still be at epoch 2");
}
SupervisorBinding::Unbound => panic!("stale revoke ack must not unbind"),
}
adapter
.stage_supervisor_trust_revoked(&session_id, "ed25519:super-a".to_string(), 2)
.await
.expect("matching-epoch revoke ack must be accepted");
}
#[tokio::test]
async fn dsl_supervisor_trust_ack_rejects_mismatched_peer_id() {
let adapter = Arc::new(MeerkatMachine::ephemeral());
let session_id = SessionId::new();
adapter.register_session(session_id.clone()).await;
adapter
.stage_supervisor_bind(
&session_id,
"super-a".to_string(),
"ed25519:super-a".to_string(),
"inproc://super-a".to_string(),
7,
)
.await
.expect("initial bind");
let wrong_peer = adapter
.stage_supervisor_trust_published(&session_id, "ed25519:unrelated".to_string(), 7)
.await;
assert!(
matches!(wrong_peer, Err(SupervisorBindingStageError::Dsl(_))),
"mismatched-peer ack must be DSL-rejected, got: {wrong_peer:?}"
);
}
#[tokio::test]
async fn dsl_supervisor_trust_ack_rejected_when_unbound() {
let adapter = Arc::new(MeerkatMachine::ephemeral());
let session_id = SessionId::new();
adapter.register_session(session_id.clone()).await;
let unbound = adapter
.stage_supervisor_trust_published(&session_id, "ed25519:super-a".to_string(), 1)
.await;
assert!(
matches!(unbound, Err(SupervisorBindingStageError::Dsl(_))),
"publish ack must be rejected when Unbound, got: {unbound:?}"
);
}
#[tokio::test]
async fn wire_member_rejects_invalid_peer_spec_before_trusting_it() {
let runtime = Arc::new(meerkat_comms::CommsRuntime::inproc_only("receiver-wire").unwrap());
let supervisor_runtime =
Arc::new(meerkat_comms::CommsRuntime::inproc_only("mob/__mob_supervisor__").unwrap());
let adapter = Arc::new(MeerkatMachine::ephemeral());
let session_id = SessionId::new();
adapter.register_session(session_id.clone()).await;
let supervisor = trusted_peer_from_runtime("mob/__mob_supervisor__", &supervisor_runtime);
runtime
.add_trusted_peer(supervisor.clone())
.await
.expect("trust supervisor");
adapter
.stage_supervisor_bind(
&session_id,
supervisor.name.to_string(),
supervisor.peer_id.as_str(),
supervisor.address.to_string(),
1,
)
.await
.expect("pre-bind supervisor");
let candidate = bridge_candidate(
&supervisor.peer_id.as_str(),
&BridgeCommand::WireMember(BridgePeerWiringPayload {
supervisor: supervisor.clone().into(),
epoch: 1,
protocol_version: SUPERVISOR_BRIDGE_PROTOCOL_VERSION,
peer_spec: BridgePeerSpec {
name: "".to_string(),
peer_id: PeerId::new().as_str(),
address: "inproc://peer".to_string(),
pubkey: [0u8; 32],
},
}),
);
assert!(
try_handle_supervisor_bridge_command(
&adapter,
&session_id,
&(runtime.clone() as Arc<dyn CommsRuntime>),
&candidate,
)
.await,
"wire bridge command should be handled"
);
let peers = runtime.peers().await;
assert!(
peers.iter().all(|entry| !entry.name.as_str().is_empty()),
"invalid wire peer specs must not be materialized in comms trust"
);
}
}
fn spawn_completion_bridge(
comms_runtime: Option<Arc<dyn CommsRuntime>>,
interaction_id: meerkat_core::interaction::InteractionId,
subscriber: Option<mpsc::Sender<AgentEvent>>,
handle: Option<crate::completion::CompletionHandle>,
) {
crate::tokio::spawn(async move {
let outcome = match handle {
Some(handle) => handle.wait().await,
None => CompletionOutcome::CompletedWithoutResult,
};
if let Some(tx) = subscriber {
let event = interaction_terminal_event(interaction_id, outcome);
if crate::tokio::time::timeout(std::time::Duration::from_secs(5), tx.send(event))
.await
.is_err()
{
tracing::warn!(
%interaction_id,
"completion bridge dropped terminal event: subscriber send timed out after 5s"
);
}
}
if let Some(runtime) = comms_runtime {
runtime.mark_interaction_complete(&interaction_id);
}
});
}