Skip to main content

phi_core/agent_loop/
parallel.rs

1use super::config::*;
2use super::core::{agent_loop, agent_loop_continue};
3use super::helpers::derive_config_segment;
4use crate::types::*;
5use chrono::Utc;
6use futures::future::join_all;
7use std::sync::Arc;
8use tokio::sync::mpsc;
9
10/// Internal: run N agent loop branches concurrently and collect their outcomes.
11///
12/// Uses `futures::future::join_all` (not `tokio::spawn`) for the branch futures so
13/// the `'static` bound is not imposed on `AgentLoopConfig` hook fields. Each branch
14/// gets its own forwarding task (`tokio::spawn`) that intercepts `AgentEnd.usage`
15/// and forwards all events to the shared `tx`.
16async fn run_parallel_branches(
17    prompts: Vec<AgentMessage>,
18    contexts: Vec<AgentContext>,
19    configs: Vec<AgentLoopConfig>,
20    tx: &mpsc::UnboundedSender<AgentEvent>,
21    cancel: &tokio_util::sync::CancellationToken,
22) -> Vec<ParallelLoopOutcome> {
23    let branch_futures: Vec<_> = contexts
24        .into_iter()
25        .zip(configs)
26        .enumerate()
27        .map(|(i, (mut ctx, config))| {
28            // Branch loop_ids are set deterministically by the dispatcher
29            // (`agent_loop_parallel` below, lines that build `branch_contexts`). A `None` here
30            // would mean the dispatcher contract was violated upstream — surface that loudly
31            // rather than silently falling back to the empty string.
32            let loop_id = ctx.loop_id.clone().unwrap_or_else(|| {
33                tracing::warn!(
34                    "run_parallel_branches: branch context {} missing loop_id; this should \
35                     have been set by the dispatcher. Falling back to empty string.",
36                    i
37                );
38                String::new()
39            });
40            let prompts = prompts.clone();
41            let main_tx = tx.clone();
42            let cancel = cancel.clone();
43
44            async move {
45                let (branch_tx, branch_rx) = mpsc::unbounded_channel::<AgentEvent>();
46                let (usage_tx, usage_rx) = tokio::sync::oneshot::channel::<Usage>();
47
48                // Record context size BEFORE the branch mutates it.
49                let original_context_len = ctx.messages.len();
50
51                // Forwarder: intercepts AgentEnd for usage, forwards all events to main_tx.
52                // tokio::spawn is valid here: branch_rx, cloned main_tx, and usage_tx are all
53                // owned Send + 'static values.
54                tokio::spawn(async move {
55                    let mut branch_rx = branch_rx;
56                    let mut last_usage = Usage::default();
57                    while let Some(event) = branch_rx.recv().await {
58                        if let AgentEvent::AgentEnd { ref usage, .. } = event {
59                            last_usage = usage.clone();
60                        }
61                        main_tx.send(event).ok();
62                    }
63                    // branch_tx is dropped when agent_loop returns -> recv() yields None ->
64                    // send usage back, unblocking usage_rx.await below.
65                    usage_tx.send(last_usage).ok();
66                });
67
68                // Route to agent_loop_continue when prompts is empty: the user query
69                // is already in the context (agent_loop_continue mode). Preconditions
70                // (non-empty context, not ending on assistant) are asserted by
71                // agent_loop_parallel before dispatch.
72                let new_messages = if prompts.is_empty() {
73                    agent_loop_continue(&mut ctx, &config, branch_tx, cancel).await
74                } else {
75                    agent_loop(prompts, &mut ctx, &config, branch_tx, cancel).await
76                };
77                let usage = usage_rx.await.unwrap_or_default();
78
79                ParallelLoopOutcome {
80                    config_index: i,
81                    loop_id,
82                    context: ctx,
83                    new_messages,
84                    usage,
85                    original_context_len,
86                }
87            }
88        })
89        .collect();
90
91    join_all(branch_futures).await
92}
93
94/// Run multiple agent loop configurations concurrently from a shared base context,
95/// evaluate the results with the supplied strategy, and return the selected outcome.
96///
97/// This is the foundation for evaluational parallelism. The standard single-loop case
98/// is a transparent special case: one config + [`crate::evaluation::TransparentEvaluation`].
99///
100/// # Branch cloning
101///
102/// `base_context` is cloned once per config entry. Tools are `Arc`-shared (zero copy);
103/// the message history is deep-cloned so branches start from identical state but diverge
104/// independently. All branches inherit the same `session_id` for traceability.
105///
106/// # Loop IDs
107///
108/// Each branch receives a distinct `loop_id`:
109/// ```text
110/// "{session_id}.{config_segment}.{N}"
111/// ```
112/// where `config_segment` is derived from `config.config_id` or auto-derived from
113/// provider + model + thinking level via [`derive_config_segment`].
114///
115/// # Events
116///
117/// Events from all branches are forwarded to `tx` interleaved. Each branch's
118/// `AgentStart` carries a distinct `loop_id` for demultiplexing. A
119/// [`AgentEvent::ParallelLoopStart`] / [`AgentEvent::ParallelLoopEnd`] pair
120/// brackets the entire parallel execution.
121///
122/// # Session continuity
123///
124/// Feed `selected_context` into [`agent_loop_continue`] to resume the session
125/// normally after the parallel evaluation --- this is a single-loop operation,
126/// not a special session mode.
127///
128/// # `agent_loop_continue` mode
129///
130/// When `prompts` is empty, each branch is dispatched via [`agent_loop_continue`]
131/// instead of [`agent_loop`]. This supports the "resume from existing context"
132/// pattern where the user query is already the last message in `base_context`.
133/// The same preconditions as `agent_loop_continue` apply: `base_context.messages`
134/// must be non-empty and must not end on an assistant message.
135pub async fn agent_loop_parallel(
136    prompts: Vec<AgentMessage>,
137    mut base_context: AgentContext,
138    configs: Vec<AgentLoopConfig>,
139    strategy: Arc<dyn EvaluationStrategy>,
140    tx: mpsc::UnboundedSender<AgentEvent>,
141    cancel: tokio_util::sync::CancellationToken,
142) -> ParallelLoopResult {
143    assert!(
144        !configs.is_empty(),
145        "agent_loop_parallel requires at least one config"
146    );
147
148    // agent_loop_continue mode precondition guards.
149    if prompts.is_empty() {
150        assert!(
151            !base_context.messages.is_empty(),
152            "agent_loop_parallel with empty prompts requires non-empty base_context.messages \
153             (agent_loop_continue mode)"
154        );
155        assert!(
156            base_context.messages.last().map(|m| m.role()) != Some("assistant"),
157            "agent_loop_parallel with empty prompts requires context NOT ending on an \
158             assistant message (agent_loop_continue mode)"
159        );
160    }
161
162    // Ensure shared session / agent identity. `get_or_insert_with` populates the field if None
163    // and returns `&mut String` either way — clone for the owned local we need below.
164    base_context
165        .agent_id
166        .get_or_insert_with(|| uuid::Uuid::new_v4().to_string());
167    let session_id = base_context
168        .session_id
169        .get_or_insert_with(|| uuid::Uuid::new_v4().to_string())
170        .clone();
171
172    // Assign deterministic loop_ids: {session_id}.{config_segment}.{N}
173    let loop_ids: Vec<String> = configs
174        .iter()
175        .enumerate()
176        .map(|(i, cfg)| format!("{}.{}.{}", session_id, derive_config_segment(cfg), i + 1))
177        .collect();
178
179    tx.send(AgentEvent::ParallelLoopStart {
180        session_id: session_id.clone(),
181        loop_ids: loop_ids.clone(),
182        timestamp: Utc::now(),
183    })
184    .ok();
185
186    // Clone base context per branch; set individual loop_ids.
187    let branch_contexts: Vec<AgentContext> = loop_ids
188        .iter()
189        .map(|lid| {
190            let mut ctx = base_context.clone();
191            ctx.loop_id = Some(lid.clone());
192            ctx
193        })
194        .collect();
195
196    let outcomes =
197        run_parallel_branches(prompts.clone(), branch_contexts, configs, &tx, &cancel).await;
198
199    let (decision, eval_usage) = strategy.evaluate(&prompts, &outcomes, &tx, cancel).await;
200    let selected_index = match decision {
201        EvaluationDecision::Select(i) => i.min(outcomes.len() - 1),
202    };
203
204    tx.send(AgentEvent::ParallelLoopEnd {
205        session_id,
206        selected_loop_id: outcomes[selected_index].loop_id.clone(),
207        selected_config_index: selected_index,
208        evaluation_usage: eval_usage.clone(),
209        timestamp: Utc::now(),
210    })
211    .ok();
212
213    let total_usage = outcomes
214        .iter()
215        .fold(Usage::default(), |acc, o| acc.combine(&o.usage))
216        .combine(&eval_usage);
217
218    // Destructure outcomes: pull out the selected one, keep the rest.
219    let mut all_outcomes = outcomes;
220    let selected = all_outcomes.remove(selected_index);
221
222    ParallelLoopResult {
223        selected_context: selected.context,
224        selected_messages: selected.new_messages,
225        selected_index,
226        all_outcomes,
227        total_usage,
228    }
229}