#[cfg(test)]
#[path = "input_tests.rs"]
mod tests;
use anyhow::{Result, bail};
use log::debug;
use crate::factory::AgentFactory;
use crate::resume;
use zag_orch::listen;
use zag_orch::messaging;
pub(crate) struct InputParams {
pub session: Option<String>,
pub message: Option<String>,
pub latest: bool,
pub active: bool,
pub ps: Option<String>,
pub input_name: Option<String>,
pub global: bool,
pub stream: bool,
pub output: Option<String>,
pub root: Option<String>,
pub quiet: bool,
pub raw: bool,
pub files: Vec<String>,
}
#[cfg(test)]
pub(crate) fn maybe_wrap_message(message: &str, raw: bool) -> String {
messaging::maybe_wrap_message(message, raw)
}
fn resolve_input_session_id(
session: Option<&str>,
latest: bool,
active: bool,
ps: Option<&str>,
input_name: Option<&str>,
global: bool,
root: Option<&str>,
) -> Result<String> {
if let Some(name) = input_name {
let store = if global {
zag_agent::session::SessionStore::load_all().unwrap_or_default()
} else {
zag_agent::session::SessionStore::load(root).unwrap_or_default()
};
if let Some(entry) = store.find_by_name(name) {
return Ok(entry.session_id.clone());
}
bail!("No session found with name '{name}'");
}
if let Some(ps_value) = ps {
return listen::resolve_session_from_ps(ps_value);
}
if let Some(id) = session {
return Ok(id.to_string());
}
if latest || active {
let log_path = listen::resolve_session_log(None, latest, active, root)?;
if let Some(stem) = log_path.file_stem().and_then(|s| s.to_str()) {
return Ok(stem.to_string());
}
bail!(
"Could not extract session ID from log path: {}",
log_path.display()
);
}
if global {
let global_dir = crate::config::Config::global_base_dir();
if let Ok(index) = zag_agent::session_log::load_global_index(&global_dir) {
if let Some(entry) = index
.sessions
.iter()
.max_by(|a, b| a.started_at.cmp(&b.started_at))
{
return Ok(entry.session_id.clone());
}
}
bail!(
"No sessions found globally. Use --session, --latest, --active, or --ps to specify one."
);
} else {
let store = zag_agent::session::SessionStore::load(root).unwrap_or_default();
if let Some(entry) = store.latest() {
return Ok(entry.session_id.clone());
}
bail!(
"No sessions found. Use --session, --latest, --active, --ps, or --global to specify one."
);
}
}
pub(crate) async fn run_input(params: InputParams) -> Result<()> {
let InputParams {
session,
message,
latest,
active,
ps,
input_name,
global,
stream,
output,
root,
quiet,
raw,
files,
} = params;
let resolved_id = resolve_input_session_id(
session.as_deref(),
latest,
active,
ps.as_deref(),
input_name.as_deref(),
global,
root.as_deref(),
)?;
debug!("Input command: resolved session ID = {resolved_id}");
let fifo = zag_orch::spawn::fifo_path(&resolved_id);
if fifo.exists() {
return send_via_fifo(message.as_deref(), &resolved_id, raw, quiet).await;
}
let target = resume::resolve_resume_target(&resolved_id, root.as_deref())
.ok_or_else(|| anyhow::anyhow!("No session found for '{resolved_id}'"))?;
let provider = &target.entry.provider;
let provider_session_id = target
.entry
.provider_session_id
.as_deref()
.unwrap_or(&resolved_id);
let model = if target.entry.model.is_empty() {
None
} else {
Some(target.entry.model.clone())
};
debug!(
"Input command: provider={provider}, provider_session_id={provider_session_id}, model={model:?}"
);
if !quiet {
log::info!(
"Sending to {} session {}",
crate::capitalize(provider),
&resolved_id[..resolved_id.len().min(8)]
);
}
if stream {
if provider != "claude" {
bail!("Streaming input (--stream) is only supported for Claude sessions");
}
let mut agent = AgentFactory::create(provider, None, model, root.clone(), false, vec![])?;
let claude_agent = agent
.as_any_mut()
.downcast_mut::<crate::claude::Claude>()
.expect("Failed to get Claude agent");
let mut session = claude_agent.execute_streaming_resume(provider_session_id)?;
let stdin_task = {
let stdin = tokio::io::stdin();
let reader = tokio::io::BufReader::new(stdin);
tokio::spawn(async move {
use tokio::io::AsyncBufReadExt;
let mut lines = reader.lines();
let mut messages = Vec::new();
while let Ok(Some(line)) = lines.next_line().await {
if !line.is_empty() {
messages.push(line);
}
}
messages
})
};
let messages = stdin_task.await?;
for msg in messages {
let wrapped = messaging::maybe_wrap_message(&msg, raw);
session.send_user_message(&wrapped).await?;
}
session.close_input();
let output_format = output.as_deref().unwrap_or("text");
while let Some(event) = session.next_event().await? {
match output_format {
"json" | "stream-json" => {
println!("{}", serde_json::to_string(&event)?);
}
_ => {
if let zag_agent::output::Event::AssistantMessage { ref content, .. } = event {
for block in content {
if let zag_agent::output::ContentBlock::Text { text } = block {
print!("{text}");
}
}
}
}
}
}
session.wait().await?;
} else {
let msg = if let Some(m) = message {
m
} else {
let mut buf = String::new();
std::io::Read::read_to_string(&mut std::io::stdin(), &mut buf)?;
let trimmed = buf.trim().to_string();
if trimmed.is_empty() {
bail!("No message provided. Pass a message argument or pipe to stdin.");
}
trimmed
};
let msg = if !files.is_empty() {
let attachments = files
.iter()
.map(|f| zag_agent::attachment::Attachment::from_path(std::path::Path::new(f)))
.collect::<Result<Vec<_>>>()?;
let prefix = zag_agent::attachment::format_attachments_prefix(&attachments);
format!("{prefix}{msg}")
} else {
msg
};
let msg = messaging::maybe_wrap_message(&msg, raw);
debug!("Input command: sending message ({} bytes)", msg.len());
let mut agent = AgentFactory::create(provider, None, model, root.clone(), false, vec![])?;
let output_format = output.as_deref();
if provider == "claude" && output_format == Some("stream-json") {
let claude_agent = agent
.as_any_mut()
.downcast_mut::<crate::claude::Claude>()
.expect("Failed to get Claude agent");
let mut session = claude_agent.execute_streaming_resume(provider_session_id)?;
session.send_user_message(&msg).await?;
session.close_input();
while let Some(event) = session.next_event().await? {
println!("{}", serde_json::to_string(&event)?);
}
session.wait().await?;
} else {
match agent
.run_resume_with_prompt(provider_session_id, &msg)
.await?
{
Some(agent_output) => {
let format = output_format.unwrap_or("text");
match format {
"json" => {
println!("{}", serde_json::to_string(&agent_output)?);
}
"json-pretty" => {
println!("{}", serde_json::to_string_pretty(&agent_output)?);
}
_ => {
if let Some(text) = agent_output.final_result() {
println!("{text}");
}
}
}
}
None => {
if !quiet {
eprintln!("Agent produced no output");
}
}
}
}
}
Ok(())
}
async fn send_via_fifo(
message: Option<&str>,
session_id: &str,
raw: bool,
quiet: bool,
) -> Result<()> {
let msg = if let Some(m) = message {
m.to_string()
} else {
let mut buf = String::new();
std::io::Read::read_to_string(&mut std::io::stdin(), &mut buf)?;
let trimmed = buf.trim().to_string();
if trimmed.is_empty() {
bail!("No message provided. Pass a message argument or pipe to stdin.");
}
trimmed
};
let msg = messaging::maybe_wrap_message(&msg, raw);
debug!(
"Sending to interactive session {} via FIFO ({} bytes)",
session_id,
msg.len()
);
if !quiet {
log::info!(
"Sending to interactive session {}",
&session_id[..session_id.len().min(8)]
);
}
messaging::send_via_fifo(session_id, &msg).await
}