Skip to main content

zeph_channels/
cli.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! CLI channel: stdin input and stdout output for interactive sessions.
5//!
6//! This module provides [`CliChannel`], the default channel used when Zeph
7//! runs in CLI mode.  It handles two stdin modes transparently:
8//!
9//! * **TTY** — uses `line_editor::read_line` for readline-style interaction.
10//! * **Piped** — reads lines from a `BufReader` in a dedicated OS thread.
11//!
12//! Input is always processed in a background task so that [`Channel::recv`] is
13//! cancel-safe: dropping the future inside `tokio::select!` never loses
14//! buffered messages.
15//!
16//! [`Channel::recv`]: zeph_core::channel::Channel::recv
17
18use std::collections::VecDeque;
19use std::io::{BufReader, IsTerminal};
20
21use tokio::sync::mpsc;
22use zeph_core::channel::{
23    Attachment, AttachmentKind, Channel, ChannelError, ChannelMessage, ElicitationField,
24    ElicitationFieldType, ElicitationRequest, ElicitationResponse,
25};
26
27use crate::line_editor::{self, ReadLineResult};
28
29const STDIN_CHANNEL_CAPACITY: usize = 32;
30
31type PersistFn = Box<dyn Fn(&str) + Send>;
32
33struct InputHistory {
34    entries: VecDeque<String>,
35    persist_fn: PersistFn,
36    max_len: usize,
37}
38
39impl InputHistory {
40    fn new(entries: Vec<String>, persist_fn: PersistFn) -> Self {
41        Self {
42            entries: VecDeque::from(entries),
43            persist_fn,
44            max_len: 1000,
45        }
46    }
47
48    fn entries(&self) -> &VecDeque<String> {
49        &self.entries
50    }
51
52    fn add(&mut self, line: &str) {
53        if line.is_empty() {
54            return;
55        }
56        if self.entries.back().is_some_and(|last| last == line) {
57            return;
58        }
59        if self.entries.len() == self.max_len {
60            self.entries.pop_front();
61        }
62        self.entries.push_back(line.to_owned());
63        (self.persist_fn)(line);
64    }
65}
66
67impl std::fmt::Debug for InputHistory {
68    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69        f.debug_struct("InputHistory")
70            .field("entries_len", &self.entries.len())
71            .finish_non_exhaustive()
72    }
73}
74
75/// Process a raw line from stdin: handle exit commands, empty-line logic,
76/// `/image` commands. Returns `None` to continue the loop, `Some(msg)` to
77/// send a message, or `Err(())` to break out of the loop.
78async fn process_line(
79    line: String,
80    is_tty: bool,
81    history: &mut Option<InputHistory>,
82    pending_attachments: &mut Vec<Attachment>,
83) -> Result<Option<ChannelMessage>, ()> {
84    let trimmed = line.trim();
85
86    match trimmed {
87        "exit" | "quit" | "/exit" | "/quit" => return Err(()),
88        "" => {
89            // TTY: empty Enter ends session. Pipe: skip formatting blank lines.
90            if is_tty {
91                return Err(());
92            }
93            return Ok(None);
94        }
95        _ => {}
96    }
97
98    if let Some(h) = history {
99        h.add(trimmed);
100    }
101
102    if let Some(path) = trimmed.strip_prefix("/image").map(str::trim) {
103        if path.is_empty() {
104            println!("Zeph: Usage: /image <path>");
105            return Ok(None);
106        }
107        let path_owned = path.to_owned();
108        let p = std::path::Path::new(&path_owned);
109        if p.is_absolute() || p.components().any(|c| c == std::path::Component::ParentDir) {
110            println!("Zeph: Invalid image path: path traversal not allowed");
111            return Ok(None);
112        }
113        match tokio::fs::read(&path_owned).await {
114            Err(e) => {
115                println!("Zeph: Cannot read image {path_owned}: {e}");
116            }
117            Ok(data) => {
118                let filename = std::path::Path::new(&path_owned)
119                    .file_name()
120                    .and_then(|n| n.to_str())
121                    .map(str::to_owned);
122                let size = data.len();
123                pending_attachments.push(Attachment {
124                    kind: AttachmentKind::Image,
125                    data,
126                    filename,
127                });
128                println!("Zeph: Image attached: {path_owned} ({size} bytes). Send your message.");
129            }
130        }
131        return Ok(None);
132    }
133
134    let attachments = std::mem::take(pending_attachments);
135    Ok(Some(ChannelMessage {
136        text: trimmed.to_string(),
137        attachments,
138    }))
139}
140
141/// Background stdin reader for TTY mode.
142///
143/// Spawns a `tokio::task::spawn_blocking` per line (using `line_editor::read_line`
144/// which manages crossterm raw mode internally).
145async fn run_tty_reader(mut history: Option<InputHistory>, tx: mpsc::Sender<ChannelMessage>) {
146    let mut pending_attachments: Vec<Attachment> = Vec::new();
147
148    loop {
149        let entries: Vec<String> = history
150            .as_ref()
151            .map(|h| h.entries().iter().cloned().collect())
152            .unwrap_or_default();
153
154        // NOTE: raw spawn_blocking is correct here — this is interactive terminal I/O (crossterm
155        // raw mode), not a CPU-bound agent task. Routing through task_supervisor's semaphore
156        // would starve the UI when 8 agent tasks are in-flight.
157        let Ok(Ok(result)) =
158            tokio::task::spawn_blocking(move || line_editor::read_line("You: ", &entries)).await
159        else {
160            break;
161        };
162
163        let line = match result {
164            ReadLineResult::Interrupted | ReadLineResult::Eof => break,
165            ReadLineResult::Line(l) => l,
166        };
167
168        match process_line(line, true, &mut history, &mut pending_attachments).await {
169            Err(()) => break,
170            Ok(None) => {}
171            Ok(Some(msg)) => {
172                if tx.send(msg).await.is_err() {
173                    break;
174                }
175            }
176        }
177    }
178}
179
180/// Background stdin reader for piped (non-TTY) mode.
181///
182/// Runs a dedicated OS thread that owns a `BufReader<Stdin>` and calls
183/// `line_editor::read_line_piped` in a loop. Results are shuttled back to an
184/// async task via a tokio mpsc channel, avoiding repeated stdin locks.
185async fn run_piped_reader(mut history: Option<InputHistory>, tx: mpsc::Sender<ChannelMessage>) {
186    tracing::debug!("stdin is not a terminal, using piped input mode");
187
188    let (line_tx, mut line_rx) = mpsc::channel::<Result<ReadLineResult, std::io::Error>>(1);
189
190    std::thread::spawn(move || {
191        let stdin = std::io::stdin();
192        let mut reader = BufReader::new(stdin);
193        loop {
194            let result = line_editor::read_line_piped(&mut reader);
195            let is_eof = matches!(result, Ok(ReadLineResult::Eof));
196            if line_tx.blocking_send(result).is_err() || is_eof {
197                break;
198            }
199        }
200    });
201
202    let mut pending_attachments: Vec<Attachment> = Vec::new();
203
204    loop {
205        let Some(Ok(result)) = line_rx.recv().await else {
206            break;
207        };
208
209        let line = match result {
210            ReadLineResult::Interrupted | ReadLineResult::Eof => break,
211            ReadLineResult::Line(l) => l,
212        };
213
214        match process_line(line, false, &mut history, &mut pending_attachments).await {
215            Err(()) => break,
216            Ok(None) => {}
217            Ok(Some(msg)) => {
218                if tx.send(msg).await.is_err() {
219                    break;
220                }
221            }
222        }
223    }
224}
225
226/// Spawn a background task that reads stdin and sends processed messages through `tx`.
227///
228/// This makes `CliChannel::recv()` cancel-safe: messages buffered in the mpsc
229/// channel are never dropped when the `recv()` future is cancelled by `tokio::select!`.
230fn spawn_stdin_reader(
231    is_tty: bool,
232    history: Option<InputHistory>,
233    tx: mpsc::Sender<ChannelMessage>,
234) {
235    tokio::spawn(async move {
236        if is_tty {
237            run_tty_reader(history, tx).await;
238        } else {
239            run_piped_reader(history, tx).await;
240        }
241    });
242}
243
244/// Pending configuration for the stdin reader background task.
245///
246/// The task is spawned lazily on the first call to `recv()`, ensuring that
247/// `CliChannel::new()` is safe to call outside of a Tokio runtime context.
248struct PendingReader {
249    history: Option<InputHistory>,
250    is_tty: bool,
251}
252
253impl std::fmt::Debug for PendingReader {
254    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
255        f.debug_struct("PendingReader")
256            .field("is_tty", &self.is_tty)
257            .finish_non_exhaustive()
258    }
259}
260
261/// CLI channel that reads from stdin and writes to stdout.
262///
263/// Input is read in a background task (spawned lazily on the first [`Channel::recv`]
264/// call), which makes `recv()` cancel-safe: dropping the future (e.g. inside a
265/// `tokio::select!` branch) never discards buffered input — messages stay in the
266/// internal [`mpsc`] channel and are returned on the next `recv()` call.
267///
268/// The channel automatically detects whether stdin is a TTY:
269/// * **TTY mode** — uses `line_editor::read_line` with crossterm raw-mode for
270///   readline-style editing (cursor movement, history navigation, `Ctrl-C`/`Ctrl-D`).
271/// * **Piped mode** — spawns a dedicated OS thread that reads lines from a
272///   [`BufReader`] and shuttles them through a tokio channel, avoiding repeated
273///   stdin locks.
274///
275/// # Examples
276///
277/// ```rust,no_run
278/// use zeph_channels::CliChannel;
279/// use zeph_core::channel::Channel;
280///
281/// # #[tokio::main]
282/// # async fn example() {
283/// let mut ch = CliChannel::new();
284/// // Send a formatted reply to stdout.
285/// ch.send("Hello from Zeph!").await.unwrap();
286/// # }
287/// ```
288///
289/// [`Channel::recv`]: zeph_core::channel::Channel::recv
290/// [`BufReader`]: std::io::BufReader
291#[derive(Debug)]
292pub struct CliChannel {
293    accumulated: String,
294    /// Lazily-initialized receiver. `None` until `recv()` is called for the first time.
295    input_rx: Option<mpsc::Receiver<ChannelMessage>>,
296    /// Pending configuration consumed when the background task is first spawned.
297    pending: Option<PendingReader>,
298}
299
300impl CliChannel {
301    /// Create a new CLI channel without persistent history.
302    ///
303    /// This is safe to call outside of a Tokio runtime; the background stdin
304    /// reader task is not spawned until the first [`Channel::recv`] call.
305    ///
306    /// [`Channel::recv`]: zeph_core::channel::Channel::recv
307    #[must_use]
308    pub fn new() -> Self {
309        let is_tty = std::io::stdin().is_terminal();
310        Self {
311            accumulated: String::new(),
312            input_rx: None,
313            pending: Some(PendingReader {
314                history: None,
315                is_tty,
316            }),
317        }
318    }
319
320    /// Create a CLI channel with persistent input history.
321    ///
322    /// `entries` is a pre-loaded history list (e.g. loaded from `SQLite` on
323    /// startup).  `persist_fn` is called for each newly submitted entry so the
324    /// caller can persist it (e.g. via `SqliteStore::save_input_entry`).
325    ///
326    /// Duplicate consecutive entries are silently ignored; empty lines are never
327    /// added to the history.
328    ///
329    /// # Examples
330    ///
331    /// ```rust,no_run
332    /// use zeph_channels::CliChannel;
333    ///
334    /// let previous: Vec<String> = vec!["ls -la".into(), "cargo build".into()];
335    /// let ch = CliChannel::with_history(previous, |entry| {
336    ///     // Persist `entry` to your storage layer.
337    ///     eprintln!("saving: {entry}");
338    /// });
339    /// ```
340    #[must_use]
341    pub fn with_history(entries: Vec<String>, persist_fn: impl Fn(&str) + Send + 'static) -> Self {
342        let is_tty = std::io::stdin().is_terminal();
343        let history = InputHistory::new(entries, Box::new(persist_fn));
344        Self {
345            accumulated: String::new(),
346            input_rx: None,
347            pending: Some(PendingReader {
348                history: Some(history),
349                is_tty,
350            }),
351        }
352    }
353
354    /// Ensure the background stdin reader is running and return a mutable
355    /// reference to the receiver. Called from within an async context only.
356    fn ensure_reader(&mut self) -> &mut mpsc::Receiver<ChannelMessage> {
357        if self.input_rx.is_none() {
358            let pending = self
359                .pending
360                .take()
361                .expect("PendingReader consumed before input_rx was set");
362            let (tx, rx) = mpsc::channel(STDIN_CHANNEL_CAPACITY);
363            spawn_stdin_reader(pending.is_tty, pending.history, tx);
364            self.input_rx = Some(rx);
365        }
366        self.input_rx.as_mut().expect("input_rx set above")
367    }
368}
369
370impl Default for CliChannel {
371    fn default() -> Self {
372        Self::new()
373    }
374}
375
376impl Channel for CliChannel {
377    /// Receive the next user message.
378    ///
379    /// This method is cancel-safe: dropping the future does not discard any
380    /// buffered input. The background stdin reader task buffers messages in an
381    /// mpsc channel; they remain available on the next `recv()` call.
382    #[cfg_attr(
383        feature = "profiling",
384        tracing::instrument(name = "channel.cli.recv", skip_all, fields(msg_len = tracing::field::Empty))
385    )]
386    async fn recv(&mut self) -> Result<Option<ChannelMessage>, ChannelError> {
387        Ok(self.ensure_reader().recv().await)
388    }
389
390    /// Write a complete agent reply to stdout.
391    ///
392    /// The message is prefixed with `"Zeph: "` and followed by a newline.
393    /// Use [`send_chunk`] / [`flush_chunks`] for streaming output instead.
394    ///
395    /// # Errors
396    ///
397    /// Always returns `Ok(())` — stdout writes do not produce recoverable
398    /// errors in this adapter.
399    ///
400    /// [`send_chunk`]: CliChannel::send_chunk
401    /// [`flush_chunks`]: CliChannel::flush_chunks
402    #[cfg_attr(
403        feature = "profiling",
404        tracing::instrument(name = "channel.cli.send", skip_all, fields(msg_len = %text.len()))
405    )]
406    async fn send(&mut self, text: &str) -> Result<(), ChannelError> {
407        println!("Zeph: {text}");
408        Ok(())
409    }
410
411    /// Write a streaming chunk to stdout and accumulate it internally.
412    ///
413    /// Chunks are printed without a trailing newline so that the response
414    /// streams character-by-character.  Call [`flush_chunks`] when the stream
415    /// is complete to emit the final newline and clear the internal buffer.
416    ///
417    /// # Errors
418    ///
419    /// Returns `Err` if the stdout flush fails.
420    ///
421    /// [`flush_chunks`]: CliChannel::flush_chunks
422    async fn send_chunk(&mut self, chunk: &str) -> Result<(), ChannelError> {
423        use std::io::{Write, stdout};
424        print!("{chunk}");
425        stdout().flush()?;
426        self.accumulated.push_str(chunk);
427        Ok(())
428    }
429
430    /// Finalise a streamed response by printing a trailing newline.
431    ///
432    /// Clears the internal accumulation buffer so the channel is ready for the
433    /// next response.
434    ///
435    /// # Errors
436    ///
437    /// Always returns `Ok(())`.
438    async fn flush_chunks(&mut self) -> Result<(), ChannelError> {
439        println!();
440        self.accumulated.clear();
441        Ok(())
442    }
443
444    /// Prompt the user for a yes/no confirmation on stdin.
445    ///
446    /// In non-interactive (piped) mode the method auto-declines and returns
447    /// `Ok(false)` without blocking.  In TTY mode it reads one line and returns
448    /// `true` only when the user types `y` or `Y`.
449    ///
450    /// # Errors
451    ///
452    /// Returns `Err` if spawning the blocking task fails or if the underlying
453    /// readline call returns an I/O error.
454    async fn confirm(&mut self, prompt: &str) -> Result<bool, ChannelError> {
455        if !std::io::stdin().is_terminal() {
456            tracing::debug!("non-interactive stdin, auto-declining confirmation");
457            return Ok(false);
458        }
459        let prompt = format!("{prompt} [y/N]: ");
460        // NOTE: raw spawn_blocking is intentional — interactive terminal readline; not an agent
461        // task, so the task_supervisor semaphore does not apply.
462        let result = tokio::task::spawn_blocking(move || line_editor::read_line(&prompt, &[]))
463            .await
464            .map_err(ChannelError::other)?
465            .map_err(ChannelError::Io)?;
466
467        match result {
468            ReadLineResult::Line(line) => Ok(line.trim().eq_ignore_ascii_case("y")),
469            ReadLineResult::Interrupted | ReadLineResult::Eof => Ok(false),
470        }
471    }
472
473    /// Collect structured input from the user on behalf of an MCP server.
474    ///
475    /// Prompts the user for each field in `request.fields` sequentially.  In
476    /// non-interactive (piped) mode the method logs a warning and auto-declines
477    /// without blocking.
478    ///
479    /// Field values are coerced to the declared [`ElicitationFieldType`].  If a
480    /// value cannot be coerced the method returns
481    /// [`ElicitationResponse::Declined`] immediately.  `Ctrl-C` or `Ctrl-D`
482    /// returns [`ElicitationResponse::Cancelled`].
483    ///
484    /// # Errors
485    ///
486    /// Returns `Err` if spawning the blocking task fails or if the underlying
487    /// readline call returns an I/O error.
488    ///
489    /// [`ElicitationFieldType`]: zeph_core::channel::ElicitationFieldType
490    /// [`ElicitationResponse::Declined`]: zeph_core::channel::ElicitationResponse::Declined
491    /// [`ElicitationResponse::Cancelled`]: zeph_core::channel::ElicitationResponse::Cancelled
492    async fn elicit(
493        &mut self,
494        request: ElicitationRequest,
495    ) -> Result<ElicitationResponse, ChannelError> {
496        if !std::io::stdin().is_terminal() {
497            tracing::warn!(
498                server = request.server_name,
499                "non-interactive stdin, auto-declining elicitation"
500            );
501            return Ok(ElicitationResponse::Declined);
502        }
503
504        println!(
505            "\n[MCP server '{}' is requesting input]",
506            request.server_name
507        );
508        println!("{}", request.message);
509
510        let mut values = serde_json::Map::new();
511        for field in &request.fields {
512            let prompt = build_field_prompt(field);
513            let field_name = field.name.clone();
514            // NOTE: raw spawn_blocking is intentional — interactive terminal readline; not an
515            // agent task, so the task_supervisor semaphore does not apply.
516            let result = tokio::task::spawn_blocking(move || line_editor::read_line(&prompt, &[]))
517                .await
518                .map_err(ChannelError::other)?
519                .map_err(ChannelError::Io)?;
520
521            match result {
522                ReadLineResult::Line(line) => {
523                    let trimmed = line.trim().to_owned();
524                    if let Some(value) = coerce_field_value(&trimmed, &field.field_type) {
525                        values.insert(field_name, value);
526                    } else {
527                        println!(
528                            "Invalid input for '{}' (expected {:?}), declining.",
529                            field_name, field.field_type
530                        );
531                        return Ok(ElicitationResponse::Declined);
532                    }
533                }
534                ReadLineResult::Interrupted | ReadLineResult::Eof => {
535                    return Ok(ElicitationResponse::Cancelled);
536                }
537            }
538        }
539
540        Ok(ElicitationResponse::Accepted(serde_json::Value::Object(
541            values,
542        )))
543    }
544}
545
546/// Build a human-readable prompt string for a single elicitation field.
547///
548/// The prompt includes the field name, an optional description in parentheses,
549/// and a type hint (e.g. `[true/false]`, `[number]`, or the list of allowed
550/// enum values separated by `/`).
551fn build_field_prompt(field: &ElicitationField) -> String {
552    let type_hint = match &field.field_type {
553        ElicitationFieldType::Boolean => " [true/false]",
554        ElicitationFieldType::Integer | ElicitationFieldType::Number => " [number]",
555        ElicitationFieldType::Enum(opts) if !opts.is_empty() => {
556            // Build hint dynamically below
557            return format!(
558                "{}{}: ",
559                field.name,
560                field
561                    .description
562                    .as_deref()
563                    .map(|d| format!(" ({d})"))
564                    .unwrap_or_default()
565            ) + &format!("[{}]: ", opts.join("/"));
566        }
567        _ => "",
568    };
569    format!(
570        "{}{}{}",
571        field.name,
572        field
573            .description
574            .as_deref()
575            .map(|d| format!(" ({d})"))
576            .unwrap_or_default(),
577        if type_hint.is_empty() {
578            ": ".to_owned()
579        } else {
580            format!("{type_hint}: ")
581        }
582    )
583}
584
585/// Coerce a raw user-input string into the JSON type required by the field.
586/// Returns `None` if the input cannot be converted to the declared type.
587fn coerce_field_value(raw: &str, field_type: &ElicitationFieldType) -> Option<serde_json::Value> {
588    match field_type {
589        ElicitationFieldType::String => Some(serde_json::Value::String(raw.to_owned())),
590        ElicitationFieldType::Boolean => match raw.to_ascii_lowercase().as_str() {
591            "true" | "yes" | "1" => Some(serde_json::Value::Bool(true)),
592            "false" | "no" | "0" => Some(serde_json::Value::Bool(false)),
593            _ => None,
594        },
595        ElicitationFieldType::Integer => raw
596            .parse::<i64>()
597            .ok()
598            .map(|n| serde_json::Value::Number(n.into())),
599        ElicitationFieldType::Number => raw
600            .parse::<f64>()
601            .ok()
602            .and_then(serde_json::Number::from_f64)
603            .map(serde_json::Value::Number),
604        ElicitationFieldType::Enum(opts) => {
605            if opts.iter().any(|o| o == raw) {
606                Some(serde_json::Value::String(raw.to_owned()))
607            } else {
608                None
609            }
610        }
611    }
612}
613
614#[cfg(test)]
615mod tests {
616    use super::*;
617
618    #[test]
619    fn cli_channel_default() {
620        let ch = CliChannel::default();
621        let _ = format!("{ch:?}");
622    }
623
624    #[tokio::test]
625    async fn cli_channel_send_chunk_accumulates() {
626        let mut ch = CliChannel::new();
627        ch.send_chunk("hello").await.unwrap();
628        ch.send_chunk(" ").await.unwrap();
629        ch.send_chunk("world").await.unwrap();
630        assert_eq!(ch.accumulated, "hello world");
631    }
632
633    #[tokio::test]
634    async fn cli_channel_flush_chunks_clears_buffer() {
635        let mut ch = CliChannel::new();
636        ch.send_chunk("test").await.unwrap();
637        ch.flush_chunks().await.unwrap();
638        assert!(ch.accumulated.is_empty());
639    }
640
641    #[test]
642    fn cli_channel_try_recv_returns_none() {
643        let mut ch = CliChannel::new();
644        assert!(ch.try_recv().is_none());
645    }
646
647    #[test]
648    fn cli_channel_new() {
649        let ch = CliChannel::new();
650        assert!(ch.accumulated.is_empty());
651    }
652
653    #[tokio::test]
654    async fn cli_channel_send_returns_ok() {
655        let mut ch = CliChannel::new();
656        ch.send("test message").await.unwrap();
657    }
658
659    #[tokio::test]
660    async fn cli_channel_flush_returns_ok() {
661        let mut ch = CliChannel::new();
662        ch.flush_chunks().await.unwrap();
663    }
664
665    #[tokio::test]
666    async fn image_command_valid_file_stores_in_pending() {
667        use std::io::Write;
668
669        let mut tmp = tempfile::NamedTempFile::new().unwrap();
670        let image_bytes = b"\x89PNG\r\n\x1a\nfake-image-data";
671        tmp.write_all(image_bytes).unwrap();
672        tmp.flush().unwrap();
673
674        let path = tmp.path().to_str().unwrap().to_owned();
675
676        let data = tokio::fs::read(&path).await.unwrap();
677        let filename = std::path::Path::new(&path)
678            .file_name()
679            .and_then(|n| n.to_str())
680            .map(str::to_owned);
681
682        let mut pending_attachments: Vec<Attachment> = Vec::new();
683        pending_attachments.push(Attachment {
684            kind: AttachmentKind::Image,
685            data: data.clone(),
686            filename,
687        });
688
689        assert_eq!(pending_attachments.len(), 1);
690        assert_eq!(pending_attachments[0].data, image_bytes);
691        assert_eq!(pending_attachments[0].kind, AttachmentKind::Image);
692
693        let taken = std::mem::take(&mut pending_attachments);
694        assert!(pending_attachments.is_empty());
695        assert_eq!(taken.len(), 1);
696    }
697
698    #[tokio::test]
699    async fn image_command_missing_file_is_handled_gracefully() {
700        let result = tokio::fs::read("/nonexistent/path/image.png").await;
701        assert!(result.is_err());
702        assert_eq!(result.unwrap_err().kind(), std::io::ErrorKind::NotFound);
703    }
704
705    #[test]
706    fn image_command_empty_args_detected() {
707        let trimmed = "/image";
708        let arg = trimmed.strip_prefix("/image").map_or("", str::trim);
709        assert!(arg.is_empty());
710
711        let trimmed_space = "/image   ";
712        let arg_space = trimmed_space.strip_prefix("/image").map_or("", str::trim);
713        assert!(arg_space.is_empty());
714    }
715
716    #[test]
717    fn cli_channel_new_has_empty_accumulated() {
718        let ch = CliChannel::new();
719        assert!(ch.accumulated.is_empty());
720    }
721
722    #[test]
723    fn cli_channel_with_history_constructs_ok() {
724        let ch = CliChannel::with_history(vec![], |_| {});
725        assert!(ch.accumulated.is_empty());
726    }
727
728    #[test]
729    fn input_history_add_and_dedup() {
730        use std::sync::Arc;
731        use std::sync::atomic::{AtomicUsize, Ordering};
732
733        let persisted = Arc::new(AtomicUsize::new(0));
734        let p = persisted.clone();
735        let mut history = InputHistory::new(
736            vec![],
737            Box::new(move |_| {
738                p.fetch_add(1, Ordering::Relaxed);
739            }),
740        );
741        history.add("hello");
742        history.add("hello"); // duplicate
743        history.add("world");
744        assert_eq!(history.entries().len(), 2);
745        assert_eq!(history.entries()[0], "hello");
746        assert_eq!(persisted.load(Ordering::Relaxed), 2);
747    }
748
749    #[test]
750    fn input_history_ignores_empty() {
751        let mut history = InputHistory::new(vec![], Box::new(|_| {}));
752        history.add("");
753        assert_eq!(history.entries().len(), 0);
754    }
755
756    /// Verify that `recv()` is cancel-safe: dropping the future does not discard
757    /// buffered input. This is the regression test for the `tokio::select!` race
758    /// that caused stdin input to be silently lost when a reload branch won.
759    #[tokio::test]
760    async fn recv_is_cancel_safe_via_mpsc_buffer() {
761        // Create a direct mpsc pair to simulate the background reader.
762        let (tx, rx) = mpsc::channel::<ChannelMessage>(32);
763        let mut ch = CliChannel {
764            accumulated: String::new(),
765            input_rx: Some(rx),
766            pending: None,
767        };
768
769        // Pre-fill the channel with a message (simulates background reader
770        // having already buffered input before select! cancellation).
771        tx.send(ChannelMessage {
772            text: "hello".to_string(),
773            attachments: vec![],
774        })
775        .await
776        .unwrap();
777
778        // Simulate select! cancellation: drop the recv() future without polling it.
779        // This models the scenario where a reload branch wins the select! race.
780        drop(ch.recv());
781
782        // The buffered message must still be available on the next recv() call.
783        let result = ch.recv().await.unwrap();
784        assert!(result.is_some());
785        assert_eq!(result.unwrap().text, "hello");
786    }
787
788    #[tokio::test]
789    async fn image_command_absolute_path_is_rejected() {
790        let mut pending: Vec<Attachment> = Vec::new();
791        let mut history = Some(InputHistory::new(vec![], Box::new(|_| {})));
792        let result = process_line(
793            "/image /etc/passwd".to_owned(),
794            false,
795            &mut history,
796            &mut pending,
797        )
798        .await;
799        assert!(matches!(result, Ok(None)));
800        assert!(pending.is_empty());
801    }
802
803    #[tokio::test]
804    async fn image_command_parent_dir_traversal_is_rejected() {
805        let mut pending: Vec<Attachment> = Vec::new();
806        let mut history = Some(InputHistory::new(vec![], Box::new(|_| {})));
807        let result = process_line(
808            "/image ../../../etc/passwd".to_owned(),
809            false,
810            &mut history,
811            &mut pending,
812        )
813        .await;
814        assert!(matches!(result, Ok(None)));
815        assert!(pending.is_empty());
816    }
817}