use crate::error::ClientError;
use crate::message::EmergentMessage;
use crate::stream::MessageStream;
use crate::subscribe::IntoSubscription;
use crate::types::{CorrelationId, PrimitiveName};
use crate::{DiscoveryInfo, PrimitiveInfo, Result};
use tracing::{debug, error, info, warn};
use acton_reactive::ipc::protocol::Format;
use acton_reactive::ipc::{
IpcClient, IpcClientConfig, IpcConfig, IpcEnvelope, IpcPushNotification, socket_exists,
socket_is_alive,
};
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::mpsc;
#[derive(Clone, Debug, Serialize, Deserialize)]
struct IpcEmergentMessage {
inner: EmergentMessage,
}
fn resolve_socket_path(_name: &str) -> Result<PathBuf> {
if let Ok(path) = std::env::var("EMERGENT_SOCKET") {
return Ok(PathBuf::from(path));
}
let mut config = IpcConfig::load();
config.socket.app_name = Some("emergent".to_string());
Ok(config.socket_path())
}
fn init_tracing(name: &str) {
use tracing_subscriber::EnvFilter;
let filter = EnvFilter::try_from_env("EMERGENT_LOG")
.or_else(|_| EnvFilter::try_from_default_env())
.unwrap_or_else(|_| EnvFilter::new("info"));
let wants_stderr = std::env::var("EMERGENT_LOG")
.map(|v| v.eq_ignore_ascii_case("stderr"))
.unwrap_or(false);
if wants_stderr {
let stderr_filter = EnvFilter::new("info");
let _ = tracing_subscriber::fmt()
.with_env_filter(stderr_filter)
.try_init();
} else {
let log_dir = directories::ProjectDirs::from("ai", "govcraft", "emergent")
.map(|dirs| dirs.data_dir().join(name))
.unwrap_or_else(|| std::path::PathBuf::from("."));
let _ = std::fs::create_dir_all(&log_dir);
if let Ok(log_file) = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(log_dir.join("primitive.log"))
{
let _ = tracing_subscriber::fmt()
.with_env_filter(filter)
.with_writer(std::sync::Mutex::new(log_file))
.with_ansi(false)
.try_init();
} else {
let _ = tracing_subscriber::fmt()
.with_env_filter(EnvFilter::new("off"))
.try_init();
}
}
}
async fn connect_to_engine(
name: &str,
socket_override: Option<&std::path::Path>,
) -> Result<IpcClient> {
init_tracing(name);
let socket_path = match socket_override {
Some(path) => path.to_path_buf(),
None => resolve_socket_path(name)?,
};
debug!(path = %socket_path.display(), "resolved socket path");
info!(primitive.name = %name, path = %socket_path.display(), "connecting to engine");
if !socket_exists(&socket_path) {
error!(path = %socket_path.display(), "engine socket not found");
return Err(ClientError::SocketNotFound(
socket_path.display().to_string(),
));
}
if !socket_is_alive(&socket_path).await {
error!(path = %socket_path.display(), "engine socket not responding");
return Err(ClientError::ConnectionFailed(
"Engine socket exists but is not responding".to_string(),
));
}
let config = IpcClientConfig {
format: Format::MessagePack,
..IpcClientConfig::default()
};
IpcClient::connect_with_config(&socket_path, config)
.await
.map_err(|e| {
error!(error = %e, "failed to connect to engine");
ClientError::ConnectionFailed(e.to_string())
})
}
fn build_publish_envelope(message: EmergentMessage) -> Result<IpcEnvelope> {
let ipc_message = IpcEmergentMessage { inner: message };
let payload = serde_json::to_value(&ipc_message)?;
Ok(IpcEnvelope::new(
"message_broker",
"EmergentMessage",
payload,
))
}
fn build_publish_request_envelope(message: EmergentMessage) -> Result<IpcEnvelope> {
let ipc_message = IpcEmergentMessage { inner: message };
let payload = serde_json::to_value(&ipc_message)?;
Ok(IpcEnvelope::new_request(
"message_broker",
"EmergentMessage",
payload,
))
}
async fn push_to_message_stream(
mut push_rx: mpsc::Receiver<IpcPushNotification>,
tx: mpsc::Sender<EmergentMessage>,
name: String,
shutdown_kind: &str,
) {
debug!(primitive.name = %name, "push bridge started");
let auto_unwrap = std::env::var("EMERGENT_UNWRAP_STDOUT")
.is_ok_and(|v| v == "true" || v == "1");
while let Some(notification) = push_rx.recv().await {
if notification.message_type == "system.shutdown" {
let kind = notification
.payload
.get("kind")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
info!(
primitive.name = %name,
shutdown_kind = %kind,
"received shutdown signal"
);
if kind == shutdown_kind {
info!(
primitive.name = %name,
"shutting down (engine requested)"
);
break;
}
debug!(
primitive.name = %name,
"ignoring shutdown for different primitive kind"
);
continue; }
let msg = if let Ok(msg) =
serde_json::from_value::<EmergentMessage>(notification.payload.clone())
{
msg
} else {
EmergentMessage::new(¬ification.message_type)
.with_source(notification.source_actor.as_deref().unwrap_or("unknown"))
.with_payload(notification.payload)
};
let msg = if auto_unwrap && !msg.message_type.as_str().starts_with("system.") {
msg.unwrap_stdout()
} else {
msg
};
debug!(
primitive.name = %name,
message_type = %msg.message_type,
message_id = %msg.id,
"received message"
);
if tx.send(msg).await.is_err() {
warn!(
primitive.name = %name,
"message stream send failed, receiver dropped"
);
break;
}
}
debug!(primitive.name = %name, "push bridge stopped");
}
async fn subscribe_and_stream(
client: &IpcClient,
topics: Vec<String>,
name: &str,
shutdown_kind: &str,
) -> Result<(MessageStream, Vec<String>)> {
let mut all_types = topics;
if !all_types.iter().any(|t| t == "system.shutdown") {
all_types.push("system.shutdown".to_string());
}
let sub_response = client
.subscribe(all_types)
.await
.map_err(|e| ClientError::SubscriptionFailed(format!("subscribe failed: {e}")))?;
if !sub_response.success {
return Err(ClientError::SubscriptionFailed(
sub_response
.error
.unwrap_or_else(|| "unknown error".to_string()),
));
}
let push_rx = client.take_push_receiver().ok_or_else(|| {
ClientError::SubscriptionFailed(
"push receiver already taken (subscribe called more than once?)".to_string(),
)
})?;
let (tx, rx) = mpsc::channel(256);
let bridge_name = name.to_string();
let bridge_kind = shutdown_kind.to_string();
tokio::spawn(async move {
push_to_message_stream(push_rx, tx, bridge_name, &bridge_kind).await;
});
let user_subs: Vec<String> = sub_response
.subscribed_types
.into_iter()
.filter(|s| s != "system.shutdown")
.collect();
Ok((MessageStream::new(rx), user_subs))
}
async fn get_my_subscriptions_via_pubsub(name: &str) -> Result<Vec<String>> {
debug!("querying configured subscriptions");
let client = connect_to_engine(name, None).await?;
let correlation_id = CorrelationId::new();
client
.subscribe(vec!["system.response.subscriptions".to_string()])
.await
.map_err(|e| ClientError::SubscriptionFailed(format!("subscribe failed: {e}")))?;
let mut push_rx = client.take_push_receiver().ok_or_else(|| {
ClientError::SubscriptionFailed("push receiver already taken".to_string())
})?;
let request = EmergentMessage::new("system.request.subscriptions")
.with_source(name)
.with_correlation_id(correlation_id.clone())
.with_payload(json!({ "name": name }));
let envelope = build_publish_envelope(request)?;
client
.send(envelope)
.await
.map_err(|e| ClientError::ConnectionFailed(format!("publish failed: {e}")))?;
let subs = tokio::time::timeout(std::time::Duration::from_secs(30), async {
while let Some(notification) = push_rx.recv().await {
if notification.message_type == "system.response.subscriptions" {
let msg: EmergentMessage = serde_json::from_value(notification.payload)?;
if msg.correlation_id.as_ref().map(|c| c.to_string())
== Some(correlation_id.to_string())
{
let subs_response: SubscriptionsResponse = serde_json::from_value(msg.payload)?;
return Ok(subs_response.subscribes);
}
}
}
Err(ClientError::ConnectionFailed(
"push channel closed before response".to_string(),
))
})
.await
.map_err(|_| ClientError::Timeout)??;
info!(types = ?subs, "received configured subscriptions");
Ok(subs)
}
async fn get_topology_via_pubsub(name: &str) -> Result<TopologyState> {
debug!("querying topology");
let client = connect_to_engine(name, None).await?;
let correlation_id = CorrelationId::new();
client
.subscribe(vec!["system.response.topology".to_string()])
.await
.map_err(|e| ClientError::SubscriptionFailed(format!("subscribe failed: {e}")))?;
let mut push_rx = client.take_push_receiver().ok_or_else(|| {
ClientError::SubscriptionFailed("push receiver already taken".to_string())
})?;
let request = EmergentMessage::new("system.request.topology")
.with_source(name)
.with_correlation_id(correlation_id.clone())
.with_payload(json!({}));
let envelope = build_publish_envelope(request)?;
client
.send(envelope)
.await
.map_err(|e| ClientError::ConnectionFailed(format!("publish failed: {e}")))?;
let state = tokio::time::timeout(std::time::Duration::from_secs(30), async {
while let Some(notification) = push_rx.recv().await {
if notification.message_type == "system.response.topology" {
let msg: EmergentMessage = serde_json::from_value(notification.payload)?;
if msg.correlation_id.as_ref().map(|c| c.to_string())
== Some(correlation_id.to_string())
{
let topo_response: TopologyResponse = serde_json::from_value(msg.payload)?;
return Ok(TopologyState {
primitives: topo_response.primitives,
});
}
}
}
Err(ClientError::ConnectionFailed(
"push channel closed before response".to_string(),
))
})
.await
.map_err(|_| ClientError::Timeout)??;
debug!(
primitive_count = state.primitives.len(),
"received topology"
);
Ok(state)
}
#[derive(Debug, Deserialize)]
struct SubscriptionsResponse {
subscribes: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TopologyPrimitive {
pub name: String,
pub kind: String,
pub state: String,
pub publishes: Vec<String>,
pub subscribes: Vec<String>,
pub pid: Option<u32>,
pub error: Option<String>,
}
#[derive(Debug, Deserialize)]
struct TopologyResponse {
primitives: Vec<TopologyPrimitive>,
}
#[derive(Debug, Clone)]
pub struct TopologyState {
pub primitives: Vec<TopologyPrimitive>,
}
pub struct EmergentSource {
name: String,
client: IpcClient,
}
impl EmergentSource {
pub async fn connect(name: &str) -> Result<Self> {
let client = connect_to_engine(name, None).await?;
info!(primitive.name = %name, primitive.kind = "source", "connected to engine");
Ok(Self {
name: name.to_string(),
client,
})
}
pub async fn connect_to(name: &str, socket_path: &std::path::Path) -> Result<Self> {
let client = connect_to_engine(name, Some(socket_path)).await?;
info!(primitive.name = %name, primitive.kind = "source", "connected to engine");
Ok(Self {
name: name.to_string(),
client,
})
}
pub async fn publish(&self, mut message: EmergentMessage) -> Result<()> {
if message.source.is_default() {
message.source = PrimitiveName::new(&self.name).map_err(|e| {
ClientError::ConnectionFailed(format!(
"invalid primitive name '{}': {}",
self.name, e
))
})?;
}
let envelope = build_publish_envelope(message)?;
self.client.send(envelope).await.map_err(|e| {
error!(primitive.name = %self.name, error = %e, "failed to publish message");
ClientError::ConnectionFailed(format!("publish failed: {e}"))
})
}
pub async fn publish_ack(&self, mut message: EmergentMessage) -> Result<()> {
if message.source.is_default() {
message.source = PrimitiveName::new(&self.name).map_err(|e| {
ClientError::ConnectionFailed(format!(
"invalid primitive name '{}': {}",
self.name, e
))
})?;
}
let envelope = build_publish_request_envelope(message)?;
let response = self.client.request(envelope).await.map_err(|e| {
error!(primitive.name = %self.name, error = %e, "publish_ack failed");
ClientError::ConnectionFailed(format!("publish_ack failed: {e}"))
})?;
if !response.success {
return Err(ClientError::PublishFailed(
response.error.unwrap_or_else(|| "broker error".to_string()),
));
}
Ok(())
}
pub async fn publish_all(
&self,
messages: impl IntoIterator<Item = EmergentMessage>,
) -> Result<usize> {
let mut count = 0;
for message in messages {
self.publish_ack(message).await?;
count += 1;
}
Ok(count)
}
pub async fn publish_stream<S>(&self, mut stream: S) -> Result<usize>
where
S: futures::Stream<Item = EmergentMessage> + Unpin,
{
use futures::StreamExt;
let mut count = 0;
while let Some(message) = stream.next().await {
self.publish_ack(message).await?;
count += 1;
}
Ok(count)
}
pub async fn discover(&self) -> Result<DiscoveryInfo> {
let client = connect_to_engine(&self.name, None).await?;
let response = client
.discover()
.await
.map_err(|e| ClientError::ConnectionFailed(format!("discover failed: {e}")))?;
if !response.success {
return Err(ClientError::DiscoveryFailed(
response
.error
.unwrap_or_else(|| "unknown error".to_string()),
));
}
let primitives = response
.actors
.unwrap_or_default()
.into_iter()
.map(|actor| PrimitiveInfo {
name: actor.name,
kind: String::new(),
})
.collect();
let message_types = response.message_types.unwrap_or_default();
Ok(DiscoveryInfo {
message_types,
primitives,
})
}
#[must_use]
pub fn name(&self) -> &str {
&self.name
}
pub async fn disconnect(&self) -> Result<()> {
info!(primitive.name = %self.name, "disconnecting from engine");
self.client
.disconnect()
.await
.map_err(|e| ClientError::ConnectionFailed(format!("disconnect failed: {e}")))?;
info!(primitive.name = %self.name, "disconnected from engine");
Ok(())
}
}
pub struct EmergentHandler {
name: String,
client: Arc<IpcClient>,
subscribed_types: Vec<String>,
}
impl EmergentHandler {
pub async fn connect(name: &str) -> Result<Self> {
let client = connect_to_engine(name, None).await?;
info!(primitive.name = %name, primitive.kind = "handler", "connected to engine");
Ok(Self {
name: name.to_string(),
client: Arc::new(client),
subscribed_types: Vec::new(),
})
}
pub async fn connect_to(name: &str, socket_path: &std::path::Path) -> Result<Self> {
let client = connect_to_engine(name, Some(socket_path)).await?;
info!(primitive.name = %name, primitive.kind = "handler", "connected to engine");
Ok(Self {
name: name.to_string(),
client: Arc::new(client),
subscribed_types: Vec::new(),
})
}
pub async fn subscribe(&mut self, types: impl IntoSubscription) -> Result<MessageStream> {
let topics = types.into_topics();
let (stream, user_subs) =
subscribe_and_stream(&self.client, topics, &self.name, "handler").await?;
self.subscribed_types = user_subs;
Ok(stream)
}
pub async fn messages(
name: impl Into<String>,
_types: impl IntoSubscription,
) -> Result<(Self, MessageStream)> {
let name = name.into();
let mut handler = Self::connect(&name).await?;
let topics = get_my_subscriptions_via_pubsub(&name).await?;
let stream = handler.subscribe(topics).await?;
Ok((handler, stream))
}
pub async fn publish(&self, mut message: EmergentMessage) -> Result<()> {
if message.source.is_default() {
message.source = PrimitiveName::new(&self.name).map_err(|e| {
ClientError::ConnectionFailed(format!(
"invalid primitive name '{}': {}",
self.name, e
))
})?;
}
let envelope = build_publish_envelope(message)?;
self.client.send(envelope).await.map_err(|e| {
error!(primitive.name = %self.name, error = %e, "failed to publish message");
ClientError::ConnectionFailed(format!("publish failed: {e}"))
})
}
pub async fn publish_ack(&self, mut message: EmergentMessage) -> Result<()> {
if message.source.is_default() {
message.source = PrimitiveName::new(&self.name).map_err(|e| {
ClientError::ConnectionFailed(format!(
"invalid primitive name '{}': {}",
self.name, e
))
})?;
}
let envelope = build_publish_request_envelope(message)?;
let response = self.client.request(envelope).await.map_err(|e| {
error!(primitive.name = %self.name, error = %e, "publish_ack failed");
ClientError::ConnectionFailed(format!("publish_ack failed: {e}"))
})?;
if !response.success {
return Err(ClientError::PublishFailed(
response.error.unwrap_or_else(|| "broker error".to_string()),
));
}
Ok(())
}
pub async fn publish_all(
&self,
messages: impl IntoIterator<Item = EmergentMessage>,
) -> Result<usize> {
let mut count = 0;
for message in messages {
self.publish_ack(message).await?;
count += 1;
}
Ok(count)
}
pub async fn publish_stream<S>(&self, mut stream: S) -> Result<usize>
where
S: futures::Stream<Item = EmergentMessage> + Unpin,
{
use futures::StreamExt;
let mut count = 0;
while let Some(message) = stream.next().await {
self.publish_ack(message).await?;
count += 1;
}
Ok(count)
}
pub async fn stream_offer(
&self,
message_type: &str,
items: impl IntoIterator<Item = serde_json::Value>,
pull_stream: &mut MessageStream,
timeout: std::time::Duration,
) -> Result<usize> {
let stream_id = CorrelationId::new().to_string();
let mut items = items.into_iter();
self.publish(
EmergentMessage::new("stream.ready").with_payload(serde_json::json!({
"stream_id": stream_id,
"message_type": message_type,
})),
)
.await?;
let mut published = 0usize;
loop {
let msg = tokio::time::timeout(timeout, pull_stream.next())
.await
.map_err(|_| ClientError::Timeout)?
.ok_or_else(|| {
ClientError::ConnectionFailed(
"pull stream closed during stream_offer".to_string(),
)
})?;
let is_pull = msg.message_type.as_str() == "stream.pull"
&& msg.payload.get("stream_id").and_then(|v| v.as_str())
== Some(stream_id.as_str());
if is_pull {
if let Some(item) = items.next() {
self.publish(
EmergentMessage::new(message_type)
.with_payload(item)
.with_metadata(serde_json::json!({"stream_id": stream_id})),
)
.await?;
published += 1;
} else {
self.publish(
EmergentMessage::new("stream.end")
.with_payload(serde_json::json!({"stream_id": stream_id})),
)
.await?;
break;
}
}
}
Ok(published)
}
pub async fn stream_consume(
&self,
message_type: &str,
source_stream: &mut MessageStream,
timeout: std::time::Duration,
mut on_item: impl FnMut(EmergentMessage),
) -> Result<usize> {
let stream_id = loop {
let msg = tokio::time::timeout(timeout, source_stream.next())
.await
.map_err(|_| ClientError::Timeout)?
.ok_or_else(|| {
ClientError::ConnectionFailed(
"source stream closed before stream.ready".to_string(),
)
})?;
if msg.message_type.as_str() == "stream.ready"
&& let (Some(mt), Some(sid)) = (
msg.payload.get("message_type").and_then(|v| v.as_str()),
msg.payload.get("stream_id").and_then(|v| v.as_str()),
)
&& mt == message_type
{
break sid.to_string();
}
};
self.publish(
EmergentMessage::new("stream.pull")
.with_payload(serde_json::json!({"stream_id": stream_id})),
)
.await?;
let mut count = 0usize;
loop {
let msg = tokio::time::timeout(timeout, source_stream.next())
.await
.map_err(|_| ClientError::Timeout)?
.ok_or_else(|| {
ClientError::ConnectionFailed(
"source stream closed during stream_consume".to_string(),
)
})?;
if msg.message_type.as_str() == "stream.end"
&& msg.payload.get("stream_id").and_then(|v| v.as_str()) == Some(stream_id.as_str())
{
break;
}
let is_item = msg.message_type.as_str() == message_type
&& msg
.metadata
.as_ref()
.and_then(|m| m.get("stream_id"))
.and_then(|v| v.as_str())
== Some(stream_id.as_str());
if is_item {
on_item(msg);
count += 1;
self.publish(
EmergentMessage::new("stream.pull")
.with_payload(serde_json::json!({"stream_id": stream_id})),
)
.await?;
}
}
Ok(count)
}
pub async fn discover(&self) -> Result<DiscoveryInfo> {
let client = connect_to_engine(&self.name, None).await?;
let response = client
.discover()
.await
.map_err(|e| ClientError::ConnectionFailed(format!("discover failed: {e}")))?;
if !response.success {
return Err(ClientError::DiscoveryFailed(
response
.error
.unwrap_or_else(|| "unknown error".to_string()),
));
}
let primitives = response
.actors
.unwrap_or_default()
.into_iter()
.map(|actor| PrimitiveInfo {
name: actor.name,
kind: String::new(),
})
.collect();
let message_types = response.message_types.unwrap_or_default();
Ok(DiscoveryInfo {
message_types,
primitives,
})
}
pub async fn get_my_subscriptions(&self) -> Result<Vec<String>> {
get_my_subscriptions_via_pubsub(&self.name).await
}
#[must_use]
pub fn name(&self) -> &str {
&self.name
}
pub fn subscribed_types(&self) -> &[String] {
&self.subscribed_types
}
pub async fn disconnect(&self) -> Result<()> {
info!(primitive.name = %self.name, "disconnecting from engine");
self.client
.disconnect()
.await
.map_err(|e| ClientError::ConnectionFailed(format!("disconnect failed: {e}")))?;
info!(primitive.name = %self.name, "disconnected from engine");
Ok(())
}
}
pub struct EmergentSink {
name: String,
client: Arc<IpcClient>,
subscribed_types: Vec<String>,
}
impl EmergentSink {
pub async fn connect(name: &str) -> Result<Self> {
let client = connect_to_engine(name, None).await?;
info!(primitive.name = %name, primitive.kind = "sink", "connected to engine");
Ok(Self {
name: name.to_string(),
client: Arc::new(client),
subscribed_types: Vec::new(),
})
}
pub async fn connect_to(name: &str, socket_path: &std::path::Path) -> Result<Self> {
let client = connect_to_engine(name, Some(socket_path)).await?;
info!(primitive.name = %name, primitive.kind = "sink", "connected to engine");
Ok(Self {
name: name.to_string(),
client: Arc::new(client),
subscribed_types: Vec::new(),
})
}
pub async fn messages(
name: impl Into<String>,
_types: impl IntoSubscription,
) -> Result<MessageStream> {
let name = name.into();
let mut sink = Self::connect(&name).await?;
let topics = sink.get_my_subscriptions().await?;
sink.subscribe(topics).await
}
pub async fn subscribe(&mut self, types: impl IntoSubscription) -> Result<MessageStream> {
let topics = types.into_topics();
let (stream, user_subs) =
subscribe_and_stream(&self.client, topics, &self.name, "sink").await?;
self.subscribed_types = user_subs;
Ok(stream)
}
pub async fn discover(&self) -> Result<DiscoveryInfo> {
let client = connect_to_engine(&self.name, None).await?;
let response = client
.discover()
.await
.map_err(|e| ClientError::ConnectionFailed(format!("discover failed: {e}")))?;
if !response.success {
return Err(ClientError::DiscoveryFailed(
response
.error
.unwrap_or_else(|| "unknown error".to_string()),
));
}
let primitives = response
.actors
.unwrap_or_default()
.into_iter()
.map(|actor| PrimitiveInfo {
name: actor.name,
kind: String::new(),
})
.collect();
let message_types = response.message_types.unwrap_or_default();
Ok(DiscoveryInfo {
message_types,
primitives,
})
}
pub async fn get_my_subscriptions(&self) -> Result<Vec<String>> {
get_my_subscriptions_via_pubsub(&self.name).await
}
pub async fn get_topology(&self) -> Result<TopologyState> {
get_topology_via_pubsub(&self.name).await
}
#[must_use]
pub fn name(&self) -> &str {
&self.name
}
pub fn subscribed_types(&self) -> &[String] {
&self.subscribed_types
}
pub async fn disconnect(&self) -> Result<()> {
info!(primitive.name = %self.name, "disconnecting from engine");
self.client
.disconnect()
.await
.map_err(|e| ClientError::ConnectionFailed(format!("disconnect failed: {e}")))?;
info!(primitive.name = %self.name, "disconnected from engine");
Ok(())
}
}