Skip to main content

objectiveai_sdk/cli/output/
handle.rs

1//! Destinations for [`super::Output::emit`].
2//!
3//! - [`Handle::Stdout`] — this process's stdout (with fatal-error
4//!   mirror to stderr).
5//! - [`Handle::Stdin`] — a child process's stdin, for programmatic
6//!   embedders that spawn a subprocess to consume the cli's output.
7//! - [`Handle::Collect`] — an in-memory shared `Vec`, for tests or
8//!   in-process embedders that want the events without a subprocess.
9//! - [`Handle::Stream`] — push each emitted output through an mpsc
10//!   channel. Used by hosts that embed the cli in-process and want
11//!   each [`Output`] line as a typed value (e.g. the viewer bridging
12//!   cli output into Tauri events).
13
14use std::sync::Arc;
15
16use serde::Serialize;
17use tokio::process::ChildStdin;
18use tokio::sync::{Mutex, mpsc};
19
20use super::{Notification, Output};
21
22/// Destination for [`super::Output::emit`].
23///
24/// `Arc<tokio::sync::Mutex<_>>` on the non-unit variants lets the
25/// handle be cloned cheaply across the command tree's call chain and
26/// guarantees concurrent emits serialize correctly. `tokio::sync::Mutex`
27/// (not `std::sync::Mutex`) because we hold the guard across `.await`
28/// boundaries during async writes, which would deadlock with std's.
29#[derive(Clone)]
30pub enum Handle {
31    /// Write each line to this process's stdout; mirror fatal
32    /// `Output::Error` lines to stderr.
33    Stdout,
34    /// Write each line to a child process's stdin.
35    Stdin(Arc<Mutex<ChildStdin>>),
36    /// Push each emitted `Output<T>` (reserialized as `Output<Value>`
37    /// for a uniform storage type) into a shared vector.
38    Collect(Arc<Mutex<Vec<Output<serde_json::Value>>>>),
39    /// Forward each emitted `Output<T>` through an mpsc channel
40    /// (reserialized as `Output<Value>` to match `Collect`'s storage
41    /// type). The receiver loops on `rx.recv()` until the cli's
42    /// `run()` completes and the `Handle` is dropped — at which point
43    /// the sender side closes and the receiver's loop exits.
44    Stream(mpsc::UnboundedSender<Output<serde_json::Value>>),
45}
46
47impl Default for Handle {
48    fn default() -> Self {
49        Handle::Stdout
50    }
51}
52
53impl Handle {
54    /// Emit `output` to this destination. Panics on write failure to
55    /// match `println!` semantics.
56    pub async fn emit<T: Serialize>(&self, output: &Output<T>) {
57        match self {
58            Handle::Stdout => {
59                let json = serde_json::to_string(output)
60                    .expect("Output<T> serializes when T: Serialize");
61                println!("{json}");
62                if matches!(output, Output::Error(e) if e.fatal) {
63                    eprintln!("{json}");
64                }
65            }
66            Handle::Stdin(stdin) => {
67                let json = serde_json::to_string(output)
68                    .expect("Output<T> serializes when T: Serialize");
69                use tokio::io::AsyncWriteExt;
70                let mut guard = stdin.lock().await;
71                guard
72                    .write_all(json.as_bytes())
73                    .await
74                    .expect("emit to child stdin failed");
75                guard
76                    .write_all(b"\n")
77                    .await
78                    .expect("emit to child stdin failed");
79            }
80            Handle::Collect(vec) => {
81                vec.lock().await.push(rebuild_as_value(output));
82            }
83            Handle::Stream(tx) => {
84                // Best-effort send: if the receiver dropped (consumer
85                // gone), just drop the message — same semantics as
86                // Stdout where a closed pipe would crash.
87                let _ = tx.send(rebuild_as_value(output));
88            }
89        }
90    }
91}
92
93/// Rebuild `Output<T>` as `Output<Value>` variant-wise, avoiding a
94/// full serialize→deserialize roundtrip. Used by `Collect` and `Stream`.
95fn rebuild_as_value<T: Serialize>(output: &Output<T>) -> Output<serde_json::Value> {
96    match output {
97        Output::Error(e) => Output::Error(e.clone()),
98        Output::Notification(n) => Output::Notification(Notification {
99            value: serde_json::to_value(&n.value)
100                .expect("T serializes when T: Serialize"),
101        }),
102        Output::Begin => Output::Begin,
103        Output::End => Output::End,
104    }
105}