zeph_bench/channel.rs
1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Headless [`zeph_core::channel::Channel`] implementation for benchmark runs.
5//!
6//! [`BenchmarkChannel`] feeds a pre-loaded prompt queue into the agent loop and captures
7//! each response without requiring a terminal, Telegram bot, or any other real I/O channel.
8//!
9//! Tool output events are captured via [`BenchmarkChannel::tool_outputs`] for Phase 2 scoring
10//! (see [`ToolOutputEvent`] and #4237). They are not added to [`responses`][BenchmarkChannel::responses]
11//! so that tool intermediaries do not corrupt response metrics.
12
13use std::collections::VecDeque;
14use std::time::Instant;
15
16use zeph_core::channel::{ChannelError, ChannelMessage, ToolOutputEvent};
17
18/// A single captured agent response corresponding to one benchmark prompt.
19///
20/// Produced by [`BenchmarkChannel`] after the agent calls [`send`][zeph_core::channel::Channel::send] or
21/// [`flush_chunks`][zeph_core::channel::Channel::flush_chunks] for a given prompt.
22///
23/// # Examples
24///
25/// ```
26/// use zeph_bench::channel::CapturedResponse;
27/// use std::time::Duration;
28///
29/// let r = CapturedResponse {
30/// prompt_index: 0,
31/// text: "42".into(),
32/// elapsed: Duration::from_millis(312),
33/// input_tokens: 120,
34/// output_tokens: 3,
35/// context_window: 128_000,
36/// };
37/// assert_eq!(r.text, "42");
38/// ```
39#[derive(Debug, Clone)]
40pub struct CapturedResponse {
41 /// Zero-based index of the prompt this response corresponds to.
42 pub prompt_index: usize,
43 /// Full text of the agent response (or concatenated streaming chunks).
44 pub text: String,
45 /// Wall-clock time from the first streaming chunk to `flush_chunks`, or
46 /// [`std::time::Duration::ZERO`] for non-streaming `send` calls.
47 pub elapsed: std::time::Duration,
48 /// Input token count reported by the LLM for this turn.
49 pub input_tokens: u64,
50 /// Output token count reported by the LLM for this turn.
51 pub output_tokens: u64,
52 /// Context window size reported by the LLM for this turn.
53 pub context_window: u64,
54}
55
56/// Headless channel that feeds pre-loaded prompts and captures agent responses.
57///
58/// Used by the bench runner to drive the agent loop without a real terminal or
59/// network connection. [`recv`][zeph_core::channel::Channel::recv] drains the prompt
60/// queue; [`send`][zeph_core::channel::Channel::send] and
61/// [`flush_chunks`][zeph_core::channel::Channel::flush_chunks] accumulate responses
62/// into an internal list. Tool outputs are captured separately via [`tool_outputs()`][BenchmarkChannel::tool_outputs].
63///
64/// # Usage
65///
66/// ```no_run
67/// use zeph_bench::BenchmarkChannel;
68///
69/// let prompts = vec!["What year did WWII end?".into()];
70/// let channel = BenchmarkChannel::new(prompts);
71/// assert_eq!(channel.total(), 1);
72/// ```
73///
74/// After the agent loop completes, call [`into_responses`] to consume the channel
75/// and retrieve all captured responses:
76///
77/// ```no_run
78/// # use zeph_bench::BenchmarkChannel;
79/// let channel = BenchmarkChannel::new(vec!["question".into()]);
80/// // ... run agent loop ...
81/// let responses = channel.into_responses();
82/// ```
83///
84/// [`into_responses`]: BenchmarkChannel::into_responses
85pub struct BenchmarkChannel {
86 prompts: VecDeque<String>,
87 responses: Vec<CapturedResponse>,
88 tool_outputs: Vec<ToolOutputEvent>,
89 current_index: usize,
90 total: usize,
91 // Streaming chunk accumulation
92 chunk_buffer: String,
93 chunk_start: Option<Instant>,
94 // Token usage for the current prompt (updated by send_usage)
95 pending_input_tokens: u64,
96 pending_output_tokens: u64,
97 pending_context_window: u64,
98}
99
100impl BenchmarkChannel {
101 /// Create a new channel pre-loaded with `prompts`.
102 ///
103 /// Prompts are fed to the agent one at a time in order via
104 /// [`recv`][zeph_core::channel::Channel::recv]. The channel returns `Ok(None)` once
105 /// all prompts have been drained.
106 ///
107 /// # Examples
108 ///
109 /// ```
110 /// use zeph_bench::BenchmarkChannel;
111 ///
112 /// let ch = BenchmarkChannel::new(vec!["hello".into(), "world".into()]);
113 /// assert_eq!(ch.total(), 2);
114 /// ```
115 #[must_use]
116 pub fn new(prompts: Vec<String>) -> Self {
117 let total = prompts.len();
118 Self {
119 prompts: VecDeque::from(prompts),
120 responses: Vec::new(),
121 tool_outputs: Vec::new(),
122 current_index: 0,
123 total,
124 chunk_buffer: String::new(),
125 chunk_start: None,
126 pending_input_tokens: 0,
127 pending_output_tokens: 0,
128 pending_context_window: 0,
129 }
130 }
131
132 /// Create a channel from a multi-turn scenario history.
133 ///
134 /// User turns are fed to the agent in order via [`recv`][zeph_core::channel::Channel::recv].
135 /// Assistant turns are pre-seeded into [`responses`][BenchmarkChannel::responses] so that
136 /// evaluators and Phase 2 scoring have access to the captured prior context.
137 ///
138 /// # Note
139 ///
140 /// If `turns` contains no [`crate::scenario::Role::User`] turns, [`total`][BenchmarkChannel::total] returns
141 /// `0` and the channel cannot serve as a prompt source. The bench runner rejects this with
142 /// [`BenchError::InvalidFormat`][crate::BenchError] — callers must ensure at least one user
143 /// turn is present.
144 ///
145 /// # Examples
146 ///
147 /// ```
148 /// use zeph_bench::BenchmarkChannel;
149 /// use zeph_bench::scenario::{Role, Turn};
150 ///
151 /// let turns = vec![
152 /// Turn { role: Role::User, content: "Hello".into() },
153 /// Turn { role: Role::Assistant, content: "Hi there".into() },
154 /// Turn { role: Role::User, content: "What year?".into() },
155 /// ];
156 /// let ch = BenchmarkChannel::from_turns(turns);
157 /// assert_eq!(ch.total(), 2); // two user turns
158 /// assert_eq!(ch.responses().len(), 1); // one seeded assistant turn
159 /// ```
160 #[must_use]
161 pub fn from_turns(turns: Vec<crate::scenario::Turn>) -> Self {
162 use crate::scenario::Role;
163
164 let mut prompts = VecDeque::new();
165 let mut seeded_responses = Vec::new();
166 let mut prompt_index: usize = 0;
167
168 for turn in turns {
169 match turn.role {
170 Role::User => {
171 prompts.push_back(turn.content);
172 prompt_index += 1;
173 }
174 Role::Assistant => {
175 seeded_responses.push(CapturedResponse {
176 prompt_index: prompt_index.saturating_sub(1),
177 text: turn.content,
178 elapsed: std::time::Duration::ZERO,
179 input_tokens: 0,
180 output_tokens: 0,
181 context_window: 0,
182 });
183 }
184 }
185 }
186
187 let total = prompts.len();
188 Self {
189 prompts,
190 responses: seeded_responses,
191 tool_outputs: Vec::new(),
192 current_index: 0,
193 total,
194 chunk_buffer: String::new(),
195 chunk_start: None,
196 pending_input_tokens: 0,
197 pending_output_tokens: 0,
198 pending_context_window: 0,
199 }
200 }
201
202 /// Total number of prompts this channel was initialised with.
203 ///
204 /// # Examples
205 ///
206 /// ```
207 /// use zeph_bench::BenchmarkChannel;
208 ///
209 /// let ch = BenchmarkChannel::new(vec!["a".into(), "b".into(), "c".into()]);
210 /// assert_eq!(ch.total(), 3);
211 /// ```
212 #[must_use]
213 pub fn total(&self) -> usize {
214 self.total
215 }
216
217 /// Consume the channel and return all [`CapturedResponse`]s collected so far.
218 ///
219 /// Call this after the agent loop exits to retrieve every response in prompt order.
220 ///
221 /// # Examples
222 ///
223 /// ```no_run
224 /// use zeph_bench::BenchmarkChannel;
225 ///
226 /// let ch = BenchmarkChannel::new(vec!["question".into()]);
227 /// // ... run agent ...
228 /// let responses = ch.into_responses();
229 /// ```
230 #[must_use]
231 pub fn into_responses(self) -> Vec<CapturedResponse> {
232 self.responses
233 }
234
235 /// Borrow the captured responses without consuming the channel.
236 ///
237 /// # Examples
238 ///
239 /// ```
240 /// use zeph_bench::BenchmarkChannel;
241 ///
242 /// let ch = BenchmarkChannel::new(vec![]);
243 /// assert!(ch.responses().is_empty());
244 /// ```
245 #[must_use]
246 pub fn responses(&self) -> &[CapturedResponse] {
247 &self.responses
248 }
249
250 /// Borrow the tool output events captured during the agent run.
251 ///
252 /// Events are appended by [`send_tool_output`][zeph_core::channel::Channel::send_tool_output]
253 /// and are available for Phase 2 evaluation (#4234) after the agent loop exits.
254 ///
255 /// # Examples
256 ///
257 /// ```
258 /// use zeph_bench::BenchmarkChannel;
259 ///
260 /// let ch = BenchmarkChannel::new(vec![]);
261 /// assert!(ch.tool_outputs().is_empty());
262 /// ```
263 #[must_use]
264 pub fn tool_outputs(&self) -> &[zeph_core::channel::ToolOutputEvent] {
265 &self.tool_outputs
266 }
267
268 fn flush_chunk_buffer(&mut self) {
269 if self.chunk_buffer.is_empty() {
270 return;
271 }
272 let elapsed = self
273 .chunk_start
274 .map_or(std::time::Duration::ZERO, |s| s.elapsed());
275 self.responses.push(CapturedResponse {
276 prompt_index: self.current_index.saturating_sub(1),
277 text: std::mem::take(&mut self.chunk_buffer),
278 elapsed,
279 input_tokens: self.pending_input_tokens,
280 output_tokens: self.pending_output_tokens,
281 context_window: self.pending_context_window,
282 });
283 self.chunk_start = None;
284 self.pending_input_tokens = 0;
285 self.pending_output_tokens = 0;
286 self.pending_context_window = 0;
287 }
288}
289
290impl zeph_core::channel::Channel for BenchmarkChannel {
291 async fn recv(&mut self) -> Result<Option<ChannelMessage>, ChannelError> {
292 match self.prompts.pop_front() {
293 Some(text) => {
294 self.current_index += 1;
295 Ok(Some(ChannelMessage {
296 text,
297 attachments: vec![],
298 is_guest_context: false,
299 is_from_bot: false,
300 }))
301 }
302 None => Ok(None),
303 }
304 }
305
306 fn supports_exit(&self) -> bool {
307 false
308 }
309
310 async fn send(&mut self, text: &str) -> Result<(), ChannelError> {
311 self.responses.push(CapturedResponse {
312 prompt_index: self.current_index.saturating_sub(1),
313 text: text.to_owned(),
314 elapsed: std::time::Duration::ZERO,
315 input_tokens: self.pending_input_tokens,
316 output_tokens: self.pending_output_tokens,
317 context_window: self.pending_context_window,
318 });
319 self.pending_input_tokens = 0;
320 self.pending_output_tokens = 0;
321 self.pending_context_window = 0;
322 Ok(())
323 }
324
325 async fn send_chunk(&mut self, chunk: &str) -> Result<(), ChannelError> {
326 if self.chunk_start.is_none() {
327 self.chunk_start = Some(Instant::now());
328 }
329 self.chunk_buffer.push_str(chunk);
330 Ok(())
331 }
332
333 async fn flush_chunks(&mut self) -> Result<(), ChannelError> {
334 self.flush_chunk_buffer();
335 Ok(())
336 }
337
338 async fn send_usage(
339 &mut self,
340 input_tokens: u64,
341 output_tokens: u64,
342 context_window: u64,
343 ) -> Result<(), ChannelError> {
344 self.pending_input_tokens = input_tokens;
345 self.pending_output_tokens = output_tokens;
346 self.pending_context_window = context_window;
347 Ok(())
348 }
349
350 async fn send_tool_output(&mut self, event: ToolOutputEvent) -> Result<(), ChannelError> {
351 self.tool_outputs.push(event);
352 Ok(())
353 }
354}
355
356#[cfg(test)]
357mod tests {
358 use zeph_core::channel::{
359 Channel, ElicitationField, ElicitationFieldType, ElicitationRequest, ElicitationResponse,
360 ToolOutputEvent,
361 };
362
363 use super::*;
364
365 #[tokio::test]
366 async fn recv_drains_queue_and_returns_none_when_empty() {
367 let mut ch = BenchmarkChannel::new(vec!["hello".into(), "world".into()]);
368 let msg1 = ch.recv().await.unwrap().unwrap();
369 assert_eq!(msg1.text, "hello");
370 let msg2 = ch.recv().await.unwrap().unwrap();
371 assert_eq!(msg2.text, "world");
372 let msg3 = ch.recv().await.unwrap();
373 assert!(msg3.is_none());
374 }
375
376 #[tokio::test]
377 async fn send_accumulates_response() {
378 let mut ch = BenchmarkChannel::new(vec!["prompt".into()]);
379 let _ = ch.recv().await.unwrap();
380 ch.send("response text").await.unwrap();
381 assert_eq!(ch.responses().len(), 1);
382 assert_eq!(ch.responses()[0].text, "response text");
383 }
384
385 #[tokio::test]
386 async fn confirm_returns_true() {
387 let mut ch = BenchmarkChannel::new(vec![]);
388 let result = ch.confirm("delete?").await.unwrap();
389 assert!(result);
390 }
391
392 #[tokio::test]
393 async fn elicit_returns_declined() {
394 let mut ch = BenchmarkChannel::new(vec![]);
395 let req = ElicitationRequest {
396 server_name: "test-server".into(),
397 message: "provide input".into(),
398 fields: vec![ElicitationField {
399 name: "field".into(),
400 description: None,
401 field_type: ElicitationFieldType::String,
402 required: true,
403 }],
404 };
405 let result = ch.elicit(req).await.unwrap();
406 assert!(matches!(result, ElicitationResponse::Declined));
407 }
408
409 #[tokio::test]
410 async fn send_chunk_and_flush_captures_response() {
411 let mut ch = BenchmarkChannel::new(vec!["p".into()]);
412 let _ = ch.recv().await.unwrap();
413 ch.send_chunk("part1").await.unwrap();
414 ch.send_chunk(" part2").await.unwrap();
415 ch.flush_chunks().await.unwrap();
416 assert_eq!(ch.responses().len(), 1);
417 assert_eq!(ch.responses()[0].text, "part1 part2");
418 }
419
420 #[tokio::test]
421 async fn supports_exit_returns_false() {
422 let ch = BenchmarkChannel::new(vec![]);
423 assert!(!ch.supports_exit());
424 }
425
426 #[tokio::test]
427 async fn send_usage_captured_on_send() {
428 let mut ch = BenchmarkChannel::new(vec!["p".into()]);
429 let _ = ch.recv().await.unwrap();
430 ch.send_usage(10, 20, 128_000).await.unwrap();
431 ch.send("answer").await.unwrap();
432 let r = &ch.responses()[0];
433 assert_eq!(r.input_tokens, 10);
434 assert_eq!(r.output_tokens, 20);
435 assert_eq!(r.context_window, 128_000);
436 }
437
438 #[tokio::test]
439 async fn send_tool_output_captured_separately_from_responses() {
440 let mut ch = BenchmarkChannel::new(vec!["p".into()]);
441 let _ = ch.recv().await.unwrap();
442 ch.send_tool_output(ToolOutputEvent {
443 tool_name: "bash".into(),
444 display: "some tool output".into(),
445 diff: None,
446 filter_stats: None,
447 kept_lines: None,
448 locations: None,
449 tool_call_id: "tc-1".into(),
450 terminal_id: None,
451 is_error: false,
452 parent_tool_use_id: None,
453 raw_response: None,
454 started_at: None,
455 })
456 .await
457 .unwrap();
458 // Tool output must not pollute benchmark responses.
459 assert_eq!(ch.responses().len(), 0);
460 // Tool output must be accessible for Phase 2 scoring.
461 assert_eq!(ch.tool_outputs().len(), 1);
462 assert_eq!(ch.tool_outputs()[0].tool_name, "bash");
463 }
464
465 #[test]
466 fn from_turns_splits_user_and_assistant() {
467 use crate::scenario::{Role, Turn};
468
469 let turns = vec![
470 Turn {
471 role: Role::User,
472 content: "Q1".into(),
473 },
474 Turn {
475 role: Role::Assistant,
476 content: "A1".into(),
477 },
478 Turn {
479 role: Role::User,
480 content: "Q2".into(),
481 },
482 ];
483 let ch = BenchmarkChannel::from_turns(turns);
484 assert_eq!(ch.total(), 2);
485 assert_eq!(ch.responses().len(), 1);
486 assert_eq!(ch.responses()[0].text, "A1");
487 }
488
489 #[test]
490 fn from_turns_user_only() {
491 use crate::scenario::{Role, Turn};
492
493 let turns = vec![Turn {
494 role: Role::User,
495 content: "Q".into(),
496 }];
497 let ch = BenchmarkChannel::from_turns(turns);
498 assert_eq!(ch.total(), 1);
499 assert!(ch.responses().is_empty());
500 }
501}