vectorless 0.1.32

Reasoning-based Document Engine
Documentation
// Copyright (c) 2026 vectorless developers
// SPDX-License-Identifier: Apache-2.0

//! Phase 2: Dispatch Workers and collect results.

use tracing::{info, warn};

use crate::llm::LlmClient;

use super::super::Agent;
use super::super::config::{AgentConfig, WorkspaceContext};
use super::super::events::EventEmitter;
use super::super::prompts::DispatchEntry;
use super::super::state::OrchestratorState;
use super::super::worker::Worker;
use crate::query::QueryPlan;

/// Dispatch Workers in parallel and collect results.
pub async fn dispatch_and_collect(
    query: &str,
    dispatches: &[DispatchEntry],
    ws: &WorkspaceContext<'_>,
    config: &AgentConfig,
    llm: &LlmClient,
    state: &mut OrchestratorState,
    emitter: &EventEmitter,
    query_plan: &QueryPlan,
) {
    let futures: Vec<_> = dispatches
        .iter()
        .filter_map(|dispatch| {
            let doc = match ws.doc(dispatch.doc_idx) {
                Some(d) => d,
                None => {
                    warn!(doc_idx = dispatch.doc_idx, "Document not found, skipping");
                    return None;
                }
            };

            let query = query.to_string();
            let task = dispatch.task.clone();
            let worker_config = config.worker.clone();
            let doc_idx = dispatch.doc_idx;
            let doc_name = doc.doc_name.to_string();
            let llm = llm.clone();
            let sub_emitter = EventEmitter::noop();
            let worker_plan = query_plan.clone();

            Some(async move {
                emitter.emit_worker_dispatched(doc_idx, &doc_name, &task, &[]);
                let worker = Worker::new(
                    &query,
                    Some(&task),
                    doc,
                    worker_config,
                    llm,
                    sub_emitter,
                    worker_plan,
                );
                let result = worker.run().await;
                (doc_idx, doc_name, result)
            })
        })
        .collect();

    let results: Vec<_> = futures::future::join_all(futures).await;

    for (doc_idx, doc_name, result) in results {
        match result {
            Ok(output) => {
                info!(
                    doc_idx,
                    evidence = output.evidence.len(),
                    "Worker completed"
                );
                emitter.emit_worker_completed(
                    doc_idx,
                    &doc_name,
                    output.evidence.len(),
                    output.metrics.rounds_used,
                    output.metrics.llm_calls,
                    true,
                );
                state.collect_result(doc_idx, output);
            }
            Err(e) => {
                warn!(doc_idx, error = %e, "Worker failed");
                emitter.emit_worker_completed(doc_idx, &doc_name, 0, 0, 0, false);
            }
        }
    }
}