objectiveai_sdk/cli/output/handle.rs
1//! Destinations for [`super::Output::emit`].
2//!
3//! - [`Handle::Stdout`] — this process's stdout (with fatal-error
4//! mirror to stderr).
5//! - [`Handle::Stdin`] — a child process's stdin, for programmatic
6//! embedders that spawn a subprocess to consume the cli's output.
7//! - [`Handle::Collect`] — an in-memory shared `Vec`, for tests or
8//! in-process embedders that want the events without a subprocess.
9//! - [`Handle::Stream`] — push each emitted output through an mpsc
10//! channel. Used by hosts that embed the cli in-process and want
11//! each [`Output`] line as a typed value (e.g. the viewer bridging
12//! cli output into Tauri events).
13
14use std::sync::Arc;
15
16use serde::Serialize;
17use tokio::process::ChildStdin;
18use tokio::sync::{Mutex, mpsc};
19
20use super::{Notification, Output};
21
22/// Destination for [`super::Output::emit`].
23///
24/// `Arc<tokio::sync::Mutex<_>>` on the non-unit variants lets the
25/// handle be cloned cheaply across the command tree's call chain and
26/// guarantees concurrent emits serialize correctly. `tokio::sync::Mutex`
27/// (not `std::sync::Mutex`) because we hold the guard across `.await`
28/// boundaries during async writes, which would deadlock with std's.
29#[derive(Clone)]
30pub enum Handle {
31 /// Write each line to this process's stdout; mirror fatal
32 /// `Output::Error` lines to stderr.
33 Stdout,
34 /// Write each line to a child process's stdin.
35 Stdin(Arc<Mutex<ChildStdin>>),
36 /// Push each emitted `Output<T>` (reserialized as `Output<Value>`
37 /// for a uniform storage type) into a shared vector.
38 Collect(Arc<Mutex<Vec<Output<serde_json::Value>>>>),
39 /// Forward each emitted `Output<T>` through an mpsc channel
40 /// (reserialized as `Output<Value>` to match `Collect`'s storage
41 /// type). The receiver loops on `rx.recv()` until the cli's
42 /// `run()` completes and the `Handle` is dropped — at which point
43 /// the sender side closes and the receiver's loop exits.
44 Stream(mpsc::UnboundedSender<Output<serde_json::Value>>),
45}
46
47impl Default for Handle {
48 fn default() -> Self {
49 Handle::Stdout
50 }
51}
52
53impl Handle {
54 /// Emit `output` to this destination. Panics on write failure to
55 /// match `println!` semantics.
56 pub async fn emit<T: Serialize>(&self, output: &Output<T>) {
57 match self {
58 Handle::Stdout => {
59 let json = serde_json::to_string(output)
60 .expect("Output<T> serializes when T: Serialize");
61 println!("{json}");
62 if matches!(output, Output::Error(e) if e.fatal) {
63 eprintln!("{json}");
64 }
65 }
66 Handle::Stdin(stdin) => {
67 let json = serde_json::to_string(output)
68 .expect("Output<T> serializes when T: Serialize");
69 use tokio::io::AsyncWriteExt;
70 let mut guard = stdin.lock().await;
71 guard
72 .write_all(json.as_bytes())
73 .await
74 .expect("emit to child stdin failed");
75 guard
76 .write_all(b"\n")
77 .await
78 .expect("emit to child stdin failed");
79 }
80 Handle::Collect(vec) => {
81 vec.lock().await.push(rebuild_as_value(output));
82 }
83 Handle::Stream(tx) => {
84 // Best-effort send: if the receiver dropped (consumer
85 // gone), just drop the message — same semantics as
86 // Stdout where a closed pipe would crash.
87 let _ = tx.send(rebuild_as_value(output));
88 }
89 }
90 }
91}
92
93/// Rebuild `Output<T>` as `Output<Value>` variant-wise, avoiding a
94/// full serialize→deserialize roundtrip. Used by `Collect` and `Stream`.
95fn rebuild_as_value<T: Serialize>(output: &Output<T>) -> Output<serde_json::Value> {
96 match output {
97 Output::Error(e) => Output::Error(e.clone()),
98 Output::Notification(n) => Output::Notification(Notification {
99 value: serde_json::to_value(&n.value)
100 .expect("T serializes when T: Serialize"),
101 }),
102 Output::Begin => Output::Begin,
103 Output::End => Output::End,
104 }
105}