Skip to main content

phi_core/agent_loop/
core.rs

1//! Primary entry points for the agent loop: `agent_loop` and `agent_loop_continue`.
2
3use super::config::*;
4use super::helpers::apply_input_filters;
5use super::run::run_loop;
6use crate::types::*;
7use tokio::sync::mpsc;
8
9/*
10DESIGN: Why agent_loop takes these separate parameters — each plays a different role:
11  `prompts`  = NEW INPUT    — the messages being added THIS call (taken by value; appended to
12                              context; also emitted as MessageStart/End inside the first TurnStart)
13  `context`  = ACCUMULATOR  — the full conversation history (system prompt + all past turns);
14                              mutated in-place as the loop runs each turn
15  `config`   = STATIC       — model, callbacks, limits; never changes within a single call
16  `tx`       = OBSERVER     — channel to push real-time AgentEvents to external callers (UI, logger)
17  `cancel`   = ABORT SIGNAL — cooperative cancellation; any code holding this token can stop the loop
18
19Why return Vec<AgentMessage> (not the whole context)?
20The caller already holds `context` via the `&mut` reference. Returning only the NEW messages
21from this call avoids duplicating the entire history — the caller can append to their own copy.
22*/
23/// Start an agent loop with new prompt messages.
24///
25/// Appends `prompts` to `context`, runs the full hook/event lifecycle (see module doc),
26/// and returns only the messages produced by this call. Events are pushed to `tx` in real time.
27pub async fn agent_loop(
28    prompts: Vec<AgentMessage>, // NEW INPUT — added to context and emitted inside first TurnStart
29    context: &mut AgentContext, // ACCUMULATOR — full history; grows in-place each turn
30    config: &AgentLoopConfig, // STATIC SETTINGS — model, tools, callbacks; unchanged during the loop
31    tx: mpsc::UnboundedSender<AgentEvent>, // OBSERVER — taken by value; all AgentEvents pushed here
32    cancel: tokio_util::sync::CancellationToken, // ABORT — checked between every major step; child tokens for tools
33) -> Vec<AgentMessage> {
34    // Populate identity IDs once at the top, generating UUIDs for any the caller didn't supply.
35    // Writing them back to context lets a subsequent agent_loop_continue() call inherit them.
36    // Hoisting here eliminates the prior `.clone().unwrap()` scatter — every AgentStart/AgentEnd
37    // emission below reuses the owned local copies.
38    let (agent_id, session_id, loop_id) = ensure_loop_ids(context);
39
40    // before_loop hook — fires before AgentStart; false aborts the entire loop
41    if let Some(ref before_loop) = config.before_loop {
42        if !before_loop(&context.messages, 0).await {
43            tx.send(AgentEvent::AgentEnd {
44                loop_id: loop_id.clone(),
45                messages: vec![],
46                usage: Usage::default(),
47                timestamp: chrono::Utc::now(),
48                rejection: None,
49            })
50            .ok();
51            return vec![];
52        }
53    }
54
55    tx.send(AgentEvent::AgentStart {
56        agent_id: agent_id.clone(),
57        session_id: session_id.clone(),
58        loop_id: loop_id.clone(),
59        parent_loop_id: context.parent_loop_id.clone(), // None for origin calls
60        continuation_kind: context
61            .continuation_kind
62            .clone()
63            .unwrap_or(ContinuationKind::Initial),
64        timestamp: chrono::Utc::now(),
65        metadata: None,
66        config_snapshot: Some(build_config_snapshot(config, context)),
67    })
68    .ok();
69
70    // !!!SECURITY!!!: Apply input filters before adding prompts to context.
71    // Reject → emit InputRejected + AgentEnd and return immediately (no LLM call made).
72    // Warn  → warning text appended to the last user message so the LLM sees it.
73    // Pass  → prompts returned unchanged.
74    let prompts = match apply_input_filters(prompts, &config.input_filters, &tx, &loop_id).await {
75        Ok(filtered) => filtered,
76        Err(reason) => {
77            // AgentEnd with rejection: pre-run rejection is the one case where
78            // AgentEnd.rejection is Some — the agent never actually started.
79            tx.send(AgentEvent::AgentEnd {
80                loop_id: loop_id.clone(),
81                messages: vec![],
82                usage: Usage::default(),
83                timestamp: chrono::Utc::now(),
84                rejection: Some(reason),
85            })
86            .ok();
87            return vec![];
88        }
89    };
90
91    let mut new_messages: Vec<AgentMessage> = prompts.clone();
92
93    // Add prompts to context
94    for prompt in &prompts {
95        context.messages.push(prompt.clone());
96    }
97
98    // Classify prompts into user_context (they're user messages)
99    for prompt in &prompts {
100        context.user_context.push(prompt.clone());
101    }
102
103    // Run the main loop (streaming + tools + steering + limits + callbacks)
104    let loop_usage = run_loop(
105        context,
106        &mut new_messages,
107        config,
108        &tx,
109        &cancel,
110        Some(&prompts),
111    )
112    .await;
113
114    tx.send(AgentEvent::AgentEnd {
115        loop_id,
116        messages: new_messages.clone(),
117        usage: loop_usage.clone(),
118        timestamp: chrono::Utc::now(),
119        rejection: None,
120    })
121    .ok();
122    // after_loop hook — fires after AgentEnd
123    if let Some(ref after_loop) = config.after_loop {
124        after_loop(&new_messages, &loop_usage).await;
125    }
126    new_messages
127}
128
129/*
130DESIGN: agent_loop_continue vs agent_loop
131Unlike agent_loop, this takes NO `prompts` — the conversation already exists in `context`.
132Used for retries and session-branching scenarios where the caller has already appended messages
133(or queued them via steering/follow-up callbacks) and simply wants to resume execution.
134No TurnStart/MessageStart events for prior context are re-emitted — the loop starts at turn 0
135from whatever state context.messages is in.
136*/
137/// Resume an agent loop from existing context without new prompts.
138///
139/// Use for retries, session branching, or re-runs from a specific point. The context must be
140/// non-empty and must not end with an assistant message. New follow-up/steering messages can
141/// be injected via `config.get_follow_up_messages` / `config.get_steering_messages`.
142///
143/// Returns only the messages produced by this continuation call.
144pub async fn agent_loop_continue(
145    context: &mut AgentContext, // ACCUMULATOR — existing history (must be non-empty, not end on assistant)
146    config: &AgentLoopConfig,   // STATIC SETTINGS — same config as the original call
147    tx: mpsc::UnboundedSender<AgentEvent>, // OBSERVER — all AgentEvents pushed here
148    cancel: tokio_util::sync::CancellationToken, // ABORT — fresh or shared token for this continuation
149) -> Vec<AgentMessage> {
150    // Identity must carry over from the originating loop. These are set by Agent::prompt_*
151    // (or by the direct caller who bootstrapped the session). Silent UUID generation here
152    // would mean every continuation gets a different identity — breaking ancestry tracking.
153    assert!(
154        context.agent_id.is_some(),
155        "agent_loop_continue requires context.agent_id to be set — \
156         identity must carry over from the originating loop"
157    );
158    assert!(
159        context.session_id.is_some(),
160        "agent_loop_continue requires context.session_id to be set — \
161         the session must be established before a continuation"
162    );
163
164    assert!(
165        !context.messages.is_empty(),
166        "Cannot continue: no messages in context"
167    );
168
169    // LLM APIs require strict alternation: user → assistant → user → assistant → …
170    if let Some(last) = context.messages.last() {
171        assert!(
172            last.role() != "assistant",
173            "Cannot continue from assistant message"
174        );
175    }
176
177    let mut new_messages: Vec<AgentMessage> = Vec::new();
178
179    // Classify existing messages into streams (if not already populated)
180    if context.user_context.is_empty() && context.inrun_context.is_empty() {
181        for msg in &context.messages {
182            match msg.as_llm() {
183                Some(Message::User { .. }) => context.user_context.push(msg.clone()),
184                Some(Message::Assistant { .. }) | Some(Message::ToolResult { .. }) => {
185                    context
186                        .inrun_context
187                        .push(crate::types::InRunEntry::Live(msg.clone()));
188                }
189                _ => {} // Extension messages go to neither stream
190            }
191        }
192    }
193
194    // Hoist identity IDs into owned locals once. agent_id and session_id are asserted Some
195    // above; loop_id is generated here if the Agent wrapper didn't supply one. Subsequent
196    // event emissions reuse these clones rather than re-`.unwrap()`-ing the Option fields.
197    let agent_id = context
198        .agent_id
199        .as_ref()
200        .expect("asserted Some above")
201        .clone();
202    let session_id = context
203        .session_id
204        .as_ref()
205        .expect("asserted Some above")
206        .clone();
207    let loop_id = context
208        .loop_id
209        .get_or_insert_with(|| uuid::Uuid::new_v4().to_string())
210        .clone();
211
212    // before_loop hook — fires before AgentStart; false aborts the entire loop
213    if let Some(ref before_loop) = config.before_loop {
214        if !before_loop(&context.messages, 0).await {
215            tx.send(AgentEvent::AgentEnd {
216                loop_id: loop_id.clone(),
217                messages: vec![],
218                usage: Usage::default(),
219                timestamp: chrono::Utc::now(),
220                rejection: None,
221            })
222            .ok();
223            return vec![];
224        }
225    }
226
227    tx.send(AgentEvent::AgentStart {
228        agent_id,
229        session_id,
230        loop_id: loop_id.clone(),
231        parent_loop_id: context.parent_loop_id.clone(), // set by Agent wrapper
232        continuation_kind: context
233            .continuation_kind
234            .clone()
235            .unwrap_or(ContinuationKind::Initial),
236        timestamp: chrono::Utc::now(),
237        metadata: None,
238        config_snapshot: Some(build_config_snapshot(config, context)),
239    })
240    .ok();
241
242    let loop_usage = run_loop(context, &mut new_messages, config, &tx, &cancel, None).await;
243
244    tx.send(AgentEvent::AgentEnd {
245        loop_id,
246        messages: new_messages.clone(),
247        usage: loop_usage.clone(),
248        timestamp: chrono::Utc::now(),
249        rejection: None,
250    })
251    .ok();
252    // after_loop hook — fires after AgentEnd
253    if let Some(ref after_loop) = config.after_loop {
254        after_loop(&new_messages, &loop_usage).await;
255    }
256    new_messages
257}
258
259/// Ensure `context.agent_id`, `context.session_id`, and `context.loop_id` are populated.
260///
261/// Generates a fresh UUID for any that are `None`, writes back into `context`, and returns
262/// owned clones of all three. Used by `agent_loop` (origin call) to hoist identity into
263/// locals once so subsequent event emissions don't repeat `.clone().unwrap()` patterns
264/// that would panic if the invariant ever drifted.
265fn ensure_loop_ids(ctx: &mut AgentContext) -> (String, String, String) {
266    let agent_id = ctx
267        .agent_id
268        .get_or_insert_with(|| uuid::Uuid::new_v4().to_string())
269        .clone();
270    let session_id = ctx
271        .session_id
272        .get_or_insert_with(|| uuid::Uuid::new_v4().to_string())
273        .clone();
274    let loop_id = ctx
275        .loop_id
276        .get_or_insert_with(|| uuid::Uuid::new_v4().to_string())
277        .clone();
278    (agent_id, session_id, loop_id)
279}
280
281/// Build a `LoopConfigSnapshot` from the current `AgentLoopConfig` and `AgentContext`.
282fn build_config_snapshot(
283    config: &AgentLoopConfig,
284    context: &AgentContext,
285) -> crate::session::LoopConfigSnapshot {
286    // Extract config_id from the loop_id's config_segment if available.
287    // loop_id format: "{session_id}.{config_segment}.{N}"
288    let config_id = context
289        .loop_id
290        .as_deref()
291        .and_then(|lid| {
292            let session_id = context.session_id.as_deref().unwrap_or("");
293            lid.strip_prefix(session_id)
294                .and_then(|rest| rest.strip_prefix('.'))
295                .and_then(|rest| rest.rsplit_once('.'))
296                .map(|(seg, _n)| seg.to_string())
297        })
298        .or_else(|| config.config_id.clone());
299
300    crate::session::LoopConfigSnapshot {
301        model: config.model_config.id.clone(),
302        provider: config.model_config.provider.clone(),
303        config_id,
304        name: Some(config.model_config.name.clone()),
305        api: Some(config.model_config.api),
306        base_url: Some(config.model_config.base_url.clone()),
307        reasoning: Some(config.model_config.reasoning),
308        context_window: Some(config.model_config.context_window),
309        max_tokens: Some(config.model_config.max_tokens),
310        thinking_level: Some(config.thinking_level),
311        temperature: config.temperature,
312    }
313}
314
315#[cfg(test)]
316mod tests {
317    use super::*;
318
319    #[test]
320    fn ensure_loop_ids_populates_missing_fields() {
321        let mut ctx = AgentContext::default();
322        assert!(ctx.agent_id.is_none());
323        assert!(ctx.session_id.is_none());
324        assert!(ctx.loop_id.is_none());
325
326        let (a, s, l) = ensure_loop_ids(&mut ctx);
327        assert!(!a.is_empty() && !s.is_empty() && !l.is_empty());
328        assert_eq!(ctx.agent_id.as_deref(), Some(a.as_str()));
329        assert_eq!(ctx.session_id.as_deref(), Some(s.as_str()));
330        assert_eq!(ctx.loop_id.as_deref(), Some(l.as_str()));
331    }
332
333    #[test]
334    fn ensure_loop_ids_idempotent() {
335        let mut ctx = AgentContext::default();
336        let (a1, s1, l1) = ensure_loop_ids(&mut ctx);
337        let (a2, s2, l2) = ensure_loop_ids(&mut ctx);
338        assert_eq!(a1, a2);
339        assert_eq!(s1, s2);
340        assert_eq!(l1, l2);
341    }
342
343    #[test]
344    fn ensure_loop_ids_preserves_existing() {
345        let mut ctx = AgentContext {
346            agent_id: Some("agent-x".into()),
347            session_id: Some("session-y".into()),
348            // loop_id intentionally None — should be generated.
349            ..AgentContext::default()
350        };
351
352        let (a, s, l) = ensure_loop_ids(&mut ctx);
353        assert_eq!(a, "agent-x");
354        assert_eq!(s, "session-y");
355        assert!(!l.is_empty());
356        assert_eq!(ctx.loop_id.as_deref(), Some(l.as_str()));
357    }
358}