1use std::collections::VecDeque;
19use std::io::{BufReader, IsTerminal};
20
21use tokio::sync::mpsc;
22use zeph_core::channel::{
23 Attachment, AttachmentKind, Channel, ChannelError, ChannelMessage, ElicitationField,
24 ElicitationFieldType, ElicitationRequest, ElicitationResponse,
25};
26
27use crate::line_editor::{self, ReadLineResult};
28
29const STDIN_CHANNEL_CAPACITY: usize = 32;
30
31type PersistFn = Box<dyn Fn(&str) + Send>;
32
33struct InputHistory {
34 entries: VecDeque<String>,
35 persist_fn: PersistFn,
36 max_len: usize,
37}
38
39impl InputHistory {
40 fn new(entries: Vec<String>, persist_fn: PersistFn) -> Self {
41 Self {
42 entries: VecDeque::from(entries),
43 persist_fn,
44 max_len: 1000,
45 }
46 }
47
48 fn entries(&self) -> &VecDeque<String> {
49 &self.entries
50 }
51
52 fn add(&mut self, line: &str) {
53 if line.is_empty() {
54 return;
55 }
56 if self.entries.back().is_some_and(|last| last == line) {
57 return;
58 }
59 if self.entries.len() == self.max_len {
60 self.entries.pop_front();
61 }
62 self.entries.push_back(line.to_owned());
63 (self.persist_fn)(line);
64 }
65}
66
67impl std::fmt::Debug for InputHistory {
68 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69 f.debug_struct("InputHistory")
70 .field("entries_len", &self.entries.len())
71 .finish_non_exhaustive()
72 }
73}
74
75async fn process_line(
79 line: String,
80 is_tty: bool,
81 history: &mut Option<InputHistory>,
82 pending_attachments: &mut Vec<Attachment>,
83) -> Result<Option<ChannelMessage>, ()> {
84 let trimmed = line.trim();
85
86 match trimmed {
87 "exit" | "quit" | "/exit" | "/quit" => return Err(()),
88 "" => {
89 if is_tty {
91 return Err(());
92 }
93 return Ok(None);
94 }
95 _ => {}
96 }
97
98 if let Some(h) = history {
99 h.add(trimmed);
100 }
101
102 if let Some(path) = trimmed.strip_prefix("/image").map(str::trim) {
103 if path.is_empty() {
104 println!("Zeph: Usage: /image <path>");
105 return Ok(None);
106 }
107 let path_owned = path.to_owned();
108 let p = std::path::Path::new(&path_owned);
109 if p.is_absolute() || p.components().any(|c| c == std::path::Component::ParentDir) {
110 println!("Zeph: Invalid image path: path traversal not allowed");
111 return Ok(None);
112 }
113 match tokio::fs::read(&path_owned).await {
114 Err(e) => {
115 println!("Zeph: Cannot read image {path_owned}: {e}");
116 }
117 Ok(data) => {
118 let filename = std::path::Path::new(&path_owned)
119 .file_name()
120 .and_then(|n| n.to_str())
121 .map(str::to_owned);
122 let size = data.len();
123 pending_attachments.push(Attachment {
124 kind: AttachmentKind::Image,
125 data,
126 filename,
127 });
128 println!("Zeph: Image attached: {path_owned} ({size} bytes). Send your message.");
129 }
130 }
131 return Ok(None);
132 }
133
134 let attachments = std::mem::take(pending_attachments);
135 Ok(Some(ChannelMessage {
136 text: trimmed.to_string(),
137 attachments,
138 }))
139}
140
141async fn run_tty_reader(mut history: Option<InputHistory>, tx: mpsc::Sender<ChannelMessage>) {
146 let mut pending_attachments: Vec<Attachment> = Vec::new();
147
148 loop {
149 let entries: Vec<String> = history
150 .as_ref()
151 .map(|h| h.entries().iter().cloned().collect())
152 .unwrap_or_default();
153
154 let Ok(Ok(result)) =
158 tokio::task::spawn_blocking(move || line_editor::read_line("You: ", &entries)).await
159 else {
160 break;
161 };
162
163 let line = match result {
164 ReadLineResult::Interrupted | ReadLineResult::Eof => break,
165 ReadLineResult::Line(l) => l,
166 };
167
168 match process_line(line, true, &mut history, &mut pending_attachments).await {
169 Err(()) => break,
170 Ok(None) => {}
171 Ok(Some(msg)) => {
172 if tx.send(msg).await.is_err() {
173 break;
174 }
175 }
176 }
177 }
178}
179
180async fn run_piped_reader(mut history: Option<InputHistory>, tx: mpsc::Sender<ChannelMessage>) {
186 tracing::debug!("stdin is not a terminal, using piped input mode");
187
188 let (line_tx, mut line_rx) = mpsc::channel::<Result<ReadLineResult, std::io::Error>>(1);
189
190 std::thread::spawn(move || {
191 let stdin = std::io::stdin();
192 let mut reader = BufReader::new(stdin);
193 loop {
194 let result = line_editor::read_line_piped(&mut reader);
195 let is_eof = matches!(result, Ok(ReadLineResult::Eof));
196 if line_tx.blocking_send(result).is_err() || is_eof {
197 break;
198 }
199 }
200 });
201
202 let mut pending_attachments: Vec<Attachment> = Vec::new();
203
204 loop {
205 let Some(Ok(result)) = line_rx.recv().await else {
206 break;
207 };
208
209 let line = match result {
210 ReadLineResult::Interrupted | ReadLineResult::Eof => break,
211 ReadLineResult::Line(l) => l,
212 };
213
214 match process_line(line, false, &mut history, &mut pending_attachments).await {
215 Err(()) => break,
216 Ok(None) => {}
217 Ok(Some(msg)) => {
218 if tx.send(msg).await.is_err() {
219 break;
220 }
221 }
222 }
223 }
224}
225
226fn spawn_stdin_reader(
231 is_tty: bool,
232 history: Option<InputHistory>,
233 tx: mpsc::Sender<ChannelMessage>,
234) {
235 tokio::spawn(async move {
236 if is_tty {
237 run_tty_reader(history, tx).await;
238 } else {
239 run_piped_reader(history, tx).await;
240 }
241 });
242}
243
244struct PendingReader {
249 history: Option<InputHistory>,
250 is_tty: bool,
251}
252
253impl std::fmt::Debug for PendingReader {
254 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
255 f.debug_struct("PendingReader")
256 .field("is_tty", &self.is_tty)
257 .finish_non_exhaustive()
258 }
259}
260
261#[derive(Debug)]
292pub struct CliChannel {
293 accumulated: String,
294 input_rx: Option<mpsc::Receiver<ChannelMessage>>,
296 pending: Option<PendingReader>,
298}
299
300impl CliChannel {
301 #[must_use]
308 pub fn new() -> Self {
309 let is_tty = std::io::stdin().is_terminal();
310 Self {
311 accumulated: String::new(),
312 input_rx: None,
313 pending: Some(PendingReader {
314 history: None,
315 is_tty,
316 }),
317 }
318 }
319
320 #[must_use]
341 pub fn with_history(entries: Vec<String>, persist_fn: impl Fn(&str) + Send + 'static) -> Self {
342 let is_tty = std::io::stdin().is_terminal();
343 let history = InputHistory::new(entries, Box::new(persist_fn));
344 Self {
345 accumulated: String::new(),
346 input_rx: None,
347 pending: Some(PendingReader {
348 history: Some(history),
349 is_tty,
350 }),
351 }
352 }
353
354 fn ensure_reader(&mut self) -> &mut mpsc::Receiver<ChannelMessage> {
357 if self.input_rx.is_none() {
358 let pending = self
359 .pending
360 .take()
361 .expect("PendingReader consumed before input_rx was set");
362 let (tx, rx) = mpsc::channel(STDIN_CHANNEL_CAPACITY);
363 spawn_stdin_reader(pending.is_tty, pending.history, tx);
364 self.input_rx = Some(rx);
365 }
366 self.input_rx.as_mut().expect("input_rx set above")
367 }
368}
369
370impl Default for CliChannel {
371 fn default() -> Self {
372 Self::new()
373 }
374}
375
376impl Channel for CliChannel {
377 #[cfg_attr(
383 feature = "profiling",
384 tracing::instrument(name = "channel.cli.recv", skip_all, fields(msg_len = tracing::field::Empty))
385 )]
386 async fn recv(&mut self) -> Result<Option<ChannelMessage>, ChannelError> {
387 Ok(self.ensure_reader().recv().await)
388 }
389
390 #[cfg_attr(
403 feature = "profiling",
404 tracing::instrument(name = "channel.cli.send", skip_all, fields(msg_len = %text.len()))
405 )]
406 async fn send(&mut self, text: &str) -> Result<(), ChannelError> {
407 println!("Zeph: {text}");
408 Ok(())
409 }
410
411 async fn send_chunk(&mut self, chunk: &str) -> Result<(), ChannelError> {
423 use std::io::{Write, stdout};
424 print!("{chunk}");
425 stdout().flush()?;
426 self.accumulated.push_str(chunk);
427 Ok(())
428 }
429
430 async fn flush_chunks(&mut self) -> Result<(), ChannelError> {
439 println!();
440 self.accumulated.clear();
441 Ok(())
442 }
443
444 async fn confirm(&mut self, prompt: &str) -> Result<bool, ChannelError> {
455 if !std::io::stdin().is_terminal() {
456 tracing::debug!("non-interactive stdin, auto-declining confirmation");
457 return Ok(false);
458 }
459 let prompt = format!("{prompt} [y/N]: ");
460 let result = tokio::task::spawn_blocking(move || line_editor::read_line(&prompt, &[]))
463 .await
464 .map_err(ChannelError::other)?
465 .map_err(ChannelError::Io)?;
466
467 match result {
468 ReadLineResult::Line(line) => Ok(line.trim().eq_ignore_ascii_case("y")),
469 ReadLineResult::Interrupted | ReadLineResult::Eof => Ok(false),
470 }
471 }
472
473 async fn elicit(
493 &mut self,
494 request: ElicitationRequest,
495 ) -> Result<ElicitationResponse, ChannelError> {
496 if !std::io::stdin().is_terminal() {
497 tracing::warn!(
498 server = request.server_name,
499 "non-interactive stdin, auto-declining elicitation"
500 );
501 return Ok(ElicitationResponse::Declined);
502 }
503
504 println!(
505 "\n[MCP server '{}' is requesting input]",
506 request.server_name
507 );
508 println!("{}", request.message);
509
510 let mut values = serde_json::Map::new();
511 for field in &request.fields {
512 let prompt = build_field_prompt(field);
513 let field_name = field.name.clone();
514 let result = tokio::task::spawn_blocking(move || line_editor::read_line(&prompt, &[]))
517 .await
518 .map_err(ChannelError::other)?
519 .map_err(ChannelError::Io)?;
520
521 match result {
522 ReadLineResult::Line(line) => {
523 let trimmed = line.trim().to_owned();
524 if let Some(value) = coerce_field_value(&trimmed, &field.field_type) {
525 values.insert(field_name, value);
526 } else {
527 println!(
528 "Invalid input for '{}' (expected {:?}), declining.",
529 field_name, field.field_type
530 );
531 return Ok(ElicitationResponse::Declined);
532 }
533 }
534 ReadLineResult::Interrupted | ReadLineResult::Eof => {
535 return Ok(ElicitationResponse::Cancelled);
536 }
537 }
538 }
539
540 Ok(ElicitationResponse::Accepted(serde_json::Value::Object(
541 values,
542 )))
543 }
544}
545
546fn build_field_prompt(field: &ElicitationField) -> String {
552 let type_hint = match &field.field_type {
553 ElicitationFieldType::Boolean => " [true/false]",
554 ElicitationFieldType::Integer | ElicitationFieldType::Number => " [number]",
555 ElicitationFieldType::Enum(opts) if !opts.is_empty() => {
556 return format!(
558 "{}{}: ",
559 field.name,
560 field
561 .description
562 .as_deref()
563 .map(|d| format!(" ({d})"))
564 .unwrap_or_default()
565 ) + &format!("[{}]: ", opts.join("/"));
566 }
567 _ => "",
568 };
569 format!(
570 "{}{}{}",
571 field.name,
572 field
573 .description
574 .as_deref()
575 .map(|d| format!(" ({d})"))
576 .unwrap_or_default(),
577 if type_hint.is_empty() {
578 ": ".to_owned()
579 } else {
580 format!("{type_hint}: ")
581 }
582 )
583}
584
585fn coerce_field_value(raw: &str, field_type: &ElicitationFieldType) -> Option<serde_json::Value> {
588 match field_type {
589 ElicitationFieldType::String => Some(serde_json::Value::String(raw.to_owned())),
590 ElicitationFieldType::Boolean => match raw.to_ascii_lowercase().as_str() {
591 "true" | "yes" | "1" => Some(serde_json::Value::Bool(true)),
592 "false" | "no" | "0" => Some(serde_json::Value::Bool(false)),
593 _ => None,
594 },
595 ElicitationFieldType::Integer => raw
596 .parse::<i64>()
597 .ok()
598 .map(|n| serde_json::Value::Number(n.into())),
599 ElicitationFieldType::Number => raw
600 .parse::<f64>()
601 .ok()
602 .and_then(serde_json::Number::from_f64)
603 .map(serde_json::Value::Number),
604 ElicitationFieldType::Enum(opts) => {
605 if opts.iter().any(|o| o == raw) {
606 Some(serde_json::Value::String(raw.to_owned()))
607 } else {
608 None
609 }
610 }
611 }
612}
613
614#[cfg(test)]
615mod tests {
616 use super::*;
617
618 #[test]
619 fn cli_channel_default() {
620 let ch = CliChannel::default();
621 let _ = format!("{ch:?}");
622 }
623
624 #[tokio::test]
625 async fn cli_channel_send_chunk_accumulates() {
626 let mut ch = CliChannel::new();
627 ch.send_chunk("hello").await.unwrap();
628 ch.send_chunk(" ").await.unwrap();
629 ch.send_chunk("world").await.unwrap();
630 assert_eq!(ch.accumulated, "hello world");
631 }
632
633 #[tokio::test]
634 async fn cli_channel_flush_chunks_clears_buffer() {
635 let mut ch = CliChannel::new();
636 ch.send_chunk("test").await.unwrap();
637 ch.flush_chunks().await.unwrap();
638 assert!(ch.accumulated.is_empty());
639 }
640
641 #[test]
642 fn cli_channel_try_recv_returns_none() {
643 let mut ch = CliChannel::new();
644 assert!(ch.try_recv().is_none());
645 }
646
647 #[test]
648 fn cli_channel_new() {
649 let ch = CliChannel::new();
650 assert!(ch.accumulated.is_empty());
651 }
652
653 #[tokio::test]
654 async fn cli_channel_send_returns_ok() {
655 let mut ch = CliChannel::new();
656 ch.send("test message").await.unwrap();
657 }
658
659 #[tokio::test]
660 async fn cli_channel_flush_returns_ok() {
661 let mut ch = CliChannel::new();
662 ch.flush_chunks().await.unwrap();
663 }
664
665 #[tokio::test]
666 async fn image_command_valid_file_stores_in_pending() {
667 use std::io::Write;
668
669 let mut tmp = tempfile::NamedTempFile::new().unwrap();
670 let image_bytes = b"\x89PNG\r\n\x1a\nfake-image-data";
671 tmp.write_all(image_bytes).unwrap();
672 tmp.flush().unwrap();
673
674 let path = tmp.path().to_str().unwrap().to_owned();
675
676 let data = tokio::fs::read(&path).await.unwrap();
677 let filename = std::path::Path::new(&path)
678 .file_name()
679 .and_then(|n| n.to_str())
680 .map(str::to_owned);
681
682 let mut pending_attachments: Vec<Attachment> = Vec::new();
683 pending_attachments.push(Attachment {
684 kind: AttachmentKind::Image,
685 data: data.clone(),
686 filename,
687 });
688
689 assert_eq!(pending_attachments.len(), 1);
690 assert_eq!(pending_attachments[0].data, image_bytes);
691 assert_eq!(pending_attachments[0].kind, AttachmentKind::Image);
692
693 let taken = std::mem::take(&mut pending_attachments);
694 assert!(pending_attachments.is_empty());
695 assert_eq!(taken.len(), 1);
696 }
697
698 #[tokio::test]
699 async fn image_command_missing_file_is_handled_gracefully() {
700 let result = tokio::fs::read("/nonexistent/path/image.png").await;
701 assert!(result.is_err());
702 assert_eq!(result.unwrap_err().kind(), std::io::ErrorKind::NotFound);
703 }
704
705 #[test]
706 fn image_command_empty_args_detected() {
707 let trimmed = "/image";
708 let arg = trimmed.strip_prefix("/image").map_or("", str::trim);
709 assert!(arg.is_empty());
710
711 let trimmed_space = "/image ";
712 let arg_space = trimmed_space.strip_prefix("/image").map_or("", str::trim);
713 assert!(arg_space.is_empty());
714 }
715
716 #[test]
717 fn cli_channel_new_has_empty_accumulated() {
718 let ch = CliChannel::new();
719 assert!(ch.accumulated.is_empty());
720 }
721
722 #[test]
723 fn cli_channel_with_history_constructs_ok() {
724 let ch = CliChannel::with_history(vec![], |_| {});
725 assert!(ch.accumulated.is_empty());
726 }
727
728 #[test]
729 fn input_history_add_and_dedup() {
730 use std::sync::Arc;
731 use std::sync::atomic::{AtomicUsize, Ordering};
732
733 let persisted = Arc::new(AtomicUsize::new(0));
734 let p = persisted.clone();
735 let mut history = InputHistory::new(
736 vec![],
737 Box::new(move |_| {
738 p.fetch_add(1, Ordering::Relaxed);
739 }),
740 );
741 history.add("hello");
742 history.add("hello"); history.add("world");
744 assert_eq!(history.entries().len(), 2);
745 assert_eq!(history.entries()[0], "hello");
746 assert_eq!(persisted.load(Ordering::Relaxed), 2);
747 }
748
749 #[test]
750 fn input_history_ignores_empty() {
751 let mut history = InputHistory::new(vec![], Box::new(|_| {}));
752 history.add("");
753 assert_eq!(history.entries().len(), 0);
754 }
755
756 #[tokio::test]
760 async fn recv_is_cancel_safe_via_mpsc_buffer() {
761 let (tx, rx) = mpsc::channel::<ChannelMessage>(32);
763 let mut ch = CliChannel {
764 accumulated: String::new(),
765 input_rx: Some(rx),
766 pending: None,
767 };
768
769 tx.send(ChannelMessage {
772 text: "hello".to_string(),
773 attachments: vec![],
774 })
775 .await
776 .unwrap();
777
778 drop(ch.recv());
781
782 let result = ch.recv().await.unwrap();
784 assert!(result.is_some());
785 assert_eq!(result.unwrap().text, "hello");
786 }
787
788 #[tokio::test]
789 async fn image_command_absolute_path_is_rejected() {
790 let mut pending: Vec<Attachment> = Vec::new();
791 let mut history = Some(InputHistory::new(vec![], Box::new(|_| {})));
792 let result = process_line(
793 "/image /etc/passwd".to_owned(),
794 false,
795 &mut history,
796 &mut pending,
797 )
798 .await;
799 assert!(matches!(result, Ok(None)));
800 assert!(pending.is_empty());
801 }
802
803 #[tokio::test]
804 async fn image_command_parent_dir_traversal_is_rejected() {
805 let mut pending: Vec<Attachment> = Vec::new();
806 let mut history = Some(InputHistory::new(vec![], Box::new(|_| {})));
807 let result = process_line(
808 "/image ../../../etc/passwd".to_owned(),
809 false,
810 &mut history,
811 &mut pending,
812 )
813 .await;
814 assert!(matches!(result, Ok(None)));
815 assert!(pending.is_empty());
816 }
817}