Skip to main content

zeph_core/
channel.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4/// Typed error for channel operations.
5#[derive(Debug, thiserror::Error)]
6pub enum ChannelError {
7    /// Underlying I/O failure.
8    #[error("I/O error: {0}")]
9    Io(#[from] std::io::Error),
10
11    /// Channel closed (mpsc send/recv failure).
12    #[error("channel closed")]
13    ChannelClosed,
14
15    /// Confirmation dialog cancelled.
16    #[error("confirmation cancelled")]
17    ConfirmCancelled,
18
19    /// Catch-all for provider-specific errors.
20    #[error("{0}")]
21    Other(String),
22}
23
24/// All fields that describe a tool-start event sent to a channel.
25#[derive(Debug)]
26pub struct ToolStartEvent<'a> {
27    pub tool_name: &'a str,
28    pub tool_call_id: &'a str,
29    pub params: Option<serde_json::Value>,
30    pub parent_tool_use_id: Option<String>,
31}
32
33/// All fields that describe a completed tool-output event sent to a channel.
34#[derive(Debug)]
35pub struct ToolOutputEvent<'a> {
36    pub tool_name: &'a str,
37    pub body: &'a str,
38    pub diff: Option<crate::DiffData>,
39    pub filter_stats: Option<String>,
40    pub kept_lines: Option<Vec<usize>>,
41    pub locations: Option<Vec<String>>,
42    pub tool_call_id: &'a str,
43    pub is_error: bool,
44    pub parent_tool_use_id: Option<String>,
45    pub raw_response: Option<serde_json::Value>,
46    pub started_at: Option<std::time::Instant>,
47}
48
49/// Kind of binary attachment on an incoming message.
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51pub enum AttachmentKind {
52    Audio,
53    Image,
54    Video,
55    File,
56}
57
58/// Binary attachment carried by a [`ChannelMessage`].
59#[derive(Debug, Clone)]
60pub struct Attachment {
61    pub kind: AttachmentKind,
62    pub data: Vec<u8>,
63    pub filename: Option<String>,
64}
65
66/// Incoming message from a channel.
67#[derive(Debug, Clone)]
68pub struct ChannelMessage {
69    pub text: String,
70    pub attachments: Vec<Attachment>,
71}
72
73/// Bidirectional communication channel for the agent.
74pub trait Channel: Send {
75    /// Receive the next message. Returns `None` on EOF or shutdown.
76    ///
77    /// # Errors
78    ///
79    /// Returns an error if the underlying I/O fails.
80    fn recv(&mut self)
81    -> impl Future<Output = Result<Option<ChannelMessage>, ChannelError>> + Send;
82
83    /// Non-blocking receive. Returns `None` if no message is immediately available.
84    fn try_recv(&mut self) -> Option<ChannelMessage> {
85        None
86    }
87
88    /// Whether `/exit` and `/quit` commands should terminate the agent loop.
89    ///
90    /// Returns `false` for persistent server-side channels (e.g. Telegram) where
91    /// breaking the loop would not meaningfully exit from the user's perspective.
92    fn supports_exit(&self) -> bool {
93        true
94    }
95
96    /// Send a text response.
97    ///
98    /// # Errors
99    ///
100    /// Returns an error if the underlying I/O fails.
101    fn send(&mut self, text: &str) -> impl Future<Output = Result<(), ChannelError>> + Send;
102
103    /// Send a partial chunk of streaming response.
104    ///
105    /// # Errors
106    ///
107    /// Returns an error if the underlying I/O fails.
108    fn send_chunk(&mut self, chunk: &str) -> impl Future<Output = Result<(), ChannelError>> + Send;
109
110    /// Flush any buffered chunks.
111    ///
112    /// # Errors
113    ///
114    /// Returns an error if the underlying I/O fails.
115    fn flush_chunks(&mut self) -> impl Future<Output = Result<(), ChannelError>> + Send;
116
117    /// Send a typing indicator. No-op by default.
118    ///
119    /// # Errors
120    ///
121    /// Returns an error if the underlying I/O fails.
122    fn send_typing(&mut self) -> impl Future<Output = Result<(), ChannelError>> + Send {
123        async { Ok(()) }
124    }
125
126    /// Send a status label (shown as spinner text in TUI). No-op by default.
127    ///
128    /// # Errors
129    ///
130    /// Returns an error if the underlying I/O fails.
131    fn send_status(
132        &mut self,
133        _text: &str,
134    ) -> impl Future<Output = Result<(), ChannelError>> + Send {
135        async { Ok(()) }
136    }
137
138    /// Send a thinking/reasoning token chunk. No-op by default.
139    ///
140    /// # Errors
141    ///
142    /// Returns an error if the underlying I/O fails.
143    fn send_thinking_chunk(
144        &mut self,
145        _chunk: &str,
146    ) -> impl Future<Output = Result<(), ChannelError>> + Send {
147        async { Ok(()) }
148    }
149
150    /// Notify channel of queued message count. No-op by default.
151    ///
152    /// # Errors
153    ///
154    /// Returns an error if the underlying I/O fails.
155    fn send_queue_count(
156        &mut self,
157        _count: usize,
158    ) -> impl Future<Output = Result<(), ChannelError>> + Send {
159        async { Ok(()) }
160    }
161
162    /// Send token usage after an LLM call. No-op by default.
163    ///
164    /// # Errors
165    ///
166    /// Returns an error if the underlying I/O fails.
167    fn send_usage(
168        &mut self,
169        _input_tokens: u64,
170        _output_tokens: u64,
171        _context_window: u64,
172    ) -> impl Future<Output = Result<(), ChannelError>> + Send {
173        async { Ok(()) }
174    }
175
176    /// Send diff data for a tool result. No-op by default (TUI overrides).
177    ///
178    /// # Errors
179    ///
180    /// Returns an error if the underlying I/O fails.
181    fn send_diff(
182        &mut self,
183        _diff: crate::DiffData,
184    ) -> impl Future<Output = Result<(), ChannelError>> + Send {
185        async { Ok(()) }
186    }
187
188    /// Announce that a tool call is starting.
189    ///
190    /// Emitted before execution begins so the transport layer can send an
191    /// `InProgress` status to the peer before the result arrives.
192    /// No-op by default.
193    ///
194    /// # Errors
195    ///
196    /// Returns an error if the underlying I/O fails.
197    fn send_tool_start(
198        &mut self,
199        _event: ToolStartEvent<'_>,
200    ) -> impl Future<Output = Result<(), ChannelError>> + Send {
201        async { Ok(()) }
202    }
203
204    /// Send a complete tool output with optional diff and filter stats atomically.
205    ///
206    /// `body` is the raw tool output content (no header). The default implementation
207    /// formats it with `[tool output: <name>]` prefix for human-readable channels.
208    /// Structured channels (e.g. `LoopbackChannel`) override this to emit a typed event
209    /// so consumers can access `tool_name` and `body` as separate fields.
210    ///
211    /// # Errors
212    ///
213    /// Returns an error if the underlying I/O fails.
214    fn send_tool_output(
215        &mut self,
216        event: ToolOutputEvent<'_>,
217    ) -> impl Future<Output = Result<(), ChannelError>> + Send {
218        let formatted = crate::agent::format_tool_output(event.tool_name, event.body);
219        async move { self.send(&formatted).await }
220    }
221
222    /// Request user confirmation for a destructive action. Returns `true` if confirmed.
223    /// Default: auto-confirm (for headless/test scenarios).
224    ///
225    /// # Errors
226    ///
227    /// Returns an error if the underlying I/O fails.
228    fn confirm(
229        &mut self,
230        _prompt: &str,
231    ) -> impl Future<Output = Result<bool, ChannelError>> + Send {
232        async { Ok(true) }
233    }
234
235    /// Signal the non-default stop reason to the consumer before flushing.
236    ///
237    /// Called by the agent loop immediately before `flush_chunks()` when a
238    /// truncation or turn-limit condition is detected. No-op by default.
239    ///
240    /// # Errors
241    ///
242    /// Returns an error if the underlying I/O fails.
243    fn send_stop_hint(
244        &mut self,
245        _hint: StopHint,
246    ) -> impl Future<Output = Result<(), ChannelError>> + Send {
247        async { Ok(()) }
248    }
249}
250
251/// Reason why the agent turn ended — carried by [`LoopbackEvent::Stop`].
252///
253/// Emitted by the agent loop immediately before `Flush` when a non-default
254/// terminal condition is detected. Consumers (e.g. the ACP layer) map this to
255/// the protocol-level `StopReason`.
256#[derive(Debug, Clone, Copy, PartialEq, Eq)]
257pub enum StopHint {
258    /// The LLM response was cut off by the token limit.
259    MaxTokens,
260    /// The turn loop exhausted `max_turns` without a final text response.
261    MaxTurnRequests,
262}
263
264/// Events emitted by the agent side toward the A2A caller.
265#[derive(Debug, Clone)]
266pub enum LoopbackEvent {
267    Chunk(String),
268    Flush,
269    FullMessage(String),
270    Status(String),
271    /// Emitted immediately before tool execution begins.
272    ToolStart {
273        tool_name: String,
274        tool_call_id: String,
275        /// Raw input parameters passed to the tool (e.g. `{"command": "..."}` for bash).
276        params: Option<serde_json::Value>,
277        /// Set when this tool call is made by a subagent; identifies the parent's `tool_call_id`.
278        parent_tool_use_id: Option<String>,
279        /// Wall-clock instant when the tool call was initiated; used to compute elapsed time.
280        started_at: std::time::Instant,
281    },
282    ToolOutput {
283        tool_name: String,
284        display: String,
285        diff: Option<crate::DiffData>,
286        filter_stats: Option<String>,
287        kept_lines: Option<Vec<usize>>,
288        locations: Option<Vec<String>>,
289        tool_call_id: String,
290        is_error: bool,
291        /// Terminal ID for shell tool calls routed through the IDE terminal.
292        terminal_id: Option<String>,
293        /// Set when this tool output belongs to a subagent; identifies the parent's `tool_call_id`.
294        parent_tool_use_id: Option<String>,
295        /// Structured tool response payload for ACP intermediate `tool_call_update` notifications.
296        raw_response: Option<serde_json::Value>,
297        /// Wall-clock instant when the corresponding `ToolStart` was emitted; used for elapsed time.
298        started_at: Option<std::time::Instant>,
299    },
300    /// Token usage from the last LLM turn.
301    Usage {
302        input_tokens: u64,
303        output_tokens: u64,
304        context_window: u64,
305    },
306    /// Generated session title (emitted after the first agent response).
307    SessionTitle(String),
308    /// Execution plan update.
309    Plan(Vec<(String, PlanItemStatus)>),
310    /// Thinking/reasoning token chunk from the LLM.
311    ThinkingChunk(String),
312    /// Non-default stop condition detected by the agent loop.
313    ///
314    /// Emitted immediately before `Flush`. When absent, the stop reason is `EndTurn`.
315    Stop(StopHint),
316}
317
318/// Status of a plan item, mirroring `acp::PlanEntryStatus`.
319#[derive(Debug, Clone)]
320pub enum PlanItemStatus {
321    Pending,
322    InProgress,
323    Completed,
324}
325
326/// Caller-side handle for sending input and receiving agent output.
327pub struct LoopbackHandle {
328    pub input_tx: tokio::sync::mpsc::Sender<ChannelMessage>,
329    pub output_rx: tokio::sync::mpsc::Receiver<LoopbackEvent>,
330    /// Shared cancel signal: notify to interrupt the agent's current operation.
331    pub cancel_signal: std::sync::Arc<tokio::sync::Notify>,
332}
333
334/// Headless channel bridging an A2A `TaskProcessor` with the agent loop.
335pub struct LoopbackChannel {
336    input_rx: tokio::sync::mpsc::Receiver<ChannelMessage>,
337    output_tx: tokio::sync::mpsc::Sender<LoopbackEvent>,
338}
339
340impl LoopbackChannel {
341    /// Create a linked `(LoopbackChannel, LoopbackHandle)` pair.
342    #[must_use]
343    pub fn pair(buffer: usize) -> (Self, LoopbackHandle) {
344        let (input_tx, input_rx) = tokio::sync::mpsc::channel(buffer);
345        let (output_tx, output_rx) = tokio::sync::mpsc::channel(buffer);
346        let cancel_signal = std::sync::Arc::new(tokio::sync::Notify::new());
347        (
348            Self {
349                input_rx,
350                output_tx,
351            },
352            LoopbackHandle {
353                input_tx,
354                output_rx,
355                cancel_signal,
356            },
357        )
358    }
359}
360
361impl Channel for LoopbackChannel {
362    fn supports_exit(&self) -> bool {
363        false
364    }
365
366    async fn recv(&mut self) -> Result<Option<ChannelMessage>, ChannelError> {
367        Ok(self.input_rx.recv().await)
368    }
369
370    async fn send(&mut self, text: &str) -> Result<(), ChannelError> {
371        self.output_tx
372            .send(LoopbackEvent::FullMessage(text.to_owned()))
373            .await
374            .map_err(|_| ChannelError::ChannelClosed)
375    }
376
377    async fn send_chunk(&mut self, chunk: &str) -> Result<(), ChannelError> {
378        self.output_tx
379            .send(LoopbackEvent::Chunk(chunk.to_owned()))
380            .await
381            .map_err(|_| ChannelError::ChannelClosed)
382    }
383
384    async fn flush_chunks(&mut self) -> Result<(), ChannelError> {
385        self.output_tx
386            .send(LoopbackEvent::Flush)
387            .await
388            .map_err(|_| ChannelError::ChannelClosed)
389    }
390
391    async fn send_status(&mut self, text: &str) -> Result<(), ChannelError> {
392        self.output_tx
393            .send(LoopbackEvent::Status(text.to_owned()))
394            .await
395            .map_err(|_| ChannelError::ChannelClosed)
396    }
397
398    async fn send_thinking_chunk(&mut self, chunk: &str) -> Result<(), ChannelError> {
399        self.output_tx
400            .send(LoopbackEvent::ThinkingChunk(chunk.to_owned()))
401            .await
402            .map_err(|_| ChannelError::ChannelClosed)
403    }
404
405    async fn send_tool_start(&mut self, event: ToolStartEvent<'_>) -> Result<(), ChannelError> {
406        self.output_tx
407            .send(LoopbackEvent::ToolStart {
408                tool_name: event.tool_name.to_owned(),
409                tool_call_id: event.tool_call_id.to_owned(),
410                params: event.params,
411                parent_tool_use_id: event.parent_tool_use_id,
412                started_at: std::time::Instant::now(),
413            })
414            .await
415            .map_err(|_| ChannelError::ChannelClosed)
416    }
417
418    async fn send_tool_output(&mut self, event: ToolOutputEvent<'_>) -> Result<(), ChannelError> {
419        self.output_tx
420            .send(LoopbackEvent::ToolOutput {
421                tool_name: event.tool_name.to_owned(),
422                display: event.body.to_owned(),
423                diff: event.diff,
424                filter_stats: event.filter_stats,
425                kept_lines: event.kept_lines,
426                locations: event.locations,
427                tool_call_id: event.tool_call_id.to_owned(),
428                is_error: event.is_error,
429                terminal_id: None,
430                parent_tool_use_id: event.parent_tool_use_id,
431                raw_response: event.raw_response,
432                started_at: event.started_at,
433            })
434            .await
435            .map_err(|_| ChannelError::ChannelClosed)
436    }
437
438    async fn confirm(&mut self, _prompt: &str) -> Result<bool, ChannelError> {
439        Ok(true)
440    }
441
442    async fn send_stop_hint(&mut self, hint: StopHint) -> Result<(), ChannelError> {
443        self.output_tx
444            .send(LoopbackEvent::Stop(hint))
445            .await
446            .map_err(|_| ChannelError::ChannelClosed)
447    }
448
449    async fn send_usage(
450        &mut self,
451        input_tokens: u64,
452        output_tokens: u64,
453        context_window: u64,
454    ) -> Result<(), ChannelError> {
455        self.output_tx
456            .send(LoopbackEvent::Usage {
457                input_tokens,
458                output_tokens,
459                context_window,
460            })
461            .await
462            .map_err(|_| ChannelError::ChannelClosed)
463    }
464}
465
466#[cfg(test)]
467mod tests {
468    use super::*;
469
470    #[test]
471    fn channel_message_creation() {
472        let msg = ChannelMessage {
473            text: "hello".to_string(),
474            attachments: vec![],
475        };
476        assert_eq!(msg.text, "hello");
477        assert!(msg.attachments.is_empty());
478    }
479
480    struct StubChannel;
481
482    impl Channel for StubChannel {
483        async fn recv(&mut self) -> Result<Option<ChannelMessage>, ChannelError> {
484            Ok(None)
485        }
486
487        async fn send(&mut self, _text: &str) -> Result<(), ChannelError> {
488            Ok(())
489        }
490
491        async fn send_chunk(&mut self, _chunk: &str) -> Result<(), ChannelError> {
492            Ok(())
493        }
494
495        async fn flush_chunks(&mut self) -> Result<(), ChannelError> {
496            Ok(())
497        }
498    }
499
500    #[tokio::test]
501    async fn send_chunk_default_is_noop() {
502        let mut ch = StubChannel;
503        ch.send_chunk("partial").await.unwrap();
504    }
505
506    #[tokio::test]
507    async fn flush_chunks_default_is_noop() {
508        let mut ch = StubChannel;
509        ch.flush_chunks().await.unwrap();
510    }
511
512    #[tokio::test]
513    async fn stub_channel_confirm_auto_approves() {
514        let mut ch = StubChannel;
515        let result = ch.confirm("Delete everything?").await.unwrap();
516        assert!(result);
517    }
518
519    #[tokio::test]
520    async fn stub_channel_send_typing_default() {
521        let mut ch = StubChannel;
522        ch.send_typing().await.unwrap();
523    }
524
525    #[tokio::test]
526    async fn stub_channel_recv_returns_none() {
527        let mut ch = StubChannel;
528        let msg = ch.recv().await.unwrap();
529        assert!(msg.is_none());
530    }
531
532    #[tokio::test]
533    async fn stub_channel_send_ok() {
534        let mut ch = StubChannel;
535        ch.send("hello").await.unwrap();
536    }
537
538    #[test]
539    fn channel_message_clone() {
540        let msg = ChannelMessage {
541            text: "test".to_string(),
542            attachments: vec![],
543        };
544        let cloned = msg.clone();
545        assert_eq!(cloned.text, "test");
546    }
547
548    #[test]
549    fn channel_message_debug() {
550        let msg = ChannelMessage {
551            text: "debug".to_string(),
552            attachments: vec![],
553        };
554        let debug = format!("{msg:?}");
555        assert!(debug.contains("debug"));
556    }
557
558    #[test]
559    fn attachment_kind_equality() {
560        assert_eq!(AttachmentKind::Audio, AttachmentKind::Audio);
561        assert_ne!(AttachmentKind::Audio, AttachmentKind::Image);
562    }
563
564    #[test]
565    fn attachment_construction() {
566        let a = Attachment {
567            kind: AttachmentKind::Audio,
568            data: vec![0, 1, 2],
569            filename: Some("test.wav".into()),
570        };
571        assert_eq!(a.kind, AttachmentKind::Audio);
572        assert_eq!(a.data.len(), 3);
573        assert_eq!(a.filename.as_deref(), Some("test.wav"));
574    }
575
576    #[test]
577    fn channel_message_with_attachments() {
578        let msg = ChannelMessage {
579            text: String::new(),
580            attachments: vec![Attachment {
581                kind: AttachmentKind::Audio,
582                data: vec![42],
583                filename: None,
584            }],
585        };
586        assert_eq!(msg.attachments.len(), 1);
587        assert_eq!(msg.attachments[0].kind, AttachmentKind::Audio);
588    }
589
590    #[test]
591    fn stub_channel_try_recv_returns_none() {
592        let mut ch = StubChannel;
593        assert!(ch.try_recv().is_none());
594    }
595
596    #[tokio::test]
597    async fn stub_channel_send_queue_count_noop() {
598        let mut ch = StubChannel;
599        ch.send_queue_count(5).await.unwrap();
600    }
601
602    // LoopbackChannel tests
603
604    #[test]
605    fn loopback_pair_returns_linked_handles() {
606        let (channel, handle) = LoopbackChannel::pair(8);
607        // Both sides exist and channels are connected via their sender capacity
608        drop(channel);
609        drop(handle);
610    }
611
612    #[tokio::test]
613    async fn loopback_cancel_signal_can_be_notified_and_awaited() {
614        let (_channel, handle) = LoopbackChannel::pair(8);
615        let signal = std::sync::Arc::clone(&handle.cancel_signal);
616        // Notify from one side, await on the other.
617        let notified = signal.notified();
618        handle.cancel_signal.notify_one();
619        notified.await; // resolves immediately after notify_one()
620    }
621
622    #[tokio::test]
623    async fn loopback_cancel_signal_shared_across_clones() {
624        let (_channel, handle) = LoopbackChannel::pair(8);
625        let signal_a = std::sync::Arc::clone(&handle.cancel_signal);
626        let signal_b = std::sync::Arc::clone(&handle.cancel_signal);
627        let notified = signal_b.notified();
628        signal_a.notify_one();
629        notified.await;
630    }
631
632    #[tokio::test]
633    async fn loopback_send_recv_round_trip() {
634        let (mut channel, handle) = LoopbackChannel::pair(8);
635        handle
636            .input_tx
637            .send(ChannelMessage {
638                text: "hello".to_owned(),
639                attachments: vec![],
640            })
641            .await
642            .unwrap();
643        let msg = channel.recv().await.unwrap().unwrap();
644        assert_eq!(msg.text, "hello");
645    }
646
647    #[tokio::test]
648    async fn loopback_recv_returns_none_when_handle_dropped() {
649        let (mut channel, handle) = LoopbackChannel::pair(8);
650        drop(handle);
651        let result = channel.recv().await.unwrap();
652        assert!(result.is_none());
653    }
654
655    #[tokio::test]
656    async fn loopback_send_produces_full_message_event() {
657        let (mut channel, mut handle) = LoopbackChannel::pair(8);
658        channel.send("world").await.unwrap();
659        let event = handle.output_rx.recv().await.unwrap();
660        assert!(matches!(event, LoopbackEvent::FullMessage(t) if t == "world"));
661    }
662
663    #[tokio::test]
664    async fn loopback_send_chunk_then_flush() {
665        let (mut channel, mut handle) = LoopbackChannel::pair(8);
666        channel.send_chunk("part1").await.unwrap();
667        channel.flush_chunks().await.unwrap();
668        let ev1 = handle.output_rx.recv().await.unwrap();
669        let ev2 = handle.output_rx.recv().await.unwrap();
670        assert!(matches!(ev1, LoopbackEvent::Chunk(t) if t == "part1"));
671        assert!(matches!(ev2, LoopbackEvent::Flush));
672    }
673
674    #[tokio::test]
675    async fn loopback_send_tool_output() {
676        let (mut channel, mut handle) = LoopbackChannel::pair(8);
677        channel
678            .send_tool_output(ToolOutputEvent {
679                tool_name: "bash",
680                body: "exit 0",
681                diff: None,
682                filter_stats: None,
683                kept_lines: None,
684                locations: None,
685                tool_call_id: "",
686                is_error: false,
687                parent_tool_use_id: None,
688                raw_response: None,
689                started_at: None,
690            })
691            .await
692            .unwrap();
693        let event = handle.output_rx.recv().await.unwrap();
694        match event {
695            LoopbackEvent::ToolOutput {
696                tool_name,
697                display,
698                diff,
699                filter_stats,
700                kept_lines,
701                locations,
702                tool_call_id,
703                is_error,
704                terminal_id,
705                parent_tool_use_id,
706                raw_response,
707                ..
708            } => {
709                assert_eq!(tool_name, "bash");
710                assert_eq!(display, "exit 0");
711                assert!(diff.is_none());
712                assert!(filter_stats.is_none());
713                assert!(kept_lines.is_none());
714                assert!(locations.is_none());
715                assert_eq!(tool_call_id, "");
716                assert!(!is_error);
717                assert!(terminal_id.is_none());
718                assert!(parent_tool_use_id.is_none());
719                assert!(raw_response.is_none());
720            }
721            _ => panic!("expected ToolOutput event"),
722        }
723    }
724
725    #[tokio::test]
726    async fn loopback_confirm_auto_approves() {
727        let (mut channel, _handle) = LoopbackChannel::pair(8);
728        let result = channel.confirm("are you sure?").await.unwrap();
729        assert!(result);
730    }
731
732    #[tokio::test]
733    async fn loopback_send_error_when_output_closed() {
734        let (mut channel, handle) = LoopbackChannel::pair(8);
735        // Drop only the output_rx side by dropping the handle
736        drop(handle);
737        let result = channel.send("too late").await;
738        assert!(matches!(result, Err(ChannelError::ChannelClosed)));
739    }
740
741    #[tokio::test]
742    async fn loopback_send_chunk_error_when_output_closed() {
743        let (mut channel, handle) = LoopbackChannel::pair(8);
744        drop(handle);
745        let result = channel.send_chunk("chunk").await;
746        assert!(matches!(result, Err(ChannelError::ChannelClosed)));
747    }
748
749    #[tokio::test]
750    async fn loopback_flush_error_when_output_closed() {
751        let (mut channel, handle) = LoopbackChannel::pair(8);
752        drop(handle);
753        let result = channel.flush_chunks().await;
754        assert!(matches!(result, Err(ChannelError::ChannelClosed)));
755    }
756
757    #[tokio::test]
758    async fn loopback_send_status_event() {
759        let (mut channel, mut handle) = LoopbackChannel::pair(8);
760        channel.send_status("working...").await.unwrap();
761        let event = handle.output_rx.recv().await.unwrap();
762        assert!(matches!(event, LoopbackEvent::Status(s) if s == "working..."));
763    }
764
765    #[tokio::test]
766    async fn loopback_send_usage_produces_usage_event() {
767        let (mut channel, mut handle) = LoopbackChannel::pair(8);
768        channel.send_usage(100, 50, 200_000).await.unwrap();
769        let event = handle.output_rx.recv().await.unwrap();
770        match event {
771            LoopbackEvent::Usage {
772                input_tokens,
773                output_tokens,
774                context_window,
775            } => {
776                assert_eq!(input_tokens, 100);
777                assert_eq!(output_tokens, 50);
778                assert_eq!(context_window, 200_000);
779            }
780            _ => panic!("expected Usage event"),
781        }
782    }
783
784    #[tokio::test]
785    async fn loopback_send_usage_error_when_closed() {
786        let (mut channel, handle) = LoopbackChannel::pair(8);
787        drop(handle);
788        let result = channel.send_usage(1, 2, 3).await;
789        assert!(matches!(result, Err(ChannelError::ChannelClosed)));
790    }
791
792    #[test]
793    fn plan_item_status_variants_are_distinct() {
794        assert!(!matches!(
795            PlanItemStatus::Pending,
796            PlanItemStatus::InProgress
797        ));
798        assert!(!matches!(
799            PlanItemStatus::InProgress,
800            PlanItemStatus::Completed
801        ));
802        assert!(!matches!(
803            PlanItemStatus::Completed,
804            PlanItemStatus::Pending
805        ));
806    }
807
808    #[test]
809    fn loopback_event_session_title_carries_string() {
810        let event = LoopbackEvent::SessionTitle("hello".to_owned());
811        assert!(matches!(event, LoopbackEvent::SessionTitle(s) if s == "hello"));
812    }
813
814    #[test]
815    fn loopback_event_plan_carries_entries() {
816        let entries = vec![
817            ("step 1".to_owned(), PlanItemStatus::Pending),
818            ("step 2".to_owned(), PlanItemStatus::InProgress),
819        ];
820        let event = LoopbackEvent::Plan(entries);
821        match event {
822            LoopbackEvent::Plan(e) => {
823                assert_eq!(e.len(), 2);
824                assert!(matches!(e[0].1, PlanItemStatus::Pending));
825                assert!(matches!(e[1].1, PlanItemStatus::InProgress));
826            }
827            _ => panic!("expected Plan event"),
828        }
829    }
830
831    #[tokio::test]
832    async fn loopback_send_tool_start_produces_tool_start_event() {
833        let (mut channel, mut handle) = LoopbackChannel::pair(8);
834        channel
835            .send_tool_start(ToolStartEvent {
836                tool_name: "shell",
837                tool_call_id: "tc-001",
838                params: Some(serde_json::json!({"command": "ls"})),
839                parent_tool_use_id: None,
840            })
841            .await
842            .unwrap();
843        let event = handle.output_rx.recv().await.unwrap();
844        match event {
845            LoopbackEvent::ToolStart {
846                tool_name,
847                tool_call_id,
848                params,
849                parent_tool_use_id,
850                ..
851            } => {
852                assert_eq!(tool_name, "shell");
853                assert_eq!(tool_call_id, "tc-001");
854                assert!(params.is_some());
855                assert!(parent_tool_use_id.is_none());
856            }
857            _ => panic!("expected ToolStart event"),
858        }
859    }
860
861    #[tokio::test]
862    async fn loopback_send_tool_start_with_parent_id() {
863        let (mut channel, mut handle) = LoopbackChannel::pair(8);
864        channel
865            .send_tool_start(ToolStartEvent {
866                tool_name: "web",
867                tool_call_id: "tc-002",
868                params: None,
869                parent_tool_use_id: Some("parent-123".into()),
870            })
871            .await
872            .unwrap();
873        let event = handle.output_rx.recv().await.unwrap();
874        assert!(matches!(
875            event,
876            LoopbackEvent::ToolStart { parent_tool_use_id: Some(ref id), .. } if id == "parent-123"
877        ));
878    }
879
880    #[tokio::test]
881    async fn loopback_send_tool_start_error_when_output_closed() {
882        let (mut channel, handle) = LoopbackChannel::pair(8);
883        drop(handle);
884        let result = channel
885            .send_tool_start(ToolStartEvent {
886                tool_name: "shell",
887                tool_call_id: "tc-003",
888                params: None,
889                parent_tool_use_id: None,
890            })
891            .await;
892        assert!(matches!(result, Err(ChannelError::ChannelClosed)));
893    }
894
895    #[tokio::test]
896    async fn default_send_tool_output_formats_message() {
897        let mut ch = StubChannel;
898        // Default impl calls self.send() which is a no-op in StubChannel — just verify it doesn't panic.
899        ch.send_tool_output(ToolOutputEvent {
900            tool_name: "bash",
901            body: "hello",
902            diff: None,
903            filter_stats: None,
904            kept_lines: None,
905            locations: None,
906            tool_call_id: "id",
907            is_error: false,
908            parent_tool_use_id: None,
909            raw_response: None,
910            started_at: None,
911        })
912        .await
913        .unwrap();
914    }
915}