use std::sync::Arc;
use serde::Serialize;
use tokio::process::ChildStdin;
use tokio::sync::{Mutex, mpsc};
use super::{Notification, Output};
#[derive(Clone)]
pub enum HandleDestination {
Stdout,
Stdin(Arc<Mutex<ChildStdin>>),
Collect(Arc<Mutex<Vec<Output<serde_json::Value>>>>),
Stream(mpsc::UnboundedSender<Output<serde_json::Value>>),
}
impl Default for HandleDestination {
fn default() -> Self {
HandleDestination::Stdout
}
}
#[derive(Clone, Default)]
pub struct Handle {
pub destination: HandleDestination,
pub agent_id: Option<String>,
}
impl From<HandleDestination> for Handle {
fn from(destination: HandleDestination) -> Self {
Handle { destination, agent_id: None }
}
}
impl Handle {
pub fn stdout() -> Self {
Handle::default()
}
pub async fn emit<T: Serialize>(&self, output: &Output<T>) {
let json = if let Some(id) = self.agent_id.as_deref() {
let mut v = serde_json::to_value(output)
.expect("Output<T> serializes when T: Serialize");
if matches!(output, Output::Notification(_) | Output::Error(_)) {
if let Some(obj) = v.as_object_mut() {
obj.insert(
"agent_id".to_string(),
serde_json::Value::String(id.to_string()),
);
}
}
serde_json::to_string(&v)
.expect("agent-id-stamped Output<T> reserializes")
} else {
serde_json::to_string(output)
.expect("Output<T> serializes when T: Serialize")
};
match &self.destination {
HandleDestination::Stdout => {
println!("{json}");
if matches!(output, Output::Error(e) if e.fatal) {
eprintln!("{json}");
}
}
HandleDestination::Stdin(stdin) => {
use tokio::io::AsyncWriteExt;
let mut guard = stdin.lock().await;
guard
.write_all(json.as_bytes())
.await
.expect("emit to child stdin failed");
guard
.write_all(b"\n")
.await
.expect("emit to child stdin failed");
}
HandleDestination::Collect(vec) => {
vec.lock().await.push(self.rebuild_as_value(output));
}
HandleDestination::Stream(tx) => {
let _ = tx.send(self.rebuild_as_value(output));
}
}
}
fn rebuild_as_value<T: Serialize>(
&self,
output: &Output<T>,
) -> Output<serde_json::Value> {
match output {
Output::Error(e) => {
let mut e = e.clone();
if e.agent_id.is_none() {
e.agent_id = self.agent_id.clone();
}
Output::Error(e)
}
Output::Notification(n) => Output::Notification(Notification {
value: serde_json::to_value(&n.value)
.expect("T serializes when T: Serialize"),
agent_id: n.agent_id.clone().or_else(|| self.agent_id.clone()),
}),
Output::Begin => Output::Begin,
Output::End => Output::End,
}
}
}
pub fn strip_agent_id_lines(text: &str) -> String {
let mut out: String = text
.lines()
.map(|line| {
let Ok(serde_json::Value::Object(mut obj)) =
serde_json::from_str::<serde_json::Value>(line)
else {
return line.to_string();
};
if obj.remove("agent_id").is_none() {
return line.to_string();
}
serde_json::to_string(&serde_json::Value::Object(obj))
.unwrap_or_else(|_| line.to_string())
})
.collect::<Vec<_>>()
.join("\n");
if text.ends_with('\n') {
out.push('\n');
}
out
}