use std::collections::VecDeque;
use std::io::{BufReader, IsTerminal};
use tokio::sync::mpsc;
use zeph_core::channel::{Attachment, AttachmentKind, Channel, ChannelError, ChannelMessage};
use crate::line_editor::{self, ReadLineResult};
const STDIN_CHANNEL_CAPACITY: usize = 32;
type PersistFn = Box<dyn Fn(&str) + Send>;
struct InputHistory {
entries: VecDeque<String>,
persist_fn: PersistFn,
max_len: usize,
}
impl InputHistory {
fn new(entries: Vec<String>, persist_fn: PersistFn) -> Self {
Self {
entries: VecDeque::from(entries),
persist_fn,
max_len: 1000,
}
}
fn entries(&self) -> &VecDeque<String> {
&self.entries
}
fn add(&mut self, line: &str) {
if line.is_empty() {
return;
}
if self.entries.back().is_some_and(|last| last == line) {
return;
}
if self.entries.len() == self.max_len {
self.entries.pop_front();
}
self.entries.push_back(line.to_owned());
(self.persist_fn)(line);
}
}
impl std::fmt::Debug for InputHistory {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InputHistory")
.field("entries_len", &self.entries.len())
.finish_non_exhaustive()
}
}
async fn process_line(
line: String,
is_tty: bool,
history: &mut Option<InputHistory>,
pending_attachments: &mut Vec<Attachment>,
) -> Result<Option<ChannelMessage>, ()> {
let trimmed = line.trim();
match trimmed {
"exit" | "quit" | "/exit" | "/quit" => return Err(()),
"" => {
if is_tty {
return Err(());
}
return Ok(None);
}
_ => {}
}
if let Some(h) = history {
h.add(trimmed);
}
if let Some(path) = trimmed.strip_prefix("/image").map(str::trim) {
if path.is_empty() {
println!("Zeph: Usage: /image <path>");
return Ok(None);
}
let path_owned = path.to_owned();
match tokio::fs::read(&path_owned).await {
Err(e) => {
println!("Zeph: File not found: {path_owned}: {e}");
}
Ok(data) => {
let filename = std::path::Path::new(&path_owned)
.file_name()
.and_then(|n| n.to_str())
.map(str::to_owned);
let size = data.len();
pending_attachments.push(Attachment {
kind: AttachmentKind::Image,
data,
filename,
});
println!("Zeph: Image attached: {path_owned} ({size} bytes). Send your message.");
}
}
return Ok(None);
}
let attachments = std::mem::take(pending_attachments);
Ok(Some(ChannelMessage {
text: trimmed.to_string(),
attachments,
}))
}
async fn run_tty_reader(mut history: Option<InputHistory>, tx: mpsc::Sender<ChannelMessage>) {
let mut pending_attachments: Vec<Attachment> = Vec::new();
loop {
let entries: Vec<String> = history
.as_ref()
.map(|h| h.entries().iter().cloned().collect())
.unwrap_or_default();
let Ok(Ok(result)) =
tokio::task::spawn_blocking(move || line_editor::read_line("You: ", &entries)).await
else {
break;
};
let line = match result {
ReadLineResult::Interrupted | ReadLineResult::Eof => break,
ReadLineResult::Line(l) => l,
};
match process_line(line, true, &mut history, &mut pending_attachments).await {
Err(()) => break,
Ok(None) => {}
Ok(Some(msg)) => {
if tx.send(msg).await.is_err() {
break;
}
}
}
}
}
async fn run_piped_reader(mut history: Option<InputHistory>, tx: mpsc::Sender<ChannelMessage>) {
tracing::debug!("stdin is not a terminal, using piped input mode");
let (line_tx, mut line_rx) = mpsc::channel::<Result<ReadLineResult, std::io::Error>>(1);
std::thread::spawn(move || {
let stdin = std::io::stdin();
let mut reader = BufReader::new(stdin);
loop {
let result = line_editor::read_line_piped(&mut reader);
let is_eof = matches!(result, Ok(ReadLineResult::Eof));
if line_tx.blocking_send(result).is_err() || is_eof {
break;
}
}
});
let mut pending_attachments: Vec<Attachment> = Vec::new();
loop {
let Some(Ok(result)) = line_rx.recv().await else {
break;
};
let line = match result {
ReadLineResult::Interrupted | ReadLineResult::Eof => break,
ReadLineResult::Line(l) => l,
};
match process_line(line, false, &mut history, &mut pending_attachments).await {
Err(()) => break,
Ok(None) => {}
Ok(Some(msg)) => {
if tx.send(msg).await.is_err() {
break;
}
}
}
}
}
fn spawn_stdin_reader(
is_tty: bool,
history: Option<InputHistory>,
tx: mpsc::Sender<ChannelMessage>,
) {
tokio::spawn(async move {
if is_tty {
run_tty_reader(history, tx).await;
} else {
run_piped_reader(history, tx).await;
}
});
}
struct PendingReader {
history: Option<InputHistory>,
is_tty: bool,
}
impl std::fmt::Debug for PendingReader {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PendingReader")
.field("is_tty", &self.is_tty)
.finish_non_exhaustive()
}
}
#[derive(Debug)]
pub struct CliChannel {
accumulated: String,
input_rx: Option<mpsc::Receiver<ChannelMessage>>,
pending: Option<PendingReader>,
}
impl CliChannel {
#[must_use]
pub fn new() -> Self {
let is_tty = std::io::stdin().is_terminal();
Self {
accumulated: String::new(),
input_rx: None,
pending: Some(PendingReader {
history: None,
is_tty,
}),
}
}
#[must_use]
pub fn with_history(entries: Vec<String>, persist_fn: impl Fn(&str) + Send + 'static) -> Self {
let is_tty = std::io::stdin().is_terminal();
let history = InputHistory::new(entries, Box::new(persist_fn));
Self {
accumulated: String::new(),
input_rx: None,
pending: Some(PendingReader {
history: Some(history),
is_tty,
}),
}
}
fn ensure_reader(&mut self) -> &mut mpsc::Receiver<ChannelMessage> {
if self.input_rx.is_none() {
let pending = self
.pending
.take()
.expect("PendingReader consumed before input_rx was set");
let (tx, rx) = mpsc::channel(STDIN_CHANNEL_CAPACITY);
spawn_stdin_reader(pending.is_tty, pending.history, tx);
self.input_rx = Some(rx);
}
self.input_rx.as_mut().expect("input_rx set above")
}
}
impl Default for CliChannel {
fn default() -> Self {
Self::new()
}
}
impl Channel for CliChannel {
async fn recv(&mut self) -> Result<Option<ChannelMessage>, ChannelError> {
Ok(self.ensure_reader().recv().await)
}
async fn send(&mut self, text: &str) -> Result<(), ChannelError> {
println!("Zeph: {text}");
Ok(())
}
async fn send_chunk(&mut self, chunk: &str) -> Result<(), ChannelError> {
use std::io::{Write, stdout};
print!("{chunk}");
stdout().flush()?;
self.accumulated.push_str(chunk);
Ok(())
}
async fn flush_chunks(&mut self) -> Result<(), ChannelError> {
println!();
self.accumulated.clear();
Ok(())
}
async fn confirm(&mut self, prompt: &str) -> Result<bool, ChannelError> {
if !std::io::stdin().is_terminal() {
tracing::debug!("non-interactive stdin, auto-declining confirmation");
return Ok(false);
}
let prompt = format!("{prompt} [y/N]: ");
let result = tokio::task::spawn_blocking(move || line_editor::read_line(&prompt, &[]))
.await
.map_err(|e| ChannelError::Other(e.to_string()))?
.map_err(ChannelError::Io)?;
match result {
ReadLineResult::Line(line) => Ok(line.trim().eq_ignore_ascii_case("y")),
ReadLineResult::Interrupted | ReadLineResult::Eof => Ok(false),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn cli_channel_default() {
let ch = CliChannel::default();
let _ = format!("{ch:?}");
}
#[tokio::test]
async fn cli_channel_send_chunk_accumulates() {
let mut ch = CliChannel::new();
ch.send_chunk("hello").await.unwrap();
ch.send_chunk(" ").await.unwrap();
ch.send_chunk("world").await.unwrap();
assert_eq!(ch.accumulated, "hello world");
}
#[tokio::test]
async fn cli_channel_flush_chunks_clears_buffer() {
let mut ch = CliChannel::new();
ch.send_chunk("test").await.unwrap();
ch.flush_chunks().await.unwrap();
assert!(ch.accumulated.is_empty());
}
#[test]
fn cli_channel_try_recv_returns_none() {
let mut ch = CliChannel::new();
assert!(ch.try_recv().is_none());
}
#[test]
fn cli_channel_new() {
let ch = CliChannel::new();
assert!(ch.accumulated.is_empty());
}
#[tokio::test]
async fn cli_channel_send_returns_ok() {
let mut ch = CliChannel::new();
ch.send("test message").await.unwrap();
}
#[tokio::test]
async fn cli_channel_flush_returns_ok() {
let mut ch = CliChannel::new();
ch.flush_chunks().await.unwrap();
}
#[tokio::test]
async fn image_command_valid_file_stores_in_pending() {
use std::io::Write;
let mut tmp = tempfile::NamedTempFile::new().unwrap();
let image_bytes = b"\x89PNG\r\n\x1a\nfake-image-data";
tmp.write_all(image_bytes).unwrap();
tmp.flush().unwrap();
let path = tmp.path().to_str().unwrap().to_owned();
let data = tokio::fs::read(&path).await.unwrap();
let filename = std::path::Path::new(&path)
.file_name()
.and_then(|n| n.to_str())
.map(str::to_owned);
let mut pending_attachments: Vec<Attachment> = Vec::new();
pending_attachments.push(Attachment {
kind: AttachmentKind::Image,
data: data.clone(),
filename,
});
assert_eq!(pending_attachments.len(), 1);
assert_eq!(pending_attachments[0].data, image_bytes);
assert_eq!(pending_attachments[0].kind, AttachmentKind::Image);
let taken = std::mem::take(&mut pending_attachments);
assert!(pending_attachments.is_empty());
assert_eq!(taken.len(), 1);
}
#[tokio::test]
async fn image_command_missing_file_is_handled_gracefully() {
let result = tokio::fs::read("/nonexistent/path/image.png").await;
assert!(result.is_err());
assert_eq!(result.unwrap_err().kind(), std::io::ErrorKind::NotFound);
}
#[test]
fn image_command_empty_args_detected() {
let trimmed = "/image";
let arg = trimmed.strip_prefix("/image").map_or("", str::trim);
assert!(arg.is_empty());
let trimmed_space = "/image ";
let arg_space = trimmed_space.strip_prefix("/image").map_or("", str::trim);
assert!(arg_space.is_empty());
}
#[test]
fn cli_channel_new_has_empty_accumulated() {
let ch = CliChannel::new();
assert!(ch.accumulated.is_empty());
}
#[test]
fn cli_channel_with_history_constructs_ok() {
let ch = CliChannel::with_history(vec![], |_| {});
assert!(ch.accumulated.is_empty());
}
#[test]
fn input_history_add_and_dedup() {
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
let persisted = Arc::new(AtomicUsize::new(0));
let p = persisted.clone();
let mut history = InputHistory::new(
vec![],
Box::new(move |_| {
p.fetch_add(1, Ordering::Relaxed);
}),
);
history.add("hello");
history.add("hello"); history.add("world");
assert_eq!(history.entries().len(), 2);
assert_eq!(history.entries()[0], "hello");
assert_eq!(persisted.load(Ordering::Relaxed), 2);
}
#[test]
fn input_history_ignores_empty() {
let mut history = InputHistory::new(vec![], Box::new(|_| {}));
history.add("");
assert_eq!(history.entries().len(), 0);
}
#[tokio::test]
async fn recv_is_cancel_safe_via_mpsc_buffer() {
let (tx, rx) = mpsc::channel::<ChannelMessage>(32);
let mut ch = CliChannel {
accumulated: String::new(),
input_rx: Some(rx),
pending: None,
};
tx.send(ChannelMessage {
text: "hello".to_string(),
attachments: vec![],
})
.await
.unwrap();
drop(ch.recv());
let result = ch.recv().await.unwrap();
assert!(result.is_some());
assert_eq!(result.unwrap().text, "hello");
}
}