Skip to main content

zeph_bench/
channel.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Headless [`zeph_core::channel::Channel`] implementation for benchmark runs.
5//!
6//! [`BenchmarkChannel`] feeds a pre-loaded prompt queue into the agent loop and captures
7//! each response without requiring a terminal, Telegram bot, or any other real I/O channel.
8//!
9//! Tool output events are captured via [`BenchmarkChannel::tool_outputs`] for Phase 2 scoring
10//! (see [`ToolOutputEvent`] and #4237). They are not added to [`responses`][BenchmarkChannel::responses]
11//! so that tool intermediaries do not corrupt response metrics.
12
13use std::collections::VecDeque;
14use std::time::Instant;
15
16use zeph_core::channel::{ChannelError, ChannelMessage, ToolOutputEvent};
17
18/// A single captured agent response corresponding to one benchmark prompt.
19///
20/// Produced by [`BenchmarkChannel`] after the agent calls [`send`][zeph_core::channel::Channel::send] or
21/// [`flush_chunks`][zeph_core::channel::Channel::flush_chunks] for a given prompt.
22///
23/// # Examples
24///
25/// ```
26/// use zeph_bench::channel::CapturedResponse;
27/// use std::time::Duration;
28///
29/// let r = CapturedResponse {
30///     prompt_index: 0,
31///     text: "42".into(),
32///     elapsed: Duration::from_millis(312),
33///     input_tokens: 120,
34///     output_tokens: 3,
35///     context_window: 128_000,
36/// };
37/// assert_eq!(r.text, "42");
38/// ```
39#[derive(Debug, Clone)]
40pub struct CapturedResponse {
41    /// Zero-based index of the prompt this response corresponds to.
42    pub prompt_index: usize,
43    /// Full text of the agent response (or concatenated streaming chunks).
44    pub text: String,
45    /// Wall-clock time from the first streaming chunk to `flush_chunks`, or
46    /// [`std::time::Duration::ZERO`] for non-streaming `send` calls.
47    pub elapsed: std::time::Duration,
48    /// Input token count reported by the LLM for this turn.
49    pub input_tokens: u64,
50    /// Output token count reported by the LLM for this turn.
51    pub output_tokens: u64,
52    /// Context window size reported by the LLM for this turn.
53    pub context_window: u64,
54}
55
56/// Headless channel that feeds pre-loaded prompts and captures agent responses.
57///
58/// Used by the bench runner to drive the agent loop without a real terminal or
59/// network connection. [`recv`][zeph_core::channel::Channel::recv] drains the prompt
60/// queue; [`send`][zeph_core::channel::Channel::send] and
61/// [`flush_chunks`][zeph_core::channel::Channel::flush_chunks] accumulate responses
62/// into an internal list. Tool outputs are captured separately via [`tool_outputs()`][BenchmarkChannel::tool_outputs].
63///
64/// # Usage
65///
66/// ```no_run
67/// use zeph_bench::BenchmarkChannel;
68///
69/// let prompts = vec!["What year did WWII end?".into()];
70/// let channel = BenchmarkChannel::new(prompts);
71/// assert_eq!(channel.total(), 1);
72/// ```
73///
74/// After the agent loop completes, call [`into_responses`] to consume the channel
75/// and retrieve all captured responses:
76///
77/// ```no_run
78/// # use zeph_bench::BenchmarkChannel;
79/// let channel = BenchmarkChannel::new(vec!["question".into()]);
80/// // ... run agent loop ...
81/// let responses = channel.into_responses();
82/// ```
83///
84/// [`into_responses`]: BenchmarkChannel::into_responses
85pub struct BenchmarkChannel {
86    prompts: VecDeque<String>,
87    responses: Vec<CapturedResponse>,
88    tool_outputs: Vec<ToolOutputEvent>,
89    current_index: usize,
90    total: usize,
91    // Streaming chunk accumulation
92    chunk_buffer: String,
93    chunk_start: Option<Instant>,
94    // Token usage for the current prompt (updated by send_usage)
95    pending_input_tokens: u64,
96    pending_output_tokens: u64,
97    pending_context_window: u64,
98}
99
100impl BenchmarkChannel {
101    /// Create a new channel pre-loaded with `prompts`.
102    ///
103    /// Prompts are fed to the agent one at a time in order via
104    /// [`recv`][zeph_core::channel::Channel::recv]. The channel returns `Ok(None)` once
105    /// all prompts have been drained.
106    ///
107    /// # Examples
108    ///
109    /// ```
110    /// use zeph_bench::BenchmarkChannel;
111    ///
112    /// let ch = BenchmarkChannel::new(vec!["hello".into(), "world".into()]);
113    /// assert_eq!(ch.total(), 2);
114    /// ```
115    #[must_use]
116    pub fn new(prompts: Vec<String>) -> Self {
117        let total = prompts.len();
118        Self {
119            prompts: VecDeque::from(prompts),
120            responses: Vec::new(),
121            tool_outputs: Vec::new(),
122            current_index: 0,
123            total,
124            chunk_buffer: String::new(),
125            chunk_start: None,
126            pending_input_tokens: 0,
127            pending_output_tokens: 0,
128            pending_context_window: 0,
129        }
130    }
131
132    /// Create a channel from a multi-turn scenario history.
133    ///
134    /// User turns are fed to the agent in order via [`recv`][zeph_core::channel::Channel::recv].
135    /// Assistant turns are pre-seeded into [`responses`][BenchmarkChannel::responses] so that
136    /// evaluators and Phase 2 scoring have access to the captured prior context.
137    ///
138    /// # Note
139    ///
140    /// If `turns` contains no [`crate::scenario::Role::User`] turns, [`total`][BenchmarkChannel::total] returns
141    /// `0` and the channel cannot serve as a prompt source. The bench runner rejects this with
142    /// [`BenchError::InvalidFormat`][crate::BenchError] — callers must ensure at least one user
143    /// turn is present.
144    ///
145    /// # Examples
146    ///
147    /// ```
148    /// use zeph_bench::BenchmarkChannel;
149    /// use zeph_bench::scenario::{Role, Turn};
150    ///
151    /// let turns = vec![
152    ///     Turn { role: Role::User, content: "Hello".into() },
153    ///     Turn { role: Role::Assistant, content: "Hi there".into() },
154    ///     Turn { role: Role::User, content: "What year?".into() },
155    /// ];
156    /// let ch = BenchmarkChannel::from_turns(turns);
157    /// assert_eq!(ch.total(), 2); // two user turns
158    /// assert_eq!(ch.responses().len(), 1); // one seeded assistant turn
159    /// ```
160    #[must_use]
161    pub fn from_turns(turns: Vec<crate::scenario::Turn>) -> Self {
162        use crate::scenario::Role;
163
164        let mut prompts = VecDeque::new();
165        let mut seeded_responses = Vec::new();
166        let mut prompt_index: usize = 0;
167
168        for turn in turns {
169            match turn.role {
170                Role::User => {
171                    prompts.push_back(turn.content);
172                    prompt_index += 1;
173                }
174                Role::Assistant => {
175                    seeded_responses.push(CapturedResponse {
176                        prompt_index: prompt_index.saturating_sub(1),
177                        text: turn.content,
178                        elapsed: std::time::Duration::ZERO,
179                        input_tokens: 0,
180                        output_tokens: 0,
181                        context_window: 0,
182                    });
183                }
184            }
185        }
186
187        let total = prompts.len();
188        Self {
189            prompts,
190            responses: seeded_responses,
191            tool_outputs: Vec::new(),
192            current_index: 0,
193            total,
194            chunk_buffer: String::new(),
195            chunk_start: None,
196            pending_input_tokens: 0,
197            pending_output_tokens: 0,
198            pending_context_window: 0,
199        }
200    }
201
202    /// Total number of prompts this channel was initialised with.
203    ///
204    /// # Examples
205    ///
206    /// ```
207    /// use zeph_bench::BenchmarkChannel;
208    ///
209    /// let ch = BenchmarkChannel::new(vec!["a".into(), "b".into(), "c".into()]);
210    /// assert_eq!(ch.total(), 3);
211    /// ```
212    #[must_use]
213    pub fn total(&self) -> usize {
214        self.total
215    }
216
217    /// Consume the channel and return all [`CapturedResponse`]s collected so far.
218    ///
219    /// Call this after the agent loop exits to retrieve every response in prompt order.
220    ///
221    /// # Examples
222    ///
223    /// ```no_run
224    /// use zeph_bench::BenchmarkChannel;
225    ///
226    /// let ch = BenchmarkChannel::new(vec!["question".into()]);
227    /// // ... run agent ...
228    /// let responses = ch.into_responses();
229    /// ```
230    #[must_use]
231    pub fn into_responses(self) -> Vec<CapturedResponse> {
232        self.responses
233    }
234
235    /// Borrow the captured responses without consuming the channel.
236    ///
237    /// # Examples
238    ///
239    /// ```
240    /// use zeph_bench::BenchmarkChannel;
241    ///
242    /// let ch = BenchmarkChannel::new(vec![]);
243    /// assert!(ch.responses().is_empty());
244    /// ```
245    #[must_use]
246    pub fn responses(&self) -> &[CapturedResponse] {
247        &self.responses
248    }
249
250    /// Borrow the tool output events captured during the agent run.
251    ///
252    /// Events are appended by [`send_tool_output`][zeph_core::channel::Channel::send_tool_output]
253    /// and are available for Phase 2 evaluation (#4234) after the agent loop exits.
254    ///
255    /// # Examples
256    ///
257    /// ```
258    /// use zeph_bench::BenchmarkChannel;
259    ///
260    /// let ch = BenchmarkChannel::new(vec![]);
261    /// assert!(ch.tool_outputs().is_empty());
262    /// ```
263    #[must_use]
264    pub fn tool_outputs(&self) -> &[zeph_core::channel::ToolOutputEvent] {
265        &self.tool_outputs
266    }
267
268    fn flush_chunk_buffer(&mut self) {
269        if self.chunk_buffer.is_empty() {
270            return;
271        }
272        let elapsed = self
273            .chunk_start
274            .map_or(std::time::Duration::ZERO, |s| s.elapsed());
275        self.responses.push(CapturedResponse {
276            prompt_index: self.current_index.saturating_sub(1),
277            text: std::mem::take(&mut self.chunk_buffer),
278            elapsed,
279            input_tokens: self.pending_input_tokens,
280            output_tokens: self.pending_output_tokens,
281            context_window: self.pending_context_window,
282        });
283        self.chunk_start = None;
284        self.pending_input_tokens = 0;
285        self.pending_output_tokens = 0;
286        self.pending_context_window = 0;
287    }
288}
289
290impl zeph_core::channel::Channel for BenchmarkChannel {
291    async fn recv(&mut self) -> Result<Option<ChannelMessage>, ChannelError> {
292        match self.prompts.pop_front() {
293            Some(text) => {
294                self.current_index += 1;
295                Ok(Some(ChannelMessage {
296                    text,
297                    attachments: vec![],
298                    is_guest_context: false,
299                    is_from_bot: false,
300                }))
301            }
302            None => Ok(None),
303        }
304    }
305
306    fn supports_exit(&self) -> bool {
307        false
308    }
309
310    async fn send(&mut self, text: &str) -> Result<(), ChannelError> {
311        self.responses.push(CapturedResponse {
312            prompt_index: self.current_index.saturating_sub(1),
313            text: text.to_owned(),
314            elapsed: std::time::Duration::ZERO,
315            input_tokens: self.pending_input_tokens,
316            output_tokens: self.pending_output_tokens,
317            context_window: self.pending_context_window,
318        });
319        self.pending_input_tokens = 0;
320        self.pending_output_tokens = 0;
321        self.pending_context_window = 0;
322        Ok(())
323    }
324
325    async fn send_chunk(&mut self, chunk: &str) -> Result<(), ChannelError> {
326        if self.chunk_start.is_none() {
327            self.chunk_start = Some(Instant::now());
328        }
329        self.chunk_buffer.push_str(chunk);
330        Ok(())
331    }
332
333    async fn flush_chunks(&mut self) -> Result<(), ChannelError> {
334        self.flush_chunk_buffer();
335        Ok(())
336    }
337
338    async fn send_usage(
339        &mut self,
340        input_tokens: u64,
341        output_tokens: u64,
342        context_window: u64,
343    ) -> Result<(), ChannelError> {
344        self.pending_input_tokens = input_tokens;
345        self.pending_output_tokens = output_tokens;
346        self.pending_context_window = context_window;
347        Ok(())
348    }
349
350    async fn send_tool_output(&mut self, event: ToolOutputEvent) -> Result<(), ChannelError> {
351        self.tool_outputs.push(event);
352        Ok(())
353    }
354}
355
356#[cfg(test)]
357mod tests {
358    use zeph_core::channel::{
359        Channel, ElicitationField, ElicitationFieldType, ElicitationRequest, ElicitationResponse,
360        ToolOutputEvent,
361    };
362
363    use super::*;
364
365    #[tokio::test]
366    async fn recv_drains_queue_and_returns_none_when_empty() {
367        let mut ch = BenchmarkChannel::new(vec!["hello".into(), "world".into()]);
368        let msg1 = ch.recv().await.unwrap().unwrap();
369        assert_eq!(msg1.text, "hello");
370        let msg2 = ch.recv().await.unwrap().unwrap();
371        assert_eq!(msg2.text, "world");
372        let msg3 = ch.recv().await.unwrap();
373        assert!(msg3.is_none());
374    }
375
376    #[tokio::test]
377    async fn send_accumulates_response() {
378        let mut ch = BenchmarkChannel::new(vec!["prompt".into()]);
379        let _ = ch.recv().await.unwrap();
380        ch.send("response text").await.unwrap();
381        assert_eq!(ch.responses().len(), 1);
382        assert_eq!(ch.responses()[0].text, "response text");
383    }
384
385    #[tokio::test]
386    async fn confirm_returns_true() {
387        let mut ch = BenchmarkChannel::new(vec![]);
388        let result = ch.confirm("delete?").await.unwrap();
389        assert!(result);
390    }
391
392    #[tokio::test]
393    async fn elicit_returns_declined() {
394        let mut ch = BenchmarkChannel::new(vec![]);
395        let req = ElicitationRequest {
396            server_name: "test-server".into(),
397            message: "provide input".into(),
398            fields: vec![ElicitationField {
399                name: "field".into(),
400                description: None,
401                field_type: ElicitationFieldType::String,
402                required: true,
403            }],
404        };
405        let result = ch.elicit(req).await.unwrap();
406        assert!(matches!(result, ElicitationResponse::Declined));
407    }
408
409    #[tokio::test]
410    async fn send_chunk_and_flush_captures_response() {
411        let mut ch = BenchmarkChannel::new(vec!["p".into()]);
412        let _ = ch.recv().await.unwrap();
413        ch.send_chunk("part1").await.unwrap();
414        ch.send_chunk(" part2").await.unwrap();
415        ch.flush_chunks().await.unwrap();
416        assert_eq!(ch.responses().len(), 1);
417        assert_eq!(ch.responses()[0].text, "part1 part2");
418    }
419
420    #[tokio::test]
421    async fn supports_exit_returns_false() {
422        let ch = BenchmarkChannel::new(vec![]);
423        assert!(!ch.supports_exit());
424    }
425
426    #[tokio::test]
427    async fn send_usage_captured_on_send() {
428        let mut ch = BenchmarkChannel::new(vec!["p".into()]);
429        let _ = ch.recv().await.unwrap();
430        ch.send_usage(10, 20, 128_000).await.unwrap();
431        ch.send("answer").await.unwrap();
432        let r = &ch.responses()[0];
433        assert_eq!(r.input_tokens, 10);
434        assert_eq!(r.output_tokens, 20);
435        assert_eq!(r.context_window, 128_000);
436    }
437
438    #[tokio::test]
439    async fn send_tool_output_captured_separately_from_responses() {
440        let mut ch = BenchmarkChannel::new(vec!["p".into()]);
441        let _ = ch.recv().await.unwrap();
442        ch.send_tool_output(ToolOutputEvent {
443            tool_name: "bash".into(),
444            display: "some tool output".into(),
445            diff: None,
446            filter_stats: None,
447            kept_lines: None,
448            locations: None,
449            tool_call_id: "tc-1".into(),
450            terminal_id: None,
451            is_error: false,
452            parent_tool_use_id: None,
453            raw_response: None,
454            started_at: None,
455        })
456        .await
457        .unwrap();
458        // Tool output must not pollute benchmark responses.
459        assert_eq!(ch.responses().len(), 0);
460        // Tool output must be accessible for Phase 2 scoring.
461        assert_eq!(ch.tool_outputs().len(), 1);
462        assert_eq!(ch.tool_outputs()[0].tool_name, "bash");
463    }
464
465    #[test]
466    fn from_turns_splits_user_and_assistant() {
467        use crate::scenario::{Role, Turn};
468
469        let turns = vec![
470            Turn {
471                role: Role::User,
472                content: "Q1".into(),
473            },
474            Turn {
475                role: Role::Assistant,
476                content: "A1".into(),
477            },
478            Turn {
479                role: Role::User,
480                content: "Q2".into(),
481            },
482        ];
483        let ch = BenchmarkChannel::from_turns(turns);
484        assert_eq!(ch.total(), 2);
485        assert_eq!(ch.responses().len(), 1);
486        assert_eq!(ch.responses()[0].text, "A1");
487    }
488
489    #[test]
490    fn from_turns_user_only() {
491        use crate::scenario::{Role, Turn};
492
493        let turns = vec![Turn {
494            role: Role::User,
495            content: "Q".into(),
496        }];
497        let ch = BenchmarkChannel::from_turns(turns);
498        assert_eq!(ch.total(), 1);
499        assert!(ch.responses().is_empty());
500    }
501}