use crate::collect::extract_last_assistant_message;
use anyhow::{Result, bail};
use log::debug;
use zag_agent::session::SessionStore;
pub struct PipeParams {
pub session_ids: Vec<String>,
pub tag: Option<String>,
pub prompt: String,
pub provider: Option<String>,
pub model: Option<String>,
pub root: Option<String>,
pub auto_approve: bool,
pub system_prompt: Option<String>,
pub add_dirs: Vec<String>,
pub size: Option<String>,
pub max_turns: Option<u32>,
pub output: Option<String>,
pub json: bool,
pub quiet: bool,
}
fn resolve_pipe_sessions(
session_ids: &[String],
tag: Option<&str>,
root: Option<&str>,
) -> Result<Vec<String>> {
let mut ids = session_ids.to_vec();
if let Some(t) = tag {
let store = SessionStore::load(root)?;
let tagged = store.find_by_tag(t);
if tagged.is_empty() && ids.is_empty() {
bail!("No sessions found with tag '{t}'");
}
for entry in tagged {
if !ids.contains(&entry.session_id) {
ids.push(entry.session_id.clone());
}
}
}
if ids.is_empty() {
bail!("No sessions specified. Provide session IDs or --tag.");
}
Ok(ids)
}
fn build_context(session_ids: &[String], root: Option<&str>) -> Result<String> {
let mut parts = Vec::new();
for (i, id) in session_ids.iter().enumerate() {
let result = extract_last_assistant_message(id, root);
match result {
Some(text) => {
if session_ids.len() == 1 {
parts.push(format!(
"<session-result session=\"{}\">\n{}\n</session-result>",
&id[..id.len().min(8)],
text
));
} else {
parts.push(format!(
"<session-result index=\"{}\" session=\"{}\">\n{}\n</session-result>",
i + 1,
&id[..id.len().min(8)],
text
));
}
}
None => {
log::warn!("No result found for session {id}");
}
}
}
if parts.is_empty() {
bail!("No results available from the specified sessions");
}
Ok(parts.join("\n\n"))
}
pub async fn pipe_sessions(params: &PipeParams) -> Result<zag_agent::output::AgentOutput> {
let session_ids = resolve_pipe_sessions(
¶ms.session_ids,
params.tag.as_deref(),
params.root.as_deref(),
)?;
debug!(
"Pipe: collecting results from {} session(s)",
session_ids.len()
);
let context = build_context(&session_ids, params.root.as_deref())?;
let full_prompt = format!(
"Here are results from previous agent sessions:\n\n{}\n\n{}",
context, params.prompt
);
debug!(
"Pipe: running exec with combined prompt ({} bytes)",
full_prompt.len()
);
let provider =
zag_agent::config::resolve_provider(params.provider.as_deref(), params.root.as_deref())?;
let mut builder = zag_agent::builder::AgentBuilder::new().provider(&provider);
if let Some(ref model) = params.model {
builder = builder.model(model);
}
if let Some(ref root) = params.root {
builder = builder.root(root);
}
if params.auto_approve {
builder = builder.auto_approve(true);
}
if let Some(ref sp) = params.system_prompt {
builder = builder.system_prompt(sp);
}
for dir in ¶ms.add_dirs {
builder = builder.add_dir(dir);
}
if let Some(ref size) = params.size {
builder = builder.size(size);
}
if let Some(turns) = params.max_turns {
builder = builder.max_turns(turns);
}
if params.quiet {
builder = builder.quiet(true);
}
builder.exec(&full_prompt).await
}
pub async fn run_pipe(params: PipeParams) -> Result<()> {
let output = pipe_sessions(¶ms).await?;
let format = params
.output
.as_deref()
.unwrap_or(if params.json { "json" } else { "text" });
match format {
"json" => println!("{}", serde_json::to_string(&output)?),
"json-pretty" => println!("{}", serde_json::to_string_pretty(&output)?),
_ => {
if let Some(text) = output.final_result() {
println!("{text}");
}
}
}
Ok(())
}
#[cfg(test)]
#[path = "pipe_tests.rs"]
mod tests;