use crate::error::ClientError;
use crate::message::EmergentMessage;
use crate::stream::MessageStream;
use crate::subscribe::IntoSubscription;
use crate::types::{CorrelationId, PrimitiveName};
use crate::{DiscoveryInfo, PrimitiveInfo, Result};
use acton_reactive::ipc::protocol::{
Format, MAX_FRAME_SIZE, MSG_TYPE_DISCOVER, MSG_TYPE_PUSH, MSG_TYPE_REQUEST, MSG_TYPE_RESPONSE,
MSG_TYPE_SUBSCRIBE, MSG_TYPE_UNSUBSCRIBE, read_frame, write_frame,
};
use acton_reactive::ipc::{
IpcConfig, IpcDiscoverRequest, IpcDiscoverResponse, IpcEnvelope, IpcPushNotification,
IpcSubscribeRequest, IpcSubscriptionResponse, IpcUnsubscribeRequest, socket_exists,
socket_is_alive,
};
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::net::UnixStream;
use tokio::net::unix::{OwnedReadHalf, OwnedWriteHalf};
use tokio::sync::{Mutex, mpsc};
use tokio::time::timeout;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
#[derive(Clone, Debug, Serialize, Deserialize)]
struct IpcEmergentMessage {
inner: EmergentMessage,
}
fn resolve_socket_path(_name: &str) -> Result<PathBuf> {
if let Ok(path) = std::env::var("EMERGENT_SOCKET") {
return Ok(PathBuf::from(path));
}
let mut config = IpcConfig::load();
config.socket.app_name = Some("emergent".to_string());
Ok(config.socket_path())
}
async fn connect_to_engine(name: &str) -> Result<UnixStream> {
let socket_path = resolve_socket_path(name)?;
if !socket_exists(&socket_path) {
return Err(ClientError::SocketNotFound(
socket_path.display().to_string(),
));
}
if !socket_is_alive(&socket_path).await {
return Err(ClientError::ConnectionFailed(
"Engine socket exists but is not responding".to_string(),
));
}
UnixStream::connect(&socket_path)
.await
.map_err(|e| ClientError::ConnectionFailed(e.to_string()))
}
async fn discover_impl(
reader: &mut OwnedReadHalf,
writer: &mut OwnedWriteHalf,
) -> Result<DiscoveryInfo> {
let request = IpcDiscoverRequest::new();
let payload = rmp_serde::to_vec(&request)?;
write_frame(writer, MSG_TYPE_DISCOVER, Format::MessagePack, &payload)
.await
.map_err(ClientError::from)?;
let (msg_type, _, payload) = timeout(DEFAULT_TIMEOUT, read_frame(reader, MAX_FRAME_SIZE))
.await
.map_err(|_| ClientError::Timeout)?
.map_err(ClientError::from)?;
if msg_type != MSG_TYPE_RESPONSE {
return Err(ClientError::ProtocolError(format!(
"Expected RESPONSE, got message type {msg_type}"
)));
}
let response: IpcDiscoverResponse = rmp_serde::from_slice(&payload)?;
if !response.success {
return Err(ClientError::DiscoveryFailed(
response
.error
.unwrap_or_else(|| "Unknown error".to_string()),
));
}
Ok(DiscoveryInfo {
message_types: response.message_types.unwrap_or_default(),
primitives: response
.actors
.unwrap_or_default()
.into_iter()
.map(|a| PrimitiveInfo {
name: a.name,
kind: "unknown".to_string(),
})
.collect(),
})
}
async fn subscribe_impl(
reader: &mut OwnedReadHalf,
writer: &mut OwnedWriteHalf,
types: &[&str],
) -> Result<Vec<String>> {
let request = IpcSubscribeRequest::new(types.iter().map(|s| (*s).to_string()).collect());
let payload = rmp_serde::to_vec(&request)?;
write_frame(writer, MSG_TYPE_SUBSCRIBE, Format::MessagePack, &payload)
.await
.map_err(ClientError::from)?;
let (msg_type, _, payload) = timeout(DEFAULT_TIMEOUT, read_frame(reader, MAX_FRAME_SIZE))
.await
.map_err(|_| ClientError::Timeout)?
.map_err(ClientError::from)?;
if msg_type != MSG_TYPE_RESPONSE {
return Err(ClientError::ProtocolError(format!(
"Expected RESPONSE, got message type {msg_type}"
)));
}
let response: IpcSubscriptionResponse = rmp_serde::from_slice(&payload)?;
if !response.success {
return Err(ClientError::SubscriptionFailed(
response
.error
.unwrap_or_else(|| "Unknown error".to_string()),
));
}
Ok(response.subscribed_types)
}
async fn unsubscribe_impl(
reader: &mut OwnedReadHalf,
writer: &mut OwnedWriteHalf,
types: &[&str],
) -> Result<()> {
let request = if types.is_empty() {
IpcUnsubscribeRequest::unsubscribe_all()
} else {
IpcUnsubscribeRequest::new(types.iter().map(|s| (*s).to_string()).collect())
};
let payload = rmp_serde::to_vec(&request)?;
write_frame(writer, MSG_TYPE_UNSUBSCRIBE, Format::MessagePack, &payload)
.await
.map_err(ClientError::from)?;
let (msg_type, _, payload) = timeout(DEFAULT_TIMEOUT, read_frame(reader, MAX_FRAME_SIZE))
.await
.map_err(|_| ClientError::Timeout)?
.map_err(ClientError::from)?;
if msg_type != MSG_TYPE_RESPONSE {
return Err(ClientError::ProtocolError(format!(
"Expected RESPONSE, got message type {msg_type}"
)));
}
let response: IpcSubscriptionResponse = rmp_serde::from_slice(&payload)?;
if !response.success {
return Err(ClientError::SubscriptionFailed(
response
.error
.unwrap_or_else(|| "Unknown error".to_string()),
));
}
Ok(())
}
async fn publish_impl(writer: &mut OwnedWriteHalf, message: EmergentMessage) -> Result<()> {
let ipc_message = IpcEmergentMessage { inner: message };
let envelope = IpcEnvelope::new(
"message_broker",
"EmergentMessage",
serde_json::to_value(&ipc_message)?,
);
let payload = rmp_serde::to_vec(&envelope)?;
write_frame(writer, MSG_TYPE_REQUEST, Format::MessagePack, &payload)
.await
.map_err(ClientError::from)?;
Ok(())
}
#[derive(Debug, Deserialize)]
struct SubscriptionsResponse {
subscribes: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TopologyPrimitive {
pub name: String,
pub kind: String,
pub state: String,
pub publishes: Vec<String>,
pub subscribes: Vec<String>,
pub pid: Option<u32>,
pub error: Option<String>,
}
#[derive(Debug, Deserialize)]
struct TopologyResponse {
primitives: Vec<TopologyPrimitive>,
}
#[derive(Debug, Clone)]
pub struct TopologyState {
pub primitives: Vec<TopologyPrimitive>,
}
async fn get_my_subscriptions_impl(
reader: &mut OwnedReadHalf,
writer: &mut OwnedWriteHalf,
name: &str,
) -> Result<Vec<String>> {
let correlation_id = CorrelationId::new();
subscribe_impl(reader, writer, &["system.response.subscriptions"]).await?;
let request = EmergentMessage::new("system.request.subscriptions")
.with_source(name)
.with_correlation_id(correlation_id.clone())
.with_payload(json!({ "name": name }));
publish_impl(writer, request).await?;
timeout(DEFAULT_TIMEOUT, async {
loop {
let (msg_type, _, payload) = read_frame(reader, MAX_FRAME_SIZE)
.await
.map_err(ClientError::from)?;
if msg_type == MSG_TYPE_PUSH {
let notification: IpcPushNotification = rmp_serde::from_slice(&payload)?;
if notification.message_type == "system.response.subscriptions" {
let msg: EmergentMessage = serde_json::from_value(notification.payload)?;
if msg.correlation_id.as_ref().map(|c| c.to_string())
== Some(correlation_id.to_string())
{
let subs_response: SubscriptionsResponse =
serde_json::from_value(msg.payload)?;
return Ok(subs_response.subscribes);
}
}
}
}
})
.await
.map_err(|_| ClientError::Timeout)?
}
async fn get_topology_impl(
reader: &mut OwnedReadHalf,
writer: &mut OwnedWriteHalf,
name: &str,
) -> Result<TopologyState> {
let correlation_id = CorrelationId::new();
subscribe_impl(reader, writer, &["system.response.topology"]).await?;
let request = EmergentMessage::new("system.request.topology")
.with_source(name)
.with_correlation_id(correlation_id.clone())
.with_payload(json!({}));
publish_impl(writer, request).await?;
timeout(DEFAULT_TIMEOUT, async {
loop {
let (msg_type, _, payload) = read_frame(reader, MAX_FRAME_SIZE)
.await
.map_err(ClientError::from)?;
if msg_type == MSG_TYPE_PUSH {
let notification: IpcPushNotification = rmp_serde::from_slice(&payload)?;
if notification.message_type == "system.response.topology" {
let msg: EmergentMessage = serde_json::from_value(notification.payload)?;
if msg.correlation_id.as_ref().map(|c| c.to_string())
== Some(correlation_id.to_string())
{
let topo_response: TopologyResponse = serde_json::from_value(msg.payload)?;
return Ok(TopologyState {
primitives: topo_response.primitives,
});
}
}
}
}
})
.await
.map_err(|_| ClientError::Timeout)?
}
pub struct EmergentSource {
name: String,
writer: Arc<Mutex<OwnedWriteHalf>>,
reader: Arc<Mutex<OwnedReadHalf>>,
}
impl EmergentSource {
pub async fn connect(name: &str) -> Result<Self> {
let stream = connect_to_engine(name).await?;
let (reader, writer) = stream.into_split();
Ok(Self {
name: name.to_string(),
writer: Arc::new(Mutex::new(writer)),
reader: Arc::new(Mutex::new(reader)),
})
}
pub async fn publish(&self, mut message: EmergentMessage) -> Result<()> {
if message.source.is_default() {
message.source = PrimitiveName::new(&self.name).map_err(|e| {
ClientError::ConnectionFailed(format!(
"invalid primitive name '{}': {}",
self.name, e
))
})?;
}
let mut writer = self.writer.lock().await;
publish_impl(&mut writer, message).await
}
pub async fn discover(&self) -> Result<DiscoveryInfo> {
let mut reader = self.reader.lock().await;
let mut writer = self.writer.lock().await;
discover_impl(&mut reader, &mut writer).await
}
#[must_use]
pub fn name(&self) -> &str {
&self.name
}
pub async fn disconnect(&self) -> Result<()> {
let mut reader = self.reader.lock().await;
let mut writer = self.writer.lock().await;
let request = IpcUnsubscribeRequest::unsubscribe_all();
let payload = rmp_serde::to_vec(&request)?;
if write_frame(
&mut *writer,
MSG_TYPE_UNSUBSCRIBE,
Format::MessagePack,
&payload,
)
.await
.is_err()
{
return Ok(());
}
let short_timeout = Duration::from_secs(1);
let _ = timeout(short_timeout, read_frame(&mut *reader, MAX_FRAME_SIZE)).await;
use tokio::io::AsyncWriteExt;
let _ = writer.shutdown().await;
Ok(())
}
}
pub struct EmergentHandler {
name: String,
writer: Arc<Mutex<OwnedWriteHalf>>,
subscribed_types: Arc<Mutex<Vec<String>>>,
}
impl EmergentHandler {
pub async fn connect(name: &str) -> Result<Self> {
let stream = connect_to_engine(name).await?;
let (_reader, writer) = stream.into_split();
Ok(Self {
name: name.to_string(),
writer: Arc::new(Mutex::new(writer)),
subscribed_types: Arc::new(Mutex::new(Vec::new())),
})
}
pub async fn subscribe(&self, types: impl IntoSubscription) -> Result<MessageStream> {
let topics = types.into_topics();
let stream = connect_to_engine(&self.name).await?;
let (mut reader, mut writer) = stream.into_split();
let mut all_types: Vec<&str> = topics.iter().map(String::as_str).collect();
if !all_types.contains(&"system.shutdown") {
all_types.push("system.shutdown");
}
let subscribed = subscribe_impl(&mut reader, &mut writer, &all_types).await?;
{
let mut subs = self.subscribed_types.lock().await;
*subs = subscribed
.into_iter()
.filter(|s| s != "system.shutdown")
.collect();
}
let (tx, rx) = mpsc::channel(256);
tokio::spawn(async move {
let _writer = writer;
loop {
match read_frame(&mut reader, MAX_FRAME_SIZE).await {
Ok((msg_type, _format, payload)) => {
if msg_type == MSG_TYPE_PUSH {
match rmp_serde::from_slice::<IpcPushNotification>(&payload) {
Ok(notification) => {
if notification.message_type == "system.shutdown" {
if let Some(kind) = notification
.payload
.get("kind")
.and_then(|v| v.as_str())
&& kind == "handler"
{
break;
}
continue; }
if let Ok(msg) = serde_json::from_value::<EmergentMessage>(
notification.payload.clone(),
) {
if tx.send(msg).await.is_err() {
break; }
} else {
let msg = EmergentMessage::new(¬ification.message_type)
.with_source(
notification
.source_actor
.as_deref()
.unwrap_or("unknown"),
)
.with_payload(notification.payload);
if tx.send(msg).await.is_err() {
break;
}
}
}
Err(e) => {
tracing::warn!("Failed to parse push notification: {}", e);
}
}
}
}
Err(e) => {
tracing::debug!("Connection closed: {}", e);
break;
}
}
}
});
Ok(MessageStream::new(rx))
}
pub async fn unsubscribe(&self, types: &[&str]) -> Result<()> {
let stream = connect_to_engine(&self.name).await?;
let (mut reader, mut writer) = stream.into_split();
unsubscribe_impl(&mut reader, &mut writer, types).await?;
{
let mut subs = self.subscribed_types.lock().await;
for t in types {
subs.retain(|s| s != *t);
}
}
Ok(())
}
pub async fn publish(&self, mut message: EmergentMessage) -> Result<()> {
if message.source.is_default() {
message.source = PrimitiveName::new(&self.name).map_err(|e| {
ClientError::ConnectionFailed(format!(
"invalid primitive name '{}': {}",
self.name, e
))
})?;
}
let mut writer = self.writer.lock().await;
publish_impl(&mut writer, message).await
}
pub async fn discover(&self) -> Result<DiscoveryInfo> {
let stream = connect_to_engine(&self.name).await?;
let (mut reader, mut writer) = stream.into_split();
discover_impl(&mut reader, &mut writer).await
}
#[must_use]
pub fn name(&self) -> &str {
&self.name
}
pub async fn subscribed_types(&self) -> Vec<String> {
self.subscribed_types.lock().await.clone()
}
pub async fn disconnect(&self) -> Result<()> {
self.unsubscribe(&[]).await?;
Ok(())
}
}
pub struct EmergentSink {
name: String,
subscribed_types: Arc<Mutex<Vec<String>>>,
}
impl EmergentSink {
pub async fn connect(name: &str) -> Result<Self> {
let _ = connect_to_engine(name).await?;
Ok(Self {
name: name.to_string(),
subscribed_types: Arc::new(Mutex::new(Vec::new())),
})
}
pub async fn messages(
name: impl Into<String>,
_types: impl IntoSubscription,
) -> Result<MessageStream> {
let name = name.into();
let sink = Self::connect(&name).await?;
let topics = sink.get_my_subscriptions().await?;
sink.subscribe(topics).await
}
pub async fn subscribe(&self, types: impl IntoSubscription) -> Result<MessageStream> {
let topics = types.into_topics();
let stream = connect_to_engine(&self.name).await?;
let (mut reader, mut writer) = stream.into_split();
let mut all_types: Vec<&str> = topics.iter().map(String::as_str).collect();
if !all_types.contains(&"system.shutdown") {
all_types.push("system.shutdown");
}
let subscribed = subscribe_impl(&mut reader, &mut writer, &all_types).await?;
{
let mut subs = self.subscribed_types.lock().await;
*subs = subscribed
.into_iter()
.filter(|s| s != "system.shutdown")
.collect();
}
let (tx, rx) = mpsc::channel(256);
tokio::spawn(async move {
let _writer = writer;
loop {
match read_frame(&mut reader, MAX_FRAME_SIZE).await {
Ok((msg_type, _format, payload)) => {
if msg_type == MSG_TYPE_PUSH {
match rmp_serde::from_slice::<IpcPushNotification>(&payload) {
Ok(notification) => {
if notification.message_type == "system.shutdown" {
if let Some(kind) = notification
.payload
.get("kind")
.and_then(|v| v.as_str())
&& kind == "sink"
{
break;
}
continue; }
if let Ok(msg) = serde_json::from_value::<EmergentMessage>(
notification.payload.clone(),
) {
if tx.send(msg).await.is_err() {
break; }
} else {
let msg = EmergentMessage::new(¬ification.message_type)
.with_source(
notification
.source_actor
.as_deref()
.unwrap_or("unknown"),
)
.with_payload(notification.payload);
if tx.send(msg).await.is_err() {
break;
}
}
}
Err(e) => {
tracing::warn!("Failed to parse push notification: {}", e);
}
}
}
}
Err(e) => {
tracing::debug!("Connection closed: {}", e);
break;
}
}
}
});
Ok(MessageStream::new(rx))
}
pub async fn unsubscribe(&self, types: &[&str]) -> Result<()> {
let stream = connect_to_engine(&self.name).await?;
let (mut reader, mut writer) = stream.into_split();
unsubscribe_impl(&mut reader, &mut writer, types).await?;
{
let mut subs = self.subscribed_types.lock().await;
for t in types {
subs.retain(|s| s != *t);
}
}
Ok(())
}
pub async fn discover(&self) -> Result<DiscoveryInfo> {
let stream = connect_to_engine(&self.name).await?;
let (mut reader, mut writer) = stream.into_split();
discover_impl(&mut reader, &mut writer).await
}
pub async fn get_my_subscriptions(&self) -> Result<Vec<String>> {
let stream = connect_to_engine(&self.name).await?;
let (mut reader, mut writer) = stream.into_split();
get_my_subscriptions_impl(&mut reader, &mut writer, &self.name).await
}
pub async fn get_topology(&self) -> Result<TopologyState> {
let stream = connect_to_engine(&self.name).await?;
let (mut reader, mut writer) = stream.into_split();
get_topology_impl(&mut reader, &mut writer, &self.name).await
}
#[must_use]
pub fn name(&self) -> &str {
&self.name
}
pub async fn subscribed_types(&self) -> Vec<String> {
self.subscribed_types.lock().await.clone()
}
pub async fn disconnect(&self) -> Result<()> {
self.unsubscribe(&[]).await?;
Ok(())
}
}