Skip to main content

emergent_client/
connection.rs

1//! Connection types for Emergent primitives.
2//!
3//! This module provides the three primitive types that connect to the Emergent engine:
4//! - [`EmergentSource`] - publish only
5//! - [`EmergentHandler`] - subscribe and publish
6//! - [`EmergentSink`] - subscribe only
7
8use crate::error::ClientError;
9use crate::message::EmergentMessage;
10use crate::stream::MessageStream;
11use crate::subscribe::IntoSubscription;
12use crate::types::{CorrelationId, PrimitiveName};
13use crate::{DiscoveryInfo, PrimitiveInfo, Result};
14
15use tracing::{debug, error, info, warn};
16
17use acton_reactive::ipc::protocol::Format;
18use acton_reactive::ipc::{
19    IpcClient, IpcClientConfig, IpcConfig, IpcEnvelope, IpcPushNotification, socket_exists,
20    socket_is_alive,
21};
22use serde::{Deserialize, Serialize};
23use serde_json::json;
24use std::path::PathBuf;
25use std::sync::Arc;
26use tokio::sync::mpsc;
27
28/// IPC wrapper message for `EmergentMessage`.
29#[derive(Clone, Debug, Serialize, Deserialize)]
30struct IpcEmergentMessage {
31    inner: EmergentMessage,
32}
33
34// ============================================================================
35// Helper Functions
36// ============================================================================
37
38/// Resolve the socket path from environment or config.
39fn resolve_socket_path(_name: &str) -> Result<PathBuf> {
40    // First check EMERGENT_SOCKET environment variable
41    if let Ok(path) = std::env::var("EMERGENT_SOCKET") {
42        return Ok(PathBuf::from(path));
43    }
44
45    // Fall back to XDG-compliant default using IpcConfig
46    let mut config = IpcConfig::load();
47    config.socket.app_name = Some("emergent".to_string());
48    Ok(config.socket_path())
49}
50
51/// Initialize a default tracing subscriber if one hasn't been set.
52///
53/// Logs to `~/.local/share/emergent/<name>/primitive.log` by default.
54/// Set `EMERGENT_LOG=stderr` to log to stderr instead (for debugging).
55/// No-op if the primitive already installed a subscriber.
56fn init_tracing(name: &str) {
57    use tracing_subscriber::EnvFilter;
58
59    let filter = EnvFilter::try_from_env("EMERGENT_LOG")
60        .or_else(|_| EnvFilter::try_from_default_env())
61        .unwrap_or_else(|_| EnvFilter::new("info"));
62
63    // Check if user explicitly wants stderr output
64    let wants_stderr = std::env::var("EMERGENT_LOG")
65        .map(|v| v.eq_ignore_ascii_case("stderr"))
66        .unwrap_or(false);
67
68    if wants_stderr {
69        let stderr_filter = EnvFilter::new("info");
70        let _ = tracing_subscriber::fmt()
71            .with_env_filter(stderr_filter)
72            .try_init();
73    } else {
74        // Log to file in XDG data directory, keyed by primitive name
75        let log_dir = directories::ProjectDirs::from("ai", "govcraft", "emergent")
76            .map(|dirs| dirs.data_dir().join(name))
77            .unwrap_or_else(|| std::path::PathBuf::from("."));
78        let _ = std::fs::create_dir_all(&log_dir);
79
80        if let Ok(log_file) = std::fs::OpenOptions::new()
81            .create(true)
82            .append(true)
83            .open(log_dir.join("primitive.log"))
84        {
85            let _ = tracing_subscriber::fmt()
86                .with_env_filter(filter)
87                .with_writer(std::sync::Mutex::new(log_file))
88                .with_ansi(false)
89                .try_init();
90        } else {
91            // If we can't open the file, fall back to silent
92            let _ = tracing_subscriber::fmt()
93                .with_env_filter(EnvFilter::new("off"))
94                .try_init();
95        }
96    }
97}
98
99/// Connect to the engine socket with health checks, returning an `IpcClient`.
100///
101/// If `socket_override` is `Some`, uses that path directly. Otherwise resolves
102/// the socket path from `EMERGENT_SOCKET` env var or XDG default.
103async fn connect_to_engine(
104    name: &str,
105    socket_override: Option<&std::path::Path>,
106) -> Result<IpcClient> {
107    init_tracing(name);
108    let socket_path = match socket_override {
109        Some(path) => path.to_path_buf(),
110        None => resolve_socket_path(name)?,
111    };
112    debug!(path = %socket_path.display(), "resolved socket path");
113
114    info!(primitive.name = %name, path = %socket_path.display(), "connecting to engine");
115
116    if !socket_exists(&socket_path) {
117        error!(path = %socket_path.display(), "engine socket not found");
118        return Err(ClientError::SocketNotFound(
119            socket_path.display().to_string(),
120        ));
121    }
122
123    if !socket_is_alive(&socket_path).await {
124        error!(path = %socket_path.display(), "engine socket not responding");
125        return Err(ClientError::ConnectionFailed(
126            "Engine socket exists but is not responding".to_string(),
127        ));
128    }
129
130    let config = IpcClientConfig {
131        format: Format::MessagePack,
132        ..IpcClientConfig::default()
133    };
134
135    IpcClient::connect_with_config(&socket_path, config)
136        .await
137        .map_err(|e| {
138            error!(error = %e, "failed to connect to engine");
139            ClientError::ConnectionFailed(e.to_string())
140        })
141}
142
143/// Build an `IpcEnvelope` for publishing an `EmergentMessage` (fire-and-forget).
144fn build_publish_envelope(message: EmergentMessage) -> Result<IpcEnvelope> {
145    let ipc_message = IpcEmergentMessage { inner: message };
146    let payload = serde_json::to_value(&ipc_message)?;
147    Ok(IpcEnvelope::new(
148        "message_broker",
149        "EmergentMessage",
150        payload,
151    ))
152}
153
154/// Build an `IpcEnvelope` for acknowledged publish (request-response).
155///
156/// The broker processes the message and returns a reply, providing backpressure.
157fn build_publish_request_envelope(message: EmergentMessage) -> Result<IpcEnvelope> {
158    let ipc_message = IpcEmergentMessage { inner: message };
159    let payload = serde_json::to_value(&ipc_message)?;
160    Ok(IpcEnvelope::new_request(
161        "message_broker",
162        "EmergentMessage",
163        payload,
164    ))
165}
166
167/// Bridge push notifications from an `IpcClient` to a `MessageStream`.
168///
169/// Handles `system.shutdown` detection and `EmergentMessage` extraction.
170/// This replaces the duplicated ~100-line read loops that were previously
171/// copy-pasted between Handler and Sink.
172async fn push_to_message_stream(
173    mut push_rx: mpsc::Receiver<IpcPushNotification>,
174    tx: mpsc::Sender<EmergentMessage>,
175    name: String,
176    shutdown_kind: &str,
177) {
178    debug!(primitive.name = %name, "push bridge started");
179
180    let auto_unwrap = std::env::var("EMERGENT_UNWRAP_STDOUT")
181        .is_ok_and(|v| v == "true" || v == "1");
182
183    while let Some(notification) = push_rx.recv().await {
184        // Check for shutdown signal
185        if notification.message_type == "system.shutdown" {
186            let kind = notification
187                .payload
188                .get("kind")
189                .and_then(|v| v.as_str())
190                .unwrap_or("unknown");
191            info!(
192                primitive.name = %name,
193                shutdown_kind = %kind,
194                "received shutdown signal"
195            );
196            if kind == shutdown_kind {
197                info!(
198                    primitive.name = %name,
199                    "shutting down (engine requested)"
200                );
201                break;
202            }
203            debug!(
204                primitive.name = %name,
205                "ignoring shutdown for different primitive kind"
206            );
207            continue; // Don't forward system.shutdown to user
208        }
209
210        // Try to extract EmergentMessage from payload
211        let msg = if let Ok(msg) =
212            serde_json::from_value::<EmergentMessage>(notification.payload.clone())
213        {
214            msg
215        } else {
216            // Fallback: create EmergentMessage from push notification fields
217            EmergentMessage::new(&notification.message_type)
218                .with_source(notification.source_actor.as_deref().unwrap_or("unknown"))
219                .with_payload(notification.payload)
220        };
221
222        // Auto-unwrap exec-source stdout payloads when configured
223        let msg = if auto_unwrap && !msg.message_type.as_str().starts_with("system.") {
224            msg.unwrap_stdout()
225        } else {
226            msg
227        };
228
229        debug!(
230            primitive.name = %name,
231            message_type = %msg.message_type,
232            message_id = %msg.id,
233            "received message"
234        );
235
236        if tx.send(msg).await.is_err() {
237            warn!(
238                primitive.name = %name,
239                "message stream send failed, receiver dropped"
240            );
241            break;
242        }
243    }
244
245    debug!(primitive.name = %name, "push bridge stopped");
246}
247
248/// Subscribe on an `IpcClient` and return a `MessageStream`.
249///
250/// Shared implementation used by both Handler and Sink.
251async fn subscribe_and_stream(
252    client: &IpcClient,
253    topics: Vec<String>,
254    name: &str,
255    shutdown_kind: &str,
256) -> Result<(MessageStream, Vec<String>)> {
257    // Add system.shutdown to subscriptions (SDK handles it internally)
258    let mut all_types = topics;
259    if !all_types.iter().any(|t| t == "system.shutdown") {
260        all_types.push("system.shutdown".to_string());
261    }
262
263    // Subscribe via IpcClient (single connection, no new socket)
264    let sub_response = client
265        .subscribe(all_types)
266        .await
267        .map_err(|e| ClientError::SubscriptionFailed(format!("subscribe failed: {e}")))?;
268
269    if !sub_response.success {
270        return Err(ClientError::SubscriptionFailed(
271            sub_response
272                .error
273                .unwrap_or_else(|| "unknown error".to_string()),
274        ));
275    }
276
277    // Take the push receiver and bridge to MessageStream
278    let push_rx = client.take_push_receiver().ok_or_else(|| {
279        ClientError::SubscriptionFailed(
280            "push receiver already taken (subscribe called more than once?)".to_string(),
281        )
282    })?;
283
284    let (tx, rx) = mpsc::channel(256);
285    let bridge_name = name.to_string();
286    let bridge_kind = shutdown_kind.to_string();
287
288    tokio::spawn(async move {
289        push_to_message_stream(push_rx, tx, bridge_name, &bridge_kind).await;
290    });
291
292    // Filter out system.shutdown from the reported subscriptions
293    let user_subs: Vec<String> = sub_response
294        .subscribed_types
295        .into_iter()
296        .filter(|s| s != "system.shutdown")
297        .collect();
298
299    Ok((MessageStream::new(rx), user_subs))
300}
301
302/// Query the engine for configured subscriptions via pub/sub pattern.
303///
304/// Uses `system.request.subscriptions` / `system.response.subscriptions`.
305/// Requires a dedicated `IpcClient` connection since it temporarily subscribes
306/// to the response topic.
307async fn get_my_subscriptions_via_pubsub(name: &str) -> Result<Vec<String>> {
308    debug!("querying configured subscriptions");
309
310    let client = connect_to_engine(name, None).await?;
311    let correlation_id = CorrelationId::new();
312
313    // Subscribe to response type
314    client
315        .subscribe(vec!["system.response.subscriptions".to_string()])
316        .await
317        .map_err(|e| ClientError::SubscriptionFailed(format!("subscribe failed: {e}")))?;
318
319    // Take push receiver before publishing
320    let mut push_rx = client.take_push_receiver().ok_or_else(|| {
321        ClientError::SubscriptionFailed("push receiver already taken".to_string())
322    })?;
323
324    // Publish request
325    let request = EmergentMessage::new("system.request.subscriptions")
326        .with_source(name)
327        .with_correlation_id(correlation_id.clone())
328        .with_payload(json!({ "name": name }));
329    let envelope = build_publish_envelope(request)?;
330    client
331        .send(envelope)
332        .await
333        .map_err(|e| ClientError::ConnectionFailed(format!("publish failed: {e}")))?;
334
335    // Wait for response with matching correlation_id
336    let subs = tokio::time::timeout(std::time::Duration::from_secs(30), async {
337        while let Some(notification) = push_rx.recv().await {
338            if notification.message_type == "system.response.subscriptions" {
339                let msg: EmergentMessage = serde_json::from_value(notification.payload)?;
340                if msg.correlation_id.as_ref().map(|c| c.to_string())
341                    == Some(correlation_id.to_string())
342                {
343                    let subs_response: SubscriptionsResponse = serde_json::from_value(msg.payload)?;
344                    return Ok(subs_response.subscribes);
345                }
346            }
347        }
348        Err(ClientError::ConnectionFailed(
349            "push channel closed before response".to_string(),
350        ))
351    })
352    .await
353    .map_err(|_| ClientError::Timeout)??;
354
355    info!(types = ?subs, "received configured subscriptions");
356    Ok(subs)
357}
358
359/// Query the engine for topology via pub/sub pattern.
360///
361/// Uses `system.request.topology` / `system.response.topology`.
362async fn get_topology_via_pubsub(name: &str) -> Result<TopologyState> {
363    debug!("querying topology");
364
365    let client = connect_to_engine(name, None).await?;
366    let correlation_id = CorrelationId::new();
367
368    client
369        .subscribe(vec!["system.response.topology".to_string()])
370        .await
371        .map_err(|e| ClientError::SubscriptionFailed(format!("subscribe failed: {e}")))?;
372
373    let mut push_rx = client.take_push_receiver().ok_or_else(|| {
374        ClientError::SubscriptionFailed("push receiver already taken".to_string())
375    })?;
376
377    let request = EmergentMessage::new("system.request.topology")
378        .with_source(name)
379        .with_correlation_id(correlation_id.clone())
380        .with_payload(json!({}));
381    let envelope = build_publish_envelope(request)?;
382    client
383        .send(envelope)
384        .await
385        .map_err(|e| ClientError::ConnectionFailed(format!("publish failed: {e}")))?;
386
387    let state = tokio::time::timeout(std::time::Duration::from_secs(30), async {
388        while let Some(notification) = push_rx.recv().await {
389            if notification.message_type == "system.response.topology" {
390                let msg: EmergentMessage = serde_json::from_value(notification.payload)?;
391                if msg.correlation_id.as_ref().map(|c| c.to_string())
392                    == Some(correlation_id.to_string())
393                {
394                    let topo_response: TopologyResponse = serde_json::from_value(msg.payload)?;
395                    return Ok(TopologyState {
396                        primitives: topo_response.primitives,
397                    });
398                }
399            }
400        }
401        Err(ClientError::ConnectionFailed(
402            "push channel closed before response".to_string(),
403        ))
404    })
405    .await
406    .map_err(|_| ClientError::Timeout)??;
407
408    debug!(
409        primitive_count = state.primitives.len(),
410        "received topology"
411    );
412    Ok(state)
413}
414
415// ============================================================================
416// Data Types
417// ============================================================================
418
419/// Response from `GetSubscriptions` request.
420#[derive(Debug, Deserialize)]
421struct SubscriptionsResponse {
422    subscribes: Vec<String>,
423}
424
425/// Information about a primitive in the topology.
426#[derive(Debug, Clone, Serialize, Deserialize)]
427pub struct TopologyPrimitive {
428    /// Unique name of the primitive.
429    pub name: String,
430    /// Kind of primitive (source, handler, sink).
431    pub kind: String,
432    /// Current lifecycle state.
433    pub state: String,
434    /// Message types this primitive publishes.
435    pub publishes: Vec<String>,
436    /// Message types this primitive subscribes to.
437    pub subscribes: Vec<String>,
438    /// Process ID if running.
439    pub pid: Option<u32>,
440    /// Error message if failed.
441    pub error: Option<String>,
442}
443
444/// Response from `GetTopology` request.
445#[derive(Debug, Deserialize)]
446struct TopologyResponse {
447    primitives: Vec<TopologyPrimitive>,
448}
449
450/// Current topology state (all primitives).
451#[derive(Debug, Clone)]
452pub struct TopologyState {
453    /// All primitives in the system.
454    pub primitives: Vec<TopologyPrimitive>,
455}
456
457// ============================================================================
458// EmergentSource - Publish Only
459// ============================================================================
460
461/// A Source primitive that publishes messages to the Emergent engine.
462///
463/// Sources are the ingress point for data entering the workflow. They can only
464/// publish messages (fire-and-forget) and cannot subscribe to receive messages.
465///
466/// # Example
467///
468/// ```rust,ignore
469/// use emergent_client::{EmergentSource, EmergentMessage};
470/// use serde_json::json;
471///
472/// let source = EmergentSource::connect("my_source").await?;
473///
474/// loop {
475///     let message = EmergentMessage::new("sensor.reading")
476///         .with_payload(json!({"temperature": 72.5}));
477///     source.publish(message).await?;
478///     tokio::time::sleep(Duration::from_secs(1)).await;
479/// }
480/// ```
481pub struct EmergentSource {
482    /// Name of this source.
483    name: String,
484    /// Channel-based IPC client (no mutex, no drain task).
485    client: IpcClient,
486}
487
488impl EmergentSource {
489    /// Connect to the Emergent engine as a Source.
490    ///
491    /// The `name` parameter identifies this source in logs and tracing.
492    ///
493    /// # Errors
494    ///
495    /// Returns an error if the connection fails or the engine is not running.
496    pub async fn connect(name: &str) -> Result<Self> {
497        let client = connect_to_engine(name, None).await?;
498
499        info!(primitive.name = %name, primitive.kind = "source", "connected to engine");
500
501        Ok(Self {
502            name: name.to_string(),
503            client,
504        })
505    }
506
507    /// Connect to the Emergent engine at a specific socket path.
508    ///
509    /// This is useful for testing or when connecting to a non-default socket.
510    ///
511    /// # Errors
512    ///
513    /// Returns an error if the connection fails or the engine is not running.
514    pub async fn connect_to(name: &str, socket_path: &std::path::Path) -> Result<Self> {
515        let client = connect_to_engine(name, Some(socket_path)).await?;
516
517        info!(primitive.name = %name, primitive.kind = "source", "connected to engine");
518
519        Ok(Self {
520            name: name.to_string(),
521            client,
522        })
523    }
524
525    /// Publish a message to the engine (fire-and-forget).
526    ///
527    /// The message will be routed to any Handlers or Sinks subscribed to its type.
528    ///
529    /// # Errors
530    ///
531    /// Returns an error if the message cannot be sent.
532    pub async fn publish(&self, mut message: EmergentMessage) -> Result<()> {
533        if message.source.is_default() {
534            message.source = PrimitiveName::new(&self.name).map_err(|e| {
535                ClientError::ConnectionFailed(format!(
536                    "invalid primitive name '{}': {}",
537                    self.name, e
538                ))
539            })?;
540        }
541
542        let envelope = build_publish_envelope(message)?;
543        self.client.send(envelope).await.map_err(|e| {
544            error!(primitive.name = %self.name, error = %e, "failed to publish message");
545            ClientError::ConnectionFailed(format!("publish failed: {e}"))
546        })
547    }
548
549    /// Publish a message with broker acknowledgment (backpressure).
550    ///
551    /// Unlike [`publish`], this waits for the engine's message broker to confirm
552    /// it has processed and forwarded the message before returning. This provides
553    /// natural backpressure — the caller cannot outpace the broker.
554    ///
555    /// Used internally by [`publish_all`] and [`publish_stream`].
556    ///
557    /// # Errors
558    ///
559    /// Returns an error if the broker rejects the message or times out.
560    pub async fn publish_ack(&self, mut message: EmergentMessage) -> Result<()> {
561        if message.source.is_default() {
562            message.source = PrimitiveName::new(&self.name).map_err(|e| {
563                ClientError::ConnectionFailed(format!(
564                    "invalid primitive name '{}': {}",
565                    self.name, e
566                ))
567            })?;
568        }
569
570        let envelope = build_publish_request_envelope(message)?;
571        let response = self.client.request(envelope).await.map_err(|e| {
572            error!(primitive.name = %self.name, error = %e, "publish_ack failed");
573            ClientError::ConnectionFailed(format!("publish_ack failed: {e}"))
574        })?;
575        if !response.success {
576            return Err(ClientError::PublishFailed(
577                response.error.unwrap_or_else(|| "broker error".to_string()),
578            ));
579        }
580        Ok(())
581    }
582
583    /// Publish all messages from an iterator.
584    ///
585    /// Sends each message individually with broker acknowledgment so subscribers
586    /// begin consuming immediately. Stops on the first error.
587    ///
588    /// Returns the number of messages successfully published.
589    ///
590    /// # Errors
591    ///
592    /// Returns the first publish error encountered.
593    pub async fn publish_all(
594        &self,
595        messages: impl IntoIterator<Item = EmergentMessage>,
596    ) -> Result<usize> {
597        let mut count = 0;
598        for message in messages {
599            self.publish_ack(message).await?;
600            count += 1;
601        }
602        Ok(count)
603    }
604
605    /// Publish messages from an async stream.
606    ///
607    /// Consumes the stream, publishing each message individually with broker
608    /// acknowledgment so subscribers begin consuming immediately. Stops on the
609    /// first publish error or when the stream ends.
610    ///
611    /// Returns the number of messages successfully published.
612    ///
613    /// # Errors
614    ///
615    /// Returns the first publish error encountered.
616    pub async fn publish_stream<S>(&self, mut stream: S) -> Result<usize>
617    where
618        S: futures::Stream<Item = EmergentMessage> + Unpin,
619    {
620        use futures::StreamExt;
621        let mut count = 0;
622        while let Some(message) = stream.next().await {
623            self.publish_ack(message).await?;
624            count += 1;
625        }
626        Ok(count)
627    }
628
629    /// Discover available message types and primitives.
630    ///
631    /// # Errors
632    ///
633    /// Returns an error if the discovery request fails.
634    pub async fn discover(&self) -> Result<DiscoveryInfo> {
635        // Discovery uses a separate connection (request-response pattern)
636        let client = connect_to_engine(&self.name, None).await?;
637        let response = client
638            .discover()
639            .await
640            .map_err(|e| ClientError::ConnectionFailed(format!("discover failed: {e}")))?;
641
642        if !response.success {
643            return Err(ClientError::DiscoveryFailed(
644                response
645                    .error
646                    .unwrap_or_else(|| "unknown error".to_string()),
647            ));
648        }
649
650        let primitives = response
651            .actors
652            .unwrap_or_default()
653            .into_iter()
654            .map(|actor| PrimitiveInfo {
655                name: actor.name,
656                kind: String::new(),
657            })
658            .collect();
659
660        let message_types = response.message_types.unwrap_or_default();
661
662        Ok(DiscoveryInfo {
663            message_types,
664            primitives,
665        })
666    }
667
668    /// Get the name of this source.
669    #[must_use]
670    pub fn name(&self) -> &str {
671        &self.name
672    }
673
674    /// Gracefully disconnect from the engine.
675    ///
676    /// # Errors
677    ///
678    /// Returns an error if the disconnection fails.
679    pub async fn disconnect(&self) -> Result<()> {
680        info!(primitive.name = %self.name, "disconnecting from engine");
681        self.client
682            .disconnect()
683            .await
684            .map_err(|e| ClientError::ConnectionFailed(format!("disconnect failed: {e}")))?;
685        info!(primitive.name = %self.name, "disconnected from engine");
686        Ok(())
687    }
688}
689
690// ============================================================================
691// EmergentHandler - Subscribe + Publish
692// ============================================================================
693
694/// A Handler primitive that subscribes to and publishes messages.
695///
696/// Handlers are the transformation layer in a workflow. They receive messages
697/// from Sources or other Handlers, process them, and emit new messages.
698///
699/// # Example
700///
701/// ```rust,ignore
702/// use emergent_client::{EmergentHandler, EmergentMessage};
703/// use futures::StreamExt;
704///
705/// let handler = EmergentHandler::connect("my_filter").await?;
706/// let mut stream = handler.subscribe("timer.tick").await?;
707///
708/// while let Some(msg) = stream.next().await {
709///     let output = EmergentMessage::new("timer.filtered")
710///         .with_causation_id(msg.id());
711///     handler.publish(output).await?;
712/// }
713/// ```
714pub struct EmergentHandler {
715    /// Name of this handler.
716    name: String,
717    /// Channel-based IPC client (no mutex, no drain task).
718    client: Arc<IpcClient>,
719    /// Currently subscribed message types.
720    subscribed_types: Vec<String>,
721}
722
723impl EmergentHandler {
724    /// Connect to the Emergent engine as a Handler.
725    ///
726    /// # Errors
727    ///
728    /// Returns an error if the connection fails.
729    pub async fn connect(name: &str) -> Result<Self> {
730        let client = connect_to_engine(name, None).await?;
731
732        info!(primitive.name = %name, primitive.kind = "handler", "connected to engine");
733
734        Ok(Self {
735            name: name.to_string(),
736            client: Arc::new(client),
737            subscribed_types: Vec::new(),
738        })
739    }
740
741    /// Connect to the Emergent engine at a specific socket path.
742    ///
743    /// This is useful for testing or when connecting to a non-default socket.
744    ///
745    /// # Errors
746    ///
747    /// Returns an error if the connection fails.
748    pub async fn connect_to(name: &str, socket_path: &std::path::Path) -> Result<Self> {
749        let client = connect_to_engine(name, Some(socket_path)).await?;
750
751        info!(primitive.name = %name, primitive.kind = "handler", "connected to engine");
752
753        Ok(Self {
754            name: name.to_string(),
755            client: Arc::new(client),
756            subscribed_types: Vec::new(),
757        })
758    }
759
760    /// Subscribe to message types and return a stream of incoming messages.
761    ///
762    /// The SDK automatically handles `system.shutdown` messages - when the engine
763    /// signals shutdown for handlers, the stream will close gracefully.
764    ///
765    /// # Examples
766    ///
767    /// ```rust,ignore
768    /// // Single topic
769    /// let stream = handler.subscribe("timer.tick").await?;
770    ///
771    /// // Multiple topics with array
772    /// let stream = handler.subscribe(["timer.tick", "timer.filtered"]).await?;
773    ///
774    /// // From a Vec
775    /// let topics = vec!["timer.tick".to_string()];
776    /// let stream = handler.subscribe(topics).await?;
777    /// ```
778    ///
779    /// # Errors
780    ///
781    /// Returns an error if the subscription fails.
782    pub async fn subscribe(&mut self, types: impl IntoSubscription) -> Result<MessageStream> {
783        let topics = types.into_topics();
784        let (stream, user_subs) =
785            subscribe_and_stream(&self.client, topics, &self.name, "handler").await?;
786        self.subscribed_types = user_subs;
787        Ok(stream)
788    }
789
790    /// Convenience method that connects, gets configured subscriptions, and returns
791    /// (handler, stream) for the common one-liner pattern.
792    ///
793    /// # Errors
794    ///
795    /// Returns an error if connection or subscription fails.
796    pub async fn messages(
797        name: impl Into<String>,
798        _types: impl IntoSubscription,
799    ) -> Result<(Self, MessageStream)> {
800        let name = name.into();
801        let mut handler = Self::connect(&name).await?;
802        let topics = get_my_subscriptions_via_pubsub(&name).await?;
803        let stream = handler.subscribe(topics).await?;
804        Ok((handler, stream))
805    }
806
807    /// Publish a message to the engine (fire-and-forget).
808    ///
809    /// # Errors
810    ///
811    /// Returns an error if the message cannot be sent.
812    pub async fn publish(&self, mut message: EmergentMessage) -> Result<()> {
813        if message.source.is_default() {
814            message.source = PrimitiveName::new(&self.name).map_err(|e| {
815                ClientError::ConnectionFailed(format!(
816                    "invalid primitive name '{}': {}",
817                    self.name, e
818                ))
819            })?;
820        }
821
822        let envelope = build_publish_envelope(message)?;
823        self.client.send(envelope).await.map_err(|e| {
824            error!(primitive.name = %self.name, error = %e, "failed to publish message");
825            ClientError::ConnectionFailed(format!("publish failed: {e}"))
826        })
827    }
828
829    /// Publish a message with broker acknowledgment (backpressure).
830    ///
831    /// Unlike [`publish`], this waits for the engine's message broker to confirm
832    /// it has processed and forwarded the message before returning. This provides
833    /// natural backpressure — the caller cannot outpace the broker.
834    ///
835    /// Used internally by [`publish_all`] and [`publish_stream`].
836    ///
837    /// # Errors
838    ///
839    /// Returns an error if the broker rejects the message or times out.
840    pub async fn publish_ack(&self, mut message: EmergentMessage) -> Result<()> {
841        if message.source.is_default() {
842            message.source = PrimitiveName::new(&self.name).map_err(|e| {
843                ClientError::ConnectionFailed(format!(
844                    "invalid primitive name '{}': {}",
845                    self.name, e
846                ))
847            })?;
848        }
849
850        let envelope = build_publish_request_envelope(message)?;
851        let response = self.client.request(envelope).await.map_err(|e| {
852            error!(primitive.name = %self.name, error = %e, "publish_ack failed");
853            ClientError::ConnectionFailed(format!("publish_ack failed: {e}"))
854        })?;
855        if !response.success {
856            return Err(ClientError::PublishFailed(
857                response.error.unwrap_or_else(|| "broker error".to_string()),
858            ));
859        }
860        Ok(())
861    }
862
863    /// Publish all messages from an iterator.
864    ///
865    /// Sends each message individually with broker acknowledgment so subscribers
866    /// begin consuming immediately. Stops on the first error.
867    ///
868    /// Returns the number of messages successfully published.
869    ///
870    /// # Errors
871    ///
872    /// Returns the first publish error encountered.
873    pub async fn publish_all(
874        &self,
875        messages: impl IntoIterator<Item = EmergentMessage>,
876    ) -> Result<usize> {
877        let mut count = 0;
878        for message in messages {
879            self.publish_ack(message).await?;
880            count += 1;
881        }
882        Ok(count)
883    }
884
885    /// Publish messages from an async stream.
886    ///
887    /// Consumes the stream, publishing each message individually with broker
888    /// acknowledgment so subscribers begin consuming immediately. Stops on the
889    /// first publish error or when the stream ends.
890    ///
891    /// Returns the number of messages successfully published.
892    ///
893    /// # Errors
894    ///
895    /// Returns the first publish error encountered.
896    pub async fn publish_stream<S>(&self, mut stream: S) -> Result<usize>
897    where
898        S: futures::Stream<Item = EmergentMessage> + Unpin,
899    {
900        use futures::StreamExt;
901        let mut count = 0;
902        while let Some(message) = stream.next().await {
903            self.publish_ack(message).await?;
904            count += 1;
905        }
906        Ok(count)
907    }
908
909    /// Offer items as a pull-based stream with consumer-driven backpressure.
910    ///
911    /// Publishes `stream.ready`, then serves items one at a time as the
912    /// consumer sends `stream.pull` requests. Publishes `stream.end`
913    /// when exhausted.
914    ///
915    /// # Errors
916    ///
917    /// Returns an error if the stream times out or the pull stream closes.
918    pub async fn stream_offer(
919        &self,
920        message_type: &str,
921        items: impl IntoIterator<Item = serde_json::Value>,
922        pull_stream: &mut MessageStream,
923        timeout: std::time::Duration,
924    ) -> Result<usize> {
925        let stream_id = CorrelationId::new().to_string();
926        let mut items = items.into_iter();
927
928        self.publish(
929            EmergentMessage::new("stream.ready").with_payload(serde_json::json!({
930                "stream_id": stream_id,
931                "message_type": message_type,
932            })),
933        )
934        .await?;
935
936        let mut published = 0usize;
937
938        loop {
939            let msg = tokio::time::timeout(timeout, pull_stream.next())
940                .await
941                .map_err(|_| ClientError::Timeout)?
942                .ok_or_else(|| {
943                    ClientError::ConnectionFailed(
944                        "pull stream closed during stream_offer".to_string(),
945                    )
946                })?;
947
948            let is_pull = msg.message_type.as_str() == "stream.pull"
949                && msg.payload.get("stream_id").and_then(|v| v.as_str())
950                    == Some(stream_id.as_str());
951
952            if is_pull {
953                if let Some(item) = items.next() {
954                    self.publish(
955                        EmergentMessage::new(message_type)
956                            .with_payload(item)
957                            .with_metadata(serde_json::json!({"stream_id": stream_id})),
958                    )
959                    .await?;
960                    published += 1;
961                } else {
962                    self.publish(
963                        EmergentMessage::new("stream.end")
964                            .with_payload(serde_json::json!({"stream_id": stream_id})),
965                    )
966                    .await?;
967                    break;
968                }
969            }
970        }
971
972        Ok(published)
973    }
974
975    /// Consume a pull-based stream, yielding items via callback.
976    ///
977    /// Waits for `stream.ready`, then sends `stream.pull` requests
978    /// automatically after each item is consumed. Stops on `stream.end`.
979    ///
980    /// # Errors
981    ///
982    /// Returns an error if the stream times out or the source stream closes.
983    pub async fn stream_consume(
984        &self,
985        message_type: &str,
986        source_stream: &mut MessageStream,
987        timeout: std::time::Duration,
988        mut on_item: impl FnMut(EmergentMessage),
989    ) -> Result<usize> {
990        let stream_id = loop {
991            let msg = tokio::time::timeout(timeout, source_stream.next())
992                .await
993                .map_err(|_| ClientError::Timeout)?
994                .ok_or_else(|| {
995                    ClientError::ConnectionFailed(
996                        "source stream closed before stream.ready".to_string(),
997                    )
998                })?;
999
1000            if msg.message_type.as_str() == "stream.ready"
1001                && let (Some(mt), Some(sid)) = (
1002                    msg.payload.get("message_type").and_then(|v| v.as_str()),
1003                    msg.payload.get("stream_id").and_then(|v| v.as_str()),
1004                )
1005                && mt == message_type
1006            {
1007                break sid.to_string();
1008            }
1009        };
1010
1011        self.publish(
1012            EmergentMessage::new("stream.pull")
1013                .with_payload(serde_json::json!({"stream_id": stream_id})),
1014        )
1015        .await?;
1016
1017        let mut count = 0usize;
1018
1019        loop {
1020            let msg = tokio::time::timeout(timeout, source_stream.next())
1021                .await
1022                .map_err(|_| ClientError::Timeout)?
1023                .ok_or_else(|| {
1024                    ClientError::ConnectionFailed(
1025                        "source stream closed during stream_consume".to_string(),
1026                    )
1027                })?;
1028
1029            if msg.message_type.as_str() == "stream.end"
1030                && msg.payload.get("stream_id").and_then(|v| v.as_str()) == Some(stream_id.as_str())
1031            {
1032                break;
1033            }
1034
1035            let is_item = msg.message_type.as_str() == message_type
1036                && msg
1037                    .metadata
1038                    .as_ref()
1039                    .and_then(|m| m.get("stream_id"))
1040                    .and_then(|v| v.as_str())
1041                    == Some(stream_id.as_str());
1042
1043            if is_item {
1044                on_item(msg);
1045                count += 1;
1046                self.publish(
1047                    EmergentMessage::new("stream.pull")
1048                        .with_payload(serde_json::json!({"stream_id": stream_id})),
1049                )
1050                .await?;
1051            }
1052        }
1053
1054        Ok(count)
1055    }
1056
1057    /// Discover available message types and primitives.
1058    ///
1059    /// # Errors
1060    ///
1061    /// Returns an error if the discovery request fails.
1062    pub async fn discover(&self) -> Result<DiscoveryInfo> {
1063        let client = connect_to_engine(&self.name, None).await?;
1064        let response = client
1065            .discover()
1066            .await
1067            .map_err(|e| ClientError::ConnectionFailed(format!("discover failed: {e}")))?;
1068
1069        if !response.success {
1070            return Err(ClientError::DiscoveryFailed(
1071                response
1072                    .error
1073                    .unwrap_or_else(|| "unknown error".to_string()),
1074            ));
1075        }
1076
1077        let primitives = response
1078            .actors
1079            .unwrap_or_default()
1080            .into_iter()
1081            .map(|actor| PrimitiveInfo {
1082                name: actor.name,
1083                kind: String::new(),
1084            })
1085            .collect();
1086
1087        let message_types = response.message_types.unwrap_or_default();
1088
1089        Ok(DiscoveryInfo {
1090            message_types,
1091            primitives,
1092        })
1093    }
1094
1095    /// Get the configured subscription types for this primitive.
1096    ///
1097    /// # Errors
1098    ///
1099    /// Returns an error if the request fails.
1100    pub async fn get_my_subscriptions(&self) -> Result<Vec<String>> {
1101        get_my_subscriptions_via_pubsub(&self.name).await
1102    }
1103
1104    /// Get the name of this handler.
1105    #[must_use]
1106    pub fn name(&self) -> &str {
1107        &self.name
1108    }
1109
1110    /// Get currently subscribed message types.
1111    pub fn subscribed_types(&self) -> &[String] {
1112        &self.subscribed_types
1113    }
1114
1115    /// Gracefully disconnect from the engine.
1116    ///
1117    /// # Errors
1118    ///
1119    /// Returns an error if the disconnection fails.
1120    pub async fn disconnect(&self) -> Result<()> {
1121        info!(primitive.name = %self.name, "disconnecting from engine");
1122        self.client
1123            .disconnect()
1124            .await
1125            .map_err(|e| ClientError::ConnectionFailed(format!("disconnect failed: {e}")))?;
1126        info!(primitive.name = %self.name, "disconnected from engine");
1127        Ok(())
1128    }
1129}
1130
1131// ============================================================================
1132// EmergentSink - Subscribe Only
1133// ============================================================================
1134
1135/// A Sink primitive that subscribes to messages from the Emergent engine.
1136///
1137/// Sinks are the egress point for data leaving the workflow. They receive messages
1138/// but cannot publish new messages to the bus.
1139///
1140/// # Example
1141///
1142/// ```rust,ignore
1143/// use emergent_client::EmergentSink;
1144///
1145/// let sink = EmergentSink::connect("my_sink").await?;
1146/// let mut stream = sink.subscribe(&["alert.high_temp"]).await?;
1147///
1148/// while let Some(msg) = stream.next().await {
1149///     println!("[ALERT] Temperature: {}", msg.payload["temperature"]);
1150///     // Log to file, send notification, etc.
1151/// }
1152/// ```
1153pub struct EmergentSink {
1154    /// Name of this sink.
1155    name: String,
1156    /// Channel-based IPC client (no mutex, no drain task).
1157    client: Arc<IpcClient>,
1158    /// Currently subscribed message types.
1159    subscribed_types: Vec<String>,
1160}
1161
1162impl EmergentSink {
1163    /// Connect to the Emergent engine as a Sink.
1164    ///
1165    /// # Errors
1166    ///
1167    /// Returns an error if the connection fails.
1168    pub async fn connect(name: &str) -> Result<Self> {
1169        let client = connect_to_engine(name, None).await?;
1170
1171        info!(primitive.name = %name, primitive.kind = "sink", "connected to engine");
1172
1173        Ok(Self {
1174            name: name.to_string(),
1175            client: Arc::new(client),
1176            subscribed_types: Vec::new(),
1177        })
1178    }
1179
1180    /// Connect to the Emergent engine at a specific socket path.
1181    ///
1182    /// This is useful for testing or when connecting to a non-default socket.
1183    ///
1184    /// # Errors
1185    ///
1186    /// Returns an error if the connection fails.
1187    pub async fn connect_to(name: &str, socket_path: &std::path::Path) -> Result<Self> {
1188        let client = connect_to_engine(name, Some(socket_path)).await?;
1189
1190        info!(primitive.name = %name, primitive.kind = "sink", "connected to engine");
1191
1192        Ok(Self {
1193            name: name.to_string(),
1194            client: Arc::new(client),
1195            subscribed_types: Vec::new(),
1196        })
1197    }
1198
1199    /// Convenience method that connects, gets configured subscriptions, and returns a stream.
1200    ///
1201    /// This is a one-liner for the common pattern of:
1202    /// 1. Connect to the engine
1203    /// 2. Query configured subscriptions from the engine's config
1204    /// 3. Subscribe to those topics
1205    /// 4. Return the message stream
1206    ///
1207    /// The `types` parameter is for API consistency but is ignored - the engine's
1208    /// configuration is the source of truth for what this sink should subscribe to.
1209    ///
1210    /// # Example
1211    ///
1212    /// ```rust,ignore
1213    /// use futures::StreamExt;
1214    ///
1215    /// let mut stream = EmergentSink::messages("console", ["timer.tick"]).await?;
1216    /// while let Some(msg) = stream.next().await {
1217    ///     println!("{}", msg.payload);
1218    /// }
1219    /// ```
1220    ///
1221    /// # Errors
1222    ///
1223    /// Returns an error if connection or subscription fails.
1224    pub async fn messages(
1225        name: impl Into<String>,
1226        _types: impl IntoSubscription,
1227    ) -> Result<MessageStream> {
1228        let name = name.into();
1229        let mut sink = Self::connect(&name).await?;
1230        let topics = sink.get_my_subscriptions().await?;
1231        sink.subscribe(topics).await
1232    }
1233
1234    /// Subscribe to message types and return a stream of incoming messages.
1235    ///
1236    /// The SDK automatically handles `system.shutdown` messages - when the engine
1237    /// signals shutdown for sinks, the stream will close gracefully.
1238    ///
1239    /// # Examples
1240    ///
1241    /// ```rust,ignore
1242    /// // Single topic
1243    /// let stream = sink.subscribe("timer.tick").await?;
1244    ///
1245    /// // Multiple topics with array
1246    /// let stream = sink.subscribe(["timer.tick", "timer.filtered"]).await?;
1247    ///
1248    /// // From a Vec
1249    /// let topics = vec!["timer.tick".to_string()];
1250    /// let stream = sink.subscribe(topics).await?;
1251    /// ```
1252    ///
1253    /// # Errors
1254    ///
1255    /// Returns an error if the subscription fails.
1256    pub async fn subscribe(&mut self, types: impl IntoSubscription) -> Result<MessageStream> {
1257        let topics = types.into_topics();
1258        let (stream, user_subs) =
1259            subscribe_and_stream(&self.client, topics, &self.name, "sink").await?;
1260        self.subscribed_types = user_subs;
1261        Ok(stream)
1262    }
1263
1264    /// Discover available message types and primitives.
1265    ///
1266    /// # Errors
1267    ///
1268    /// Returns an error if the discovery request fails.
1269    pub async fn discover(&self) -> Result<DiscoveryInfo> {
1270        let client = connect_to_engine(&self.name, None).await?;
1271        let response = client
1272            .discover()
1273            .await
1274            .map_err(|e| ClientError::ConnectionFailed(format!("discover failed: {e}")))?;
1275
1276        if !response.success {
1277            return Err(ClientError::DiscoveryFailed(
1278                response
1279                    .error
1280                    .unwrap_or_else(|| "unknown error".to_string()),
1281            ));
1282        }
1283
1284        let primitives = response
1285            .actors
1286            .unwrap_or_default()
1287            .into_iter()
1288            .map(|actor| PrimitiveInfo {
1289                name: actor.name,
1290                kind: String::new(),
1291            })
1292            .collect();
1293
1294        let message_types = response.message_types.unwrap_or_default();
1295
1296        Ok(DiscoveryInfo {
1297            message_types,
1298            primitives,
1299        })
1300    }
1301
1302    /// Get the configured subscription types for this primitive.
1303    ///
1304    /// # Errors
1305    ///
1306    /// Returns an error if the request fails.
1307    pub async fn get_my_subscriptions(&self) -> Result<Vec<String>> {
1308        get_my_subscriptions_via_pubsub(&self.name).await
1309    }
1310
1311    /// Get the current topology (all primitives and their state).
1312    ///
1313    /// # Errors
1314    ///
1315    /// Returns an error if the request fails.
1316    pub async fn get_topology(&self) -> Result<TopologyState> {
1317        get_topology_via_pubsub(&self.name).await
1318    }
1319
1320    /// Get the name of this sink.
1321    #[must_use]
1322    pub fn name(&self) -> &str {
1323        &self.name
1324    }
1325
1326    /// Get currently subscribed message types.
1327    pub fn subscribed_types(&self) -> &[String] {
1328        &self.subscribed_types
1329    }
1330
1331    /// Gracefully disconnect from the engine.
1332    ///
1333    /// # Errors
1334    ///
1335    /// Returns an error if the disconnection fails.
1336    pub async fn disconnect(&self) -> Result<()> {
1337        info!(primitive.name = %self.name, "disconnecting from engine");
1338        self.client
1339            .disconnect()
1340            .await
1341            .map_err(|e| ClientError::ConnectionFailed(format!("disconnect failed: {e}")))?;
1342        info!(primitive.name = %self.name, "disconnected from engine");
1343        Ok(())
1344    }
1345}