objectiveai_sdk/cli/output/handle.rs
1//! Destinations for [`super::Output::emit`].
2//!
3//! - [`HandleDestination::Stdout`] — this process's stdout (with
4//! fatal-error mirror to stderr).
5//! - [`HandleDestination::Stdin`] — a child process's stdin, for
6//! programmatic embedders that spawn a subprocess to consume the
7//! cli's output.
8//! - [`HandleDestination::Collect`] — an in-memory shared `Vec`, for
9//! tests or in-process embedders that want the events without a
10//! subprocess.
11//! - [`HandleDestination::Stream`] — push each emitted output through
12//! an mpsc channel. Used by hosts that embed the cli in-process and
13//! want each [`Output`] line as a typed value (e.g. the viewer
14//! bridging cli output into Tauri events).
15//!
16//! `Handle` is a thin pair of (`destination`, `agent_id`). The cli's
17//! top-level `run()` stamps `agent_id` once at startup; from then on
18//! every emit picks up the same value without any per-call site
19//! threading.
20
21use std::sync::Arc;
22
23use serde::Serialize;
24use tokio::process::ChildStdin;
25use tokio::sync::{Mutex, mpsc};
26
27use super::{Notification, Output};
28
29/// Destination for [`super::Output::emit`].
30///
31/// `Arc<tokio::sync::Mutex<_>>` on the non-unit variants lets the
32/// handle be cloned cheaply across the command tree's call chain and
33/// guarantees concurrent emits serialize correctly. `tokio::sync::Mutex`
34/// (not `std::sync::Mutex`) because we hold the guard across `.await`
35/// boundaries during async writes, which would deadlock with std's.
36#[derive(Clone)]
37pub enum HandleDestination {
38 /// Write each line to this process's stdout; mirror fatal
39 /// `Output::Error` lines to stderr.
40 Stdout,
41 /// Write each line to a child process's stdin.
42 Stdin(Arc<Mutex<ChildStdin>>),
43 /// Push each emitted `Output<T>` (reserialized as `Output<Value>`
44 /// for a uniform storage type) into a shared vector.
45 Collect(Arc<Mutex<Vec<Output<serde_json::Value>>>>),
46 /// Forward each emitted `Output<T>` through an mpsc channel
47 /// (reserialized as `Output<Value>` to match `Collect`'s storage
48 /// type). The receiver loops on `rx.recv()` until the cli's
49 /// `run()` completes and the `Handle` is dropped — at which point
50 /// the sender side closes and the receiver's loop exits.
51 Stream(mpsc::UnboundedSender<Output<serde_json::Value>>),
52}
53
54impl Default for HandleDestination {
55 fn default() -> Self {
56 HandleDestination::Stdout
57 }
58}
59
60/// Output handle: a destination plus an optional `agent_id` that gets
61/// stamped on every emitted `Notification` and `Error` line.
62///
63/// The `agent_id` field is set once by the cli's `run()` from
64/// `Config.agent_id` (env `OBJECTIVEAI_AGENT_ID`). All emit sites
65/// stay verbatim — `Notification` and `Error` payloads carry their
66/// own `agent_id: Option<String>` field that defaults to `None`, and
67/// `emit` overwrites it with the handle's value before writing.
68#[derive(Clone, Default)]
69pub struct Handle {
70 pub destination: HandleDestination,
71 pub agent_id: Option<String>,
72}
73
74impl From<HandleDestination> for Handle {
75 fn from(destination: HandleDestination) -> Self {
76 Handle { destination, agent_id: None }
77 }
78}
79
80impl Handle {
81 /// Sugar for the most common destination — write to this process's
82 /// stdout. Equivalent to `Handle::default()`.
83 pub fn stdout() -> Self {
84 Handle::default()
85 }
86
87 /// Emit `output` to this destination. Panics on write failure to
88 /// match `println!` semantics.
89 pub async fn emit<T: Serialize>(&self, output: &Output<T>) {
90 // Single Value round-trip when we have an agent_id to stamp.
91 // Only `Notification` and `Error` get tagged; `Begin` / `End`
92 // are unit-shaped framing markers and stay as-is.
93 let json = if let Some(id) = self.agent_id.as_deref() {
94 let mut v = serde_json::to_value(output)
95 .expect("Output<T> serializes when T: Serialize");
96 if matches!(output, Output::Notification(_) | Output::Error(_)) {
97 if let Some(obj) = v.as_object_mut() {
98 obj.insert(
99 "agent_id".to_string(),
100 serde_json::Value::String(id.to_string()),
101 );
102 }
103 }
104 serde_json::to_string(&v)
105 .expect("agent-id-stamped Output<T> reserializes")
106 } else {
107 serde_json::to_string(output)
108 .expect("Output<T> serializes when T: Serialize")
109 };
110 match &self.destination {
111 HandleDestination::Stdout => {
112 println!("{json}");
113 if matches!(output, Output::Error(e) if e.fatal) {
114 eprintln!("{json}");
115 }
116 }
117 HandleDestination::Stdin(stdin) => {
118 use tokio::io::AsyncWriteExt;
119 let mut guard = stdin.lock().await;
120 guard
121 .write_all(json.as_bytes())
122 .await
123 .expect("emit to child stdin failed");
124 guard
125 .write_all(b"\n")
126 .await
127 .expect("emit to child stdin failed");
128 }
129 HandleDestination::Collect(vec) => {
130 vec.lock().await.push(self.rebuild_as_value(output));
131 }
132 HandleDestination::Stream(tx) => {
133 // Best-effort send: if the receiver dropped (consumer
134 // gone), just drop the message — same semantics as
135 // Stdout where a closed pipe would crash.
136 let _ = tx.send(self.rebuild_as_value(output));
137 }
138 }
139 }
140
141 /// Rebuild `Output<T>` as `Output<Value>` variant-wise, avoiding a
142 /// full serialize→deserialize roundtrip. Used by `Collect` and
143 /// `Stream`. Picks up the handle's `agent_id` on `Notification`
144 /// and `Error` so in-memory consumers see the same stamped shape
145 /// the wire output carries.
146 fn rebuild_as_value<T: Serialize>(
147 &self,
148 output: &Output<T>,
149 ) -> Output<serde_json::Value> {
150 match output {
151 Output::Error(e) => {
152 let mut e = e.clone();
153 if e.agent_id.is_none() {
154 e.agent_id = self.agent_id.clone();
155 }
156 Output::Error(e)
157 }
158 Output::Notification(n) => Output::Notification(Notification {
159 value: serde_json::to_value(&n.value)
160 .expect("T serializes when T: Serialize"),
161 agent_id: n.agent_id.clone().or_else(|| self.agent_id.clone()),
162 }),
163 Output::Begin => Output::Begin,
164 Output::End => Output::End,
165 }
166 }
167}
168
169/// Reverse of [`Handle::emit`]'s `agent_id` stamping: walks each
170/// newline-delimited line in `text`, tries to parse it as a JSON
171/// object, and removes the top-level `"agent_id"` key. Lines that
172/// aren't valid JSON pass through unchanged. Preserves the trailing
173/// newline if the original had one (the emit envelope always ends
174/// with `\n`, so matching that keeps re-stripped bodies stable).
175///
176/// Used by test-mode response collectors (e.g. the MCP server's
177/// `TEST_MODE` strip) and snapshot normalizers to keep the racy
178/// `agent_id` counter out of test artefacts. **Never use this on
179/// production wire output** — callers depend on the agent_id stamp
180/// for cross-process correlation.
181pub fn strip_agent_id_lines(text: &str) -> String {
182 let mut out: String = text
183 .lines()
184 .map(|line| {
185 // Only rewrite lines that parse as a JSON *object* AND
186 // actually contain an `agent_id` top-level key. Other
187 // JSON-valid lines (strings, numbers, arrays, or
188 // objects without agent_id) pass through verbatim —
189 // otherwise we'd re-serialize an indented JSON-string
190 // line like ` "object"` (a schema's enum value) as
191 // `"object"`, destroying the surrounding pretty-printed
192 // indentation that downstream consumers depend on.
193 let Ok(serde_json::Value::Object(mut obj)) =
194 serde_json::from_str::<serde_json::Value>(line)
195 else {
196 return line.to_string();
197 };
198 if obj.remove("agent_id").is_none() {
199 return line.to_string();
200 }
201 serde_json::to_string(&serde_json::Value::Object(obj))
202 .unwrap_or_else(|_| line.to_string())
203 })
204 .collect::<Vec<_>>()
205 .join("\n");
206 if text.ends_with('\n') {
207 out.push('\n');
208 }
209 out
210}