phi_core/agent_loop/parallel.rs
1use super::config::*;
2use super::core::{agent_loop, agent_loop_continue};
3use super::helpers::derive_config_segment;
4use crate::types::*;
5use chrono::Utc;
6use futures::future::join_all;
7use std::sync::Arc;
8use tokio::sync::mpsc;
9
10/// Internal: run N agent loop branches concurrently and collect their outcomes.
11///
12/// Uses `futures::future::join_all` (not `tokio::spawn`) for the branch futures so
13/// the `'static` bound is not imposed on `AgentLoopConfig` hook fields. Each branch
14/// gets its own forwarding task (`tokio::spawn`) that intercepts `AgentEnd.usage`
15/// and forwards all events to the shared `tx`.
16async fn run_parallel_branches(
17 prompts: Vec<AgentMessage>,
18 contexts: Vec<AgentContext>,
19 configs: Vec<AgentLoopConfig>,
20 tx: &mpsc::UnboundedSender<AgentEvent>,
21 cancel: &tokio_util::sync::CancellationToken,
22) -> Vec<ParallelLoopOutcome> {
23 let branch_futures: Vec<_> = contexts
24 .into_iter()
25 .zip(configs)
26 .enumerate()
27 .map(|(i, (mut ctx, config))| {
28 // Branch loop_ids are set deterministically by the dispatcher
29 // (`agent_loop_parallel` below, lines that build `branch_contexts`). A `None` here
30 // would mean the dispatcher contract was violated upstream — surface that loudly
31 // rather than silently falling back to the empty string.
32 let loop_id = ctx.loop_id.clone().unwrap_or_else(|| {
33 tracing::warn!(
34 "run_parallel_branches: branch context {} missing loop_id; this should \
35 have been set by the dispatcher. Falling back to empty string.",
36 i
37 );
38 String::new()
39 });
40 let prompts = prompts.clone();
41 let main_tx = tx.clone();
42 let cancel = cancel.clone();
43
44 async move {
45 let (branch_tx, branch_rx) = mpsc::unbounded_channel::<AgentEvent>();
46 let (usage_tx, usage_rx) = tokio::sync::oneshot::channel::<Usage>();
47
48 // Record context size BEFORE the branch mutates it.
49 let original_context_len = ctx.messages.len();
50
51 // Forwarder: intercepts AgentEnd for usage, forwards all events to main_tx.
52 // tokio::spawn is valid here: branch_rx, cloned main_tx, and usage_tx are all
53 // owned Send + 'static values.
54 tokio::spawn(async move {
55 let mut branch_rx = branch_rx;
56 let mut last_usage = Usage::default();
57 while let Some(event) = branch_rx.recv().await {
58 if let AgentEvent::AgentEnd { ref usage, .. } = event {
59 last_usage = usage.clone();
60 }
61 main_tx.send(event).ok();
62 }
63 // branch_tx is dropped when agent_loop returns -> recv() yields None ->
64 // send usage back, unblocking usage_rx.await below.
65 usage_tx.send(last_usage).ok();
66 });
67
68 // Route to agent_loop_continue when prompts is empty: the user query
69 // is already in the context (agent_loop_continue mode). Preconditions
70 // (non-empty context, not ending on assistant) are asserted by
71 // agent_loop_parallel before dispatch.
72 let new_messages = if prompts.is_empty() {
73 agent_loop_continue(&mut ctx, &config, branch_tx, cancel).await
74 } else {
75 agent_loop(prompts, &mut ctx, &config, branch_tx, cancel).await
76 };
77 let usage = usage_rx.await.unwrap_or_default();
78
79 ParallelLoopOutcome {
80 config_index: i,
81 loop_id,
82 context: ctx,
83 new_messages,
84 usage,
85 original_context_len,
86 }
87 }
88 })
89 .collect();
90
91 join_all(branch_futures).await
92}
93
94/// Run multiple agent loop configurations concurrently from a shared base context,
95/// evaluate the results with the supplied strategy, and return the selected outcome.
96///
97/// This is the foundation for evaluational parallelism. The standard single-loop case
98/// is a transparent special case: one config + [`crate::evaluation::TransparentEvaluation`].
99///
100/// # Branch cloning
101///
102/// `base_context` is cloned once per config entry. Tools are `Arc`-shared (zero copy);
103/// the message history is deep-cloned so branches start from identical state but diverge
104/// independently. All branches inherit the same `session_id` for traceability.
105///
106/// # Loop IDs
107///
108/// Each branch receives a distinct `loop_id`:
109/// ```text
110/// "{session_id}.{config_segment}.{N}"
111/// ```
112/// where `config_segment` is derived from `config.config_id` or auto-derived from
113/// provider + model + thinking level via [`derive_config_segment`].
114///
115/// # Events
116///
117/// Events from all branches are forwarded to `tx` interleaved. Each branch's
118/// `AgentStart` carries a distinct `loop_id` for demultiplexing. A
119/// [`AgentEvent::ParallelLoopStart`] / [`AgentEvent::ParallelLoopEnd`] pair
120/// brackets the entire parallel execution.
121///
122/// # Session continuity
123///
124/// Feed `selected_context` into [`agent_loop_continue`] to resume the session
125/// normally after the parallel evaluation --- this is a single-loop operation,
126/// not a special session mode.
127///
128/// # `agent_loop_continue` mode
129///
130/// When `prompts` is empty, each branch is dispatched via [`agent_loop_continue`]
131/// instead of [`agent_loop`]. This supports the "resume from existing context"
132/// pattern where the user query is already the last message in `base_context`.
133/// The same preconditions as `agent_loop_continue` apply: `base_context.messages`
134/// must be non-empty and must not end on an assistant message.
135pub async fn agent_loop_parallel(
136 prompts: Vec<AgentMessage>,
137 mut base_context: AgentContext,
138 configs: Vec<AgentLoopConfig>,
139 strategy: Arc<dyn EvaluationStrategy>,
140 tx: mpsc::UnboundedSender<AgentEvent>,
141 cancel: tokio_util::sync::CancellationToken,
142) -> ParallelLoopResult {
143 assert!(
144 !configs.is_empty(),
145 "agent_loop_parallel requires at least one config"
146 );
147
148 // agent_loop_continue mode precondition guards.
149 if prompts.is_empty() {
150 assert!(
151 !base_context.messages.is_empty(),
152 "agent_loop_parallel with empty prompts requires non-empty base_context.messages \
153 (agent_loop_continue mode)"
154 );
155 assert!(
156 base_context.messages.last().map(|m| m.role()) != Some("assistant"),
157 "agent_loop_parallel with empty prompts requires context NOT ending on an \
158 assistant message (agent_loop_continue mode)"
159 );
160 }
161
162 // Ensure shared session / agent identity. `get_or_insert_with` populates the field if None
163 // and returns `&mut String` either way — clone for the owned local we need below.
164 base_context
165 .agent_id
166 .get_or_insert_with(|| uuid::Uuid::new_v4().to_string());
167 let session_id = base_context
168 .session_id
169 .get_or_insert_with(|| uuid::Uuid::new_v4().to_string())
170 .clone();
171
172 // Assign deterministic loop_ids: {session_id}.{config_segment}.{N}
173 let loop_ids: Vec<String> = configs
174 .iter()
175 .enumerate()
176 .map(|(i, cfg)| format!("{}.{}.{}", session_id, derive_config_segment(cfg), i + 1))
177 .collect();
178
179 tx.send(AgentEvent::ParallelLoopStart {
180 session_id: session_id.clone(),
181 loop_ids: loop_ids.clone(),
182 timestamp: Utc::now(),
183 })
184 .ok();
185
186 // Clone base context per branch; set individual loop_ids.
187 let branch_contexts: Vec<AgentContext> = loop_ids
188 .iter()
189 .map(|lid| {
190 let mut ctx = base_context.clone();
191 ctx.loop_id = Some(lid.clone());
192 ctx
193 })
194 .collect();
195
196 let outcomes =
197 run_parallel_branches(prompts.clone(), branch_contexts, configs, &tx, &cancel).await;
198
199 let (decision, eval_usage) = strategy.evaluate(&prompts, &outcomes, &tx, cancel).await;
200 let selected_index = match decision {
201 EvaluationDecision::Select(i) => i.min(outcomes.len() - 1),
202 };
203
204 tx.send(AgentEvent::ParallelLoopEnd {
205 session_id,
206 selected_loop_id: outcomes[selected_index].loop_id.clone(),
207 selected_config_index: selected_index,
208 evaluation_usage: eval_usage.clone(),
209 timestamp: Utc::now(),
210 })
211 .ok();
212
213 let total_usage = outcomes
214 .iter()
215 .fold(Usage::default(), |acc, o| acc.combine(&o.usage))
216 .combine(&eval_usage);
217
218 // Destructure outcomes: pull out the selected one, keep the rest.
219 let mut all_outcomes = outcomes;
220 let selected = all_outcomes.remove(selected_index);
221
222 ParallelLoopResult {
223 selected_context: selected.context,
224 selected_messages: selected.new_messages,
225 selected_index,
226 all_outcomes,
227 total_usage,
228 }
229}