use std::io::{BufRead, BufReader, IsTerminal};
use std::sync::Arc;
use zeph_core::DiffData;
use zeph_core::channel::{
Channel, ChannelError, ChannelMessage, ElicitationRequest, ElicitationResponse, StopHint,
ToolOutputEvent, ToolStartEvent,
};
use zeph_core::json_event_sink::{JsonEvent, JsonEventSink};
#[derive(Debug)]
pub struct JsonCliChannel {
sink: Arc<JsonEventSink>,
rx: tokio::sync::mpsc::Receiver<Option<String>>,
auto: bool,
pending_chunks: bool,
}
impl JsonCliChannel {
#[must_use]
pub fn new(sink: Arc<JsonEventSink>, auto: bool) -> Self {
let (tx, rx) = tokio::sync::mpsc::channel(32);
let is_tty = std::io::stdin().is_terminal();
let _ = is_tty; std::thread::spawn(move || {
let reader = BufReader::new(std::io::stdin().lock());
for line in reader.lines() {
if let Ok(l) = line {
if tx.blocking_send(Some(l)).is_err() {
break;
}
} else {
let _ = tx.blocking_send(None);
break;
}
}
let _ = tx.blocking_send(None);
});
Self {
sink,
rx,
auto,
pending_chunks: false,
}
}
}
impl Channel for JsonCliChannel {
async fn recv(&mut self) -> Result<Option<ChannelMessage>, ChannelError> {
loop {
match self.rx.recv().await {
Some(Some(line)) => {
let trimmed = line.trim();
match trimmed {
"" => continue,
"exit" | "quit" | "/exit" | "/quit" => return Ok(None),
_ => {}
}
let text = trimmed.to_owned();
self.sink.emit(&JsonEvent::Query {
text: &text,
queue_len: 0,
});
return Ok(Some(ChannelMessage {
text,
attachments: Vec::new(),
}));
}
Some(None) | None => return Ok(None), }
}
}
fn try_recv(&mut self) -> Option<ChannelMessage> {
match self.rx.try_recv() {
Ok(Some(line)) => {
let trimmed = line.trim().to_owned();
if trimmed.is_empty() {
return None;
}
self.sink.emit(&JsonEvent::Query {
text: &trimmed,
queue_len: 0,
});
Some(ChannelMessage {
text: trimmed,
attachments: Vec::new(),
})
}
_ => None,
}
}
fn supports_exit(&self) -> bool {
true
}
async fn send(&mut self, text: &str) -> Result<(), ChannelError> {
self.sink.emit(&JsonEvent::ResponseChunk { text });
self.pending_chunks = true;
Ok(())
}
async fn send_chunk(&mut self, chunk: &str) -> Result<(), ChannelError> {
self.sink.emit(&JsonEvent::ResponseChunk { text: chunk });
self.pending_chunks = true;
Ok(())
}
async fn flush_chunks(&mut self) -> Result<(), ChannelError> {
if self.pending_chunks {
self.sink.emit(&JsonEvent::ResponseEnd);
self.pending_chunks = false;
}
Ok(())
}
async fn send_typing(&mut self) -> Result<(), ChannelError> {
Ok(())
}
async fn confirm(&mut self, prompt: &str) -> Result<bool, ChannelError> {
if self.auto {
return Ok(true);
}
self.sink.emit(&JsonEvent::Status {
message: &format!("{prompt} (y/n)"),
});
match self.rx.recv().await {
Some(Some(line)) => Ok(matches!(line.trim().to_lowercase().as_str(), "y" | "yes")),
_ => Ok(false),
}
}
async fn elicit(
&mut self,
_request: ElicitationRequest,
) -> Result<ElicitationResponse, ChannelError> {
Ok(ElicitationResponse::Declined)
}
async fn send_status(&mut self, text: &str) -> Result<(), ChannelError> {
self.sink.emit(&JsonEvent::Status { message: text });
Ok(())
}
async fn send_queue_count(&mut self, count: usize) -> Result<(), ChannelError> {
self.sink.emit(&JsonEvent::Status {
message: &format!("queue: {count}"),
});
Ok(())
}
async fn send_diff(&mut self, _diff: DiffData) -> Result<(), ChannelError> {
Ok(())
}
async fn send_tool_output(&mut self, _event: ToolOutputEvent) -> Result<(), ChannelError> {
Ok(())
}
async fn send_thinking_chunk(&mut self, _chunk: &str) -> Result<(), ChannelError> {
Ok(())
}
async fn send_stop_hint(&mut self, hint: StopHint) -> Result<(), ChannelError> {
self.sink.emit(&JsonEvent::Status {
message: &format!("stop_hint: {hint:?}"),
});
Ok(())
}
async fn send_usage(
&mut self,
_input_tokens: u64,
_output_tokens: u64,
_context_window: u64,
) -> Result<(), ChannelError> {
Ok(())
}
async fn send_tool_start(&mut self, _event: ToolStartEvent) -> Result<(), ChannelError> {
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::sync::{Arc, Mutex};
use zeph_core::json_event_sink::JsonEventSink;
use super::*;
struct BufWriter(Arc<Mutex<Vec<u8>>>);
impl std::io::Write for BufWriter {
fn write(&mut self, b: &[u8]) -> std::io::Result<usize> {
self.0.lock().unwrap().extend_from_slice(b);
Ok(b.len())
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
fn make_test_sink() -> (Arc<JsonEventSink>, impl Fn() -> Vec<String>) {
let buf: Arc<Mutex<Vec<u8>>> = Arc::new(Mutex::new(Vec::new()));
let buf_read = Arc::clone(&buf);
let sink = Arc::new(JsonEventSink::with_writer(BufWriter(buf)));
let read = move || {
let data = buf_read.lock().unwrap();
String::from_utf8(data.clone())
.unwrap_or_default()
.lines()
.filter(|l| !l.is_empty())
.map(str::to_owned)
.collect::<Vec<_>>()
};
(sink, read)
}
fn event_field<'a>(line: &'a str, key: &str) -> &'a str {
let needle = format!("\"{key}\":\"");
line.find(&needle).map_or("", |i| {
let rest = &line[i + needle.len()..];
&rest[..rest.find('"').unwrap_or(rest.len())]
})
}
#[tokio::test]
async fn flush_chunks_is_noop_without_chunks() {
let (sink, read) = make_test_sink();
let mut ch = JsonCliChannel::new(Arc::clone(&sink), false);
ch.flush_chunks().await.unwrap();
assert!(
read().is_empty(),
"flush_chunks must not emit when no chunks were sent"
);
}
#[tokio::test]
async fn flush_chunks_emits_end_after_chunk() {
let (sink, read) = make_test_sink();
let mut ch = JsonCliChannel::new(Arc::clone(&sink), false);
ch.send_chunk("hello").await.unwrap();
ch.flush_chunks().await.unwrap();
let lines = read();
assert_eq!(lines.len(), 2);
assert_eq!(event_field(&lines[0], "event"), "response_chunk");
assert_eq!(event_field(&lines[1], "event"), "response_end");
}
#[tokio::test]
async fn send_sets_pending_and_flush_emits_end() {
let (sink, read) = make_test_sink();
let mut ch = JsonCliChannel::new(Arc::clone(&sink), false);
ch.send_chunk("a").await.unwrap();
ch.send("b").await.unwrap();
assert!(
!read()
.iter()
.any(|l| event_field(l, "event") == "response_end"),
"send() must not emit ResponseEnd"
);
ch.flush_chunks().await.unwrap();
let lines = read();
assert_eq!(
lines
.iter()
.filter(|l| event_field(l, "event") == "response_end")
.count(),
1,
"flush_chunks must emit exactly one ResponseEnd; got: {lines:?}"
);
}
#[tokio::test]
async fn send_after_send_chunk_then_flush_emits_single_end() {
let (sink, read) = make_test_sink();
let mut ch = JsonCliChannel::new(Arc::clone(&sink), false);
ch.send_chunk("a").await.unwrap();
ch.send("b").await.unwrap();
ch.flush_chunks().await.unwrap();
let lines = read();
assert_eq!(
lines.len(),
3,
"expected chunk(a), chunk(b), response_end; got: {lines:?}"
);
assert_eq!(event_field(&lines[0], "event"), "response_chunk");
assert_eq!(event_field(&lines[1], "event"), "response_chunk");
assert_eq!(event_field(&lines[2], "event"), "response_end");
}
#[tokio::test]
async fn two_sequential_sends_with_flush_emit_two_ends() {
let (sink, read) = make_test_sink();
let mut ch = JsonCliChannel::new(Arc::clone(&sink), false);
ch.send("first").await.unwrap();
ch.flush_chunks().await.unwrap();
ch.send("second").await.unwrap();
ch.flush_chunks().await.unwrap();
let lines = read();
assert_eq!(lines.len(), 4);
assert_eq!(event_field(&lines[1], "event"), "response_end");
assert_eq!(event_field(&lines[3], "event"), "response_end");
}
#[tokio::test]
async fn send_status_ok() {
let (sink, _read) = make_test_sink();
let mut ch = JsonCliChannel::new(Arc::clone(&sink), false);
assert!(ch.send_status("working…").await.is_ok());
}
#[tokio::test]
async fn no_ops_do_not_error() {
use zeph_core::channel::{ToolOutputEvent, ToolStartEvent};
let (sink, _read) = make_test_sink();
let mut ch = JsonCliChannel::new(Arc::clone(&sink), false);
assert!(ch.send_typing().await.is_ok());
assert!(ch.send_thinking_chunk("...").await.is_ok());
assert!(
ch.send_tool_start(ToolStartEvent {
tool_name: "shell".into(),
tool_call_id: "x".into(),
params: None,
parent_tool_use_id: None,
started_at: std::time::Instant::now(),
speculative: false,
sandbox_profile: None,
})
.await
.is_ok()
);
assert!(
ch.send_tool_output(ToolOutputEvent {
tool_name: "shell".into(),
display: "ok".into(),
diff: None,
filter_stats: None,
kept_lines: None,
locations: None,
tool_call_id: "x".into(),
is_error: false,
terminal_id: None,
parent_tool_use_id: None,
raw_response: None,
started_at: None,
})
.await
.is_ok()
);
assert!(ch.send_usage(100, 50, 200_000).await.is_ok());
assert!(ch.send_stop_hint(StopHint::MaxTokens).await.is_ok());
}
#[test]
fn supports_exit_is_true() {
let (sink, _) = make_test_sink();
let ch = JsonCliChannel::new(sink, false);
assert!(ch.supports_exit());
}
#[tokio::test]
async fn send_then_marker_chunk_then_flush_emits_single_end() {
let (sink, read) = make_test_sink();
let mut ch = JsonCliChannel::new(Arc::clone(&sink), false);
ch.send("The answer is 42.").await.unwrap();
ch.send_chunk(" [flag]").await.unwrap();
ch.flush_chunks().await.unwrap();
let lines = read();
let end_count = lines
.iter()
.filter(|l| event_field(l, "event") == "response_end")
.count();
assert_eq!(
end_count, 1,
"expected exactly one response_end; got {end_count} in: {lines:?}"
);
}
#[tokio::test]
async fn flag_marker_appended_via_send_chunk_has_single_end() {
let (sink, read) = make_test_sink();
let mut ch = JsonCliChannel::new(Arc::clone(&sink), false);
ch.send_chunk("The answer is 42.").await.unwrap();
ch.send_chunk(" [verify]").await.unwrap();
ch.flush_chunks().await.unwrap();
let lines = read();
assert_eq!(lines.len(), 3, "expected 2 chunks + 1 end; got: {lines:?}");
assert_eq!(event_field(&lines[0], "event"), "response_chunk");
assert_eq!(event_field(&lines[1], "event"), "response_chunk");
assert_eq!(event_field(&lines[2], "event"), "response_end");
}
#[test]
fn try_recv_returns_none_when_no_input() {
let (sink, _) = make_test_sink();
let mut ch = JsonCliChannel::new(sink, false);
assert!(ch.try_recv().is_none());
}
}