use crate::event::{AgentEvent, EventEnvelope};
use crate::interaction::{InteractionId, ResponseStatus};
use crate::types::{ContentBlock, HandlingMode};
use futures::Stream;
use serde::{Deserialize, Serialize};
use std::any::Any;
use std::collections::BTreeMap;
use std::pin::Pin;
use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
};
use uuid::Uuid;
pub const SUPERVISOR_BRIDGE_INTENT: &str = "supervisor.bridge";
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum CommsPeerRequestIntent {
#[serde(rename = "supervisor.bridge")]
SupervisorBridge,
#[serde(rename = "checksum_token")]
ChecksumToken,
}
impl CommsPeerRequestIntent {
pub const fn as_str(&self) -> &'static str {
match self {
Self::SupervisorBridge => SUPERVISOR_BRIDGE_INTENT,
Self::ChecksumToken => "checksum_token",
}
}
}
impl std::fmt::Display for CommsPeerRequestIntent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
pub struct PeerId(#[cfg_attr(feature = "schema", schemars(with = "String"))] pub Uuid);
const PEER_ID_ED25519_PUBKEY_NAMESPACE: Uuid =
Uuid::from_u128(0x6d65_6572_6b61_7450_6565_7249_6430_0001);
impl PeerId {
pub fn new() -> Self {
Self(crate::time_compat::new_uuid_v7())
}
pub const fn from_uuid(uuid: Uuid) -> Self {
Self(uuid)
}
pub fn parse(s: &str) -> Result<Self, PeerIdError> {
Uuid::parse_str(s)
.map(Self)
.map_err(|source| PeerIdError::Invalid {
input: s.to_string(),
source,
})
}
pub fn from_ed25519_pubkey(pubkey: &[u8; 32]) -> Self {
Self(uuid_v5_from_bytes(
&PEER_ID_ED25519_PUBKEY_NAMESPACE,
pubkey,
))
}
pub fn as_str(&self) -> String {
self.0.to_string()
}
pub const fn as_uuid(&self) -> &Uuid {
&self.0
}
}
fn uuid_v5_from_bytes(namespace: &Uuid, name: &[u8]) -> Uuid {
let digest = sha1_digest_bytes(&[namespace.as_bytes(), name]);
let mut bytes = [0u8; 16];
bytes.copy_from_slice(&digest[..16]);
bytes[6] = (bytes[6] & 0x0f) | 0x50;
bytes[8] = (bytes[8] & 0x3f) | 0x80;
Uuid::from_bytes(bytes)
}
fn sha1_digest_bytes(parts: &[&[u8]]) -> [u8; 20] {
let total_len = parts.iter().map(|part| part.len()).sum::<usize>();
let mut message = Vec::with_capacity(((total_len + 9).div_ceil(64)) * 64);
for part in parts {
message.extend_from_slice(part);
}
let bit_len = (total_len as u64) * 8;
message.push(0x80);
while message.len() % 64 != 56 {
message.push(0);
}
message.extend_from_slice(&bit_len.to_be_bytes());
let mut h0 = 0x6745_2301u32;
let mut h1 = 0xefcd_ab89u32;
let mut h2 = 0x98ba_dcfeu32;
let mut h3 = 0x1032_5476u32;
let mut h4 = 0xc3d2_e1f0u32;
for chunk in message.chunks_exact(64) {
let mut schedule = [0u32; 80];
for (word_index, word) in schedule.iter_mut().take(16).enumerate() {
let offset = word_index * 4;
*word = u32::from_be_bytes([
chunk[offset],
chunk[offset + 1],
chunk[offset + 2],
chunk[offset + 3],
]);
}
for word_index in 16..80 {
schedule[word_index] = (schedule[word_index - 3]
^ schedule[word_index - 8]
^ schedule[word_index - 14]
^ schedule[word_index - 16])
.rotate_left(1);
}
let mut work_a = h0;
let mut work_b = h1;
let mut work_c = h2;
let mut work_d = h3;
let mut work_e = h4;
for (round_index, word) in schedule.iter().enumerate() {
let (round_function, round_constant) = match round_index {
0..=19 => ((work_b & work_c) | ((!work_b) & work_d), 0x5a82_7999),
20..=39 => (work_b ^ work_c ^ work_d, 0x6ed9_eba1),
40..=59 => (
(work_b & work_c) | (work_b & work_d) | (work_c & work_d),
0x8f1b_bcdc,
),
_ => (work_b ^ work_c ^ work_d, 0xca62_c1d6),
};
let temp = work_a
.rotate_left(5)
.wrapping_add(round_function)
.wrapping_add(work_e)
.wrapping_add(round_constant)
.wrapping_add(*word);
work_e = work_d;
work_d = work_c;
work_c = work_b.rotate_left(30);
work_b = work_a;
work_a = temp;
}
h0 = h0.wrapping_add(work_a);
h1 = h1.wrapping_add(work_b);
h2 = h2.wrapping_add(work_c);
h3 = h3.wrapping_add(work_d);
h4 = h4.wrapping_add(work_e);
}
let mut digest = [0u8; 20];
for (offset, value) in [h0, h1, h2, h3, h4].into_iter().enumerate() {
digest[offset * 4..offset * 4 + 4].copy_from_slice(&value.to_be_bytes());
}
digest
}
impl Default for PeerId {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Display for PeerId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
pub enum PeerIdError {
#[error("invalid peer id {input:?}: {source}")]
Invalid {
input: String,
#[source]
source: uuid::Error,
},
}
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
#[non_exhaustive]
pub enum PeerTransport {
Inproc,
Uds,
Tcp,
}
impl PeerTransport {
pub const fn as_scheme(&self) -> &'static str {
match self {
Self::Inproc => "inproc",
Self::Uds => "uds",
Self::Tcp => "tcp",
}
}
}
impl std::fmt::Display for PeerTransport {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_scheme())
}
}
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct PeerAddress {
pub transport: PeerTransport,
pub endpoint: String,
}
#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
pub enum PeerAddressParseError {
#[error("peer address missing transport scheme: {input}")]
MissingTransportScheme { input: String },
#[error("unknown peer address transport {scheme:?} in address {input:?}")]
UnknownTransport { input: String, scheme: String },
}
impl PeerAddress {
pub fn new(transport: PeerTransport, endpoint: impl Into<String>) -> Self {
Self {
transport,
endpoint: endpoint.into(),
}
}
pub const fn transport(&self) -> PeerTransport {
self.transport
}
pub fn endpoint(&self) -> &str {
&self.endpoint
}
pub fn parse(raw: impl AsRef<str>) -> Result<Self, PeerAddressParseError> {
let raw = raw.as_ref();
let (scheme, endpoint) =
raw.split_once("://")
.ok_or_else(|| PeerAddressParseError::MissingTransportScheme {
input: raw.to_string(),
})?;
let transport = match scheme {
"inproc" => PeerTransport::Inproc,
"uds" => PeerTransport::Uds,
"tcp" => PeerTransport::Tcp,
other => {
return Err(PeerAddressParseError::UnknownTransport {
input: raw.to_string(),
scheme: other.to_string(),
});
}
};
Ok(Self::new(transport, endpoint))
}
}
impl std::fmt::Display for PeerAddress {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}://{}", self.transport.as_scheme(), self.endpoint)
}
}
impl std::str::FromStr for PeerAddress {
type Err = PeerAddressParseError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Self::parse(s)
}
}
impl TryFrom<&str> for PeerAddress {
type Error = PeerAddressParseError;
fn try_from(value: &str) -> Result<Self, Self::Error> {
Self::parse(value)
}
}
impl TryFrom<String> for PeerAddress {
type Error = PeerAddressParseError;
fn try_from(value: String) -> Result<Self, Self::Error> {
Self::parse(value)
}
}
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct PeerName(String);
impl PeerName {
pub fn new(name: impl Into<String>) -> Result<Self, String> {
let name = name.into();
if name.trim().is_empty() {
return Err("peer name cannot be empty".to_string());
}
if name.chars().any(char::is_control) {
return Err("peer name cannot contain control characters".to_string());
}
Ok(Self(name))
}
pub fn as_str(&self) -> &str {
&self.0
}
pub fn as_string(&self) -> String {
self.0.clone()
}
}
impl AsRef<str> for PeerName {
fn as_ref(&self) -> &str {
self.as_str()
}
}
impl std::fmt::Display for PeerName {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
impl From<PeerName> for String {
fn from(peer_name: PeerName) -> Self {
peer_name.0
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PeerRoute {
pub peer_id: PeerId,
pub display_name: Option<PeerName>,
}
impl PeerRoute {
pub fn new(peer_id: PeerId) -> Self {
Self {
peer_id,
display_name: None,
}
}
pub fn with_display_name(peer_id: PeerId, display_name: PeerName) -> Self {
Self {
peer_id,
display_name: Some(display_name),
}
}
pub fn label(&self) -> String {
self.display_name
.as_ref()
.map(PeerName::as_string)
.unwrap_or_else(|| self.peer_id.to_string())
}
}
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TrustedPeerDescriptor {
pub peer_id: PeerId,
pub name: PeerName,
pub address: PeerAddress,
pub pubkey: [u8; 32],
}
#[derive(Debug, Clone)]
pub struct CommsTrustMutationAuthority {
source_kind: GeneratedCommsTrustAuthoritySourceKind,
source_epoch: u64,
source_owner_token: Option<Arc<dyn Any + Send + Sync>>,
trust_row_owner_kind: GeneratedCommsTrustAuthoritySourceKind,
operation: GeneratedCommsTrustAuthorityOperation,
peer_id: String,
trust_store_peer_id: Option<String>,
peer_descriptor: Option<TrustedPeerDescriptor>,
consumed: Arc<AtomicBool>,
}
#[derive(Clone)]
pub struct GeneratedPeerCommsOwnerToken {
inner: Arc<dyn Any + Send + Sync>,
}
impl std::fmt::Debug for GeneratedPeerCommsOwnerToken {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("GeneratedPeerCommsOwnerToken").finish()
}
}
impl GeneratedPeerCommsOwnerToken {
#[cfg_attr(
any(test, not(meerkat_internal_generated_authority_bridge)),
allow(dead_code)
)]
pub(crate) fn from_generated_owner_token(inner: Arc<dyn Any + Send + Sync>) -> Self {
Self { inner }
}
pub fn same_owner(&self, other: &Self) -> bool {
Arc::ptr_eq(&self.inner, &other.inner)
}
fn matches_raw_owner(&self, other: &Arc<dyn Any + Send + Sync>) -> bool {
Arc::ptr_eq(&self.inner, other)
}
}
#[doc(hidden)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub enum GeneratedCommsTrustAuthoritySourceKind {
MeerkatMachinePeerProjection,
MeerkatMachineSupervisorPublish,
MeerkatMachineSupervisorRevoke,
MobMachineMemberTrustWiring,
MobMachineMemberTrustUnwiring,
MobMachineExternalPeerTrustWiring,
MobMachineExternalPeerTrustUnwiring,
MobMachineExternalPeerTrustRepair,
MobMachineExternalPeerReciprocalTrust,
}
#[doc(hidden)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum GeneratedCommsTrustAuthorityOperation {
PublicAdd,
PublicRemove,
PrivateAdd,
PrivateRemove,
}
impl CommsTrustMutationAuthority {
#[cfg_attr(not(meerkat_internal_generated_authority_bridge), allow(dead_code))]
#[allow(clippy::too_many_arguments)]
fn from_generated_parts(
source_kind: GeneratedCommsTrustAuthoritySourceKind,
source_epoch: u64,
source_owner_token: Option<Arc<dyn Any + Send + Sync>>,
trust_row_owner_kind: GeneratedCommsTrustAuthoritySourceKind,
operation: GeneratedCommsTrustAuthorityOperation,
peer_id: impl Into<String>,
trust_store_peer_id: Option<String>,
peer_descriptor: Option<TrustedPeerDescriptor>,
) -> Result<Self, String> {
let peer_id = peer_id.into();
if matches!(
operation,
GeneratedCommsTrustAuthorityOperation::PublicAdd
| GeneratedCommsTrustAuthorityOperation::PrivateAdd
) && peer_descriptor.is_none()
{
return Err(format!(
"generated comms trust add for peer {peer_id:?} requires a trusted peer descriptor"
));
}
if let Some(peer) = peer_descriptor.as_ref()
&& peer.peer_id.to_string() != peer_id
{
return Err(format!(
"generated comms trust descriptor peer_id {} does not match requested {:?}",
peer.peer_id, peer_id,
));
}
if matches!(
operation,
GeneratedCommsTrustAuthorityOperation::PublicRemove
| GeneratedCommsTrustAuthorityOperation::PrivateRemove
) && peer_descriptor.is_some()
{
return Err(format!(
"generated comms trust remove for peer {peer_id:?} must not carry a trusted peer descriptor"
));
}
Ok(Self {
source_kind,
source_epoch,
source_owner_token,
trust_row_owner_kind,
operation,
peer_id,
trust_store_peer_id,
peer_descriptor,
consumed: Arc::new(AtomicBool::new(false)),
})
}
pub fn validate_public_add(
&self,
trust_store_peer_id: Option<PeerId>,
peer: &TrustedPeerDescriptor,
) -> Result<(), String> {
self.validate_add_operation(
GeneratedCommsTrustAuthorityOperation::PublicAdd,
trust_store_peer_id,
peer,
"add a public trusted peer",
)
}
pub fn validate_public_remove(
&self,
trust_store_peer_id: Option<PeerId>,
peer_id: PeerId,
) -> Result<(), String> {
self.validate_operation(
GeneratedCommsTrustAuthorityOperation::PublicRemove,
trust_store_peer_id,
peer_id,
"remove a public trusted peer",
)
}
pub fn validate_private_add(
&self,
trust_store_peer_id: Option<PeerId>,
peer: &TrustedPeerDescriptor,
) -> Result<(), String> {
self.validate_add_operation(
GeneratedCommsTrustAuthorityOperation::PrivateAdd,
trust_store_peer_id,
peer,
"add a private trusted peer",
)
}
pub fn validate_private_remove(
&self,
trust_store_peer_id: Option<PeerId>,
peer_id: PeerId,
) -> Result<(), String> {
self.validate_operation(
GeneratedCommsTrustAuthorityOperation::PrivateRemove,
trust_store_peer_id,
peer_id,
"remove a private trusted peer",
)
}
pub fn preflight_public_add(
&self,
trust_store_peer_id: Option<PeerId>,
peer: &TrustedPeerDescriptor,
) -> Result<(), String> {
self.preflight_add_operation(
GeneratedCommsTrustAuthorityOperation::PublicAdd,
trust_store_peer_id,
peer,
"add a public trusted peer",
)
}
pub fn preflight_public_remove(
&self,
trust_store_peer_id: Option<PeerId>,
peer_id: PeerId,
) -> Result<(), String> {
self.preflight_operation(
GeneratedCommsTrustAuthorityOperation::PublicRemove,
trust_store_peer_id,
peer_id,
"remove a public trusted peer",
)
}
fn validate_operation(
&self,
operation: GeneratedCommsTrustAuthorityOperation,
trust_store_peer_id: Option<PeerId>,
peer_id: PeerId,
action: &'static str,
) -> Result<(), String> {
if self.operation != operation {
return Err(format!(
"trust authority from {:?} for {:?} cannot {action}",
self.source_kind, self.operation,
));
}
self.validate_peer_match(peer_id)?;
self.validate_trust_store_peer_match(trust_store_peer_id)?;
self.consume_once()
}
fn preflight_operation(
&self,
operation: GeneratedCommsTrustAuthorityOperation,
trust_store_peer_id: Option<PeerId>,
peer_id: PeerId,
action: &'static str,
) -> Result<(), String> {
if self.operation != operation {
return Err(format!(
"trust authority from {:?} for {:?} cannot {action}",
self.source_kind, self.operation,
));
}
self.validate_peer_match(peer_id)?;
self.validate_trust_store_peer_match(trust_store_peer_id)
}
fn validate_add_operation(
&self,
operation: GeneratedCommsTrustAuthorityOperation,
trust_store_peer_id: Option<PeerId>,
peer: &TrustedPeerDescriptor,
action: &'static str,
) -> Result<(), String> {
if self.operation != operation {
return Err(format!(
"trust authority from {:?} for {:?} cannot {action}",
self.source_kind, self.operation,
));
}
self.validate_peer_match(peer.peer_id)?;
self.validate_peer_descriptor_match(peer)?;
self.validate_trust_store_peer_match(trust_store_peer_id)?;
self.consume_once()
}
fn preflight_add_operation(
&self,
operation: GeneratedCommsTrustAuthorityOperation,
trust_store_peer_id: Option<PeerId>,
peer: &TrustedPeerDescriptor,
action: &'static str,
) -> Result<(), String> {
if self.operation != operation {
return Err(format!(
"trust authority from {:?} for {:?} cannot {action}",
self.source_kind, self.operation,
));
}
self.validate_peer_match(peer.peer_id)?;
self.validate_peer_descriptor_match(peer)?;
self.validate_trust_store_peer_match(trust_store_peer_id)
}
fn consume_once(&self) -> Result<(), String> {
self.consumed
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.map(|_| ())
.map_err(|_| "generated comms trust authority was already consumed".to_string())
}
fn validate_peer_match(&self, peer_id: PeerId) -> Result<(), String> {
let expected = self.peer_id();
if expected == peer_id.to_string() {
Ok(())
} else {
Err(format!(
"trust authority peer_id {expected:?} does not match mutation peer_id {peer_id}"
))
}
}
fn validate_trust_store_peer_match(
&self,
trust_store_peer_id: Option<PeerId>,
) -> Result<(), String> {
let Some(expected) = self.trust_store_peer_id.as_deref() else {
return Ok(());
};
let Some(actual) = trust_store_peer_id else {
return Err(format!(
"trust authority from {:?} requires trust-store peer_id {expected:?}, but the target runtime did not expose one",
self.source_kind,
));
};
if expected == actual.to_string() {
Ok(())
} else {
Err(format!(
"trust authority from {:?} for peer {:?} targets trust-store peer_id {expected:?}, not {actual}",
self.source_kind,
self.peer_id(),
))
}
}
fn validate_peer_descriptor_match(&self, peer: &TrustedPeerDescriptor) -> Result<(), String> {
let Some(expected) = self.peer_descriptor.as_ref() else {
return Err(format!(
"trust authority from {:?} for {:?} did not carry a generated peer descriptor",
self.source_kind, self.operation,
));
};
if expected == peer {
Ok(())
} else {
Err(format!(
"trust authority descriptor for peer {:?} does not match mutation descriptor",
self.peer_id()
))
}
}
fn peer_id(&self) -> &str {
self.peer_id.as_str()
}
pub fn source_epoch(&self) -> u64 {
self.source_epoch
}
pub fn validate_source_owner_token(
&self,
expected: Option<&GeneratedPeerCommsOwnerToken>,
) -> Result<(), String> {
let Some(actual) = self.source_owner_token.as_ref() else {
return Err(format!(
"trust authority from {:?} did not carry a generated owner token",
self.source_kind,
));
};
let Some(expected) = expected else {
return Err(format!(
"trust authority from {:?} requires the target runtime's generated owner token",
self.source_kind,
));
};
if expected.matches_raw_owner(actual) {
Ok(())
} else {
Err(format!(
"trust authority from {:?} was minted by a different generated owner",
self.source_kind,
))
}
}
pub fn validate_target_source_owner_token(
&self,
expected_meerkat_machine_owner: Option<&GeneratedPeerCommsOwnerToken>,
expected_mob_machine_owner: Option<&Arc<dyn Any + Send + Sync>>,
) -> Result<(), String> {
if is_meerkat_machine_trust_source(self.source_kind) {
self.validate_source_owner_token(expected_meerkat_machine_owner)
} else if is_mob_machine_trust_source(self.source_kind) {
self.validate_raw_source_owner_token(expected_mob_machine_owner)
} else {
Err(format!(
"trust authority from {:?} has no target owner validator",
self.source_kind,
))
}
}
pub fn validate_raw_source_owner_token(
&self,
expected: Option<&Arc<dyn Any + Send + Sync>>,
) -> Result<(), String> {
let Some(actual) = self.source_owner_token.as_ref() else {
return Err(format!(
"trust authority from {:?} did not carry a generated owner token",
self.source_kind,
));
};
let Some(expected) = expected else {
return Err(format!(
"trust authority from {:?} requires the target runtime's generated owner token",
self.source_kind,
));
};
if Arc::ptr_eq(actual, expected) {
Ok(())
} else {
Err(format!(
"trust authority from {:?} was minted by a different generated owner",
self.source_kind,
))
}
}
pub fn is_mob_machine_source(&self) -> bool {
is_mob_machine_trust_source(self.source_kind)
}
pub fn trust_row_owner_kind(&self) -> GeneratedCommsTrustAuthoritySourceKind {
self.trust_row_owner_kind
}
}
#[cfg(all(meerkat_internal_generated_authority_bridge, not(test)))]
#[allow(improper_ctypes_definitions, unsafe_code)]
unsafe extern "Rust" {
#[link_name = concat!(
"__meerkat_runtime_generated_authority_bridge_token_is_valid_v1_comms_trust_reconcile_",
env!("MEERKAT_GENERATED_AUTHORITY_BRIDGE_SYMBOL_SUFFIX")
)]
fn runtime_comms_trust_reconcile_generated_authority_bridge_token_is_valid(
token: &(dyn std::any::Any + Send + Sync),
) -> bool;
#[link_name = concat!(
"__meerkat_runtime_generated_authority_bridge_token_is_valid_v1_supervisor_trust_publish_",
env!("MEERKAT_GENERATED_AUTHORITY_BRIDGE_SYMBOL_SUFFIX")
)]
fn runtime_supervisor_trust_publish_generated_authority_bridge_token_is_valid(
token: &(dyn std::any::Any + Send + Sync),
) -> bool;
#[link_name = concat!(
"__meerkat_runtime_generated_authority_bridge_token_is_valid_v1_supervisor_trust_revoke_",
env!("MEERKAT_GENERATED_AUTHORITY_BRIDGE_SYMBOL_SUFFIX")
)]
fn runtime_supervisor_trust_revoke_generated_authority_bridge_token_is_valid(
token: &(dyn std::any::Any + Send + Sync),
) -> bool;
#[link_name = concat!(
"__meerkat_mob_generated_authority_bridge_token_is_valid_v1_mob_member_trust_wiring_",
env!("MEERKAT_GENERATED_AUTHORITY_BRIDGE_SYMBOL_SUFFIX")
)]
fn mob_member_trust_wiring_generated_authority_bridge_token_is_valid(
token: &(dyn std::any::Any + Send + Sync),
) -> bool;
#[link_name = concat!(
"__meerkat_mob_generated_authority_bridge_token_is_valid_v1_mob_member_trust_unwiring_",
env!("MEERKAT_GENERATED_AUTHORITY_BRIDGE_SYMBOL_SUFFIX")
)]
fn mob_member_trust_unwiring_generated_authority_bridge_token_is_valid(
token: &(dyn std::any::Any + Send + Sync),
) -> bool;
#[link_name = concat!(
"__meerkat_mob_generated_authority_bridge_token_is_valid_v1_mob_external_peer_trust_wiring_",
env!("MEERKAT_GENERATED_AUTHORITY_BRIDGE_SYMBOL_SUFFIX")
)]
fn mob_external_peer_trust_wiring_generated_authority_bridge_token_is_valid(
token: &(dyn std::any::Any + Send + Sync),
) -> bool;
#[link_name = concat!(
"__meerkat_mob_generated_authority_bridge_token_is_valid_v1_mob_external_peer_trust_unwiring_",
env!("MEERKAT_GENERATED_AUTHORITY_BRIDGE_SYMBOL_SUFFIX")
)]
fn mob_external_peer_trust_unwiring_generated_authority_bridge_token_is_valid(
token: &(dyn std::any::Any + Send + Sync),
) -> bool;
#[link_name = concat!(
"__meerkat_mob_generated_authority_bridge_token_is_valid_v1_mob_external_peer_trust_repair_",
env!("MEERKAT_GENERATED_AUTHORITY_BRIDGE_SYMBOL_SUFFIX")
)]
fn mob_external_peer_trust_repair_generated_authority_bridge_token_is_valid(
token: &(dyn std::any::Any + Send + Sync),
) -> bool;
#[link_name = concat!(
"__meerkat_mob_generated_authority_bridge_token_is_valid_v1_mob_external_peer_reciprocal_trust_",
env!("MEERKAT_GENERATED_AUTHORITY_BRIDGE_SYMBOL_SUFFIX")
)]
fn mob_external_peer_reciprocal_trust_generated_authority_bridge_token_is_valid(
token: &(dyn std::any::Any + Send + Sync),
) -> bool;
}
#[cfg(all(meerkat_internal_generated_authority_bridge, not(test)))]
#[doc(hidden)]
#[allow(improper_ctypes_definitions, unsafe_code)]
#[allow(clippy::too_many_arguments)]
#[unsafe(export_name = concat!(
"__meerkat_core_runtime_generated_comms_trust_authority_build_v1_",
env!("MEERKAT_GENERATED_AUTHORITY_BRIDGE_SYMBOL_SUFFIX")
))]
pub(crate) extern "Rust" fn runtime_generated_comms_trust_authority_build(
token: &'static (dyn std::any::Any + Send + Sync),
source_kind: GeneratedCommsTrustAuthoritySourceKind,
source_epoch: u64,
source_owner_token: Option<Arc<dyn Any + Send + Sync>>,
trust_row_owner_kind: GeneratedCommsTrustAuthoritySourceKind,
operation: GeneratedCommsTrustAuthorityOperation,
peer_id: String,
trust_store_peer_id: Option<String>,
peer_descriptor: Option<TrustedPeerDescriptor>,
) -> Result<CommsTrustMutationAuthority, String> {
validate_runtime_generated_authority_bridge_token(source_kind, token)?;
validate_meerkat_machine_trust_source(source_kind, trust_row_owner_kind)?;
CommsTrustMutationAuthority::from_generated_parts(
source_kind,
source_epoch,
source_owner_token,
trust_row_owner_kind,
operation,
peer_id,
trust_store_peer_id,
peer_descriptor,
)
}
#[cfg(all(meerkat_internal_generated_authority_bridge, not(test)))]
#[doc(hidden)]
#[allow(improper_ctypes_definitions, unsafe_code)]
#[allow(clippy::too_many_arguments)]
#[unsafe(export_name = concat!(
"__meerkat_core_mob_generated_comms_trust_authority_build_v1_",
env!("MEERKAT_GENERATED_AUTHORITY_BRIDGE_SYMBOL_SUFFIX")
))]
pub(crate) extern "Rust" fn mob_generated_comms_trust_authority_build(
token: &'static (dyn std::any::Any + Send + Sync),
source_kind: GeneratedCommsTrustAuthoritySourceKind,
source_epoch: u64,
source_owner_token: Option<Arc<dyn Any + Send + Sync>>,
trust_row_owner_kind: GeneratedCommsTrustAuthoritySourceKind,
operation: GeneratedCommsTrustAuthorityOperation,
peer_id: String,
trust_store_peer_id: Option<String>,
peer_descriptor: Option<TrustedPeerDescriptor>,
) -> Result<CommsTrustMutationAuthority, String> {
validate_mob_generated_authority_bridge_token(source_kind, token)?;
validate_mob_machine_trust_source(source_kind, trust_row_owner_kind)?;
CommsTrustMutationAuthority::from_generated_parts(
source_kind,
source_epoch,
source_owner_token,
trust_row_owner_kind,
operation,
peer_id,
trust_store_peer_id,
peer_descriptor,
)
}
#[cfg(all(meerkat_internal_generated_authority_bridge, not(test)))]
fn validate_runtime_generated_authority_bridge_token(
source_kind: GeneratedCommsTrustAuthoritySourceKind,
token: &(dyn std::any::Any + Send + Sync),
) -> Result<(), String> {
#[allow(unsafe_code)]
let valid = unsafe {
match source_kind {
GeneratedCommsTrustAuthoritySourceKind::MeerkatMachinePeerProjection => {
runtime_comms_trust_reconcile_generated_authority_bridge_token_is_valid(token)
}
GeneratedCommsTrustAuthoritySourceKind::MeerkatMachineSupervisorPublish => {
runtime_supervisor_trust_publish_generated_authority_bridge_token_is_valid(token)
}
GeneratedCommsTrustAuthoritySourceKind::MeerkatMachineSupervisorRevoke => {
runtime_supervisor_trust_revoke_generated_authority_bridge_token_is_valid(token)
}
_ => false,
}
};
if valid {
Ok(())
} else {
Err("generated comms trust authority requires the matching generated runtime protocol bridge token".into())
}
}
#[cfg(all(meerkat_internal_generated_authority_bridge, not(test)))]
fn validate_mob_generated_authority_bridge_token(
source_kind: GeneratedCommsTrustAuthoritySourceKind,
token: &(dyn std::any::Any + Send + Sync),
) -> Result<(), String> {
#[allow(unsafe_code)]
let valid = unsafe {
match source_kind {
GeneratedCommsTrustAuthoritySourceKind::MobMachineMemberTrustWiring => {
mob_member_trust_wiring_generated_authority_bridge_token_is_valid(token)
}
GeneratedCommsTrustAuthoritySourceKind::MobMachineMemberTrustUnwiring => {
mob_member_trust_unwiring_generated_authority_bridge_token_is_valid(token)
}
GeneratedCommsTrustAuthoritySourceKind::MobMachineExternalPeerTrustWiring => {
mob_external_peer_trust_wiring_generated_authority_bridge_token_is_valid(token)
}
GeneratedCommsTrustAuthoritySourceKind::MobMachineExternalPeerTrustUnwiring => {
mob_external_peer_trust_unwiring_generated_authority_bridge_token_is_valid(token)
}
GeneratedCommsTrustAuthoritySourceKind::MobMachineExternalPeerTrustRepair => {
mob_external_peer_trust_repair_generated_authority_bridge_token_is_valid(token)
}
GeneratedCommsTrustAuthoritySourceKind::MobMachineExternalPeerReciprocalTrust => {
mob_external_peer_reciprocal_trust_generated_authority_bridge_token_is_valid(token)
}
_ => false,
}
};
if valid {
Ok(())
} else {
Err("generated comms trust authority requires the matching generated MobMachine protocol bridge token".into())
}
}
#[cfg(all(meerkat_internal_generated_authority_bridge, not(test)))]
fn validate_meerkat_machine_trust_source(
source_kind: GeneratedCommsTrustAuthoritySourceKind,
trust_row_owner_kind: GeneratedCommsTrustAuthoritySourceKind,
) -> Result<(), String> {
if is_meerkat_machine_trust_source(source_kind)
&& is_meerkat_machine_trust_source(trust_row_owner_kind)
{
Ok(())
} else {
Err(format!(
"runtime generated comms trust authority cannot package source {source_kind:?} with row owner {trust_row_owner_kind:?}"
))
}
}
#[cfg(all(meerkat_internal_generated_authority_bridge, not(test)))]
fn validate_mob_machine_trust_source(
source_kind: GeneratedCommsTrustAuthoritySourceKind,
trust_row_owner_kind: GeneratedCommsTrustAuthoritySourceKind,
) -> Result<(), String> {
if is_mob_machine_trust_source(source_kind) && is_mob_machine_trust_source(trust_row_owner_kind)
{
Ok(())
} else {
Err(format!(
"mob generated comms trust authority cannot package source {source_kind:?} with row owner {trust_row_owner_kind:?}"
))
}
}
fn is_meerkat_machine_trust_source(kind: GeneratedCommsTrustAuthoritySourceKind) -> bool {
matches!(
kind,
GeneratedCommsTrustAuthoritySourceKind::MeerkatMachinePeerProjection
| GeneratedCommsTrustAuthoritySourceKind::MeerkatMachineSupervisorPublish
| GeneratedCommsTrustAuthoritySourceKind::MeerkatMachineSupervisorRevoke
)
}
fn is_mob_machine_trust_source(kind: GeneratedCommsTrustAuthoritySourceKind) -> bool {
matches!(
kind,
GeneratedCommsTrustAuthoritySourceKind::MobMachineMemberTrustWiring
| GeneratedCommsTrustAuthoritySourceKind::MobMachineMemberTrustUnwiring
| GeneratedCommsTrustAuthoritySourceKind::MobMachineExternalPeerTrustWiring
| GeneratedCommsTrustAuthoritySourceKind::MobMachineExternalPeerTrustUnwiring
| GeneratedCommsTrustAuthoritySourceKind::MobMachineExternalPeerTrustRepair
| GeneratedCommsTrustAuthoritySourceKind::MobMachineExternalPeerReciprocalTrust
)
}
#[derive(Debug, Clone)]
pub enum CommsTrustMutation {
AddTrustedPeer {
peer: TrustedPeerDescriptor,
authority: CommsTrustMutationAuthority,
},
RemoveTrustedPeer {
peer_id: String,
authority: CommsTrustMutationAuthority,
},
AddPrivateTrustedPeer {
peer: TrustedPeerDescriptor,
authority: CommsTrustMutationAuthority,
},
RemovePrivateTrustedPeer {
peer_id: String,
authority: CommsTrustMutationAuthority,
},
}
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum CommsTrustMutationResult {
Added { created: bool },
Removed { removed: bool },
}
impl TrustedPeerDescriptor {
pub fn pubkey_is_zero(pubkey: &[u8; 32]) -> bool {
*pubkey == [0u8; 32]
}
pub fn has_zero_pubkey(&self) -> bool {
Self::pubkey_is_zero(&self.pubkey)
}
pub fn validate_pubkey_for_peer_id(peer_id: PeerId, pubkey: &[u8; 32]) -> Result<(), String> {
if Self::pubkey_is_zero(pubkey) {
return Err("TrustedPeerDescriptor.pubkey must be non-zero".to_string());
}
let derived = PeerId::from_ed25519_pubkey(pubkey);
if derived != peer_id {
return Err(format!(
"peer_id {peer_id} does not match pubkey-derived id {derived}"
));
}
Ok(())
}
pub fn test_only_unsigned(
name: impl Into<String>,
peer_id: impl AsRef<str>,
address: impl AsRef<str>,
) -> Result<Self, String> {
let name = PeerName::new(name).map_err(|e| format!("invalid peer name: {e}"))?;
let peer_id =
PeerId::parse(peer_id.as_ref()).map_err(|e| format!("invalid peer_id: {e}"))?;
let address = PeerAddress::parse(address.as_ref()).map_err(|e| e.to_string())?;
Ok(Self {
peer_id,
name,
address,
pubkey: [0u8; 32],
})
}
pub fn test_only_unsigned_typed(
name: impl Into<String>,
peer_id: PeerId,
address: impl AsRef<str>,
) -> Result<Self, String> {
let name = PeerName::new(name).map_err(|e| format!("invalid peer name: {e}"))?;
let address = PeerAddress::parse(address.as_ref()).map_err(|e| e.to_string())?;
Ok(Self {
peer_id,
name,
address,
pubkey: [0u8; 32],
})
}
pub fn with_pubkey(mut self, pubkey: [u8; 32]) -> Self {
self.pubkey = pubkey;
self
}
pub fn unsigned_with_pubkey(
name: impl Into<String>,
peer_id: impl AsRef<str>,
pubkey: [u8; 32],
address: impl AsRef<str>,
) -> Result<Self, String> {
let mut descriptor = Self::test_only_unsigned(name, peer_id, address)?;
Self::validate_pubkey_for_peer_id(descriptor.peer_id, &pubkey)?;
descriptor.pubkey = pubkey;
Ok(descriptor)
}
}
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum PeerLifecycleKind {
#[serde(rename = "mob.peer_added")]
PeerAdded,
#[serde(rename = "mob.peer_retired")]
PeerRetired,
#[serde(rename = "mob.peer_unwired")]
PeerUnwired,
#[serde(rename = "mob.dismiss")]
Dismiss,
}
impl PeerLifecycleKind {
pub const fn as_str(self) -> &'static str {
match self {
Self::PeerAdded => "mob.peer_added",
Self::PeerRetired => "mob.peer_retired",
Self::PeerUnwired => "mob.peer_unwired",
Self::Dismiss => "mob.dismiss",
}
}
}
impl std::fmt::Display for PeerLifecycleKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "kind", rename_all = "snake_case", deny_unknown_fields)]
pub enum CommsCommandRequest {
Input {
body: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
blocks: Option<Vec<ContentBlock>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
source: Option<InputSource>,
#[serde(default, skip_serializing_if = "Option::is_none")]
stream: Option<InputStreamMode>,
#[serde(default, skip_serializing_if = "Option::is_none")]
handling_mode: Option<HandlingMode>,
#[serde(default, skip_serializing_if = "Option::is_none")]
allow_self_session: Option<bool>,
},
PeerMessage {
to: PeerId,
body: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
blocks: Option<Vec<ContentBlock>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
handling_mode: Option<HandlingMode>,
},
PeerLifecycle {
to: PeerId,
lifecycle_kind: PeerLifecycleKind,
#[serde(default)]
params: serde_json::Value,
},
PeerRequest {
to: PeerId,
intent: CommsPeerRequestIntent,
#[serde(default)]
params: serde_json::Value,
#[serde(default, skip_serializing_if = "Option::is_none")]
blocks: Option<Vec<ContentBlock>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
handling_mode: Option<HandlingMode>,
#[serde(default, skip_serializing_if = "Option::is_none")]
stream: Option<InputStreamMode>,
},
PeerResponse {
to: PeerId,
in_reply_to: InteractionId,
status: ResponseStatus,
#[serde(default)]
result: serde_json::Value,
#[serde(default, skip_serializing_if = "Option::is_none")]
blocks: Option<Vec<ContentBlock>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
handling_mode: Option<HandlingMode>,
},
}
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
pub enum CommsCommandError {
#[error("handling_mode is forbidden on progress peer responses")]
HandlingModeForbiddenForProgressResponse,
}
impl CommsCommandRequest {
pub fn into_command(
self,
session_id: &crate::types::SessionId,
) -> Result<CommsCommand, CommsCommandError> {
Ok(match self {
CommsCommandRequest::Input {
body,
blocks,
source,
stream,
handling_mode,
allow_self_session,
} => CommsCommand::Input {
session_id: session_id.clone(),
body,
blocks,
handling_mode: handling_mode.unwrap_or_default(),
source: source.unwrap_or(InputSource::Rpc),
stream: stream.unwrap_or(InputStreamMode::None),
allow_self_session: allow_self_session.unwrap_or(false),
},
CommsCommandRequest::PeerMessage {
to,
body,
blocks,
handling_mode,
} => CommsCommand::PeerMessage {
to: PeerRoute::new(to),
body,
blocks,
handling_mode: handling_mode.unwrap_or_default(),
},
CommsCommandRequest::PeerLifecycle {
to,
lifecycle_kind,
params,
} => CommsCommand::PeerLifecycle {
to: PeerRoute::new(to),
kind: lifecycle_kind,
params,
},
CommsCommandRequest::PeerRequest {
to,
intent,
params,
blocks,
handling_mode,
stream,
} => CommsCommand::PeerRequest {
to: PeerRoute::new(to),
intent: intent.as_str().to_string(),
params,
blocks,
handling_mode: handling_mode.unwrap_or_default(),
stream: stream.unwrap_or(InputStreamMode::None),
},
CommsCommandRequest::PeerResponse {
to,
in_reply_to,
status,
result,
blocks,
handling_mode,
} => CommsCommand::PeerResponse {
to: PeerRoute::new(to),
in_reply_to,
status,
result,
blocks,
handling_mode,
},
})
}
pub fn kind(&self) -> &'static str {
match self {
Self::Input { .. } => "input",
Self::PeerMessage { .. } => "peer_message",
Self::PeerLifecycle { .. } => "peer_lifecycle",
Self::PeerRequest { .. } => "peer_request",
Self::PeerResponse { .. } => "peer_response",
}
}
}
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum InputSource {
Tcp,
Uds,
Stdin,
Webhook,
Rpc,
}
impl From<crate::config::PlainEventSource> for InputSource {
fn from(source: crate::config::PlainEventSource) -> Self {
match source {
crate::config::PlainEventSource::Tcp => Self::Tcp,
crate::config::PlainEventSource::Uds => Self::Uds,
crate::config::PlainEventSource::Stdin => Self::Stdin,
crate::config::PlainEventSource::Webhook => Self::Webhook,
crate::config::PlainEventSource::Rpc => Self::Rpc,
}
}
}
impl From<InputSource> for crate::config::PlainEventSource {
fn from(source: InputSource) -> Self {
match source {
InputSource::Tcp => Self::Tcp,
InputSource::Uds => Self::Uds,
InputSource::Stdin => Self::Stdin,
InputSource::Webhook => Self::Webhook,
InputSource::Rpc => Self::Rpc,
}
}
}
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum InputStreamMode {
None,
ReserveInteraction,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CommsCommand {
Input {
session_id: crate::types::SessionId,
body: String,
blocks: Option<Vec<ContentBlock>>,
handling_mode: HandlingMode,
source: InputSource,
stream: InputStreamMode,
allow_self_session: bool,
},
PeerMessage {
to: PeerRoute,
body: String,
blocks: Option<Vec<ContentBlock>>,
handling_mode: HandlingMode,
},
PeerLifecycle {
to: PeerRoute,
kind: PeerLifecycleKind,
params: serde_json::Value,
},
PeerRequest {
to: PeerRoute,
intent: String,
params: serde_json::Value,
blocks: Option<Vec<ContentBlock>>,
handling_mode: HandlingMode,
stream: InputStreamMode,
},
PeerResponse {
to: PeerRoute,
in_reply_to: InteractionId,
status: ResponseStatus,
result: serde_json::Value,
blocks: Option<Vec<ContentBlock>>,
handling_mode: Option<HandlingMode>,
},
}
impl CommsCommand {
pub fn command_kind(&self) -> &'static str {
match self {
Self::Input { .. } => "input",
Self::PeerMessage { .. } => "peer_message",
Self::PeerLifecycle { .. } => "peer_lifecycle",
Self::PeerRequest { .. } => "peer_request",
Self::PeerResponse { .. } => "peer_response",
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SendReceipt {
InputAccepted {
interaction_id: InteractionId,
stream_reserved: bool,
},
PeerMessageSent {
envelope_id: uuid::Uuid,
acked: bool,
},
PeerLifecycleSent {
envelope_id: uuid::Uuid,
},
PeerRequestSent {
envelope_id: uuid::Uuid,
interaction_id: InteractionId,
stream_reserved: bool,
},
PeerResponseSent {
envelope_id: uuid::Uuid,
in_reply_to: InteractionId,
},
}
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum PeerDirectorySource {
Trusted,
Inproc,
TrustedAndInproc,
Unknown,
}
impl PeerDirectorySource {
pub const fn as_str(&self) -> &'static str {
match self {
Self::Trusted => "trusted",
Self::Inproc => "inproc",
Self::TrustedAndInproc => "trusted_and_inproc",
Self::Unknown => "unknown",
}
}
}
impl std::fmt::Display for PeerDirectorySource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum PeerSendability {
PeerMessage,
PeerRequest,
PeerResponse,
}
impl PeerSendability {
pub const DIRECTORY_DEFAULTS: [Self; 3] =
[Self::PeerMessage, Self::PeerRequest, Self::PeerResponse];
pub fn directory_defaults() -> Vec<Self> {
Self::DIRECTORY_DEFAULTS.to_vec()
}
pub const fn as_str(&self) -> &'static str {
match self {
Self::PeerMessage => "peer_message",
Self::PeerRequest => "peer_request",
Self::PeerResponse => "peer_response",
}
}
}
impl std::fmt::Display for PeerSendability {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct PeerCapabilitySet {
#[serde(default = "PeerCapabilitySet::default_version")]
pub version: u16,
#[serde(default)]
pub extensions: BTreeMap<String, serde_json::Value>,
}
impl PeerCapabilitySet {
pub const CURRENT_VERSION: u16 = 1;
const fn default_version() -> u16 {
Self::CURRENT_VERSION
}
pub fn with_extension(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
self.extensions.insert(key.into(), value);
self
}
}
impl Default for PeerCapabilitySet {
fn default() -> Self {
Self {
version: Self::CURRENT_VERSION,
extensions: BTreeMap::new(),
}
}
}
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct PeerDirectoryEntry {
pub peer_id: PeerId,
pub name: PeerName,
pub address: PeerAddress,
pub source: PeerDirectorySource,
pub sendable_kinds: Vec<PeerSendability>,
pub capabilities: PeerCapabilitySet,
pub meta: crate::PeerMeta,
}
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct PeerDirectoryListing {
pub peers: Vec<PeerDirectoryEntry>,
}
impl PeerDirectoryListing {
pub fn new(peers: Vec<PeerDirectoryEntry>) -> Self {
Self { peers }
}
}
impl From<Vec<PeerDirectoryEntry>> for PeerDirectoryListing {
fn from(peers: Vec<PeerDirectoryEntry>) -> Self {
Self::new(peers)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum StreamScope {
Session(crate::types::SessionId),
Interaction(InteractionId),
}
pub type EventStream = Pin<Box<dyn Stream<Item = EventEnvelope<AgentEvent>> + Send>>;
#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
pub enum StreamError {
#[error("interaction not reserved: {0}")]
NotReserved(InteractionId),
#[error("stream not found: {0}")]
NotFound(String),
#[error("already attached: {0}")]
AlreadyAttached(InteractionId),
#[error("stream closed")]
Closed,
#[error("permission denied: {0}")]
PermissionDenied(String),
#[error("timeout: {0}")]
Timeout(String),
#[error("internal: {0}")]
Internal(String),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
#[serde(rename_all = "snake_case")]
#[non_exhaustive]
pub enum AdmissionDropReason {
UntrustedSender,
ClassificationRejected,
SessionClosed,
InboxFull,
}
impl AdmissionDropReason {
pub fn as_code(&self) -> &'static str {
match self {
AdmissionDropReason::UntrustedSender => "untrusted_sender",
AdmissionDropReason::ClassificationRejected => "classification_rejected",
AdmissionDropReason::SessionClosed => "session_closed",
AdmissionDropReason::InboxFull => "inbox_full",
}
}
}
#[derive(Debug, Clone, thiserror::Error)]
#[non_exhaustive]
pub enum SendError {
#[error("peer not found: {0}")]
PeerNotFound(String),
#[error("peer offline")]
PeerOffline,
#[error("peer not sendable")]
PeerNotSendable(String),
#[error("input stream closed")]
InputClosed,
#[error("unsupported command: {0}")]
Unsupported(String),
#[error("validation failed: {0}")]
Validation(String),
#[error("internal: {0}")]
Internal(String),
#[error("transport error: {0}")]
Transport(String),
#[error("peer dropped at admission: {reason:?}")]
AdmissionDropped { reason: AdmissionDropReason },
}
#[derive(Debug, Clone, thiserror::Error)]
pub enum SendAndStreamError {
#[error("send failed: {0}")]
Send(#[from] SendError),
#[error("stream attach failed: receipt={receipt:?}, error={error}")]
StreamAttach {
receipt: SendReceipt,
error: StreamError,
},
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
#[test]
fn peer_id_pubkey_derivation_matches_uuid_v5() {
let pubkey = [42u8; 32];
assert_eq!(
PeerId::from_ed25519_pubkey(&pubkey).as_uuid(),
&Uuid::new_v5(&PEER_ID_ED25519_PUBKEY_NAMESPACE, &pubkey)
);
}
#[test]
fn peer_name_validation() {
assert!(PeerName::new("alice").is_ok());
assert!(PeerName::new("".to_string()).is_err());
assert!(PeerName::new("bad\x00name").is_err());
}
#[test]
fn peer_directory_entry_fields() -> Result<(), String> {
let entry = PeerDirectoryEntry {
peer_id: PeerId::new(),
name: PeerName::new("agent")?,
address: PeerAddress::new(PeerTransport::Inproc, "agent"),
source: PeerDirectorySource::Inproc,
sendable_kinds: vec![PeerSendability::PeerMessage],
capabilities: PeerCapabilitySet::default(),
meta: crate::PeerMeta::default(),
};
assert_eq!(entry.name.as_str(), "agent");
assert_eq!(entry.address.transport(), PeerTransport::Inproc);
assert_eq!(entry.address.endpoint(), "agent");
assert_eq!(entry.source, PeerDirectorySource::Inproc);
Ok(())
}
#[test]
fn peer_directory_listing_serializes_typed_source_sendability_and_capabilities()
-> Result<(), String> {
let entry = PeerDirectoryEntry {
peer_id: PeerId::new(),
name: PeerName::new("agent")?,
address: PeerAddress::new(PeerTransport::Inproc, "agent"),
source: PeerDirectorySource::Inproc,
sendable_kinds: vec![PeerSendability::PeerMessage, PeerSendability::PeerRequest],
capabilities: PeerCapabilitySet::default()
.with_extension("vendor.echo", serde_json::json!({ "enabled": true })),
meta: crate::PeerMeta::default(),
};
let value = serde_json::to_value(PeerDirectoryListing::new(vec![entry]))
.map_err(|err| err.to_string())?;
let peer = &value["peers"][0];
assert_eq!(peer["source"], "inproc");
assert_eq!(
peer["sendable_kinds"],
serde_json::json!(["peer_message", "peer_request"])
);
assert_eq!(peer["capabilities"]["version"], 1);
assert_eq!(
peer["capabilities"]["extensions"]["vendor.echo"]["enabled"],
true
);
Ok(())
}
#[test]
fn generated_trust_authority_rejects_descriptor_peer_mismatch() {
let pubkey = [1u8; 32];
let descriptor_peer_id = PeerId::from_ed25519_pubkey(&pubkey);
let requested_peer_id = PeerId::from_ed25519_pubkey(&[2u8; 32]);
let descriptor = TrustedPeerDescriptor::unsigned_with_pubkey(
"fake",
descriptor_peer_id.to_string(),
pubkey,
"inproc://fake",
)
.expect("valid descriptor");
let err = CommsTrustMutationAuthority::from_generated_parts(
GeneratedCommsTrustAuthoritySourceKind::MeerkatMachinePeerProjection,
1,
None,
GeneratedCommsTrustAuthoritySourceKind::MeerkatMachinePeerProjection,
GeneratedCommsTrustAuthorityOperation::PublicAdd,
requested_peer_id.to_string(),
Some(requested_peer_id.to_string()),
Some(descriptor),
)
.expect_err("descriptor for another peer must not mint authority");
assert!(
err.contains("does not match requested"),
"unexpected rejection: {err}"
);
}
#[test]
fn peer_id_parse_round_trip() {
let id = PeerId::new();
let parsed = PeerId::parse(&id.as_str()).expect("parse");
assert_eq!(id, parsed);
}
#[test]
fn peer_id_parse_rejects_garbage() {
let err = PeerId::parse("not-a-uuid").expect_err("parse must reject");
match err {
PeerIdError::Invalid { input, .. } => assert_eq!(input, "not-a-uuid"),
}
}
#[test]
fn peer_address_display() {
let addr = PeerAddress::new(PeerTransport::Tcp, "127.0.0.1:4200");
assert_eq!(addr.to_string(), "tcp://127.0.0.1:4200");
}
#[test]
fn peer_address_parse_round_trips_supported_schemes() {
let cases = [
("inproc://agent-a", PeerTransport::Inproc, "agent-a"),
(
"uds:///tmp/meerkat.sock",
PeerTransport::Uds,
"/tmp/meerkat.sock",
),
("tcp://127.0.0.1:4200", PeerTransport::Tcp, "127.0.0.1:4200"),
];
for (raw, transport, endpoint) in cases {
let parsed = PeerAddress::parse(raw).expect("supported address parses");
assert_eq!(parsed.transport(), transport);
assert_eq!(parsed.endpoint(), endpoint);
assert_eq!(parsed.to_string(), raw);
}
}
#[test]
fn peer_address_parse_rejects_unknown_scheme() {
let err = PeerAddress::parse("http://127.0.0.1:4200")
.expect_err("unknown transport schemes must fail closed");
assert!(
err.to_string().contains("unknown peer address transport"),
"unexpected error: {err}",
);
}
#[test]
fn peer_address_parse_rejects_schemeless_input() {
let err = PeerAddress::parse("127.0.0.1:4200")
.expect_err("strict parser requires an address scheme");
assert!(
err.to_string().contains("missing transport scheme"),
"unexpected error: {err}",
);
}
#[test]
fn input_stream_mode_roundtrip() -> Result<(), serde_json::Error> {
let mode = InputStreamMode::ReserveInteraction;
let serialized = serde_json::to_value(mode)?;
assert_eq!(serialized.as_str(), Some("reserve_interaction"));
assert_eq!(serde_json::from_value::<InputStreamMode>(serialized)?, mode);
Ok(())
}
#[test]
fn deserialize_input_with_typed_source() -> Result<(), serde_json::Error> {
let json = r#"{"kind":"input","body":"hello","source":"webhook","handling_mode":"steer"}"#;
let req: CommsCommandRequest = serde_json::from_str(json)?;
match req {
CommsCommandRequest::Input {
body,
source,
handling_mode,
..
} => {
assert_eq!(body, "hello");
assert_eq!(source, Some(InputSource::Webhook));
assert_eq!(handling_mode, Some(HandlingMode::Steer));
}
other => panic!("expected input command request, got {other:?}"),
}
Ok(())
}
#[test]
fn deserialize_input_invalid_source_rejects_at_serde_boundary() {
let json = r#"{"kind":"input","body":"hello","source":"webhookd"}"#;
let err = serde_json::from_str::<CommsCommandRequest>(json)
.expect_err("invalid source must fail deserialization");
let msg = err.to_string();
assert!(
msg.contains("webhookd"),
"error should name the rejected value, got: {msg}"
);
}
#[test]
fn deserialize_unknown_kind_rejects_at_serde_boundary() {
let json = r#"{"kind":"foobar","body":"hello"}"#;
let err = serde_json::from_str::<CommsCommandRequest>(json)
.expect_err("unknown kind must fail deserialization");
let msg = err.to_string();
assert!(
msg.contains("foobar") || msg.contains("variant"),
"error should mention unknown variant, got: {msg}"
);
}
}