use crate::agent_blueprint::AgentBlueprint;
use crate::ai_provider::AiRequest;
use crate::Event;
use tokio::sync::{broadcast, mpsc, oneshot};
#[derive(Debug, Clone)]
pub struct PipelineContext {
pub input: String,
pub output: String,
pub agent: String,
pub stage: usize,
pub total_stages: usize,
}
#[derive(Debug, Clone)]
pub struct PipelineStage {
pub agent_name: String,
pub blueprint: AgentBlueprint,
pub args: String,
}
#[derive(Debug, Clone)]
pub struct PipelineTrace {
pub stages: Vec<PipelineContext>,
pub final_output: String,
}
pub async fn execute_pipeline(
stages: Vec<PipelineStage>,
ai_tx: mpsc::Sender<AiRequest>,
event_tx: broadcast::Sender<Event>,
) -> Result<PipelineTrace, String> {
let total = stages.len();
let mut trace = PipelineTrace {
stages: Vec::with_capacity(total),
final_output: String::new(),
};
let mut previous_output = String::new();
let _ = event_tx.send(Event {
sender: "Pipeline".to_string(),
action: "announces".to_string(),
content: format!("Starting semantic pipeline with {} stages...", total),
});
for (i, stage) in stages.iter().enumerate() {
let stage_num = i + 1;
let _ = event_tx.send(Event {
sender: "Pipeline".to_string(),
action: "announces".to_string(),
content: format!("[{}/{}] Invoking agent '{}'...", stage_num, total, stage.agent_name),
});
let prompt = build_stage_prompt(stage, &previous_output, stage_num, total);
let (resp_tx, resp_rx) = oneshot::channel::<String>();
let request = AiRequest {
agent_name: format!("pipeline:{}", stage.agent_name),
prompt,
is_json_format: false,
is_autonomous: false,
iq_level: stage.blueprint.base_iq,
current_pos: [0.0, 0.0, 0.0],
nearby_objects: String::new(),
response_tx: Some(resp_tx),
};
if ai_tx.send(request).await.is_err() {
return Err(format!("Pipeline stage {}: Failed to send request to AI engine", stage_num));
}
let output = match tokio::time::timeout(
std::time::Duration::from_secs(120),
resp_rx,
).await {
Ok(Ok(text)) => text,
Ok(Err(_)) => return Err(format!("Pipeline stage {}: AI engine dropped the response channel", stage_num)),
Err(_) => return Err(format!("Pipeline stage {}: Timed out after 120s", stage_num)),
};
let preview = if output.len() > 200 {
format!("{}...", &output[..200])
} else {
output.clone()
};
let _ = event_tx.send(Event {
sender: format!("pipeline:{}", stage.agent_name),
action: "speaks".to_string(),
content: preview,
});
let ctx = PipelineContext {
input: if previous_output.is_empty() { stage.args.clone() } else { previous_output.clone() },
output: output.clone(),
agent: stage.agent_name.clone(),
stage: stage_num,
total_stages: total,
};
trace.stages.push(ctx);
previous_output = output;
}
trace.final_output = previous_output;
let _ = event_tx.send(Event {
sender: "Pipeline".to_string(),
action: "announces".to_string(),
content: format!("Pipeline complete. {} stages executed.", total),
});
Ok(trace)
}
fn build_stage_prompt(
stage: &PipelineStage,
previous_output: &str,
stage_num: usize,
total: usize,
) -> String {
let mut prompt = String::new();
prompt.push_str(&format!("SYSTEM: {}\n\n", stage.blueprint.system_prompt));
prompt.push_str(&format!(
"You are stage {}/{} in a semantic pipeline. ",
stage_num, total
));
if !previous_output.is_empty() {
prompt.push_str("The previous agent produced the following output. Use it as your primary input context:\n\n");
prompt.push_str("--- PREVIOUS AGENT OUTPUT ---\n");
prompt.push_str(previous_output);
prompt.push_str("\n--- END PREVIOUS OUTPUT ---\n\n");
}
if !stage.args.is_empty() {
prompt.push_str(&format!("USER REQUEST: {}\n\n", stage.args));
}
prompt.push_str("Respond with your analysis/output directly. Be thorough but concise. Do not wrap in JSON.");
prompt
}
pub fn parse_gsh_file(content: &str) -> Vec<String> {
content
.lines()
.map(|line| line.trim())
.filter(|line| !line.is_empty() && !line.starts_with('#'))
.map(|line| line.to_string())
.collect()
}