use crate::error::ClientError;
use crate::message::EmergentMessage;
use crate::stream::MessageStream;
use crate::subscribe::IntoSubscription;
use crate::types::{CorrelationId, PrimitiveName};
use crate::{DiscoveryInfo, PrimitiveInfo, Result};
use tracing::{debug, error, info, warn};
use acton_reactive::ipc::{
IpcClient, IpcClientConfig, IpcConfig, IpcEnvelope, IpcPushNotification, socket_exists,
socket_is_alive,
};
use acton_reactive::ipc::protocol::Format;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::mpsc;
#[derive(Clone, Debug, Serialize, Deserialize)]
struct IpcEmergentMessage {
inner: EmergentMessage,
}
fn resolve_socket_path(_name: &str) -> Result<PathBuf> {
if let Ok(path) = std::env::var("EMERGENT_SOCKET") {
return Ok(PathBuf::from(path));
}
let mut config = IpcConfig::load();
config.socket.app_name = Some("emergent".to_string());
Ok(config.socket_path())
}
fn init_tracing(name: &str) {
use tracing_subscriber::EnvFilter;
let filter = EnvFilter::try_from_env("EMERGENT_LOG")
.or_else(|_| EnvFilter::try_from_default_env())
.unwrap_or_else(|_| EnvFilter::new("info"));
let wants_stderr = std::env::var("EMERGENT_LOG")
.map(|v| v.eq_ignore_ascii_case("stderr"))
.unwrap_or(false);
if wants_stderr {
let stderr_filter = EnvFilter::new("info");
let _ = tracing_subscriber::fmt()
.with_env_filter(stderr_filter)
.try_init();
} else {
let log_dir = directories::ProjectDirs::from("ai", "govcraft", "emergent")
.map(|dirs| dirs.data_dir().join(name))
.unwrap_or_else(|| std::path::PathBuf::from("."));
let _ = std::fs::create_dir_all(&log_dir);
if let Ok(log_file) = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(log_dir.join("primitive.log"))
{
let _ = tracing_subscriber::fmt()
.with_env_filter(filter)
.with_writer(std::sync::Mutex::new(log_file))
.with_ansi(false)
.try_init();
} else {
let _ = tracing_subscriber::fmt()
.with_env_filter(EnvFilter::new("off"))
.try_init();
}
}
}
async fn connect_to_engine(name: &str) -> Result<IpcClient> {
init_tracing(name);
let socket_path = resolve_socket_path(name)?;
debug!(path = %socket_path.display(), "resolved socket path");
info!(primitive.name = %name, path = %socket_path.display(), "connecting to engine");
if !socket_exists(&socket_path) {
error!(path = %socket_path.display(), "engine socket not found");
return Err(ClientError::SocketNotFound(
socket_path.display().to_string(),
));
}
if !socket_is_alive(&socket_path).await {
error!(path = %socket_path.display(), "engine socket not responding");
return Err(ClientError::ConnectionFailed(
"Engine socket exists but is not responding".to_string(),
));
}
let config = IpcClientConfig {
format: Format::MessagePack,
..IpcClientConfig::default()
};
IpcClient::connect_with_config(&socket_path, config)
.await
.map_err(|e| {
error!(error = %e, "failed to connect to engine");
ClientError::ConnectionFailed(e.to_string())
})
}
fn build_publish_envelope(message: EmergentMessage) -> Result<IpcEnvelope> {
let ipc_message = IpcEmergentMessage { inner: message };
let payload = serde_json::to_value(&ipc_message)?;
Ok(IpcEnvelope::new("message_broker", "EmergentMessage", payload))
}
async fn push_to_message_stream(
mut push_rx: mpsc::Receiver<IpcPushNotification>,
tx: mpsc::Sender<EmergentMessage>,
name: String,
shutdown_kind: &str,
) {
debug!(primitive.name = %name, "push bridge started");
while let Some(notification) = push_rx.recv().await {
if notification.message_type == "system.shutdown" {
let kind = notification
.payload
.get("kind")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
info!(
primitive.name = %name,
shutdown_kind = %kind,
"received shutdown signal"
);
if kind == shutdown_kind {
info!(
primitive.name = %name,
"shutting down (engine requested)"
);
break;
}
debug!(
primitive.name = %name,
"ignoring shutdown for different primitive kind"
);
continue; }
let msg = if let Ok(msg) =
serde_json::from_value::<EmergentMessage>(notification.payload.clone())
{
msg
} else {
EmergentMessage::new(¬ification.message_type)
.with_source(
notification
.source_actor
.as_deref()
.unwrap_or("unknown"),
)
.with_payload(notification.payload)
};
debug!(
primitive.name = %name,
message_type = %msg.message_type,
message_id = %msg.id,
"received message"
);
if tx.send(msg).await.is_err() {
warn!(
primitive.name = %name,
"message stream send failed, receiver dropped"
);
break;
}
}
debug!(primitive.name = %name, "push bridge stopped");
}
async fn subscribe_and_stream(
client: &IpcClient,
topics: Vec<String>,
name: &str,
shutdown_kind: &str,
) -> Result<(MessageStream, Vec<String>)> {
let mut all_types = topics;
if !all_types.iter().any(|t| t == "system.shutdown") {
all_types.push("system.shutdown".to_string());
}
let sub_response = client.subscribe(all_types).await.map_err(|e| {
ClientError::SubscriptionFailed(format!("subscribe failed: {e}"))
})?;
if !sub_response.success {
return Err(ClientError::SubscriptionFailed(
sub_response.error.unwrap_or_else(|| "unknown error".to_string()),
));
}
let push_rx = client.take_push_receiver().ok_or_else(|| {
ClientError::SubscriptionFailed(
"push receiver already taken (subscribe called more than once?)".to_string(),
)
})?;
let (tx, rx) = mpsc::channel(256);
let bridge_name = name.to_string();
let bridge_kind = shutdown_kind.to_string();
tokio::spawn(async move {
push_to_message_stream(push_rx, tx, bridge_name, &bridge_kind).await;
});
let user_subs: Vec<String> = sub_response
.subscribed_types
.into_iter()
.filter(|s| s != "system.shutdown")
.collect();
Ok((MessageStream::new(rx), user_subs))
}
async fn get_my_subscriptions_via_pubsub(name: &str) -> Result<Vec<String>> {
debug!("querying configured subscriptions");
let client = connect_to_engine(name).await?;
let correlation_id = CorrelationId::new();
client
.subscribe(vec!["system.response.subscriptions".to_string()])
.await
.map_err(|e| ClientError::SubscriptionFailed(format!("subscribe failed: {e}")))?;
let mut push_rx = client.take_push_receiver().ok_or_else(|| {
ClientError::SubscriptionFailed("push receiver already taken".to_string())
})?;
let request = EmergentMessage::new("system.request.subscriptions")
.with_source(name)
.with_correlation_id(correlation_id.clone())
.with_payload(json!({ "name": name }));
let envelope = build_publish_envelope(request)?;
client.send(envelope).await.map_err(|e| {
ClientError::ConnectionFailed(format!("publish failed: {e}"))
})?;
let subs = tokio::time::timeout(std::time::Duration::from_secs(30), async {
while let Some(notification) = push_rx.recv().await {
if notification.message_type == "system.response.subscriptions" {
let msg: EmergentMessage = serde_json::from_value(notification.payload)?;
if msg.correlation_id.as_ref().map(|c| c.to_string())
== Some(correlation_id.to_string())
{
let subs_response: SubscriptionsResponse =
serde_json::from_value(msg.payload)?;
return Ok(subs_response.subscribes);
}
}
}
Err(ClientError::ConnectionFailed(
"push channel closed before response".to_string(),
))
})
.await
.map_err(|_| ClientError::Timeout)??;
info!(types = ?subs, "received configured subscriptions");
Ok(subs)
}
async fn get_topology_via_pubsub(name: &str) -> Result<TopologyState> {
debug!("querying topology");
let client = connect_to_engine(name).await?;
let correlation_id = CorrelationId::new();
client
.subscribe(vec!["system.response.topology".to_string()])
.await
.map_err(|e| ClientError::SubscriptionFailed(format!("subscribe failed: {e}")))?;
let mut push_rx = client.take_push_receiver().ok_or_else(|| {
ClientError::SubscriptionFailed("push receiver already taken".to_string())
})?;
let request = EmergentMessage::new("system.request.topology")
.with_source(name)
.with_correlation_id(correlation_id.clone())
.with_payload(json!({}));
let envelope = build_publish_envelope(request)?;
client.send(envelope).await.map_err(|e| {
ClientError::ConnectionFailed(format!("publish failed: {e}"))
})?;
let state = tokio::time::timeout(std::time::Duration::from_secs(30), async {
while let Some(notification) = push_rx.recv().await {
if notification.message_type == "system.response.topology" {
let msg: EmergentMessage = serde_json::from_value(notification.payload)?;
if msg.correlation_id.as_ref().map(|c| c.to_string())
== Some(correlation_id.to_string())
{
let topo_response: TopologyResponse = serde_json::from_value(msg.payload)?;
return Ok(TopologyState {
primitives: topo_response.primitives,
});
}
}
}
Err(ClientError::ConnectionFailed(
"push channel closed before response".to_string(),
))
})
.await
.map_err(|_| ClientError::Timeout)??;
debug!(
primitive_count = state.primitives.len(),
"received topology"
);
Ok(state)
}
#[derive(Debug, Deserialize)]
struct SubscriptionsResponse {
subscribes: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TopologyPrimitive {
pub name: String,
pub kind: String,
pub state: String,
pub publishes: Vec<String>,
pub subscribes: Vec<String>,
pub pid: Option<u32>,
pub error: Option<String>,
}
#[derive(Debug, Deserialize)]
struct TopologyResponse {
primitives: Vec<TopologyPrimitive>,
}
#[derive(Debug, Clone)]
pub struct TopologyState {
pub primitives: Vec<TopologyPrimitive>,
}
pub struct EmergentSource {
name: String,
client: IpcClient,
}
impl EmergentSource {
pub async fn connect(name: &str) -> Result<Self> {
let client = connect_to_engine(name).await?;
info!(primitive.name = %name, primitive.kind = "source", "connected to engine");
Ok(Self {
name: name.to_string(),
client,
})
}
pub async fn publish(&self, mut message: EmergentMessage) -> Result<()> {
if message.source.is_default() {
message.source = PrimitiveName::new(&self.name).map_err(|e| {
ClientError::ConnectionFailed(format!(
"invalid primitive name '{}': {}",
self.name, e
))
})?;
}
let envelope = build_publish_envelope(message)?;
self.client.send(envelope).await.map_err(|e| {
error!(primitive.name = %self.name, error = %e, "failed to publish message");
ClientError::ConnectionFailed(format!("publish failed: {e}"))
})
}
pub async fn discover(&self) -> Result<DiscoveryInfo> {
let client = connect_to_engine(&self.name).await?;
let response = client.discover().await.map_err(|e| {
ClientError::ConnectionFailed(format!("discover failed: {e}"))
})?;
if !response.success {
return Err(ClientError::DiscoveryFailed(
response.error.unwrap_or_else(|| "unknown error".to_string()),
));
}
let primitives = response
.actors
.unwrap_or_default()
.into_iter()
.map(|actor| PrimitiveInfo {
name: actor.name,
kind: String::new(),
})
.collect();
let message_types = response.message_types.unwrap_or_default();
Ok(DiscoveryInfo {
message_types,
primitives,
})
}
#[must_use]
pub fn name(&self) -> &str {
&self.name
}
pub async fn disconnect(&self) -> Result<()> {
info!(primitive.name = %self.name, "disconnecting from engine");
self.client.disconnect().await.map_err(|e| {
ClientError::ConnectionFailed(format!("disconnect failed: {e}"))
})?;
info!(primitive.name = %self.name, "disconnected from engine");
Ok(())
}
}
pub struct EmergentHandler {
name: String,
client: Arc<IpcClient>,
subscribed_types: Vec<String>,
}
impl EmergentHandler {
pub async fn connect(name: &str) -> Result<Self> {
let client = connect_to_engine(name).await?;
info!(primitive.name = %name, primitive.kind = "handler", "connected to engine");
Ok(Self {
name: name.to_string(),
client: Arc::new(client),
subscribed_types: Vec::new(),
})
}
pub async fn subscribe(&mut self, types: impl IntoSubscription) -> Result<MessageStream> {
let topics = types.into_topics();
let (stream, user_subs) =
subscribe_and_stream(&self.client, topics, &self.name, "handler").await?;
self.subscribed_types = user_subs;
Ok(stream)
}
pub async fn messages(
name: impl Into<String>,
_types: impl IntoSubscription,
) -> Result<(Self, MessageStream)> {
let name = name.into();
let mut handler = Self::connect(&name).await?;
let topics = get_my_subscriptions_via_pubsub(&name).await?;
let stream = handler.subscribe(topics).await?;
Ok((handler, stream))
}
pub async fn publish(&self, mut message: EmergentMessage) -> Result<()> {
if message.source.is_default() {
message.source = PrimitiveName::new(&self.name).map_err(|e| {
ClientError::ConnectionFailed(format!(
"invalid primitive name '{}': {}",
self.name, e
))
})?;
}
let envelope = build_publish_envelope(message)?;
self.client.send(envelope).await.map_err(|e| {
error!(primitive.name = %self.name, error = %e, "failed to publish message");
ClientError::ConnectionFailed(format!("publish failed: {e}"))
})
}
pub async fn discover(&self) -> Result<DiscoveryInfo> {
let client = connect_to_engine(&self.name).await?;
let response = client.discover().await.map_err(|e| {
ClientError::ConnectionFailed(format!("discover failed: {e}"))
})?;
if !response.success {
return Err(ClientError::DiscoveryFailed(
response.error.unwrap_or_else(|| "unknown error".to_string()),
));
}
let primitives = response
.actors
.unwrap_or_default()
.into_iter()
.map(|actor| PrimitiveInfo {
name: actor.name,
kind: String::new(),
})
.collect();
let message_types = response.message_types.unwrap_or_default();
Ok(DiscoveryInfo {
message_types,
primitives,
})
}
pub async fn get_my_subscriptions(&self) -> Result<Vec<String>> {
get_my_subscriptions_via_pubsub(&self.name).await
}
#[must_use]
pub fn name(&self) -> &str {
&self.name
}
pub fn subscribed_types(&self) -> &[String] {
&self.subscribed_types
}
pub async fn disconnect(&self) -> Result<()> {
info!(primitive.name = %self.name, "disconnecting from engine");
self.client.disconnect().await.map_err(|e| {
ClientError::ConnectionFailed(format!("disconnect failed: {e}"))
})?;
info!(primitive.name = %self.name, "disconnected from engine");
Ok(())
}
}
pub struct EmergentSink {
name: String,
client: Arc<IpcClient>,
subscribed_types: Vec<String>,
}
impl EmergentSink {
pub async fn connect(name: &str) -> Result<Self> {
let client = connect_to_engine(name).await?;
info!(primitive.name = %name, primitive.kind = "sink", "connected to engine");
Ok(Self {
name: name.to_string(),
client: Arc::new(client),
subscribed_types: Vec::new(),
})
}
pub async fn messages(
name: impl Into<String>,
_types: impl IntoSubscription,
) -> Result<MessageStream> {
let name = name.into();
let mut sink = Self::connect(&name).await?;
let topics = sink.get_my_subscriptions().await?;
sink.subscribe(topics).await
}
pub async fn subscribe(&mut self, types: impl IntoSubscription) -> Result<MessageStream> {
let topics = types.into_topics();
let (stream, user_subs) =
subscribe_and_stream(&self.client, topics, &self.name, "sink").await?;
self.subscribed_types = user_subs;
Ok(stream)
}
pub async fn discover(&self) -> Result<DiscoveryInfo> {
let client = connect_to_engine(&self.name).await?;
let response = client.discover().await.map_err(|e| {
ClientError::ConnectionFailed(format!("discover failed: {e}"))
})?;
if !response.success {
return Err(ClientError::DiscoveryFailed(
response.error.unwrap_or_else(|| "unknown error".to_string()),
));
}
let primitives = response
.actors
.unwrap_or_default()
.into_iter()
.map(|actor| PrimitiveInfo {
name: actor.name,
kind: String::new(),
})
.collect();
let message_types = response.message_types.unwrap_or_default();
Ok(DiscoveryInfo {
message_types,
primitives,
})
}
pub async fn get_my_subscriptions(&self) -> Result<Vec<String>> {
get_my_subscriptions_via_pubsub(&self.name).await
}
pub async fn get_topology(&self) -> Result<TopologyState> {
get_topology_via_pubsub(&self.name).await
}
#[must_use]
pub fn name(&self) -> &str {
&self.name
}
pub fn subscribed_types(&self) -> &[String] {
&self.subscribed_types
}
pub async fn disconnect(&self) -> Result<()> {
info!(primitive.name = %self.name, "disconnecting from engine");
self.client.disconnect().await.map_err(|e| {
ClientError::ConnectionFailed(format!("disconnect failed: {e}"))
})?;
info!(primitive.name = %self.name, "disconnected from engine");
Ok(())
}
}