mod analyze;
mod dispatch;
mod evaluate;
mod replan;
use tracing::info;
use crate::llm::LlmClient;
use crate::query::QueryPlan;
use super::Agent;
use super::config::{AgentConfig, Output, WorkspaceContext};
use super::events::EventEmitter;
use super::state::OrchestratorState;
use super::tools::orchestrator as orch_tools;
use analyze::{AnalyzeOutcome, analyze};
use evaluate::evaluate;
use replan::replan;
const MAX_SUPERVISOR_ITERATIONS: u32 = 3;
pub struct Orchestrator<'a> {
query: String,
ws: &'a WorkspaceContext<'a>,
config: AgentConfig,
llm: LlmClient,
emitter: EventEmitter,
skip_analysis: bool,
query_plan: QueryPlan,
}
impl<'a> Orchestrator<'a> {
pub fn new(
query: &str,
ws: &'a WorkspaceContext<'a>,
config: AgentConfig,
llm: LlmClient,
emitter: EventEmitter,
skip_analysis: bool,
query_plan: QueryPlan,
) -> Self {
Self {
query: query.to_string(),
ws,
config,
llm,
emitter,
skip_analysis,
query_plan,
}
}
}
impl<'a> Agent for Orchestrator<'a> {
type Output = Output;
fn name(&self) -> &str {
"orchestrator"
}
async fn run(self) -> crate::error::Result<Output> {
let Orchestrator {
query,
ws,
config,
llm,
emitter,
skip_analysis,
query_plan,
} = self;
info!(
docs = ws.doc_count(),
skip_analysis,
intent = %query_plan.intent,
complexity = %query_plan.complexity,
"Orchestrator starting"
);
emitter.emit_orchestrator_started(&query, ws.doc_count(), skip_analysis);
let mut state = OrchestratorState::new();
let mut orch_llm_calls: u32 = 0;
let initial_dispatches = match analyze(
&query,
ws,
&mut state,
&emitter,
skip_analysis,
&query_plan,
&llm,
)
.await?
{
AnalyzeOutcome::Proceed {
dispatches,
llm_calls,
} => {
orch_llm_calls += llm_calls;
dispatches
}
AnalyzeOutcome::AlreadyAnswered { llm_calls } => {
let mut output = Output::empty();
output.answer = "Already answered by cross-document search.".to_string();
emitter.emit_orchestrator_completed(0, orch_llm_calls + llm_calls, 0);
return Ok(output);
}
AnalyzeOutcome::NoResults { llm_calls } => {
emitter.emit_orchestrator_completed(0, orch_llm_calls + llm_calls, 0);
return Ok(Output::empty());
}
};
let mut current_dispatches = initial_dispatches;
let mut iteration: u32 = 0;
let mut eval_sufficient = false;
loop {
if iteration >= MAX_SUPERVISOR_ITERATIONS {
info!(iteration, "Supervisor loop budget exhausted");
break;
}
if !current_dispatches.is_empty() {
info!(
docs = current_dispatches.len(),
docs_list = ?current_dispatches.iter().map(|d| d.doc_idx).collect::<Vec<_>>(),
iteration,
"Dispatching Workers"
);
dispatch::dispatch_and_collect(
&query,
¤t_dispatches,
ws,
&config,
&llm,
&mut state,
&emitter,
&query_plan,
)
.await;
}
if state.all_evidence.is_empty() {
info!("No evidence collected from any Worker");
break;
}
if skip_analysis {
eval_sufficient = !state.all_evidence.is_empty();
break;
}
let eval_result = evaluate(&query, &state.all_evidence, &llm).await?;
orch_llm_calls += 1;
if eval_result.sufficient {
eval_sufficient = true;
info!(
evidence = state.all_evidence.len(),
iteration, "Evidence sufficient — exiting supervisor loop"
);
break;
}
info!(
evidence = state.all_evidence.len(),
missing = eval_result.missing_info.len(),
iteration,
"Evidence insufficient — replanning"
);
let doc_cards_text = orch_tools::ls_docs(ws).feedback;
let replan_result = replan(
&query,
&eval_result.missing_info,
&state.all_evidence,
&state.dispatched,
ws.doc_count(),
&doc_cards_text,
&llm,
)
.await?;
orch_llm_calls += 1;
if replan_result.dispatches.is_empty() {
info!("Replan produced no new dispatches — exiting supervisor loop");
break;
}
current_dispatches = replan_result.dispatches;
iteration += 1;
}
let confidence =
compute_confidence(eval_sufficient, iteration, state.all_evidence.is_empty());
if state.all_evidence.is_empty() {
emitter.emit_orchestrator_completed(0, orch_llm_calls, 0);
return Ok(state.into_output(String::new()));
}
let multi_doc = ws.doc_count() > 1;
finalize_output(
&query,
&state,
&emitter,
orch_llm_calls,
multi_doc,
query_plan.intent,
confidence,
)
.await
}
}
fn compute_confidence(eval_sufficient: bool, replan_rounds: u32, no_evidence: bool) -> f32 {
if no_evidence {
return 0.0;
}
if eval_sufficient {
(0.95 - replan_rounds as f32 * 0.15).max(0.5)
} else {
(0.4 - replan_rounds as f32 * 0.1).max(0.1)
}
}
pub async fn finalize_output(
query: &str,
state: &OrchestratorState,
emitter: &EventEmitter,
orch_llm_calls: u32,
multi_doc: bool,
intent: crate::query::QueryIntent,
confidence: f32,
) -> crate::error::Result<Output> {
let rerank_result =
crate::rerank::process(query, &state.all_evidence, multi_doc, intent, confidence).await?;
let total_llm_calls = orch_llm_calls + rerank_result.llm_calls;
if !rerank_result.answer.is_empty() {
emitter.emit_answer_completed(rerank_result.answer.len(), "medium");
}
let mut output = state.clone_results_into_output(rerank_result.answer);
output.metrics.llm_calls += total_llm_calls;
output.confidence = rerank_result.confidence;
emitter.emit_orchestrator_completed(
output.evidence.len(),
output.metrics.llm_calls,
output.metrics.rounds_used,
);
info!(
evidence = output.evidence.len(),
llm_calls = output.metrics.llm_calls,
confidence = output.confidence,
"Orchestrator complete"
);
Ok(output)
}