Skip to main content

zeph_core/
channel.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4/// A single field in an elicitation form request.
5#[derive(Debug, Clone)]
6pub struct ElicitationField {
7    pub name: String,
8    pub description: Option<String>,
9    pub field_type: ElicitationFieldType,
10    pub required: bool,
11}
12
13/// Type of an elicitation form field.
14#[derive(Debug, Clone)]
15pub enum ElicitationFieldType {
16    String,
17    Integer,
18    Number,
19    Boolean,
20    /// Enum with allowed values.
21    Enum(Vec<String>),
22}
23
24/// An elicitation request from an MCP server.
25#[derive(Debug, Clone)]
26pub struct ElicitationRequest {
27    /// Name of the MCP server making the request (shown for phishing prevention).
28    pub server_name: String,
29    /// Human-readable message from the server.
30    pub message: String,
31    /// Form fields to collect from the user.
32    pub fields: Vec<ElicitationField>,
33}
34
35/// User's response to an elicitation request.
36#[derive(Debug, Clone)]
37pub enum ElicitationResponse {
38    /// User filled in the form and submitted.
39    Accepted(serde_json::Value),
40    /// User actively declined to provide input.
41    Declined,
42    /// User cancelled (e.g. Escape, timeout).
43    Cancelled,
44}
45
46/// Typed error for channel operations.
47#[derive(Debug, thiserror::Error)]
48pub enum ChannelError {
49    /// Underlying I/O failure.
50    #[error("I/O error: {0}")]
51    Io(#[from] std::io::Error),
52
53    /// Channel closed (mpsc send/recv failure).
54    #[error("channel closed")]
55    ChannelClosed,
56
57    /// Confirmation dialog cancelled.
58    #[error("confirmation cancelled")]
59    ConfirmCancelled,
60
61    /// Catch-all for provider-specific errors.
62    #[error("{0}")]
63    Other(String),
64}
65
66impl ChannelError {
67    /// Create a catch-all error from any displayable error.
68    ///
69    /// Converts the error message to a string and wraps it in the `Other` variant.
70    /// Useful for wrapping provider-specific errors from third-party libraries.
71    pub fn other(e: impl std::fmt::Display) -> Self {
72        Self::Other(e.to_string())
73    }
74}
75
76/// Kind of binary attachment on an incoming message.
77#[derive(Debug, Clone, Copy, PartialEq, Eq)]
78pub enum AttachmentKind {
79    Audio,
80    Image,
81    Video,
82    File,
83}
84
85/// Binary attachment carried by a [`ChannelMessage`].
86#[derive(Debug, Clone)]
87pub struct Attachment {
88    pub kind: AttachmentKind,
89    pub data: Vec<u8>,
90    pub filename: Option<String>,
91}
92
93/// Incoming message from a channel.
94#[derive(Debug, Clone)]
95pub struct ChannelMessage {
96    pub text: String,
97    pub attachments: Vec<Attachment>,
98}
99
100/// Bidirectional communication channel for the agent.
101pub trait Channel: Send {
102    /// Receive the next message. Returns `None` on EOF or shutdown.
103    ///
104    /// # Errors
105    ///
106    /// Returns an error if the underlying I/O fails.
107    fn recv(&mut self)
108    -> impl Future<Output = Result<Option<ChannelMessage>, ChannelError>> + Send;
109
110    /// Non-blocking receive. Returns `None` if no message is immediately available.
111    fn try_recv(&mut self) -> Option<ChannelMessage> {
112        None
113    }
114
115    /// Whether `/exit` and `/quit` commands should terminate the agent loop.
116    ///
117    /// Returns `false` for persistent server-side channels (e.g. Telegram) where
118    /// breaking the loop would not meaningfully exit from the user's perspective.
119    fn supports_exit(&self) -> bool {
120        true
121    }
122
123    /// Send a text response.
124    ///
125    /// # Errors
126    ///
127    /// Returns an error if the underlying I/O fails.
128    fn send(&mut self, text: &str) -> impl Future<Output = Result<(), ChannelError>> + Send;
129
130    /// Send a partial chunk of streaming response.
131    ///
132    /// # Errors
133    ///
134    /// Returns an error if the underlying I/O fails.
135    fn send_chunk(&mut self, chunk: &str) -> impl Future<Output = Result<(), ChannelError>> + Send;
136
137    /// Flush any buffered chunks.
138    ///
139    /// # Errors
140    ///
141    /// Returns an error if the underlying I/O fails.
142    fn flush_chunks(&mut self) -> impl Future<Output = Result<(), ChannelError>> + Send;
143
144    /// Send a typing indicator. No-op by default.
145    ///
146    /// # Errors
147    ///
148    /// Returns an error if the underlying I/O fails.
149    fn send_typing(&mut self) -> impl Future<Output = Result<(), ChannelError>> + Send {
150        async { Ok(()) }
151    }
152
153    /// Send a status label (shown as spinner text in TUI). No-op by default.
154    ///
155    /// # Errors
156    ///
157    /// Returns an error if the underlying I/O fails.
158    fn send_status(
159        &mut self,
160        _text: &str,
161    ) -> impl Future<Output = Result<(), ChannelError>> + Send {
162        async { Ok(()) }
163    }
164
165    /// Send a thinking/reasoning token chunk. No-op by default.
166    ///
167    /// # Errors
168    ///
169    /// Returns an error if the underlying I/O fails.
170    fn send_thinking_chunk(
171        &mut self,
172        _chunk: &str,
173    ) -> impl Future<Output = Result<(), ChannelError>> + Send {
174        async { Ok(()) }
175    }
176
177    /// Notify channel of queued message count. No-op by default.
178    ///
179    /// # Errors
180    ///
181    /// Returns an error if the underlying I/O fails.
182    fn send_queue_count(
183        &mut self,
184        _count: usize,
185    ) -> impl Future<Output = Result<(), ChannelError>> + Send {
186        async { Ok(()) }
187    }
188
189    /// Send token usage after an LLM call. No-op by default.
190    ///
191    /// # Errors
192    ///
193    /// Returns an error if the underlying I/O fails.
194    fn send_usage(
195        &mut self,
196        _input_tokens: u64,
197        _output_tokens: u64,
198        _context_window: u64,
199    ) -> impl Future<Output = Result<(), ChannelError>> + Send {
200        async { Ok(()) }
201    }
202
203    /// Send diff data for a tool result. No-op by default (TUI overrides).
204    ///
205    /// # Errors
206    ///
207    /// Returns an error if the underlying I/O fails.
208    fn send_diff(
209        &mut self,
210        _diff: crate::DiffData,
211    ) -> impl Future<Output = Result<(), ChannelError>> + Send {
212        async { Ok(()) }
213    }
214
215    /// Announce that a tool call is starting.
216    ///
217    /// Emitted before execution begins so the transport layer can send an
218    /// `InProgress` status to the peer before the result arrives.
219    /// No-op by default.
220    ///
221    /// # Errors
222    ///
223    /// Returns an error if the underlying I/O fails.
224    fn send_tool_start(
225        &mut self,
226        _event: ToolStartEvent,
227    ) -> impl Future<Output = Result<(), ChannelError>> + Send {
228        async { Ok(()) }
229    }
230
231    /// Send a complete tool output with optional diff and filter stats atomically.
232    ///
233    /// `display` is the formatted tool output. The default implementation forwards to
234    /// [`Channel::send`]. Structured channels (e.g. `LoopbackChannel`) override this to
235    /// emit a typed event so consumers can access `tool_name` and `display` as separate fields.
236    ///
237    /// # Errors
238    ///
239    /// Returns an error if the underlying I/O fails.
240    fn send_tool_output(
241        &mut self,
242        event: ToolOutputEvent,
243    ) -> impl Future<Output = Result<(), ChannelError>> + Send {
244        let formatted = crate::agent::format_tool_output(event.tool_name.as_str(), &event.display);
245        async move { self.send(&formatted).await }
246    }
247
248    /// Request user confirmation for a destructive action. Returns `true` if confirmed.
249    /// Default: auto-confirm (for headless/test scenarios).
250    ///
251    /// # Errors
252    ///
253    /// Returns an error if the underlying I/O fails.
254    fn confirm(
255        &mut self,
256        _prompt: &str,
257    ) -> impl Future<Output = Result<bool, ChannelError>> + Send {
258        async { Ok(true) }
259    }
260
261    /// Request structured input from the user for an MCP elicitation.
262    ///
263    /// Always displays `request.server_name` to prevent phishing by malicious servers.
264    /// Default: auto-decline (for headless/daemon/non-interactive scenarios).
265    ///
266    /// # Errors
267    ///
268    /// Returns an error if the underlying I/O fails.
269    fn elicit(
270        &mut self,
271        _request: ElicitationRequest,
272    ) -> impl Future<Output = Result<ElicitationResponse, ChannelError>> + Send {
273        async { Ok(ElicitationResponse::Declined) }
274    }
275
276    /// Signal the non-default stop reason to the consumer before flushing.
277    ///
278    /// Called by the agent loop immediately before `flush_chunks()` when a
279    /// truncation or turn-limit condition is detected. No-op by default.
280    ///
281    /// # Errors
282    ///
283    /// Returns an error if the underlying I/O fails.
284    fn send_stop_hint(
285        &mut self,
286        _hint: StopHint,
287    ) -> impl Future<Output = Result<(), ChannelError>> + Send {
288        async { Ok(()) }
289    }
290}
291
292/// Reason why the agent turn ended — carried by [`LoopbackEvent::Stop`].
293///
294/// Emitted by the agent loop immediately before `Flush` when a non-default
295/// terminal condition is detected. Consumers (e.g. the ACP layer) map this to
296/// the protocol-level `StopReason`.
297#[derive(Debug, Clone, Copy, PartialEq, Eq)]
298pub enum StopHint {
299    /// The LLM response was cut off by the token limit.
300    MaxTokens,
301    /// The turn loop exhausted `max_turns` without a final text response.
302    MaxTurnRequests,
303}
304
305/// Event carrying data for a tool call start, emitted before execution begins.
306///
307/// Passed by value to [`Channel::send_tool_start`] and carried by
308/// [`LoopbackEvent::ToolStart`]. All fields are owned — no lifetime parameters.
309#[derive(Debug, Clone)]
310pub struct ToolStartEvent {
311    /// Name of the tool being invoked.
312    pub tool_name: zeph_common::ToolName,
313    /// Opaque tool call ID assigned by the LLM.
314    pub tool_call_id: String,
315    /// Raw input parameters passed to the tool (e.g. `{"command": "..."}` for bash).
316    pub params: Option<serde_json::Value>,
317    /// Set when this tool call is made by a subagent; identifies the parent's `tool_call_id`.
318    pub parent_tool_use_id: Option<String>,
319    /// Wall-clock instant when the tool call was initiated; used to compute elapsed time.
320    pub started_at: std::time::Instant,
321    /// True when this tool call was speculatively dispatched before LLM finished decoding.
322    ///
323    /// TUI renders a `[spec]` prefix; Telegram suppresses unless `chat_visibility = verbose`.
324    pub speculative: bool,
325    /// OS sandbox profile applied to this tool call, if any.
326    ///
327    /// `None` means no sandbox was applied (not configured or not a subprocess executor).
328    pub sandbox_profile: Option<zeph_tools::SandboxProfile>,
329}
330
331/// Event carrying data for a completed tool output, emitted after execution.
332///
333/// Passed by value to [`Channel::send_tool_output`] and carried by
334/// [`LoopbackEvent::ToolOutput`]. All fields are owned — no lifetime parameters.
335#[derive(Debug, Clone)]
336pub struct ToolOutputEvent {
337    /// Name of the tool that produced this output.
338    pub tool_name: zeph_common::ToolName,
339    /// Human-readable output text.
340    pub display: String,
341    /// Optional diff for file-editing tools.
342    pub diff: Option<crate::DiffData>,
343    /// Optional filter statistics from output filtering.
344    pub filter_stats: Option<String>,
345    /// Kept line indices after filtering (for display).
346    pub kept_lines: Option<Vec<usize>>,
347    /// Source locations for code search results.
348    pub locations: Option<Vec<String>>,
349    /// Opaque tool call ID matching the corresponding `ToolStartEvent`.
350    pub tool_call_id: String,
351    /// Whether this output represents an error.
352    pub is_error: bool,
353    /// Terminal ID for shell tool calls routed through the IDE terminal.
354    pub terminal_id: Option<String>,
355    /// Set when this tool output belongs to a subagent; identifies the parent's `tool_call_id`.
356    pub parent_tool_use_id: Option<String>,
357    /// Structured tool response payload for ACP intermediate `tool_call_update` notifications.
358    pub raw_response: Option<serde_json::Value>,
359    /// Wall-clock instant when the corresponding `ToolStartEvent` was emitted.
360    pub started_at: Option<std::time::Instant>,
361}
362
363/// Backward-compatible alias for [`ToolStartEvent`].
364///
365/// Kept for use in the ACP layer. Prefer [`ToolStartEvent`] in new code.
366pub type ToolStartData = ToolStartEvent;
367
368/// Backward-compatible alias for [`ToolOutputEvent`].
369///
370/// Kept for use in the ACP layer. Prefer [`ToolOutputEvent`] in new code.
371pub type ToolOutputData = ToolOutputEvent;
372
373/// Events emitted by the agent side toward the A2A caller.
374#[derive(Debug, Clone)]
375pub enum LoopbackEvent {
376    Chunk(String),
377    Flush,
378    FullMessage(String),
379    Status(String),
380    /// Emitted immediately before tool execution begins.
381    ToolStart(Box<ToolStartEvent>),
382    ToolOutput(Box<ToolOutputEvent>),
383    /// Token usage from the last LLM turn.
384    Usage {
385        input_tokens: u64,
386        output_tokens: u64,
387        context_window: u64,
388    },
389    /// Generated session title (emitted after the first agent response).
390    SessionTitle(String),
391    /// Execution plan update.
392    Plan(Vec<(String, PlanItemStatus)>),
393    /// Thinking/reasoning token chunk from the LLM.
394    ThinkingChunk(String),
395    /// Non-default stop condition detected by the agent loop.
396    ///
397    /// Emitted immediately before `Flush`. When absent, the stop reason is `EndTurn`.
398    Stop(StopHint),
399}
400
401/// Status of a plan item, mirroring `acp::PlanEntryStatus`.
402#[derive(Debug, Clone)]
403pub enum PlanItemStatus {
404    Pending,
405    InProgress,
406    Completed,
407}
408
409/// Caller-side handle for sending input and receiving agent output.
410pub struct LoopbackHandle {
411    pub input_tx: tokio::sync::mpsc::Sender<ChannelMessage>,
412    pub output_rx: tokio::sync::mpsc::Receiver<LoopbackEvent>,
413    /// Shared cancel signal: notify to interrupt the agent's current operation.
414    pub cancel_signal: std::sync::Arc<tokio::sync::Notify>,
415}
416
417/// Headless channel bridging an A2A `TaskProcessor` with the agent loop.
418pub struct LoopbackChannel {
419    input_rx: tokio::sync::mpsc::Receiver<ChannelMessage>,
420    output_tx: tokio::sync::mpsc::Sender<LoopbackEvent>,
421}
422
423impl LoopbackChannel {
424    /// Create a linked `(LoopbackChannel, LoopbackHandle)` pair.
425    #[must_use]
426    pub fn pair(buffer: usize) -> (Self, LoopbackHandle) {
427        let (input_tx, input_rx) = tokio::sync::mpsc::channel(buffer);
428        let (output_tx, output_rx) = tokio::sync::mpsc::channel(buffer);
429        let cancel_signal = std::sync::Arc::new(tokio::sync::Notify::new());
430        (
431            Self {
432                input_rx,
433                output_tx,
434            },
435            LoopbackHandle {
436                input_tx,
437                output_rx,
438                cancel_signal,
439            },
440        )
441    }
442}
443
444impl Channel for LoopbackChannel {
445    fn supports_exit(&self) -> bool {
446        false
447    }
448
449    async fn recv(&mut self) -> Result<Option<ChannelMessage>, ChannelError> {
450        Ok(self.input_rx.recv().await)
451    }
452
453    async fn send(&mut self, text: &str) -> Result<(), ChannelError> {
454        self.output_tx
455            .send(LoopbackEvent::FullMessage(text.to_owned()))
456            .await
457            .map_err(|_| ChannelError::ChannelClosed)
458    }
459
460    async fn send_chunk(&mut self, chunk: &str) -> Result<(), ChannelError> {
461        self.output_tx
462            .send(LoopbackEvent::Chunk(chunk.to_owned()))
463            .await
464            .map_err(|_| ChannelError::ChannelClosed)
465    }
466
467    async fn flush_chunks(&mut self) -> Result<(), ChannelError> {
468        self.output_tx
469            .send(LoopbackEvent::Flush)
470            .await
471            .map_err(|_| ChannelError::ChannelClosed)
472    }
473
474    async fn send_status(&mut self, text: &str) -> Result<(), ChannelError> {
475        self.output_tx
476            .send(LoopbackEvent::Status(text.to_owned()))
477            .await
478            .map_err(|_| ChannelError::ChannelClosed)
479    }
480
481    async fn send_thinking_chunk(&mut self, chunk: &str) -> Result<(), ChannelError> {
482        self.output_tx
483            .send(LoopbackEvent::ThinkingChunk(chunk.to_owned()))
484            .await
485            .map_err(|_| ChannelError::ChannelClosed)
486    }
487
488    async fn send_tool_start(&mut self, event: ToolStartEvent) -> Result<(), ChannelError> {
489        self.output_tx
490            .send(LoopbackEvent::ToolStart(Box::new(event)))
491            .await
492            .map_err(|_| ChannelError::ChannelClosed)
493    }
494
495    async fn send_tool_output(&mut self, event: ToolOutputEvent) -> Result<(), ChannelError> {
496        self.output_tx
497            .send(LoopbackEvent::ToolOutput(Box::new(event)))
498            .await
499            .map_err(|_| ChannelError::ChannelClosed)
500    }
501
502    async fn confirm(&mut self, _prompt: &str) -> Result<bool, ChannelError> {
503        Ok(true)
504    }
505
506    async fn send_stop_hint(&mut self, hint: StopHint) -> Result<(), ChannelError> {
507        self.output_tx
508            .send(LoopbackEvent::Stop(hint))
509            .await
510            .map_err(|_| ChannelError::ChannelClosed)
511    }
512
513    async fn send_usage(
514        &mut self,
515        input_tokens: u64,
516        output_tokens: u64,
517        context_window: u64,
518    ) -> Result<(), ChannelError> {
519        self.output_tx
520            .send(LoopbackEvent::Usage {
521                input_tokens,
522                output_tokens,
523                context_window,
524            })
525            .await
526            .map_err(|_| ChannelError::ChannelClosed)
527    }
528}
529
530/// Adapter that wraps a [`Channel`] reference and implements [`zeph_commands::ChannelSink`].
531///
532/// Used at command dispatch time to coerce `&mut C` into `&mut dyn ChannelSink` without
533/// a blanket impl (which would violate Rust's orphan rules).
534pub(crate) struct ChannelSinkAdapter<'a, C: Channel>(pub &'a mut C);
535
536impl<C: Channel> zeph_commands::ChannelSink for ChannelSinkAdapter<'_, C> {
537    fn send<'a>(
538        &'a mut self,
539        msg: &'a str,
540    ) -> std::pin::Pin<
541        Box<dyn std::future::Future<Output = Result<(), zeph_commands::CommandError>> + Send + 'a>,
542    > {
543        Box::pin(async move {
544            self.0
545                .send(msg)
546                .await
547                .map_err(zeph_commands::CommandError::new)
548        })
549    }
550
551    fn flush_chunks<'a>(
552        &'a mut self,
553    ) -> std::pin::Pin<
554        Box<dyn std::future::Future<Output = Result<(), zeph_commands::CommandError>> + Send + 'a>,
555    > {
556        Box::pin(async move {
557            self.0
558                .flush_chunks()
559                .await
560                .map_err(zeph_commands::CommandError::new)
561        })
562    }
563
564    fn send_queue_count<'a>(
565        &'a mut self,
566        count: usize,
567    ) -> std::pin::Pin<
568        Box<dyn std::future::Future<Output = Result<(), zeph_commands::CommandError>> + Send + 'a>,
569    > {
570        Box::pin(async move {
571            self.0
572                .send_queue_count(count)
573                .await
574                .map_err(zeph_commands::CommandError::new)
575        })
576    }
577
578    fn supports_exit(&self) -> bool {
579        self.0.supports_exit()
580    }
581}
582
583#[cfg(test)]
584mod tests {
585    use super::*;
586
587    #[test]
588    fn channel_message_creation() {
589        let msg = ChannelMessage {
590            text: "hello".to_string(),
591            attachments: vec![],
592        };
593        assert_eq!(msg.text, "hello");
594        assert!(msg.attachments.is_empty());
595    }
596
597    struct StubChannel;
598
599    impl Channel for StubChannel {
600        async fn recv(&mut self) -> Result<Option<ChannelMessage>, ChannelError> {
601            Ok(None)
602        }
603
604        async fn send(&mut self, _text: &str) -> Result<(), ChannelError> {
605            Ok(())
606        }
607
608        async fn send_chunk(&mut self, _chunk: &str) -> Result<(), ChannelError> {
609            Ok(())
610        }
611
612        async fn flush_chunks(&mut self) -> Result<(), ChannelError> {
613            Ok(())
614        }
615    }
616
617    #[tokio::test]
618    async fn send_chunk_default_is_noop() {
619        let mut ch = StubChannel;
620        ch.send_chunk("partial").await.unwrap();
621    }
622
623    #[tokio::test]
624    async fn flush_chunks_default_is_noop() {
625        let mut ch = StubChannel;
626        ch.flush_chunks().await.unwrap();
627    }
628
629    #[tokio::test]
630    async fn stub_channel_confirm_auto_approves() {
631        let mut ch = StubChannel;
632        let result = ch.confirm("Delete everything?").await.unwrap();
633        assert!(result);
634    }
635
636    #[tokio::test]
637    async fn stub_channel_send_typing_default() {
638        let mut ch = StubChannel;
639        ch.send_typing().await.unwrap();
640    }
641
642    #[tokio::test]
643    async fn stub_channel_recv_returns_none() {
644        let mut ch = StubChannel;
645        let msg = ch.recv().await.unwrap();
646        assert!(msg.is_none());
647    }
648
649    #[tokio::test]
650    async fn stub_channel_send_ok() {
651        let mut ch = StubChannel;
652        ch.send("hello").await.unwrap();
653    }
654
655    #[test]
656    fn channel_message_clone() {
657        let msg = ChannelMessage {
658            text: "test".to_string(),
659            attachments: vec![],
660        };
661        let cloned = msg.clone();
662        assert_eq!(cloned.text, "test");
663    }
664
665    #[test]
666    fn channel_message_debug() {
667        let msg = ChannelMessage {
668            text: "debug".to_string(),
669            attachments: vec![],
670        };
671        let debug = format!("{msg:?}");
672        assert!(debug.contains("debug"));
673    }
674
675    #[test]
676    fn attachment_kind_equality() {
677        assert_eq!(AttachmentKind::Audio, AttachmentKind::Audio);
678        assert_ne!(AttachmentKind::Audio, AttachmentKind::Image);
679    }
680
681    #[test]
682    fn attachment_construction() {
683        let a = Attachment {
684            kind: AttachmentKind::Audio,
685            data: vec![0, 1, 2],
686            filename: Some("test.wav".into()),
687        };
688        assert_eq!(a.kind, AttachmentKind::Audio);
689        assert_eq!(a.data.len(), 3);
690        assert_eq!(a.filename.as_deref(), Some("test.wav"));
691    }
692
693    #[test]
694    fn channel_message_with_attachments() {
695        let msg = ChannelMessage {
696            text: String::new(),
697            attachments: vec![Attachment {
698                kind: AttachmentKind::Audio,
699                data: vec![42],
700                filename: None,
701            }],
702        };
703        assert_eq!(msg.attachments.len(), 1);
704        assert_eq!(msg.attachments[0].kind, AttachmentKind::Audio);
705    }
706
707    #[test]
708    fn stub_channel_try_recv_returns_none() {
709        let mut ch = StubChannel;
710        assert!(ch.try_recv().is_none());
711    }
712
713    #[tokio::test]
714    async fn stub_channel_send_queue_count_noop() {
715        let mut ch = StubChannel;
716        ch.send_queue_count(5).await.unwrap();
717    }
718
719    // LoopbackChannel tests
720
721    #[test]
722    fn loopback_pair_returns_linked_handles() {
723        let (channel, handle) = LoopbackChannel::pair(8);
724        // Both sides exist and channels are connected via their sender capacity
725        drop(channel);
726        drop(handle);
727    }
728
729    #[tokio::test]
730    async fn loopback_cancel_signal_can_be_notified_and_awaited() {
731        let (_channel, handle) = LoopbackChannel::pair(8);
732        let signal = std::sync::Arc::clone(&handle.cancel_signal);
733        // Notify from one side, await on the other.
734        let notified = signal.notified();
735        handle.cancel_signal.notify_one();
736        notified.await; // resolves immediately after notify_one()
737    }
738
739    #[tokio::test]
740    async fn loopback_cancel_signal_shared_across_clones() {
741        let (_channel, handle) = LoopbackChannel::pair(8);
742        let signal_a = std::sync::Arc::clone(&handle.cancel_signal);
743        let signal_b = std::sync::Arc::clone(&handle.cancel_signal);
744        let notified = signal_b.notified();
745        signal_a.notify_one();
746        notified.await;
747    }
748
749    #[tokio::test]
750    async fn loopback_send_recv_round_trip() {
751        let (mut channel, handle) = LoopbackChannel::pair(8);
752        handle
753            .input_tx
754            .send(ChannelMessage {
755                text: "hello".to_owned(),
756                attachments: vec![],
757            })
758            .await
759            .unwrap();
760        let msg = channel.recv().await.unwrap().unwrap();
761        assert_eq!(msg.text, "hello");
762    }
763
764    #[tokio::test]
765    async fn loopback_recv_returns_none_when_handle_dropped() {
766        let (mut channel, handle) = LoopbackChannel::pair(8);
767        drop(handle);
768        let result = channel.recv().await.unwrap();
769        assert!(result.is_none());
770    }
771
772    #[tokio::test]
773    async fn loopback_send_produces_full_message_event() {
774        let (mut channel, mut handle) = LoopbackChannel::pair(8);
775        channel.send("world").await.unwrap();
776        let event = handle.output_rx.recv().await.unwrap();
777        assert!(matches!(event, LoopbackEvent::FullMessage(t) if t == "world"));
778    }
779
780    #[tokio::test]
781    async fn loopback_send_chunk_then_flush() {
782        let (mut channel, mut handle) = LoopbackChannel::pair(8);
783        channel.send_chunk("part1").await.unwrap();
784        channel.flush_chunks().await.unwrap();
785        let ev1 = handle.output_rx.recv().await.unwrap();
786        let ev2 = handle.output_rx.recv().await.unwrap();
787        assert!(matches!(ev1, LoopbackEvent::Chunk(t) if t == "part1"));
788        assert!(matches!(ev2, LoopbackEvent::Flush));
789    }
790
791    #[tokio::test]
792    async fn loopback_send_tool_output() {
793        let (mut channel, mut handle) = LoopbackChannel::pair(8);
794        channel
795            .send_tool_output(ToolOutputEvent {
796                tool_name: "bash".into(),
797                display: "exit 0".into(),
798                diff: None,
799                filter_stats: None,
800                kept_lines: None,
801                locations: None,
802                tool_call_id: String::new(),
803                terminal_id: None,
804                is_error: false,
805                parent_tool_use_id: None,
806                raw_response: None,
807                started_at: None,
808            })
809            .await
810            .unwrap();
811        let event = handle.output_rx.recv().await.unwrap();
812        match event {
813            LoopbackEvent::ToolOutput(data) => {
814                assert_eq!(data.tool_name, "bash");
815                assert_eq!(data.display, "exit 0");
816                assert!(data.diff.is_none());
817                assert!(data.filter_stats.is_none());
818                assert!(data.kept_lines.is_none());
819                assert!(data.locations.is_none());
820                assert_eq!(data.tool_call_id, "");
821                assert!(!data.is_error);
822                assert!(data.terminal_id.is_none());
823                assert!(data.parent_tool_use_id.is_none());
824                assert!(data.raw_response.is_none());
825            }
826            _ => panic!("expected ToolOutput event"),
827        }
828    }
829
830    #[tokio::test]
831    async fn loopback_confirm_auto_approves() {
832        let (mut channel, _handle) = LoopbackChannel::pair(8);
833        let result = channel.confirm("are you sure?").await.unwrap();
834        assert!(result);
835    }
836
837    #[tokio::test]
838    async fn loopback_send_error_when_output_closed() {
839        let (mut channel, handle) = LoopbackChannel::pair(8);
840        // Drop only the output_rx side by dropping the handle
841        drop(handle);
842        let result = channel.send("too late").await;
843        assert!(matches!(result, Err(ChannelError::ChannelClosed)));
844    }
845
846    #[tokio::test]
847    async fn loopback_send_chunk_error_when_output_closed() {
848        let (mut channel, handle) = LoopbackChannel::pair(8);
849        drop(handle);
850        let result = channel.send_chunk("chunk").await;
851        assert!(matches!(result, Err(ChannelError::ChannelClosed)));
852    }
853
854    #[tokio::test]
855    async fn loopback_flush_error_when_output_closed() {
856        let (mut channel, handle) = LoopbackChannel::pair(8);
857        drop(handle);
858        let result = channel.flush_chunks().await;
859        assert!(matches!(result, Err(ChannelError::ChannelClosed)));
860    }
861
862    #[tokio::test]
863    async fn loopback_send_status_event() {
864        let (mut channel, mut handle) = LoopbackChannel::pair(8);
865        channel.send_status("working...").await.unwrap();
866        let event = handle.output_rx.recv().await.unwrap();
867        assert!(matches!(event, LoopbackEvent::Status(s) if s == "working..."));
868    }
869
870    #[tokio::test]
871    async fn loopback_send_usage_produces_usage_event() {
872        let (mut channel, mut handle) = LoopbackChannel::pair(8);
873        channel.send_usage(100, 50, 200_000).await.unwrap();
874        let event = handle.output_rx.recv().await.unwrap();
875        match event {
876            LoopbackEvent::Usage {
877                input_tokens,
878                output_tokens,
879                context_window,
880            } => {
881                assert_eq!(input_tokens, 100);
882                assert_eq!(output_tokens, 50);
883                assert_eq!(context_window, 200_000);
884            }
885            _ => panic!("expected Usage event"),
886        }
887    }
888
889    #[tokio::test]
890    async fn loopback_send_usage_error_when_closed() {
891        let (mut channel, handle) = LoopbackChannel::pair(8);
892        drop(handle);
893        let result = channel.send_usage(1, 2, 3).await;
894        assert!(matches!(result, Err(ChannelError::ChannelClosed)));
895    }
896
897    #[test]
898    fn plan_item_status_variants_are_distinct() {
899        assert!(!matches!(
900            PlanItemStatus::Pending,
901            PlanItemStatus::InProgress
902        ));
903        assert!(!matches!(
904            PlanItemStatus::InProgress,
905            PlanItemStatus::Completed
906        ));
907        assert!(!matches!(
908            PlanItemStatus::Completed,
909            PlanItemStatus::Pending
910        ));
911    }
912
913    #[test]
914    fn loopback_event_session_title_carries_string() {
915        let event = LoopbackEvent::SessionTitle("hello".to_owned());
916        assert!(matches!(event, LoopbackEvent::SessionTitle(s) if s == "hello"));
917    }
918
919    #[test]
920    fn loopback_event_plan_carries_entries() {
921        let entries = vec![
922            ("step 1".to_owned(), PlanItemStatus::Pending),
923            ("step 2".to_owned(), PlanItemStatus::InProgress),
924        ];
925        let event = LoopbackEvent::Plan(entries);
926        match event {
927            LoopbackEvent::Plan(e) => {
928                assert_eq!(e.len(), 2);
929                assert!(matches!(e[0].1, PlanItemStatus::Pending));
930                assert!(matches!(e[1].1, PlanItemStatus::InProgress));
931            }
932            _ => panic!("expected Plan event"),
933        }
934    }
935
936    #[tokio::test]
937    async fn loopback_send_tool_start_produces_tool_start_event() {
938        let (mut channel, mut handle) = LoopbackChannel::pair(8);
939        channel
940            .send_tool_start(ToolStartEvent {
941                tool_name: "shell".into(),
942                tool_call_id: "tc-001".into(),
943                params: Some(serde_json::json!({"command": "ls"})),
944                parent_tool_use_id: None,
945                started_at: std::time::Instant::now(),
946                speculative: false,
947                sandbox_profile: None,
948            })
949            .await
950            .unwrap();
951        let event = handle.output_rx.recv().await.unwrap();
952        match event {
953            LoopbackEvent::ToolStart(data) => {
954                assert_eq!(data.tool_name.as_str(), "shell");
955                assert_eq!(data.tool_call_id.as_str(), "tc-001");
956                assert!(data.params.is_some());
957                assert!(data.parent_tool_use_id.is_none());
958            }
959            _ => panic!("expected ToolStart event"),
960        }
961    }
962
963    #[tokio::test]
964    async fn loopback_send_tool_start_with_parent_id() {
965        let (mut channel, mut handle) = LoopbackChannel::pair(8);
966        channel
967            .send_tool_start(ToolStartEvent {
968                tool_name: "web".into(),
969                tool_call_id: "tc-002".into(),
970                params: None,
971                parent_tool_use_id: Some("parent-123".into()),
972                started_at: std::time::Instant::now(),
973                speculative: false,
974                sandbox_profile: None,
975            })
976            .await
977            .unwrap();
978        let event = handle.output_rx.recv().await.unwrap();
979        assert!(matches!(
980            event,
981            LoopbackEvent::ToolStart(ref data) if data.parent_tool_use_id.as_deref() == Some("parent-123")
982        ));
983    }
984
985    #[tokio::test]
986    async fn loopback_send_tool_start_error_when_output_closed() {
987        let (mut channel, handle) = LoopbackChannel::pair(8);
988        drop(handle);
989        let result = channel
990            .send_tool_start(ToolStartEvent {
991                tool_name: "shell".into(),
992                tool_call_id: "tc-003".into(),
993                params: None,
994                parent_tool_use_id: None,
995                started_at: std::time::Instant::now(),
996                speculative: false,
997                sandbox_profile: None,
998            })
999            .await;
1000        assert!(matches!(result, Err(ChannelError::ChannelClosed)));
1001    }
1002
1003    #[tokio::test]
1004    async fn default_send_tool_output_formats_message() {
1005        let mut ch = StubChannel;
1006        // Default impl calls self.send() which is a no-op in StubChannel — just verify it doesn't panic.
1007        ch.send_tool_output(ToolOutputEvent {
1008            tool_name: "bash".into(),
1009            display: "hello".into(),
1010            diff: None,
1011            filter_stats: None,
1012            kept_lines: None,
1013            locations: None,
1014            tool_call_id: "id".into(),
1015            terminal_id: None,
1016            is_error: false,
1017            parent_tool_use_id: None,
1018            raw_response: None,
1019            started_at: None,
1020        })
1021        .await
1022        .unwrap();
1023    }
1024}