use crate::contact_directory::{ContactEntry, MobTransport};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RemoteMobError {
UnsupportedTransport { mob_id: String, transport: String },
ControlChannelUnavailable {
mob_id: String,
endpoint: String,
operation: &'static str,
},
Rejected {
mob_id: String,
endpoint: String,
code: String,
message: String,
},
Encode { endpoint: String, message: String },
Decode { endpoint: String, message: String },
}
impl RemoteMobError {
pub(crate) fn with_context(self, context: String) -> Self {
match self {
Self::ControlChannelUnavailable {
mob_id,
endpoint,
operation,
} => Self::ControlChannelUnavailable {
mob_id: if mob_id.is_empty() { context } else { mob_id },
endpoint,
operation,
},
other => other,
}
}
}
impl std::fmt::Display for RemoteMobError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::UnsupportedTransport { mob_id, transport } => {
write!(
f,
"remote mob '{mob_id}' uses unsupported transport: {transport}"
)
}
Self::ControlChannelUnavailable {
mob_id,
endpoint,
operation,
} => {
write!(
f,
"remote mob '{mob_id}' control channel ({endpoint}): operation '{operation}' \
failed — confirm the peer gateway is running with a control listener bound \
on this endpoint"
)
}
Self::Rejected {
mob_id,
endpoint,
code,
message,
} => {
write!(
f,
"remote mob '{mob_id}' control channel ({endpoint}) rejected request \
[{code}]: {message}"
)
}
Self::Encode { endpoint, message } => {
write!(f, "control request encode failed for {endpoint}: {message}")
}
Self::Decode { endpoint, message } => {
write!(
f,
"control response decode failed for {endpoint}: {message}"
)
}
}
}
}
impl std::error::Error for RemoteMobError {}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RemoteEndpoint {
Tcp(String),
Uds(String),
}
impl RemoteEndpoint {
pub fn scheme(&self) -> &'static str {
match self {
Self::Tcp(_) => "tcp",
Self::Uds(_) => "uds",
}
}
pub fn comms_address(&self) -> String {
match self {
Self::Tcp(addr) => format!("tcp://{addr}"),
Self::Uds(path) => format!("uds://{path}"),
}
}
pub fn raw(&self) -> &str {
match self {
Self::Tcp(s) | Self::Uds(s) => s.as_str(),
}
}
}
#[derive(Debug, Clone)]
pub struct RemoteMobProxy {
mob_id: String,
endpoint: RemoteEndpoint,
}
impl RemoteMobProxy {
pub fn from_entry(entry: &ContactEntry) -> Result<Option<Self>, RemoteMobError> {
match &entry.transport {
MobTransport::Inproc => Ok(None),
MobTransport::Tcp(addr) => Ok(Some(Self {
mob_id: entry.mob_id.clone(),
endpoint: RemoteEndpoint::Tcp(addr.clone()),
})),
MobTransport::Uds(path) => Ok(Some(Self {
mob_id: entry.mob_id.clone(),
endpoint: RemoteEndpoint::Uds(path.clone()),
})),
}
}
pub fn mob_id(&self) -> &str {
&self.mob_id
}
pub fn endpoint(&self) -> &RemoteEndpoint {
&self.endpoint
}
pub async fn wire_remote(
&self,
remote_member: &str,
local_peer_spec_address: &str,
local_comms_name: &str,
local_peer_id: &str,
local_pubkey_b64: Option<String>,
) -> Result<(), RemoteMobError> {
let request = super::cross_mob_control::ControlRequest::Wire {
remote_member: remote_member.to_string(),
local_peer_spec_address: local_peer_spec_address.to_string(),
local_comms_name: local_comms_name.to_string(),
local_peer_id: local_peer_id.to_string(),
local_pubkey_b64,
};
self.dispatch_no_payload(request, "wire").await
}
pub async fn unwire_remote(
&self,
remote_member: &str,
local_peer_spec_address: &str,
local_comms_name: &str,
local_peer_id: &str,
local_pubkey_b64: Option<String>,
) -> Result<(), RemoteMobError> {
let request = super::cross_mob_control::ControlRequest::Unwire {
remote_member: remote_member.to_string(),
local_peer_spec_address: local_peer_spec_address.to_string(),
local_comms_name: local_comms_name.to_string(),
local_peer_id: local_peer_id.to_string(),
local_pubkey_b64,
};
self.dispatch_no_payload(request, "unwire").await
}
pub async fn inject_message(
&self,
remote_member: &str,
content_json: serde_json::Value,
) -> Result<String, RemoteMobError> {
let request = super::cross_mob_control::ControlRequest::Inject {
remote_member: remote_member.to_string(),
content: content_json,
};
let response = super::cross_mob_control::RemoteControlClient::send(
&self.endpoint,
&request,
super::cross_mob_control::DEFAULT_CONTROL_TIMEOUT,
)
.await
.map_err(|err| self.attach_mob_id(err))?;
match response {
super::cross_mob_control::ControlResponse::Injected { session_id } => Ok(session_id),
super::cross_mob_control::ControlResponse::Err { code, message } => {
Err(RemoteMobError::Rejected {
mob_id: self.mob_id.clone(),
endpoint: self.endpoint.comms_address(),
code,
message,
})
}
other => Err(RemoteMobError::Decode {
endpoint: self.endpoint.comms_address(),
message: format!("expected Injected response, got {other:?}"),
}),
}
}
async fn dispatch_no_payload(
&self,
request: super::cross_mob_control::ControlRequest,
operation: &'static str,
) -> Result<(), RemoteMobError> {
let response = super::cross_mob_control::RemoteControlClient::send(
&self.endpoint,
&request,
super::cross_mob_control::DEFAULT_CONTROL_TIMEOUT,
)
.await
.map_err(|err| self.attach_mob_id(err))?;
match response {
super::cross_mob_control::ControlResponse::Ok => Ok(()),
super::cross_mob_control::ControlResponse::Err { code, message } => {
Err(RemoteMobError::Rejected {
mob_id: self.mob_id.clone(),
endpoint: self.endpoint.comms_address(),
code,
message,
})
}
other => Err(RemoteMobError::Decode {
endpoint: self.endpoint.comms_address(),
message: format!("expected Ok for {operation}, got {other:?}"),
}),
}
}
pub async fn lookup_member(
&self,
remote_member: &str,
) -> Result<(String, String), RemoteMobError> {
let request = super::cross_mob_control::ControlRequest::LookupMember {
remote_member: remote_member.to_string(),
};
let response = super::cross_mob_control::RemoteControlClient::send(
&self.endpoint,
&request,
super::cross_mob_control::DEFAULT_CONTROL_TIMEOUT,
)
.await
.map_err(|err| self.attach_mob_id(err))?;
match response {
super::cross_mob_control::ControlResponse::Member {
peer_id,
comms_name,
} => Ok((peer_id, comms_name)),
super::cross_mob_control::ControlResponse::Err { code, message } => {
Err(RemoteMobError::Rejected {
mob_id: self.mob_id.clone(),
endpoint: self.endpoint.comms_address(),
code,
message,
})
}
other => Err(RemoteMobError::Decode {
endpoint: self.endpoint.comms_address(),
message: format!("expected Member response, got {other:?}"),
}),
}
}
fn attach_mob_id(&self, err: RemoteMobError) -> RemoteMobError {
match err {
RemoteMobError::ControlChannelUnavailable {
mob_id,
endpoint,
operation,
} if mob_id.is_empty() || mob_id != self.mob_id => {
RemoteMobError::ControlChannelUnavailable {
mob_id: self.mob_id.clone(),
endpoint,
operation,
}
}
other => other,
}
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;
#[test]
fn from_entry_inproc_returns_none() {
let entry = ContactEntry {
mob_id: "demo".to_string(),
transport: MobTransport::Inproc,
pubkey: None,
};
let proxy = RemoteMobProxy::from_entry(&entry).expect("inproc is supported");
assert!(proxy.is_none());
}
#[test]
fn from_entry_tcp_round_trip() {
let entry = ContactEntry {
mob_id: "remote".to_string(),
transport: MobTransport::Tcp("127.0.0.1:9001".to_string()),
pubkey: None,
};
let proxy = RemoteMobProxy::from_entry(&entry)
.expect("tcp is supported")
.expect("tcp returns Some(proxy)");
assert_eq!(proxy.mob_id(), "remote");
assert_eq!(proxy.endpoint().scheme(), "tcp");
assert_eq!(proxy.endpoint().raw(), "127.0.0.1:9001");
assert_eq!(proxy.endpoint().comms_address(), "tcp://127.0.0.1:9001");
}
#[test]
fn from_entry_uds_round_trip() {
let entry = ContactEntry {
mob_id: "remote-uds".to_string(),
transport: MobTransport::Uds("/tmp/cross-mob.sock".to_string()),
pubkey: None,
};
let proxy = RemoteMobProxy::from_entry(&entry)
.expect("uds is supported")
.expect("uds returns Some(proxy)");
assert_eq!(proxy.endpoint().scheme(), "uds");
assert_eq!(proxy.endpoint().raw(), "/tmp/cross-mob.sock");
assert_eq!(
proxy.endpoint().comms_address(),
"uds:///tmp/cross-mob.sock"
);
}
#[tokio::test]
async fn wire_remote_returns_unavailable_when_no_listener() {
let entry = ContactEntry {
mob_id: "remote".to_string(),
transport: MobTransport::Tcp("127.0.0.1:1".to_string()),
pubkey: None,
};
let proxy = RemoteMobProxy::from_entry(&entry)
.expect("tcp ok")
.expect("some");
let err = proxy
.wire_remote(
"alice",
"tcp://127.0.0.1:9000",
"demo/role/alice",
"00000000-0000-4000-8000-000000000001",
None,
)
.await
.expect_err("no listener");
assert!(
matches!(err, RemoteMobError::ControlChannelUnavailable { ref mob_id, .. } if mob_id == "remote"),
"got {err:?}"
);
}
}