use meerkat_core::comms::TrustedPeerDescriptor;
use meerkat_core::types::HandlingMode;
use meerkat_mob::ids::MeerkatId;
use meerkat_mob::{MobHandle, PeerTarget};
use crate::auth::peer_keys::GatewayPeerKeys;
use crate::contact_directory::{ContactDirectory, ContactEntry, MobTransport};
use crate::runtime::cross_mob_remote::{RemoteMobError, RemoteMobProxy};
use super::UnifiedRuntime;
enum LocalOrRemote {
Local(MobHandle),
Remote(RemoteMobProxy),
}
struct MemberPeerInfo {
peer_id: String,
comms_name: String,
pubkey: [u8; 32],
}
#[derive(Debug)]
pub enum CrossMobError {
NoContactDirectory,
UnknownMob(String),
NoPeerHandle(String),
MissingPeerPubkey { mob_id: Option<String> },
MemberNotFound { member_id: String, mob_id: String },
NoCommsInfo { member_id: String, mob_id: String },
Mob(meerkat_mob::MobError),
PeerSpec(String),
Remote(RemoteMobError),
}
impl std::fmt::Display for CrossMobError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::NoContactDirectory => write!(f, "no contact directory configured"),
Self::UnknownMob(id) => write!(f, "unknown mob: {id}"),
Self::NoPeerHandle(id) => write!(f, "no peer mob handle registered for: {id}"),
Self::Remote(e) => write!(f, "cross-process cross-mob: {e}"),
Self::MemberNotFound { member_id, mob_id } => {
write!(f, "member '{member_id}' not found in mob '{mob_id}'")
}
Self::NoCommsInfo { member_id, mob_id } => {
write!(
f,
"member '{member_id}' in mob '{mob_id}' has no comms runtime"
)
}
Self::Mob(err) => write!(f, "mob error: {err}"),
Self::PeerSpec(reason) => write!(f, "peer spec error: {reason}"),
Self::MissingPeerPubkey { mob_id } => match mob_id {
Some(id) => write!(
f,
"non-inproc peer for mob '{id}' has no signing pubkey; \
bootstrap via mobkit/peer_pubkey or populate the contact \
directory's pubkey field before wiring"
),
None => write!(
f,
"non-inproc peer has no signing pubkey; supply a 32-byte \
Ed25519 pubkey or use inproc transport"
),
},
}
}
}
impl std::error::Error for CrossMobError {}
impl From<meerkat_mob::MobError> for CrossMobError {
fn from(err: meerkat_mob::MobError) -> Self {
Self::Mob(err)
}
}
impl From<RemoteMobError> for CrossMobError {
fn from(err: RemoteMobError) -> Self {
Self::Remote(err)
}
}
impl UnifiedRuntime {
pub async fn register_peer_mob(&self, mob_id: &str, handle: MobHandle) {
self.peer_mob_handles
.write()
.await
.insert(mob_id.to_string(), handle);
}
pub fn set_contact_directory(&mut self, directory: ContactDirectory) {
self.contact_directory = Some(directory);
}
pub fn set_gateway_peer_keys(&mut self, keys: GatewayPeerKeys) {
self.gateway_peer_keys = Some(keys);
}
pub fn gateway_peer_keys(&self) -> Option<&GatewayPeerKeys> {
self.gateway_peer_keys.as_ref()
}
pub async fn wire_cross_mob(
&self,
local_member_id: &str,
remote_member_id: &str,
remote_mob_id: &str,
) -> Result<(), CrossMobError> {
let entry = self.resolve_contact(remote_mob_id)?;
let remote = self.dispatch_for(&entry).await?;
let local_handle = self.mob_runtime.handle();
let local_mob_id = local_handle.mob_id().to_string();
let local_mid = MeerkatId::from(local_member_id);
let local_info = self
.get_member_peer_info(&local_handle, &local_mid, &local_mob_id)
.await?;
match remote {
LocalOrRemote::Local(remote_handle) => {
let remote_mid = MeerkatId::from(remote_member_id);
let remote_info = self
.get_member_peer_info(&remote_handle, &remote_mid, remote_mob_id)
.await?;
let remote_spec = build_peer_spec(
&remote_info.comms_name,
&remote_info.peer_id,
&entry.transport,
Some(remote_info.pubkey),
)?;
let local_spec = build_peer_spec(
&local_info.comms_name,
&local_info.peer_id,
&MobTransport::Inproc,
Some(local_info.pubkey),
)?;
local_handle
.wire(local_mid.clone(), PeerTarget::External(remote_spec))
.await
.map_err(CrossMobError::Mob)?;
if let Err(e) = remote_handle
.wire(remote_mid.clone(), PeerTarget::External(local_spec))
.await
{
if let Ok(rollback_spec) = build_peer_spec(
&remote_info.comms_name,
&remote_info.peer_id,
&entry.transport,
Some(remote_info.pubkey),
) {
let _ = local_handle
.unwire(local_mid, PeerTarget::External(rollback_spec))
.await;
}
return Err(CrossMobError::Mob(e));
}
Ok(())
}
LocalOrRemote::Remote(proxy) => {
let (remote_peer_id, remote_comms_name) = proxy
.lookup_member(remote_member_id)
.await
.map_err(CrossMobError::Remote)?;
let remote_spec = build_peer_spec(
&remote_comms_name,
&remote_peer_id,
&entry.transport,
entry.pubkey,
)?;
local_handle
.wire(local_mid.clone(), PeerTarget::External(remote_spec))
.await
.map_err(CrossMobError::Mob)?;
let pubkey_b64 = self
.gateway_peer_keys
.as_ref()
.map(crate::auth::peer_keys::GatewayPeerKeys::pubkey_b64);
let local_spec_address = format!("inproc://{}", local_info.comms_name);
if let Err(remote_err) = proxy
.wire_remote(
remote_member_id,
&local_spec_address,
&local_info.comms_name,
&local_info.peer_id,
pubkey_b64,
)
.await
{
let rollback_spec = build_peer_spec(
&remote_comms_name,
&remote_peer_id,
&entry.transport,
entry.pubkey,
);
if let Ok(spec) = rollback_spec {
let _ = local_handle
.unwire(local_mid, PeerTarget::External(spec))
.await;
}
return Err(CrossMobError::Remote(remote_err));
}
Ok(())
}
}
}
pub async fn unwire_cross_mob(
&self,
local_member_id: &str,
remote_member_id: &str,
remote_mob_id: &str,
) -> Result<(), CrossMobError> {
let entry = self.resolve_contact(remote_mob_id)?;
let remote = self.dispatch_for(&entry).await?;
let local_handle = self.mob_runtime.handle();
let local_mob_id = local_handle.mob_id().to_string();
let local_mid = MeerkatId::from(local_member_id);
let mut first_error: Option<CrossMobError> = None;
let local_info_opt = self
.get_member_peer_info(&local_handle, &local_mid, &local_mob_id)
.await
.ok();
match remote {
LocalOrRemote::Local(remote_handle) => {
let remote_mid = MeerkatId::from(remote_member_id);
if let Ok(remote_info) = self
.get_member_peer_info(&remote_handle, &remote_mid, remote_mob_id)
.await
&& let Ok(spec) = build_peer_spec(
&remote_info.comms_name,
&remote_info.peer_id,
&entry.transport,
Some(remote_info.pubkey),
)
&& let Err(e) = local_handle
.unwire(local_mid.clone(), PeerTarget::External(spec))
.await
{
first_error = Some(CrossMobError::Mob(e));
}
if let Some(local_info) = &local_info_opt
&& let Ok(spec) = build_peer_spec(
&local_info.comms_name,
&local_info.peer_id,
&MobTransport::Inproc,
Some(local_info.pubkey),
)
&& let Err(e) = remote_handle
.unwire(remote_mid.clone(), PeerTarget::External(spec))
.await
&& first_error.is_none()
{
first_error = Some(CrossMobError::Mob(e));
}
}
LocalOrRemote::Remote(proxy) => {
if let Ok((remote_peer_id, remote_comms_name)) =
proxy.lookup_member(remote_member_id).await
&& let Ok(spec) = build_peer_spec(
&remote_comms_name,
&remote_peer_id,
&entry.transport,
entry.pubkey,
)
&& let Err(e) = local_handle
.unwire(local_mid.clone(), PeerTarget::External(spec))
.await
{
first_error = Some(CrossMobError::Mob(e));
}
if let Some(local_info) = &local_info_opt {
let pubkey_b64 = self
.gateway_peer_keys
.as_ref()
.map(crate::auth::peer_keys::GatewayPeerKeys::pubkey_b64);
let local_spec_address = format!("inproc://{}", local_info.comms_name);
if let Err(e) = proxy
.unwire_remote(
remote_member_id,
&local_spec_address,
&local_info.comms_name,
&local_info.peer_id,
pubkey_b64,
)
.await
&& first_error.is_none()
{
first_error = Some(CrossMobError::Remote(e));
}
}
}
}
match first_error {
Some(e) => Err(e),
None => Ok(()),
}
}
pub async fn send_cross_mob(
&self,
from_local_member: &str,
remote_member_id: &str,
remote_mob_id: &str,
content: impl Into<meerkat_core::ContentInput>,
) -> Result<String, CrossMobError> {
let entry = self.resolve_contact(remote_mob_id)?;
let remote = self.dispatch_for(&entry).await?;
let remote_mid = MeerkatId::from(remote_member_id);
let content = content.into();
let _ = from_local_member;
match remote {
LocalOrRemote::Local(remote_handle) => {
let _receipt = remote_handle
.member(&remote_mid)
.await
.map_err(CrossMobError::Mob)?
.send(content, HandlingMode::Queue)
.await
.map_err(CrossMobError::Mob)?;
let session_id = remote_handle
.resolve_bridge_session_id(&remote_mid)
.await
.ok_or_else(|| CrossMobError::NoCommsInfo {
member_id: remote_mid.to_string(),
mob_id: remote_mob_id.to_string(),
})?;
Ok(session_id.to_string())
}
LocalOrRemote::Remote(proxy) => {
let content_json = serde_json::to_value(&content).map_err(|err| {
CrossMobError::PeerSpec(format!(
"failed to serialize content for remote inject: {err}"
))
})?;
let session_id = proxy
.inject_message(remote_member_id, content_json)
.await
.map_err(CrossMobError::Remote)?;
Ok(session_id)
}
}
}
pub fn list_external_mobs(&self) -> Vec<ContactEntry> {
self.contact_directory
.as_ref()
.map(|d| d.list().into_iter().cloned().collect())
.unwrap_or_default()
}
pub fn has_contact_directory(&self) -> bool {
self.contact_directory.is_some()
}
pub async fn has_peer_mob_handles(&self) -> bool {
!self.peer_mob_handles.read().await.is_empty()
}
pub fn has_inproc_contacts(&self) -> bool {
self.contact_directory.as_ref().is_some_and(|d| {
d.list()
.iter()
.any(|e| matches!(e.transport, MobTransport::Inproc))
})
}
pub fn has_remote_contacts(&self) -> bool {
self.contact_directory.as_ref().is_some_and(|d| {
d.list()
.iter()
.any(|e| matches!(e.transport, MobTransport::Tcp(_) | MobTransport::Uds(_)))
})
}
pub fn mob_id(&self) -> String {
self.mob_runtime.handle().mob_id().to_string()
}
pub async fn local_member_peer_info(
&self,
member_id: &str,
) -> Result<(String, String, String), CrossMobError> {
let handle = self.mob_runtime.handle();
let mob_id = handle.mob_id().to_string();
let mid = MeerkatId::from(member_id);
let info = self.get_member_peer_info(&handle, &mid, &mob_id).await?;
let address = format!("inproc://{}", info.comms_name);
Ok((info.peer_id, info.comms_name, address))
}
pub async fn wire_local(
&self,
local_member_id: &str,
remote_comms_name: &str,
remote_peer_id: &str,
remote_address: &str,
remote_pubkey: Option<[u8; 32]>,
) -> Result<(), CrossMobError> {
let spec = build_external_peer_spec(
remote_comms_name,
remote_peer_id,
remote_address,
remote_pubkey,
)?;
let local_mid = MeerkatId::from(local_member_id);
self.mob_runtime
.handle()
.wire(local_mid, PeerTarget::External(spec))
.await
.map_err(CrossMobError::Mob)
}
pub async fn unwire_local(
&self,
local_member_id: &str,
remote_comms_name: &str,
remote_peer_id: &str,
remote_address: &str,
remote_pubkey: Option<[u8; 32]>,
) -> Result<(), CrossMobError> {
let spec = build_external_peer_spec(
remote_comms_name,
remote_peer_id,
remote_address,
remote_pubkey,
)?;
let local_mid = MeerkatId::from(local_member_id);
self.mob_runtime
.handle()
.unwire(local_mid, PeerTarget::External(spec))
.await
.map_err(CrossMobError::Mob)
}
fn resolve_contact(&self, mob_id: &str) -> Result<ContactEntry, CrossMobError> {
let dir = self
.contact_directory
.as_ref()
.ok_or(CrossMobError::NoContactDirectory)?;
dir.get(mob_id)
.cloned()
.ok_or_else(|| CrossMobError::UnknownMob(mob_id.to_string()))
}
async fn dispatch_for(&self, entry: &ContactEntry) -> Result<LocalOrRemote, CrossMobError> {
if let Some(handle) = self
.peer_mob_handles
.read()
.await
.get(&entry.mob_id)
.cloned()
{
return Ok(LocalOrRemote::Local(handle));
}
match RemoteMobProxy::from_entry(entry)? {
Some(proxy) => Ok(LocalOrRemote::Remote(proxy)),
None => Err(CrossMobError::NoPeerHandle(entry.mob_id.clone())),
}
}
async fn get_member_peer_info(
&self,
handle: &MobHandle,
meerkat_id: &MeerkatId,
mob_id: &str,
) -> Result<MemberPeerInfo, CrossMobError> {
let entry =
handle
.get_member(meerkat_id)
.await
.ok_or_else(|| CrossMobError::MemberNotFound {
member_id: meerkat_id.to_string(),
mob_id: mob_id.to_string(),
})?;
let peer_id = entry
.peer_id()
.ok_or_else(|| CrossMobError::NoCommsInfo {
member_id: meerkat_id.to_string(),
mob_id: mob_id.to_string(),
})?
.to_string();
let pubkey_b64 = entry.transport_public_key().ok_or_else(|| {
CrossMobError::PeerSpec(format!(
"member '{meerkat_id}' in mob '{mob_id}' has no transport public key"
))
})?;
let pubkey = crate::auth::peer_keys::decode_pubkey_b64(pubkey_b64).map_err(|err| {
CrossMobError::PeerSpec(format!(
"member '{meerkat_id}' in mob '{mob_id}' has invalid transport public key: {err}"
))
})?;
let comms_name = format!("{}/{}/{}", mob_id, entry.role, meerkat_id);
Ok(MemberPeerInfo {
peer_id,
comms_name,
pubkey,
})
}
}
fn build_peer_spec(
comms_name: &str,
peer_id: &str,
transport: &MobTransport,
pubkey: Option<[u8; 32]>,
) -> Result<TrustedPeerDescriptor, CrossMobError> {
let address = match transport {
MobTransport::Inproc => format!("inproc://{comms_name}"),
MobTransport::Tcp(addr) => format!("tcp://{addr}"),
MobTransport::Uds(path) => format!("uds://{path}"),
};
build_external_peer_spec(comms_name, peer_id, &address, pubkey)
}
fn build_external_peer_spec(
comms_name: &str,
peer_id: &str,
address: &str,
pubkey: Option<[u8; 32]>,
) -> Result<TrustedPeerDescriptor, CrossMobError> {
let is_inproc = address.starts_with("inproc://");
match (is_inproc, pubkey) {
(true, None) => TrustedPeerDescriptor::test_only_unsigned(comms_name, peer_id, address)
.map_err(CrossMobError::PeerSpec),
(true, Some(bytes)) => {
TrustedPeerDescriptor::unsigned_with_pubkey(comms_name, peer_id, bytes, address)
.map_err(CrossMobError::PeerSpec)
}
(false, None) => Err(CrossMobError::MissingPeerPubkey { mob_id: None }),
(false, Some(bytes)) => {
if bytes == [0u8; 32] {
return Err(CrossMobError::MissingPeerPubkey { mob_id: None });
}
TrustedPeerDescriptor::unsigned_with_pubkey(comms_name, peer_id, bytes, address)
.map_err(CrossMobError::PeerSpec)
}
}
}
pub fn build_tcp_peer_spec(
comms_name: &str,
peer_id: &str,
address: &str,
) -> Result<TrustedPeerDescriptor, CrossMobError> {
TrustedPeerDescriptor::test_only_unsigned(comms_name, peer_id, format!("tcp://{address}"))
.map_err(CrossMobError::PeerSpec)
}
pub fn build_uds_peer_spec(
comms_name: &str,
peer_id: &str,
path: &str,
) -> Result<TrustedPeerDescriptor, CrossMobError> {
let normalized = if let Some(stripped) = path.strip_prefix('/') {
stripped
} else {
path
};
TrustedPeerDescriptor::test_only_unsigned(comms_name, peer_id, format!("uds:///{normalized}"))
.map_err(CrossMobError::PeerSpec)
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;
const TEST_PEER_ID: &str = "00000000-0000-4000-8000-000000000001";
const TEST_PUBKEY: [u8; 32] = [42u8; 32];
fn derived_peer_id() -> String {
meerkat_core::comms::PeerId::from_ed25519_pubkey(&TEST_PUBKEY).to_string()
}
#[test]
fn peer_spec_inproc_uses_comms_name_address() {
let spec = build_peer_spec(
"authors/coordinator/alice",
TEST_PEER_ID,
&MobTransport::Inproc,
None,
)
.expect("spec");
assert_eq!(spec.address.endpoint(), "authors/coordinator/alice");
}
#[test]
fn peer_spec_tcp_uses_tcp_scheme() {
let id = derived_peer_id();
let spec = build_peer_spec(
"authors/coordinator/alice",
&id,
&MobTransport::Tcp("127.0.0.1:9001".to_string()),
Some(TEST_PUBKEY),
)
.expect("spec");
assert_eq!(spec.address.endpoint(), "127.0.0.1:9001");
}
#[test]
fn peer_spec_uds_uses_uds_scheme() {
let id = derived_peer_id();
let spec = build_peer_spec(
"authors/coordinator/alice",
&id,
&MobTransport::Uds("/tmp/x.sock".to_string()),
Some(TEST_PUBKEY),
)
.expect("spec");
assert_eq!(spec.address.endpoint(), "/tmp/x.sock");
}
#[test]
fn peer_spec_tcp_without_pubkey_rejected() {
let result = build_peer_spec(
"authors/coordinator/alice",
TEST_PEER_ID,
&MobTransport::Tcp("127.0.0.1:9001".to_string()),
None,
);
assert!(
matches!(result, Err(CrossMobError::MissingPeerPubkey { .. })),
"TCP peer spec without pubkey must fail closed, got {result:?}"
);
}
#[test]
fn peer_spec_uds_without_pubkey_rejected() {
let result = build_peer_spec(
"authors/coordinator/alice",
TEST_PEER_ID,
&MobTransport::Uds("/tmp/x.sock".to_string()),
None,
);
assert!(
matches!(result, Err(CrossMobError::MissingPeerPubkey { .. })),
"UDS peer spec without pubkey must fail closed, got {result:?}"
);
}
#[test]
fn build_uds_peer_spec_handles_leading_slash() {
let with = build_uds_peer_spec("a", "00000000-0000-4000-8000-000000000001", "/tmp/x.sock")
.expect("spec");
let without =
build_uds_peer_spec("a", "00000000-0000-4000-8000-000000000001", "tmp/x.sock")
.expect("spec");
assert_eq!(with.address.endpoint(), "/tmp/x.sock");
assert_eq!(without.address.endpoint(), "/tmp/x.sock");
}
}