use std::collections::VecDeque;
use std::time::Instant;
use zeph_core::channel::{ChannelError, ChannelMessage, ToolOutputEvent};
#[derive(Debug, Clone)]
pub struct CapturedResponse {
pub prompt_index: usize,
pub text: String,
pub elapsed: std::time::Duration,
pub input_tokens: u64,
pub output_tokens: u64,
pub context_window: u64,
}
pub struct BenchmarkChannel {
prompts: VecDeque<String>,
responses: Vec<CapturedResponse>,
tool_outputs: Vec<ToolOutputEvent>,
current_index: usize,
total: usize,
chunk_buffer: String,
chunk_start: Option<Instant>,
pending_input_tokens: u64,
pending_output_tokens: u64,
pending_context_window: u64,
}
impl BenchmarkChannel {
#[must_use]
pub fn new(prompts: Vec<String>) -> Self {
let total = prompts.len();
Self {
prompts: VecDeque::from(prompts),
responses: Vec::new(),
tool_outputs: Vec::new(),
current_index: 0,
total,
chunk_buffer: String::new(),
chunk_start: None,
pending_input_tokens: 0,
pending_output_tokens: 0,
pending_context_window: 0,
}
}
#[must_use]
pub fn from_turns(turns: Vec<crate::scenario::Turn>) -> Self {
use crate::scenario::Role;
let mut prompts = VecDeque::new();
let mut seeded_responses = Vec::new();
let mut prompt_index: usize = 0;
for turn in turns {
match turn.role {
Role::User => {
prompts.push_back(turn.content);
prompt_index += 1;
}
Role::Assistant => {
seeded_responses.push(CapturedResponse {
prompt_index: prompt_index.saturating_sub(1),
text: turn.content,
elapsed: std::time::Duration::ZERO,
input_tokens: 0,
output_tokens: 0,
context_window: 0,
});
}
}
}
let total = prompts.len();
Self {
prompts,
responses: seeded_responses,
tool_outputs: Vec::new(),
current_index: 0,
total,
chunk_buffer: String::new(),
chunk_start: None,
pending_input_tokens: 0,
pending_output_tokens: 0,
pending_context_window: 0,
}
}
#[must_use]
pub fn total(&self) -> usize {
self.total
}
#[must_use]
pub fn into_responses(self) -> Vec<CapturedResponse> {
self.responses
}
#[must_use]
pub fn responses(&self) -> &[CapturedResponse] {
&self.responses
}
#[must_use]
pub fn tool_outputs(&self) -> &[zeph_core::channel::ToolOutputEvent] {
&self.tool_outputs
}
fn flush_chunk_buffer(&mut self) {
if self.chunk_buffer.is_empty() {
return;
}
let elapsed = self
.chunk_start
.map_or(std::time::Duration::ZERO, |s| s.elapsed());
self.responses.push(CapturedResponse {
prompt_index: self.current_index.saturating_sub(1),
text: std::mem::take(&mut self.chunk_buffer),
elapsed,
input_tokens: self.pending_input_tokens,
output_tokens: self.pending_output_tokens,
context_window: self.pending_context_window,
});
self.chunk_start = None;
self.pending_input_tokens = 0;
self.pending_output_tokens = 0;
self.pending_context_window = 0;
}
}
impl zeph_core::channel::Channel for BenchmarkChannel {
async fn recv(&mut self) -> Result<Option<ChannelMessage>, ChannelError> {
match self.prompts.pop_front() {
Some(text) => {
self.current_index += 1;
Ok(Some(ChannelMessage {
text,
attachments: vec![],
is_guest_context: false,
is_from_bot: false,
}))
}
None => Ok(None),
}
}
fn supports_exit(&self) -> bool {
false
}
async fn send(&mut self, text: &str) -> Result<(), ChannelError> {
self.responses.push(CapturedResponse {
prompt_index: self.current_index.saturating_sub(1),
text: text.to_owned(),
elapsed: std::time::Duration::ZERO,
input_tokens: self.pending_input_tokens,
output_tokens: self.pending_output_tokens,
context_window: self.pending_context_window,
});
self.pending_input_tokens = 0;
self.pending_output_tokens = 0;
self.pending_context_window = 0;
Ok(())
}
async fn send_chunk(&mut self, chunk: &str) -> Result<(), ChannelError> {
if self.chunk_start.is_none() {
self.chunk_start = Some(Instant::now());
}
self.chunk_buffer.push_str(chunk);
Ok(())
}
async fn flush_chunks(&mut self) -> Result<(), ChannelError> {
self.flush_chunk_buffer();
Ok(())
}
async fn send_usage(
&mut self,
input_tokens: u64,
output_tokens: u64,
context_window: u64,
) -> Result<(), ChannelError> {
self.pending_input_tokens = input_tokens;
self.pending_output_tokens = output_tokens;
self.pending_context_window = context_window;
Ok(())
}
async fn send_tool_output(&mut self, event: ToolOutputEvent) -> Result<(), ChannelError> {
self.tool_outputs.push(event);
Ok(())
}
}
#[cfg(test)]
mod tests {
use zeph_core::channel::{
Channel, ElicitationField, ElicitationFieldType, ElicitationRequest, ElicitationResponse,
ToolOutputEvent,
};
use super::*;
#[tokio::test]
async fn recv_drains_queue_and_returns_none_when_empty() {
let mut ch = BenchmarkChannel::new(vec!["hello".into(), "world".into()]);
let msg1 = ch.recv().await.unwrap().unwrap();
assert_eq!(msg1.text, "hello");
let msg2 = ch.recv().await.unwrap().unwrap();
assert_eq!(msg2.text, "world");
let msg3 = ch.recv().await.unwrap();
assert!(msg3.is_none());
}
#[tokio::test]
async fn send_accumulates_response() {
let mut ch = BenchmarkChannel::new(vec!["prompt".into()]);
let _ = ch.recv().await.unwrap();
ch.send("response text").await.unwrap();
assert_eq!(ch.responses().len(), 1);
assert_eq!(ch.responses()[0].text, "response text");
}
#[tokio::test]
async fn confirm_returns_true() {
let mut ch = BenchmarkChannel::new(vec![]);
let result = ch.confirm("delete?").await.unwrap();
assert!(result);
}
#[tokio::test]
async fn elicit_returns_declined() {
let mut ch = BenchmarkChannel::new(vec![]);
let req = ElicitationRequest {
server_name: "test-server".into(),
message: "provide input".into(),
fields: vec![ElicitationField {
name: "field".into(),
description: None,
field_type: ElicitationFieldType::String,
required: true,
}],
};
let result = ch.elicit(req).await.unwrap();
assert!(matches!(result, ElicitationResponse::Declined));
}
#[tokio::test]
async fn send_chunk_and_flush_captures_response() {
let mut ch = BenchmarkChannel::new(vec!["p".into()]);
let _ = ch.recv().await.unwrap();
ch.send_chunk("part1").await.unwrap();
ch.send_chunk(" part2").await.unwrap();
ch.flush_chunks().await.unwrap();
assert_eq!(ch.responses().len(), 1);
assert_eq!(ch.responses()[0].text, "part1 part2");
}
#[tokio::test]
async fn supports_exit_returns_false() {
let ch = BenchmarkChannel::new(vec![]);
assert!(!ch.supports_exit());
}
#[tokio::test]
async fn send_usage_captured_on_send() {
let mut ch = BenchmarkChannel::new(vec!["p".into()]);
let _ = ch.recv().await.unwrap();
ch.send_usage(10, 20, 128_000).await.unwrap();
ch.send("answer").await.unwrap();
let r = &ch.responses()[0];
assert_eq!(r.input_tokens, 10);
assert_eq!(r.output_tokens, 20);
assert_eq!(r.context_window, 128_000);
}
#[tokio::test]
async fn send_tool_output_captured_separately_from_responses() {
let mut ch = BenchmarkChannel::new(vec!["p".into()]);
let _ = ch.recv().await.unwrap();
ch.send_tool_output(ToolOutputEvent {
tool_name: "bash".into(),
display: "some tool output".into(),
diff: None,
filter_stats: None,
kept_lines: None,
locations: None,
tool_call_id: "tc-1".into(),
terminal_id: None,
is_error: false,
parent_tool_use_id: None,
raw_response: None,
started_at: None,
})
.await
.unwrap();
assert_eq!(ch.responses().len(), 0);
assert_eq!(ch.tool_outputs().len(), 1);
assert_eq!(ch.tool_outputs()[0].tool_name, "bash");
}
#[test]
fn from_turns_splits_user_and_assistant() {
use crate::scenario::{Role, Turn};
let turns = vec![
Turn {
role: Role::User,
content: "Q1".into(),
},
Turn {
role: Role::Assistant,
content: "A1".into(),
},
Turn {
role: Role::User,
content: "Q2".into(),
},
];
let ch = BenchmarkChannel::from_turns(turns);
assert_eq!(ch.total(), 2);
assert_eq!(ch.responses().len(), 1);
assert_eq!(ch.responses()[0].text, "A1");
}
#[test]
fn from_turns_user_only() {
use crate::scenario::{Role, Turn};
let turns = vec![Turn {
role: Role::User,
content: "Q".into(),
}];
let ch = BenchmarkChannel::from_turns(turns);
assert_eq!(ch.total(), 1);
assert!(ch.responses().is_empty());
}
}