use crate::output::Event;
use anyhow::{Result, bail};
use serde_json;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, Lines};
use tokio::process::{Child, ChildStdin, ChildStdout};
pub struct StreamingSession {
child: Child,
stdin: Option<ChildStdin>,
lines: Lines<BufReader<ChildStdout>>,
translator: crate::providers::claude::ClaudeEventTranslator,
pending: std::collections::VecDeque<Event>,
}
impl StreamingSession {
pub(crate) fn new(mut child: Child) -> Result<Self> {
let stdout = child
.stdout
.take()
.ok_or_else(|| anyhow::anyhow!("Child process stdout not piped"))?;
let stdin = child.stdin.take();
let reader = BufReader::new(stdout);
let lines = reader.lines();
Ok(Self {
child,
stdin,
lines,
translator: crate::providers::claude::ClaudeEventTranslator::new(),
pending: std::collections::VecDeque::new(),
})
}
pub async fn send(&mut self, message: &str) -> Result<()> {
let stdin = self
.stdin
.as_mut()
.ok_or_else(|| anyhow::anyhow!("stdin already closed"))?;
stdin.write_all(message.as_bytes()).await?;
stdin.write_all(b"\n").await?;
stdin.flush().await?;
Ok(())
}
pub async fn send_user_message(&mut self, content: &str) -> Result<()> {
let msg = serde_json::json!({
"type": "user_message",
"content": content,
});
self.send(&serde_json::to_string(&msg)?).await
}
pub async fn next_event(&mut self) -> Result<Option<Event>> {
use crate::providers::claude::models::ClaudeEvent;
loop {
if let Some(event) = self.pending.pop_front() {
return Ok(Some(event));
}
match self.lines.next_line().await? {
None => return Ok(None),
Some(line) => {
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
match serde_json::from_str::<ClaudeEvent>(trimmed) {
Ok(claude_event) => {
for event in self.translator.translate(&claude_event) {
self.pending.push_back(event);
}
continue;
}
Err(e) => {
log::debug!(
"Skipping unparseable streaming event: {}. Line: {}",
e,
crate::truncate_str(trimmed, 200)
);
continue;
}
}
}
}
}
}
pub fn close_input(&mut self) {
self.stdin.take();
}
pub async fn wait(mut self) -> Result<()> {
self.stdin.take();
let stderr_handle = self.child.stderr.take();
let status = self.child.wait().await?;
let stderr_text = if let Some(stderr) = stderr_handle {
let mut buf = Vec::new();
let mut reader = tokio::io::BufReader::new(stderr);
let _ = tokio::io::AsyncReadExt::read_to_end(&mut reader, &mut buf).await;
String::from_utf8_lossy(&buf).trim().to_string()
} else {
String::new()
};
crate::process::log_stderr_text(&stderr_text);
if !status.success() {
if stderr_text.is_empty() {
bail!("Agent process failed with status: {}", status);
} else {
bail!("{}", stderr_text);
}
}
Ok(())
}
}
#[cfg(test)]
#[path = "streaming_tests.rs"]
mod tests;