Skip to main content

zeph_channels/
any.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Runtime channel dispatch via [`AnyChannel`].
5//!
6//! The Zeph binary selects a concrete channel at startup (CLI, Telegram, …)
7//! and wraps it in `AnyChannel`.  All subsequent agent-loop code operates on
8//! `AnyChannel` through the `Channel` trait without knowing the transport.
9//!
10//! Method forwarding is implemented via a private `dispatch_channel!` macro
11//! that expands each trait method into a `match` over the active variant.
12
13use zeph_core::channel::{
14    Channel, ChannelError, ChannelMessage, ElicitationRequest, ElicitationResponse, StopHint,
15    ToolOutputEvent, ToolStartEvent,
16};
17
18use crate::cli::CliChannel;
19#[cfg(feature = "discord")]
20use crate::discord::DiscordChannel;
21use crate::json_cli::JsonCliChannel;
22#[cfg(feature = "slack")]
23use crate::slack::SlackChannel;
24use crate::telegram::TelegramChannel;
25
26/// Enum dispatch for runtime channel selection.
27///
28/// `AnyChannel` implements `Channel` by forwarding every method call to the
29/// active variant through a private macro.  The binary picks a variant at
30/// startup; the rest of the codebase never needs to be generic over the
31/// channel type.
32///
33/// # Variants
34///
35/// * `Cli` — reads from stdin and writes to stdout via [`CliChannel`].
36/// * `JsonCli` — reads from stdin and emits JSONL events to stdout via [`JsonCliChannel`].
37/// * `Telegram` — Telegram Bot API adapter via [`crate::telegram::TelegramChannel`].
38/// * `Discord` *(feature `discord`)* — Discord gateway adapter.
39/// * `Slack` *(feature `slack`)* — Slack Events API adapter.
40///
41/// # Examples
42///
43/// ```rust,ignore
44/// use zeph_channels::{AnyChannel, CliChannel};
45/// use zeph_core::channel::Channel;
46///
47/// let mut ch = AnyChannel::Cli(CliChannel::new());
48/// // Send a message regardless of the underlying channel.
49/// # tokio_test::block_on(async {
50/// ch.send("Hello!").await.unwrap();
51/// # });
52/// ```
53#[derive(Debug)]
54pub enum AnyChannel {
55    Cli(CliChannel),
56    JsonCli(JsonCliChannel),
57    Telegram(TelegramChannel),
58    #[cfg(feature = "discord")]
59    Discord(DiscordChannel),
60    #[cfg(feature = "slack")]
61    Slack(SlackChannel),
62}
63
64macro_rules! dispatch_channel {
65    ($self:expr, $method:ident $(, $arg:expr)*) => {
66        match $self {
67            AnyChannel::Cli(c) => c.$method($($arg),*).await,
68            AnyChannel::JsonCli(c) => c.$method($($arg),*).await,
69            AnyChannel::Telegram(c) => c.$method($($arg),*).await,
70            #[cfg(feature = "discord")]
71            AnyChannel::Discord(c) => c.$method($($arg),*).await,
72            #[cfg(feature = "slack")]
73            AnyChannel::Slack(c) => c.$method($($arg),*).await,
74        }
75    };
76}
77
78impl Channel for AnyChannel {
79    async fn recv(&mut self) -> Result<Option<ChannelMessage>, ChannelError> {
80        dispatch_channel!(self, recv)
81    }
82
83    async fn send(&mut self, text: &str) -> Result<(), ChannelError> {
84        dispatch_channel!(self, send, text)
85    }
86
87    async fn send_chunk(&mut self, chunk: &str) -> Result<(), ChannelError> {
88        dispatch_channel!(self, send_chunk, chunk)
89    }
90
91    async fn flush_chunks(&mut self) -> Result<(), ChannelError> {
92        dispatch_channel!(self, flush_chunks)
93    }
94
95    async fn send_typing(&mut self) -> Result<(), ChannelError> {
96        dispatch_channel!(self, send_typing)
97    }
98
99    async fn confirm(&mut self, prompt: &str) -> Result<bool, ChannelError> {
100        dispatch_channel!(self, confirm, prompt)
101    }
102
103    async fn elicit(
104        &mut self,
105        request: ElicitationRequest,
106    ) -> Result<ElicitationResponse, ChannelError> {
107        dispatch_channel!(self, elicit, request)
108    }
109
110    fn try_recv(&mut self) -> Option<ChannelMessage> {
111        match self {
112            Self::Cli(c) => c.try_recv(),
113            Self::JsonCli(c) => c.try_recv(),
114            Self::Telegram(c) => c.try_recv(),
115            #[cfg(feature = "discord")]
116            Self::Discord(c) => c.try_recv(),
117            #[cfg(feature = "slack")]
118            Self::Slack(c) => c.try_recv(),
119        }
120    }
121
122    fn supports_exit(&self) -> bool {
123        match self {
124            Self::Cli(c) => c.supports_exit(),
125            Self::JsonCli(c) => c.supports_exit(),
126            Self::Telegram(c) => c.supports_exit(),
127            #[cfg(feature = "discord")]
128            Self::Discord(c) => c.supports_exit(),
129            #[cfg(feature = "slack")]
130            Self::Slack(c) => c.supports_exit(),
131        }
132    }
133
134    async fn send_status(&mut self, text: &str) -> Result<(), ChannelError> {
135        dispatch_channel!(self, send_status, text)
136    }
137
138    async fn send_queue_count(&mut self, count: usize) -> Result<(), ChannelError> {
139        dispatch_channel!(self, send_queue_count, count)
140    }
141
142    async fn send_diff(&mut self, diff: zeph_core::DiffData) -> Result<(), ChannelError> {
143        dispatch_channel!(self, send_diff, diff)
144    }
145
146    async fn send_tool_output(&mut self, event: ToolOutputEvent) -> Result<(), ChannelError> {
147        dispatch_channel!(self, send_tool_output, event)
148    }
149
150    async fn send_thinking_chunk(&mut self, chunk: &str) -> Result<(), ChannelError> {
151        dispatch_channel!(self, send_thinking_chunk, chunk)
152    }
153
154    async fn send_stop_hint(&mut self, hint: StopHint) -> Result<(), ChannelError> {
155        dispatch_channel!(self, send_stop_hint, hint)
156    }
157
158    async fn send_usage(
159        &mut self,
160        input_tokens: u64,
161        output_tokens: u64,
162        context_window: u64,
163    ) -> Result<(), ChannelError> {
164        dispatch_channel!(
165            self,
166            send_usage,
167            input_tokens,
168            output_tokens,
169            context_window
170        )
171    }
172
173    async fn send_tool_start(&mut self, event: ToolStartEvent) -> Result<(), ChannelError> {
174        dispatch_channel!(self, send_tool_start, event)
175    }
176}
177
178#[cfg(test)]
179mod tests {
180    use super::*;
181    use crate::cli::CliChannel;
182    use zeph_core::channel::Channel;
183
184    #[tokio::test]
185    async fn any_channel_cli_send_returns_ok() {
186        let mut ch = AnyChannel::Cli(CliChannel::new());
187        assert!(ch.send("hello").await.is_ok());
188    }
189
190    #[tokio::test]
191    async fn any_channel_cli_send_chunk_returns_ok() {
192        let mut ch = AnyChannel::Cli(CliChannel::new());
193        assert!(ch.send_chunk("chunk").await.is_ok());
194    }
195
196    #[tokio::test]
197    async fn any_channel_cli_flush_chunks_returns_ok() {
198        let mut ch = AnyChannel::Cli(CliChannel::new());
199        ch.send_chunk("data").await.unwrap();
200        assert!(ch.flush_chunks().await.is_ok());
201    }
202
203    #[tokio::test]
204    async fn any_channel_cli_send_typing_returns_ok() {
205        let mut ch = AnyChannel::Cli(CliChannel::new());
206        assert!(ch.send_typing().await.is_ok());
207    }
208
209    #[tokio::test]
210    async fn any_channel_cli_send_status_returns_ok() {
211        let mut ch = AnyChannel::Cli(CliChannel::new());
212        assert!(ch.send_status("thinking...").await.is_ok());
213    }
214
215    // crossterm on Windows uses ReadConsoleInputW which blocks indefinitely
216    // without a real console handle (headless CI), while Unix poll() gets EOF
217    #[cfg(not(target_os = "windows"))]
218    #[tokio::test]
219    async fn any_channel_cli_confirm_returns_bool() {
220        let mut ch = AnyChannel::Cli(CliChannel::new());
221        let _ = ch.confirm("confirm?").await;
222    }
223
224    #[test]
225    fn any_channel_cli_try_recv_returns_none() {
226        let mut ch = AnyChannel::Cli(CliChannel::new());
227        assert!(ch.try_recv().is_none());
228    }
229
230    #[test]
231    fn any_channel_debug() {
232        let ch = AnyChannel::Cli(CliChannel::new());
233        let debug = format!("{ch:?}");
234        assert!(debug.contains("Cli"));
235    }
236
237    #[tokio::test]
238    async fn any_channel_sends_thinking_chunk() {
239        let mut ch = AnyChannel::Cli(CliChannel::new());
240        assert!(ch.send_thinking_chunk("thinking...").await.is_ok());
241    }
242
243    #[tokio::test]
244    async fn any_channel_sends_stop_hint() {
245        use zeph_core::channel::StopHint;
246        let mut ch = AnyChannel::Cli(CliChannel::new());
247        assert!(ch.send_stop_hint(StopHint::MaxTokens).await.is_ok());
248    }
249
250    #[tokio::test]
251    async fn any_channel_sends_usage() {
252        let mut ch = AnyChannel::Cli(CliChannel::new());
253        assert!(ch.send_usage(100, 50, 200_000).await.is_ok());
254    }
255
256    #[tokio::test]
257    async fn any_channel_sends_tool_start() {
258        use zeph_core::channel::ToolStartEvent;
259        let mut ch = AnyChannel::Cli(CliChannel::new());
260        assert!(
261            ch.send_tool_start(ToolStartEvent {
262                tool_name: "shell".into(),
263                tool_call_id: "tc-001".into(),
264                params: None,
265                parent_tool_use_id: None,
266                started_at: std::time::Instant::now(),
267                speculative: false,
268                sandbox_profile: None,
269            })
270            .await
271            .is_ok()
272        );
273    }
274
275    /// Exhaustive `Channel` method coverage for `AnyChannel`.
276    ///
277    /// When a new method is added to the Channel trait, it must be called here.
278    /// If a forwarding is missing in `AnyChannel`, this test serves as a manual checklist
279    /// to catch the gap during review.
280    #[tokio::test]
281    #[cfg(not(target_os = "windows"))]
282    async fn any_channel_forwards_all_channel_methods() {
283        use zeph_core::channel::{StopHint, ToolOutputEvent, ToolStartEvent};
284
285        let mut ch = AnyChannel::Cli(CliChannel::new());
286        // 1. recv — skipped (blocks on stdin)
287        // 2. try_recv
288        let _ = ch.try_recv();
289        // 3. supports_exit
290        let _ = ch.supports_exit();
291        // 4. send
292        ch.send("test").await.unwrap();
293        // 5. send_chunk
294        ch.send_chunk("chunk").await.unwrap();
295        // 6. flush_chunks
296        ch.flush_chunks().await.unwrap();
297        // 7. send_typing
298        ch.send_typing().await.unwrap();
299        // 8. send_status
300        ch.send_status("working").await.unwrap();
301        // 9. send_thinking_chunk
302        ch.send_thinking_chunk("...").await.unwrap();
303        // 10. send_queue_count
304        ch.send_queue_count(3).await.unwrap();
305        // 11. send_usage
306        ch.send_usage(10, 5, 8192).await.unwrap();
307        // 12. send_diff
308        ch.send_diff(zeph_core::DiffData {
309            file_path: String::new(),
310            old_content: String::new(),
311            new_content: String::new(),
312        })
313        .await
314        .unwrap();
315        // 13. send_tool_start
316        ch.send_tool_start(ToolStartEvent {
317            tool_name: "bash".into(),
318            tool_call_id: "x".into(),
319            params: None,
320            parent_tool_use_id: None,
321            started_at: std::time::Instant::now(),
322            speculative: false,
323            sandbox_profile: None,
324        })
325        .await
326        .unwrap();
327        // 14. send_tool_output
328        ch.send_tool_output(ToolOutputEvent {
329            tool_name: "bash".into(),
330            display: "ok".into(),
331            diff: None,
332            filter_stats: None,
333            kept_lines: None,
334            locations: None,
335            tool_call_id: "x".into(),
336
337            terminal_id: None,
338            is_error: false,
339            parent_tool_use_id: None,
340            raw_response: None,
341            started_at: None,
342        })
343        .await
344        .unwrap();
345        // 15. send_stop_hint
346        ch.send_stop_hint(StopHint::MaxTurnRequests).await.unwrap();
347        // 16. confirm — skipped (reads from stdin; covered by separate test)
348    }
349}