1use crate::error::ClientError;
9use crate::message::EmergentMessage;
10use crate::stream::MessageStream;
11use crate::subscribe::IntoSubscription;
12use crate::types::{CorrelationId, PrimitiveName};
13use crate::{DiscoveryInfo, PrimitiveInfo, Result};
14
15use tracing::{debug, error, info, warn};
16
17use acton_reactive::ipc::protocol::Format;
18use acton_reactive::ipc::{
19 IpcClient, IpcClientConfig, IpcConfig, IpcEnvelope, IpcPushNotification, socket_exists,
20 socket_is_alive,
21};
22use serde::{Deserialize, Serialize};
23use serde_json::json;
24use std::path::PathBuf;
25use std::sync::Arc;
26use tokio::sync::mpsc;
27
28#[derive(Clone, Debug, Serialize, Deserialize)]
30struct IpcEmergentMessage {
31 inner: EmergentMessage,
32}
33
34fn resolve_socket_path(_name: &str) -> Result<PathBuf> {
40 if let Ok(path) = std::env::var("EMERGENT_SOCKET") {
42 return Ok(PathBuf::from(path));
43 }
44
45 let mut config = IpcConfig::load();
47 config.socket.app_name = Some("emergent".to_string());
48 Ok(config.socket_path())
49}
50
51fn init_tracing(name: &str) {
57 use tracing_subscriber::EnvFilter;
58
59 let filter = EnvFilter::try_from_env("EMERGENT_LOG")
60 .or_else(|_| EnvFilter::try_from_default_env())
61 .unwrap_or_else(|_| EnvFilter::new("info"));
62
63 let wants_stderr = std::env::var("EMERGENT_LOG")
65 .map(|v| v.eq_ignore_ascii_case("stderr"))
66 .unwrap_or(false);
67
68 if wants_stderr {
69 let stderr_filter = EnvFilter::new("info");
70 let _ = tracing_subscriber::fmt()
71 .with_env_filter(stderr_filter)
72 .try_init();
73 } else {
74 let log_dir = directories::ProjectDirs::from("ai", "govcraft", "emergent")
76 .map(|dirs| dirs.data_dir().join(name))
77 .unwrap_or_else(|| std::path::PathBuf::from("."));
78 let _ = std::fs::create_dir_all(&log_dir);
79
80 if let Ok(log_file) = std::fs::OpenOptions::new()
81 .create(true)
82 .append(true)
83 .open(log_dir.join("primitive.log"))
84 {
85 let _ = tracing_subscriber::fmt()
86 .with_env_filter(filter)
87 .with_writer(std::sync::Mutex::new(log_file))
88 .with_ansi(false)
89 .try_init();
90 } else {
91 let _ = tracing_subscriber::fmt()
93 .with_env_filter(EnvFilter::new("off"))
94 .try_init();
95 }
96 }
97}
98
99async fn connect_to_engine(
104 name: &str,
105 socket_override: Option<&std::path::Path>,
106) -> Result<IpcClient> {
107 init_tracing(name);
108 let socket_path = match socket_override {
109 Some(path) => path.to_path_buf(),
110 None => resolve_socket_path(name)?,
111 };
112 debug!(path = %socket_path.display(), "resolved socket path");
113
114 info!(primitive.name = %name, path = %socket_path.display(), "connecting to engine");
115
116 if !socket_exists(&socket_path) {
117 error!(path = %socket_path.display(), "engine socket not found");
118 return Err(ClientError::SocketNotFound(
119 socket_path.display().to_string(),
120 ));
121 }
122
123 if !socket_is_alive(&socket_path).await {
124 error!(path = %socket_path.display(), "engine socket not responding");
125 return Err(ClientError::ConnectionFailed(
126 "Engine socket exists but is not responding".to_string(),
127 ));
128 }
129
130 let config = IpcClientConfig {
131 format: Format::MessagePack,
132 ..IpcClientConfig::default()
133 };
134
135 IpcClient::connect_with_config(&socket_path, config)
136 .await
137 .map_err(|e| {
138 error!(error = %e, "failed to connect to engine");
139 ClientError::ConnectionFailed(e.to_string())
140 })
141}
142
143fn build_publish_envelope(message: EmergentMessage) -> Result<IpcEnvelope> {
145 let ipc_message = IpcEmergentMessage { inner: message };
146 let payload = serde_json::to_value(&ipc_message)?;
147 Ok(IpcEnvelope::new(
148 "message_broker",
149 "EmergentMessage",
150 payload,
151 ))
152}
153
154fn build_publish_request_envelope(message: EmergentMessage) -> Result<IpcEnvelope> {
158 let ipc_message = IpcEmergentMessage { inner: message };
159 let payload = serde_json::to_value(&ipc_message)?;
160 Ok(IpcEnvelope::new_request(
161 "message_broker",
162 "EmergentMessage",
163 payload,
164 ))
165}
166
167async fn push_to_message_stream(
173 mut push_rx: mpsc::Receiver<IpcPushNotification>,
174 tx: mpsc::Sender<EmergentMessage>,
175 name: String,
176 shutdown_kind: &str,
177) {
178 debug!(primitive.name = %name, "push bridge started");
179
180 let auto_unwrap = std::env::var("EMERGENT_UNWRAP_STDOUT")
181 .is_ok_and(|v| v == "true" || v == "1");
182
183 while let Some(notification) = push_rx.recv().await {
184 if notification.message_type == "system.shutdown" {
186 let kind = notification
187 .payload
188 .get("kind")
189 .and_then(|v| v.as_str())
190 .unwrap_or("unknown");
191 info!(
192 primitive.name = %name,
193 shutdown_kind = %kind,
194 "received shutdown signal"
195 );
196 if kind == shutdown_kind {
197 info!(
198 primitive.name = %name,
199 "shutting down (engine requested)"
200 );
201 break;
202 }
203 debug!(
204 primitive.name = %name,
205 "ignoring shutdown for different primitive kind"
206 );
207 continue; }
209
210 let msg = if let Ok(msg) =
212 serde_json::from_value::<EmergentMessage>(notification.payload.clone())
213 {
214 msg
215 } else {
216 EmergentMessage::new(¬ification.message_type)
218 .with_source(notification.source_actor.as_deref().unwrap_or("unknown"))
219 .with_payload(notification.payload)
220 };
221
222 let msg = if auto_unwrap && !msg.message_type.as_str().starts_with("system.") {
224 msg.unwrap_stdout()
225 } else {
226 msg
227 };
228
229 debug!(
230 primitive.name = %name,
231 message_type = %msg.message_type,
232 message_id = %msg.id,
233 "received message"
234 );
235
236 if tx.send(msg).await.is_err() {
237 warn!(
238 primitive.name = %name,
239 "message stream send failed, receiver dropped"
240 );
241 break;
242 }
243 }
244
245 debug!(primitive.name = %name, "push bridge stopped");
246}
247
248async fn subscribe_and_stream(
252 client: &IpcClient,
253 topics: Vec<String>,
254 name: &str,
255 shutdown_kind: &str,
256) -> Result<(MessageStream, Vec<String>)> {
257 let mut all_types = topics;
259 if !all_types.iter().any(|t| t == "system.shutdown") {
260 all_types.push("system.shutdown".to_string());
261 }
262
263 let sub_response = client
265 .subscribe(all_types)
266 .await
267 .map_err(|e| ClientError::SubscriptionFailed(format!("subscribe failed: {e}")))?;
268
269 if !sub_response.success {
270 return Err(ClientError::SubscriptionFailed(
271 sub_response
272 .error
273 .unwrap_or_else(|| "unknown error".to_string()),
274 ));
275 }
276
277 let push_rx = client.take_push_receiver().ok_or_else(|| {
279 ClientError::SubscriptionFailed(
280 "push receiver already taken (subscribe called more than once?)".to_string(),
281 )
282 })?;
283
284 let (tx, rx) = mpsc::channel(256);
285 let bridge_name = name.to_string();
286 let bridge_kind = shutdown_kind.to_string();
287
288 tokio::spawn(async move {
289 push_to_message_stream(push_rx, tx, bridge_name, &bridge_kind).await;
290 });
291
292 let user_subs: Vec<String> = sub_response
294 .subscribed_types
295 .into_iter()
296 .filter(|s| s != "system.shutdown")
297 .collect();
298
299 Ok((MessageStream::new(rx), user_subs))
300}
301
302async fn get_my_subscriptions_via_pubsub(name: &str) -> Result<Vec<String>> {
308 debug!("querying configured subscriptions");
309
310 let client = connect_to_engine(name, None).await?;
311 let correlation_id = CorrelationId::new();
312
313 client
315 .subscribe(vec!["system.response.subscriptions".to_string()])
316 .await
317 .map_err(|e| ClientError::SubscriptionFailed(format!("subscribe failed: {e}")))?;
318
319 let mut push_rx = client.take_push_receiver().ok_or_else(|| {
321 ClientError::SubscriptionFailed("push receiver already taken".to_string())
322 })?;
323
324 let request = EmergentMessage::new("system.request.subscriptions")
326 .with_source(name)
327 .with_correlation_id(correlation_id.clone())
328 .with_payload(json!({ "name": name }));
329 let envelope = build_publish_envelope(request)?;
330 client
331 .send(envelope)
332 .await
333 .map_err(|e| ClientError::ConnectionFailed(format!("publish failed: {e}")))?;
334
335 let subs = tokio::time::timeout(std::time::Duration::from_secs(30), async {
337 while let Some(notification) = push_rx.recv().await {
338 if notification.message_type == "system.response.subscriptions" {
339 let msg: EmergentMessage = serde_json::from_value(notification.payload)?;
340 if msg.correlation_id.as_ref().map(|c| c.to_string())
341 == Some(correlation_id.to_string())
342 {
343 let subs_response: SubscriptionsResponse = serde_json::from_value(msg.payload)?;
344 return Ok(subs_response.subscribes);
345 }
346 }
347 }
348 Err(ClientError::ConnectionFailed(
349 "push channel closed before response".to_string(),
350 ))
351 })
352 .await
353 .map_err(|_| ClientError::Timeout)??;
354
355 info!(types = ?subs, "received configured subscriptions");
356 Ok(subs)
357}
358
359async fn get_topology_via_pubsub(name: &str) -> Result<TopologyState> {
363 debug!("querying topology");
364
365 let client = connect_to_engine(name, None).await?;
366 let correlation_id = CorrelationId::new();
367
368 client
369 .subscribe(vec!["system.response.topology".to_string()])
370 .await
371 .map_err(|e| ClientError::SubscriptionFailed(format!("subscribe failed: {e}")))?;
372
373 let mut push_rx = client.take_push_receiver().ok_or_else(|| {
374 ClientError::SubscriptionFailed("push receiver already taken".to_string())
375 })?;
376
377 let request = EmergentMessage::new("system.request.topology")
378 .with_source(name)
379 .with_correlation_id(correlation_id.clone())
380 .with_payload(json!({}));
381 let envelope = build_publish_envelope(request)?;
382 client
383 .send(envelope)
384 .await
385 .map_err(|e| ClientError::ConnectionFailed(format!("publish failed: {e}")))?;
386
387 let state = tokio::time::timeout(std::time::Duration::from_secs(30), async {
388 while let Some(notification) = push_rx.recv().await {
389 if notification.message_type == "system.response.topology" {
390 let msg: EmergentMessage = serde_json::from_value(notification.payload)?;
391 if msg.correlation_id.as_ref().map(|c| c.to_string())
392 == Some(correlation_id.to_string())
393 {
394 let topo_response: TopologyResponse = serde_json::from_value(msg.payload)?;
395 return Ok(TopologyState {
396 primitives: topo_response.primitives,
397 });
398 }
399 }
400 }
401 Err(ClientError::ConnectionFailed(
402 "push channel closed before response".to_string(),
403 ))
404 })
405 .await
406 .map_err(|_| ClientError::Timeout)??;
407
408 debug!(
409 primitive_count = state.primitives.len(),
410 "received topology"
411 );
412 Ok(state)
413}
414
415#[derive(Debug, Deserialize)]
421struct SubscriptionsResponse {
422 subscribes: Vec<String>,
423}
424
425#[derive(Debug, Clone, Serialize, Deserialize)]
427pub struct TopologyPrimitive {
428 pub name: String,
430 pub kind: String,
432 pub state: String,
434 pub publishes: Vec<String>,
436 pub subscribes: Vec<String>,
438 pub pid: Option<u32>,
440 pub error: Option<String>,
442}
443
444#[derive(Debug, Deserialize)]
446struct TopologyResponse {
447 primitives: Vec<TopologyPrimitive>,
448}
449
450#[derive(Debug, Clone)]
452pub struct TopologyState {
453 pub primitives: Vec<TopologyPrimitive>,
455}
456
457pub struct EmergentSource {
482 name: String,
484 client: IpcClient,
486}
487
488impl EmergentSource {
489 pub async fn connect(name: &str) -> Result<Self> {
497 let client = connect_to_engine(name, None).await?;
498
499 info!(primitive.name = %name, primitive.kind = "source", "connected to engine");
500
501 Ok(Self {
502 name: name.to_string(),
503 client,
504 })
505 }
506
507 pub async fn connect_to(name: &str, socket_path: &std::path::Path) -> Result<Self> {
515 let client = connect_to_engine(name, Some(socket_path)).await?;
516
517 info!(primitive.name = %name, primitive.kind = "source", "connected to engine");
518
519 Ok(Self {
520 name: name.to_string(),
521 client,
522 })
523 }
524
525 pub async fn publish(&self, mut message: EmergentMessage) -> Result<()> {
533 if message.source.is_default() {
534 message.source = PrimitiveName::new(&self.name).map_err(|e| {
535 ClientError::ConnectionFailed(format!(
536 "invalid primitive name '{}': {}",
537 self.name, e
538 ))
539 })?;
540 }
541
542 let envelope = build_publish_envelope(message)?;
543 self.client.send(envelope).await.map_err(|e| {
544 error!(primitive.name = %self.name, error = %e, "failed to publish message");
545 ClientError::ConnectionFailed(format!("publish failed: {e}"))
546 })
547 }
548
549 pub async fn publish_ack(&self, mut message: EmergentMessage) -> Result<()> {
561 if message.source.is_default() {
562 message.source = PrimitiveName::new(&self.name).map_err(|e| {
563 ClientError::ConnectionFailed(format!(
564 "invalid primitive name '{}': {}",
565 self.name, e
566 ))
567 })?;
568 }
569
570 let envelope = build_publish_request_envelope(message)?;
571 let response = self.client.request(envelope).await.map_err(|e| {
572 error!(primitive.name = %self.name, error = %e, "publish_ack failed");
573 ClientError::ConnectionFailed(format!("publish_ack failed: {e}"))
574 })?;
575 if !response.success {
576 return Err(ClientError::PublishFailed(
577 response.error.unwrap_or_else(|| "broker error".to_string()),
578 ));
579 }
580 Ok(())
581 }
582
583 pub async fn publish_all(
594 &self,
595 messages: impl IntoIterator<Item = EmergentMessage>,
596 ) -> Result<usize> {
597 let mut count = 0;
598 for message in messages {
599 self.publish_ack(message).await?;
600 count += 1;
601 }
602 Ok(count)
603 }
604
605 pub async fn publish_stream<S>(&self, mut stream: S) -> Result<usize>
617 where
618 S: futures::Stream<Item = EmergentMessage> + Unpin,
619 {
620 use futures::StreamExt;
621 let mut count = 0;
622 while let Some(message) = stream.next().await {
623 self.publish_ack(message).await?;
624 count += 1;
625 }
626 Ok(count)
627 }
628
629 pub async fn discover(&self) -> Result<DiscoveryInfo> {
635 let client = connect_to_engine(&self.name, None).await?;
637 let response = client
638 .discover()
639 .await
640 .map_err(|e| ClientError::ConnectionFailed(format!("discover failed: {e}")))?;
641
642 if !response.success {
643 return Err(ClientError::DiscoveryFailed(
644 response
645 .error
646 .unwrap_or_else(|| "unknown error".to_string()),
647 ));
648 }
649
650 let primitives = response
651 .actors
652 .unwrap_or_default()
653 .into_iter()
654 .map(|actor| PrimitiveInfo {
655 name: actor.name,
656 kind: String::new(),
657 })
658 .collect();
659
660 let message_types = response.message_types.unwrap_or_default();
661
662 Ok(DiscoveryInfo {
663 message_types,
664 primitives,
665 })
666 }
667
668 #[must_use]
670 pub fn name(&self) -> &str {
671 &self.name
672 }
673
674 pub async fn disconnect(&self) -> Result<()> {
680 info!(primitive.name = %self.name, "disconnecting from engine");
681 self.client
682 .disconnect()
683 .await
684 .map_err(|e| ClientError::ConnectionFailed(format!("disconnect failed: {e}")))?;
685 info!(primitive.name = %self.name, "disconnected from engine");
686 Ok(())
687 }
688}
689
690pub struct EmergentHandler {
715 name: String,
717 client: Arc<IpcClient>,
719 subscribed_types: Vec<String>,
721}
722
723impl EmergentHandler {
724 pub async fn connect(name: &str) -> Result<Self> {
730 let client = connect_to_engine(name, None).await?;
731
732 info!(primitive.name = %name, primitive.kind = "handler", "connected to engine");
733
734 Ok(Self {
735 name: name.to_string(),
736 client: Arc::new(client),
737 subscribed_types: Vec::new(),
738 })
739 }
740
741 pub async fn connect_to(name: &str, socket_path: &std::path::Path) -> Result<Self> {
749 let client = connect_to_engine(name, Some(socket_path)).await?;
750
751 info!(primitive.name = %name, primitive.kind = "handler", "connected to engine");
752
753 Ok(Self {
754 name: name.to_string(),
755 client: Arc::new(client),
756 subscribed_types: Vec::new(),
757 })
758 }
759
760 pub async fn subscribe(&mut self, types: impl IntoSubscription) -> Result<MessageStream> {
783 let topics = types.into_topics();
784 let (stream, user_subs) =
785 subscribe_and_stream(&self.client, topics, &self.name, "handler").await?;
786 self.subscribed_types = user_subs;
787 Ok(stream)
788 }
789
790 pub async fn messages(
797 name: impl Into<String>,
798 _types: impl IntoSubscription,
799 ) -> Result<(Self, MessageStream)> {
800 let name = name.into();
801 let mut handler = Self::connect(&name).await?;
802 let topics = get_my_subscriptions_via_pubsub(&name).await?;
803 let stream = handler.subscribe(topics).await?;
804 Ok((handler, stream))
805 }
806
807 pub async fn publish(&self, mut message: EmergentMessage) -> Result<()> {
813 if message.source.is_default() {
814 message.source = PrimitiveName::new(&self.name).map_err(|e| {
815 ClientError::ConnectionFailed(format!(
816 "invalid primitive name '{}': {}",
817 self.name, e
818 ))
819 })?;
820 }
821
822 let envelope = build_publish_envelope(message)?;
823 self.client.send(envelope).await.map_err(|e| {
824 error!(primitive.name = %self.name, error = %e, "failed to publish message");
825 ClientError::ConnectionFailed(format!("publish failed: {e}"))
826 })
827 }
828
829 pub async fn publish_ack(&self, mut message: EmergentMessage) -> Result<()> {
841 if message.source.is_default() {
842 message.source = PrimitiveName::new(&self.name).map_err(|e| {
843 ClientError::ConnectionFailed(format!(
844 "invalid primitive name '{}': {}",
845 self.name, e
846 ))
847 })?;
848 }
849
850 let envelope = build_publish_request_envelope(message)?;
851 let response = self.client.request(envelope).await.map_err(|e| {
852 error!(primitive.name = %self.name, error = %e, "publish_ack failed");
853 ClientError::ConnectionFailed(format!("publish_ack failed: {e}"))
854 })?;
855 if !response.success {
856 return Err(ClientError::PublishFailed(
857 response.error.unwrap_or_else(|| "broker error".to_string()),
858 ));
859 }
860 Ok(())
861 }
862
863 pub async fn publish_all(
874 &self,
875 messages: impl IntoIterator<Item = EmergentMessage>,
876 ) -> Result<usize> {
877 let mut count = 0;
878 for message in messages {
879 self.publish_ack(message).await?;
880 count += 1;
881 }
882 Ok(count)
883 }
884
885 pub async fn publish_stream<S>(&self, mut stream: S) -> Result<usize>
897 where
898 S: futures::Stream<Item = EmergentMessage> + Unpin,
899 {
900 use futures::StreamExt;
901 let mut count = 0;
902 while let Some(message) = stream.next().await {
903 self.publish_ack(message).await?;
904 count += 1;
905 }
906 Ok(count)
907 }
908
909 pub async fn stream_offer(
919 &self,
920 message_type: &str,
921 items: impl IntoIterator<Item = serde_json::Value>,
922 pull_stream: &mut MessageStream,
923 timeout: std::time::Duration,
924 ) -> Result<usize> {
925 let stream_id = CorrelationId::new().to_string();
926 let mut items = items.into_iter();
927
928 self.publish(
929 EmergentMessage::new("stream.ready").with_payload(serde_json::json!({
930 "stream_id": stream_id,
931 "message_type": message_type,
932 })),
933 )
934 .await?;
935
936 let mut published = 0usize;
937
938 loop {
939 let msg = tokio::time::timeout(timeout, pull_stream.next())
940 .await
941 .map_err(|_| ClientError::Timeout)?
942 .ok_or_else(|| {
943 ClientError::ConnectionFailed(
944 "pull stream closed during stream_offer".to_string(),
945 )
946 })?;
947
948 let is_pull = msg.message_type.as_str() == "stream.pull"
949 && msg.payload.get("stream_id").and_then(|v| v.as_str())
950 == Some(stream_id.as_str());
951
952 if is_pull {
953 if let Some(item) = items.next() {
954 self.publish(
955 EmergentMessage::new(message_type)
956 .with_payload(item)
957 .with_metadata(serde_json::json!({"stream_id": stream_id})),
958 )
959 .await?;
960 published += 1;
961 } else {
962 self.publish(
963 EmergentMessage::new("stream.end")
964 .with_payload(serde_json::json!({"stream_id": stream_id})),
965 )
966 .await?;
967 break;
968 }
969 }
970 }
971
972 Ok(published)
973 }
974
975 pub async fn stream_consume(
984 &self,
985 message_type: &str,
986 source_stream: &mut MessageStream,
987 timeout: std::time::Duration,
988 mut on_item: impl FnMut(EmergentMessage),
989 ) -> Result<usize> {
990 let stream_id = loop {
991 let msg = tokio::time::timeout(timeout, source_stream.next())
992 .await
993 .map_err(|_| ClientError::Timeout)?
994 .ok_or_else(|| {
995 ClientError::ConnectionFailed(
996 "source stream closed before stream.ready".to_string(),
997 )
998 })?;
999
1000 if msg.message_type.as_str() == "stream.ready"
1001 && let (Some(mt), Some(sid)) = (
1002 msg.payload.get("message_type").and_then(|v| v.as_str()),
1003 msg.payload.get("stream_id").and_then(|v| v.as_str()),
1004 )
1005 && mt == message_type
1006 {
1007 break sid.to_string();
1008 }
1009 };
1010
1011 self.publish(
1012 EmergentMessage::new("stream.pull")
1013 .with_payload(serde_json::json!({"stream_id": stream_id})),
1014 )
1015 .await?;
1016
1017 let mut count = 0usize;
1018
1019 loop {
1020 let msg = tokio::time::timeout(timeout, source_stream.next())
1021 .await
1022 .map_err(|_| ClientError::Timeout)?
1023 .ok_or_else(|| {
1024 ClientError::ConnectionFailed(
1025 "source stream closed during stream_consume".to_string(),
1026 )
1027 })?;
1028
1029 if msg.message_type.as_str() == "stream.end"
1030 && msg.payload.get("stream_id").and_then(|v| v.as_str()) == Some(stream_id.as_str())
1031 {
1032 break;
1033 }
1034
1035 let is_item = msg.message_type.as_str() == message_type
1036 && msg
1037 .metadata
1038 .as_ref()
1039 .and_then(|m| m.get("stream_id"))
1040 .and_then(|v| v.as_str())
1041 == Some(stream_id.as_str());
1042
1043 if is_item {
1044 on_item(msg);
1045 count += 1;
1046 self.publish(
1047 EmergentMessage::new("stream.pull")
1048 .with_payload(serde_json::json!({"stream_id": stream_id})),
1049 )
1050 .await?;
1051 }
1052 }
1053
1054 Ok(count)
1055 }
1056
1057 pub async fn discover(&self) -> Result<DiscoveryInfo> {
1063 let client = connect_to_engine(&self.name, None).await?;
1064 let response = client
1065 .discover()
1066 .await
1067 .map_err(|e| ClientError::ConnectionFailed(format!("discover failed: {e}")))?;
1068
1069 if !response.success {
1070 return Err(ClientError::DiscoveryFailed(
1071 response
1072 .error
1073 .unwrap_or_else(|| "unknown error".to_string()),
1074 ));
1075 }
1076
1077 let primitives = response
1078 .actors
1079 .unwrap_or_default()
1080 .into_iter()
1081 .map(|actor| PrimitiveInfo {
1082 name: actor.name,
1083 kind: String::new(),
1084 })
1085 .collect();
1086
1087 let message_types = response.message_types.unwrap_or_default();
1088
1089 Ok(DiscoveryInfo {
1090 message_types,
1091 primitives,
1092 })
1093 }
1094
1095 pub async fn get_my_subscriptions(&self) -> Result<Vec<String>> {
1101 get_my_subscriptions_via_pubsub(&self.name).await
1102 }
1103
1104 #[must_use]
1106 pub fn name(&self) -> &str {
1107 &self.name
1108 }
1109
1110 pub fn subscribed_types(&self) -> &[String] {
1112 &self.subscribed_types
1113 }
1114
1115 pub async fn disconnect(&self) -> Result<()> {
1121 info!(primitive.name = %self.name, "disconnecting from engine");
1122 self.client
1123 .disconnect()
1124 .await
1125 .map_err(|e| ClientError::ConnectionFailed(format!("disconnect failed: {e}")))?;
1126 info!(primitive.name = %self.name, "disconnected from engine");
1127 Ok(())
1128 }
1129}
1130
1131pub struct EmergentSink {
1154 name: String,
1156 client: Arc<IpcClient>,
1158 subscribed_types: Vec<String>,
1160}
1161
1162impl EmergentSink {
1163 pub async fn connect(name: &str) -> Result<Self> {
1169 let client = connect_to_engine(name, None).await?;
1170
1171 info!(primitive.name = %name, primitive.kind = "sink", "connected to engine");
1172
1173 Ok(Self {
1174 name: name.to_string(),
1175 client: Arc::new(client),
1176 subscribed_types: Vec::new(),
1177 })
1178 }
1179
1180 pub async fn connect_to(name: &str, socket_path: &std::path::Path) -> Result<Self> {
1188 let client = connect_to_engine(name, Some(socket_path)).await?;
1189
1190 info!(primitive.name = %name, primitive.kind = "sink", "connected to engine");
1191
1192 Ok(Self {
1193 name: name.to_string(),
1194 client: Arc::new(client),
1195 subscribed_types: Vec::new(),
1196 })
1197 }
1198
1199 pub async fn messages(
1225 name: impl Into<String>,
1226 _types: impl IntoSubscription,
1227 ) -> Result<MessageStream> {
1228 let name = name.into();
1229 let mut sink = Self::connect(&name).await?;
1230 let topics = sink.get_my_subscriptions().await?;
1231 sink.subscribe(topics).await
1232 }
1233
1234 pub async fn subscribe(&mut self, types: impl IntoSubscription) -> Result<MessageStream> {
1257 let topics = types.into_topics();
1258 let (stream, user_subs) =
1259 subscribe_and_stream(&self.client, topics, &self.name, "sink").await?;
1260 self.subscribed_types = user_subs;
1261 Ok(stream)
1262 }
1263
1264 pub async fn discover(&self) -> Result<DiscoveryInfo> {
1270 let client = connect_to_engine(&self.name, None).await?;
1271 let response = client
1272 .discover()
1273 .await
1274 .map_err(|e| ClientError::ConnectionFailed(format!("discover failed: {e}")))?;
1275
1276 if !response.success {
1277 return Err(ClientError::DiscoveryFailed(
1278 response
1279 .error
1280 .unwrap_or_else(|| "unknown error".to_string()),
1281 ));
1282 }
1283
1284 let primitives = response
1285 .actors
1286 .unwrap_or_default()
1287 .into_iter()
1288 .map(|actor| PrimitiveInfo {
1289 name: actor.name,
1290 kind: String::new(),
1291 })
1292 .collect();
1293
1294 let message_types = response.message_types.unwrap_or_default();
1295
1296 Ok(DiscoveryInfo {
1297 message_types,
1298 primitives,
1299 })
1300 }
1301
1302 pub async fn get_my_subscriptions(&self) -> Result<Vec<String>> {
1308 get_my_subscriptions_via_pubsub(&self.name).await
1309 }
1310
1311 pub async fn get_topology(&self) -> Result<TopologyState> {
1317 get_topology_via_pubsub(&self.name).await
1318 }
1319
1320 #[must_use]
1322 pub fn name(&self) -> &str {
1323 &self.name
1324 }
1325
1326 pub fn subscribed_types(&self) -> &[String] {
1328 &self.subscribed_types
1329 }
1330
1331 pub async fn disconnect(&self) -> Result<()> {
1337 info!(primitive.name = %self.name, "disconnecting from engine");
1338 self.client
1339 .disconnect()
1340 .await
1341 .map_err(|e| ClientError::ConnectionFailed(format!("disconnect failed: {e}")))?;
1342 info!(primitive.name = %self.name, "disconnected from engine");
1343 Ok(())
1344 }
1345}