Skip to main content

objectiveai_sdk/cli/output/
handle.rs

1//! Destinations for [`super::Output::emit`].
2//!
3//! - [`HandleDestination::Stdout`] — this process's stdout (with
4//!   fatal-error mirror to stderr).
5//! - [`HandleDestination::Stdin`] — a child process's stdin, for
6//!   programmatic embedders that spawn a subprocess to consume the
7//!   cli's output.
8//! - [`HandleDestination::Collect`] — an in-memory shared `Vec`, for
9//!   tests or in-process embedders that want the events without a
10//!   subprocess.
11//! - [`HandleDestination::Stream`] — push each emitted output through
12//!   an mpsc channel. Used by hosts that embed the cli in-process and
13//!   want each [`Output`] line as a typed value (e.g. the viewer
14//!   bridging cli output into Tauri events).
15//!
16//! `Handle` is a thin pair of (`destination`, `agent_id`). The cli's
17//! top-level `run()` stamps `agent_id` once at startup; from then on
18//! every emit picks up the same value without any per-call site
19//! threading.
20
21use std::sync::Arc;
22
23use serde::Serialize;
24use tokio::process::ChildStdin;
25use tokio::sync::{Mutex, mpsc};
26
27use super::{Notification, Output};
28
29/// Destination for [`super::Output::emit`].
30///
31/// `Arc<tokio::sync::Mutex<_>>` on the non-unit variants lets the
32/// handle be cloned cheaply across the command tree's call chain and
33/// guarantees concurrent emits serialize correctly. `tokio::sync::Mutex`
34/// (not `std::sync::Mutex`) because we hold the guard across `.await`
35/// boundaries during async writes, which would deadlock with std's.
36#[derive(Clone)]
37pub enum HandleDestination {
38    /// Write each line to this process's stdout; mirror fatal
39    /// `Output::Error` lines to stderr.
40    Stdout,
41    /// Write each line to a child process's stdin.
42    Stdin(Arc<Mutex<ChildStdin>>),
43    /// Push each emitted `Output<T>` (reserialized as `Output<Value>`
44    /// for a uniform storage type) into a shared vector.
45    Collect(Arc<Mutex<Vec<Output<serde_json::Value>>>>),
46    /// Forward each emitted `Output<T>` through an mpsc channel
47    /// (reserialized as `Output<Value>` to match `Collect`'s storage
48    /// type). The receiver loops on `rx.recv()` until the cli's
49    /// `run()` completes and the `Handle` is dropped — at which point
50    /// the sender side closes and the receiver's loop exits.
51    Stream(mpsc::UnboundedSender<Output<serde_json::Value>>),
52}
53
54impl Default for HandleDestination {
55    fn default() -> Self {
56        HandleDestination::Stdout
57    }
58}
59
60/// Output handle: a destination plus an optional `agent_id` that gets
61/// stamped on every emitted `Notification` and `Error` line.
62///
63/// The `agent_id` field is set once by the cli's `run()` from
64/// `Config.agent_id` (env `OBJECTIVEAI_AGENT_ID`). All emit sites
65/// stay verbatim — `Notification` and `Error` payloads carry their
66/// own `agent_id: Option<String>` field that defaults to `None`, and
67/// `emit` overwrites it with the handle's value before writing.
68#[derive(Clone, Default)]
69pub struct Handle {
70    pub destination: HandleDestination,
71    pub agent_id: Option<String>,
72}
73
74impl From<HandleDestination> for Handle {
75    fn from(destination: HandleDestination) -> Self {
76        Handle { destination, agent_id: None }
77    }
78}
79
80impl Handle {
81    /// Sugar for the most common destination — write to this process's
82    /// stdout. Equivalent to `Handle::default()`.
83    pub fn stdout() -> Self {
84        Handle::default()
85    }
86
87    /// Emit `output` to this destination. Panics on write failure to
88    /// match `println!` semantics.
89    pub async fn emit<T: Serialize>(&self, output: &Output<T>) {
90        // Single Value round-trip when we have an agent_id to stamp.
91        // Only `Notification` and `Error` get tagged; `Begin` / `End`
92        // are unit-shaped framing markers and stay as-is.
93        let json = if let Some(id) = self.agent_id.as_deref() {
94            let mut v = serde_json::to_value(output)
95                .expect("Output<T> serializes when T: Serialize");
96            if matches!(output, Output::Notification(_) | Output::Error(_)) {
97                if let Some(obj) = v.as_object_mut() {
98                    obj.insert(
99                        "agent_id".to_string(),
100                        serde_json::Value::String(id.to_string()),
101                    );
102                }
103            }
104            serde_json::to_string(&v)
105                .expect("agent-id-stamped Output<T> reserializes")
106        } else {
107            serde_json::to_string(output)
108                .expect("Output<T> serializes when T: Serialize")
109        };
110        match &self.destination {
111            HandleDestination::Stdout => {
112                println!("{json}");
113                if matches!(output, Output::Error(e) if e.fatal) {
114                    eprintln!("{json}");
115                }
116            }
117            HandleDestination::Stdin(stdin) => {
118                use tokio::io::AsyncWriteExt;
119                let mut guard = stdin.lock().await;
120                guard
121                    .write_all(json.as_bytes())
122                    .await
123                    .expect("emit to child stdin failed");
124                guard
125                    .write_all(b"\n")
126                    .await
127                    .expect("emit to child stdin failed");
128            }
129            HandleDestination::Collect(vec) => {
130                vec.lock().await.push(self.rebuild_as_value(output));
131            }
132            HandleDestination::Stream(tx) => {
133                // Best-effort send: if the receiver dropped (consumer
134                // gone), just drop the message — same semantics as
135                // Stdout where a closed pipe would crash.
136                let _ = tx.send(self.rebuild_as_value(output));
137            }
138        }
139    }
140
141    /// Rebuild `Output<T>` as `Output<Value>` variant-wise, avoiding a
142    /// full serialize→deserialize roundtrip. Used by `Collect` and
143    /// `Stream`. Picks up the handle's `agent_id` on `Notification`
144    /// and `Error` so in-memory consumers see the same stamped shape
145    /// the wire output carries.
146    fn rebuild_as_value<T: Serialize>(
147        &self,
148        output: &Output<T>,
149    ) -> Output<serde_json::Value> {
150        match output {
151            Output::Error(e) => {
152                let mut e = e.clone();
153                if e.agent_id.is_none() {
154                    e.agent_id = self.agent_id.clone();
155                }
156                Output::Error(e)
157            }
158            Output::Notification(n) => Output::Notification(Notification {
159                value: serde_json::to_value(&n.value)
160                    .expect("T serializes when T: Serialize"),
161                agent_id: n.agent_id.clone().or_else(|| self.agent_id.clone()),
162            }),
163            Output::Begin => Output::Begin,
164            Output::End => Output::End,
165        }
166    }
167}
168
169/// Reverse of [`Handle::emit`]'s `agent_id` stamping: walks each
170/// newline-delimited line in `text`, tries to parse it as a JSON
171/// object, and removes the top-level `"agent_id"` key. Lines that
172/// aren't valid JSON pass through unchanged. Preserves the trailing
173/// newline if the original had one (the emit envelope always ends
174/// with `\n`, so matching that keeps re-stripped bodies stable).
175///
176/// Used by test-mode response collectors (e.g. the MCP server's
177/// `TEST_MODE` strip) and snapshot normalizers to keep the racy
178/// `agent_id` counter out of test artefacts. **Never use this on
179/// production wire output** — callers depend on the agent_id stamp
180/// for cross-process correlation.
181pub fn strip_agent_id_lines(text: &str) -> String {
182    let mut out: String = text
183        .lines()
184        .map(|line| {
185            // Only rewrite lines that parse as a JSON *object* AND
186            // actually contain an `agent_id` top-level key. Other
187            // JSON-valid lines (strings, numbers, arrays, or
188            // objects without agent_id) pass through verbatim —
189            // otherwise we'd re-serialize an indented JSON-string
190            // line like `    "object"` (a schema's enum value) as
191            // `"object"`, destroying the surrounding pretty-printed
192            // indentation that downstream consumers depend on.
193            let Ok(serde_json::Value::Object(mut obj)) =
194                serde_json::from_str::<serde_json::Value>(line)
195            else {
196                return line.to_string();
197            };
198            if obj.remove("agent_id").is_none() {
199                return line.to_string();
200            }
201            serde_json::to_string(&serde_json::Value::Object(obj))
202                .unwrap_or_else(|_| line.to_string())
203        })
204        .collect::<Vec<_>>()
205        .join("\n");
206    if text.ends_with('\n') {
207        out.push('\n');
208    }
209    out
210}