use crate::types::Output;
use agent_first_data::{cli_output, OutputFormat, RedactionPolicy};
use serde_json::Value;
use std::io::Write;
use tokio::sync::mpsc;
pub async fn writer_task(mut rx: mpsc::Receiver<Output>, format: OutputFormat) {
while let Some(output) = rx.recv().await {
let rendered = match serde_json::to_value(&output) {
Ok(mut value) => {
if matches!(format, OutputFormat::Json) {
match redaction_policy_for_output(&output) {
Some(policy) => agent_first_data::output_json_with(&value, policy),
None => cli_output(&value, OutputFormat::Json),
}
} else {
protect_server_body(&mut value);
cli_output(&value, format)
}
}
Err(_) => {
let fallback = serde_json::json!({
"code": "error",
"error_code": "internal_error",
"error": "output serialization failed",
"retryable": false,
"trace": {"duration_ms": 0}
});
cli_output(&fallback, format)
}
};
let stdout = std::io::stdout();
let mut out = stdout.lock();
let _ = out.write_all(rendered.as_bytes());
if !rendered.ends_with('\n') {
let _ = out.write_all(b"\n");
}
let _ = out.flush();
}
}
fn redaction_policy_for_output(output: &Output) -> Option<RedactionPolicy> {
match output {
Output::Response { .. } => Some(RedactionPolicy::RedactionTraceOnly),
Output::ChunkData { .. } => Some(RedactionPolicy::RedactionNone),
_ => None,
}
}
fn protect_server_body(value: &mut Value) {
if let Some(obj) = value.as_object_mut() {
for key in &["body", "data"] {
if let Some(v) = obj.get(*key).cloned() {
if !v.is_null() && !v.is_string() {
if let Ok(json_str) = serde_json::to_string(&v) {
obj.insert((*key).to_string(), Value::String(json_str));
}
}
}
}
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
#[tokio::test]
async fn writer_task_drains_channel() {
let (tx, rx) = mpsc::channel(4);
tx.send(Output::Pong {
trace: crate::types::PongTrace {
uptime_s: 1,
requests_total: 2,
connections_active: 3,
},
})
.await
.expect("send");
drop(tx);
writer_task(rx, OutputFormat::Json).await;
}
#[tokio::test]
async fn writer_task_yaml_and_plain_formats() {
for format in [OutputFormat::Yaml, OutputFormat::Plain] {
let (tx, rx) = mpsc::channel(4);
tx.send(Output::Pong {
trace: crate::types::PongTrace {
uptime_s: 1,
requests_total: 2,
connections_active: 3,
},
})
.await
.expect("send");
drop(tx);
writer_task(rx, format).await;
}
}
}