Skip to main content

tirea_agent_loop/runtime/loop_runner/
mod.rs

1//! Agent loop implementation with Phase-based plugin execution.
2//!
3//! The agent loop orchestrates the conversation between user, LLM, and tools:
4//!
5//! ```text
6//! User Input → LLM → Tool Calls? → Execute Tools → LLM → ... → Final Response
7//! ```
8//!
9//! # Phase Execution
10//!
11//! Each phase dispatches to its typed plugin hook:
12//!
13//! ```text
14//! RunStart (once)
15//!     │
16//!     ▼
17//! ┌─────────────────────────┐
18//! │      StepStart          │ ← plugins can apply state patches
19//! ├─────────────────────────┤
20//! │    BeforeInference      │ ← plugins can inject prompt context, filter tools
21//! ├─────────────────────────┤
22//! │      [LLM CALL]         │
23//! ├─────────────────────────┤
24//! │    AfterInference       │
25//! ├─────────────────────────┤
26//! │  ┌───────────────────┐  │
27//! │  │ BeforeToolExecute │  │ ← plugins can block/pending
28//! │  ├───────────────────┤  │
29//! │  │   [TOOL EXEC]     │  │
30//! │  ├───────────────────┤  │
31//! │  │ AfterToolExecute  │  │ ← plugins can add reminders
32//! │  └───────────────────┘  │
33//! ├─────────────────────────┤
34//! │       StepEnd           │
35//! └─────────────────────────┘
36//!     │
37//!     ▼
38//! RunEnd (once)
39//! ```
40
41mod config;
42mod core;
43mod outcome;
44mod plugin_runtime;
45mod run_state;
46mod state_commit;
47mod stream_core;
48mod stream_runner;
49mod tool_exec;
50
51use crate::contracts::plugin::phase::Phase;
52use crate::contracts::runtime::ActivityManager;
53use crate::contracts::runtime::{
54    StopPolicy, StreamResult, ToolExecutionRequest, ToolExecutionResult,
55};
56use crate::contracts::thread::CheckpointReason;
57use crate::contracts::thread::{gen_message_id, Message, MessageMetadata};
58use crate::contracts::tool::Tool;
59use crate::contracts::RunContext;
60use crate::contracts::StopReason;
61use crate::contracts::{AgentEvent, FrontendToolInvocation, Interaction, TerminationReason};
62use crate::engine::convert::{assistant_message, assistant_tool_calls, tool_response};
63use crate::engine::stop_conditions::check_stop_policies;
64use crate::runtime::activity::ActivityHub;
65
66use crate::runtime::streaming::StreamCollector;
67use async_stream::stream;
68use futures::{Stream, StreamExt};
69use genai::Client;
70use std::collections::HashMap;
71use std::future::Future;
72use std::pin::Pin;
73use std::sync::Arc;
74use uuid::Uuid;
75
76#[cfg(test)]
77use crate::contracts::plugin::AgentPlugin;
78pub use crate::contracts::runtime::{LlmExecutor, ToolExecutor};
79pub use crate::runtime::run_context::{
80    await_or_cancel, is_cancelled, CancelAware, RunCancellationToken, StateCommitError,
81    StateCommitter, TOOL_SCOPE_CALLER_AGENT_ID_KEY, TOOL_SCOPE_CALLER_MESSAGES_KEY,
82    TOOL_SCOPE_CALLER_STATE_KEY, TOOL_SCOPE_CALLER_THREAD_ID_KEY,
83};
84use config::StaticStepToolProvider;
85pub use config::{AgentConfig, GenaiLlmExecutor, LlmRetryPolicy};
86pub use config::{StepToolInput, StepToolProvider, StepToolSnapshot};
87#[cfg(test)]
88use core::build_messages;
89use core::{
90    build_request_for_filtered_tools, clear_agent_pending_interaction, drain_agent_outbox,
91    inference_inputs_from_step, pending_frontend_invocation_from_ctx, pending_interaction_from_ctx,
92    set_agent_pending_interaction,
93};
94pub use outcome::{tool_map, tool_map_from_arc, AgentLoopError};
95pub use outcome::{LoopOutcome, LoopStats, LoopUsage};
96#[cfg(test)]
97use plugin_runtime::emit_phase_checked;
98use plugin_runtime::{
99    emit_cleanup_phases_and_apply, emit_phase_block, emit_run_end_phase, run_phase_block,
100};
101use run_state::{effective_stop_conditions, RunState};
102pub use state_commit::ChannelStateCommitter;
103use state_commit::PendingDeltaCommitContext;
104use std::time::{SystemTime, UNIX_EPOCH};
105use tirea_state::TrackedPatch;
106#[cfg(test)]
107use tokio_util::sync::CancellationToken;
108#[cfg(test)]
109use tool_exec::execute_tools_parallel_with_phases;
110use tool_exec::{
111    apply_tool_results_impl, apply_tool_results_to_session, execute_single_tool_with_phases,
112    scope_with_tool_caller_context, step_metadata, ToolPhaseContext,
113};
114pub use tool_exec::{
115    execute_tools, execute_tools_with_config, execute_tools_with_plugins,
116    execute_tools_with_plugins_and_executor, ParallelToolExecutor, SequentialToolExecutor,
117};
118
119/// Fully resolved agent wiring ready for execution.
120///
121/// Contains everything needed to run an agent loop: the loop configuration,
122/// the resolved tool map, and the runtime config. This is a pure data struct
123/// that can be inspected, mutated, and tested independently.
124pub struct ResolvedRun {
125    /// Loop configuration (model, plugins, stop conditions, ...).
126    pub config: AgentConfig,
127    /// Resolved tool map after filtering and wiring.
128    pub tools: HashMap<String, Arc<dyn Tool>>,
129    /// Runtime configuration (user_id, run_id, ...).
130    pub run_config: crate::contracts::RunConfig,
131}
132
133impl ResolvedRun {
134    /// Add or replace a tool in the resolved tool map.
135    #[must_use]
136    pub fn with_tool(mut self, id: String, tool: Arc<dyn Tool>) -> Self {
137        self.tools.insert(id, tool);
138        self
139    }
140
141    /// Add a plugin to the resolved config.
142    #[must_use]
143    pub fn with_plugin(mut self, plugin: Arc<dyn crate::contracts::plugin::AgentPlugin>) -> Self {
144        self.config.plugins.push(plugin);
145        self
146    }
147
148    /// Overlay tools from a tool registry (insert-if-absent semantics).
149    pub fn overlay_tool_registry(&mut self, registry: &dyn crate::contracts::ToolRegistry) {
150        for (id, tool) in registry.snapshot() {
151            self.tools.entry(id).or_insert(tool);
152        }
153    }
154}
155
156fn uuid_v7() -> String {
157    Uuid::now_v7().simple().to_string()
158}
159
160pub(crate) fn current_unix_millis() -> u64 {
161    SystemTime::now()
162        .duration_since(UNIX_EPOCH)
163        .map_or(0, |d| d.as_millis().min(u128::from(u64::MAX)) as u64)
164}
165
166#[derive(Debug, Clone, Copy, PartialEq, Eq)]
167pub(super) enum CancellationStage {
168    Inference,
169    ToolExecution,
170}
171
172pub(super) const CANCELLATION_INFERENCE_USER_MESSAGE: &str =
173    "The previous run was interrupted during inference. Please continue from the current context.";
174pub(super) const CANCELLATION_TOOL_USER_MESSAGE: &str =
175    "The previous run was interrupted while using tools. Please continue from the current context.";
176
177pub(super) fn append_cancellation_user_message(run_ctx: &mut RunContext, stage: CancellationStage) {
178    let content = match stage {
179        CancellationStage::Inference => CANCELLATION_INFERENCE_USER_MESSAGE,
180        CancellationStage::ToolExecution => CANCELLATION_TOOL_USER_MESSAGE,
181    };
182    run_ctx.add_message(Arc::new(Message::user(content)));
183}
184
185pub(super) fn effective_llm_models(config: &AgentConfig) -> Vec<String> {
186    let mut models = Vec::with_capacity(1 + config.fallback_models.len());
187    models.push(config.model.clone());
188    for model in &config.fallback_models {
189        if model.trim().is_empty() {
190            continue;
191        }
192        if !models.iter().any(|m| m == model) {
193            models.push(model.clone());
194        }
195    }
196    models
197}
198
199pub(super) fn llm_retry_attempts(config: &AgentConfig) -> usize {
200    config.llm_retry_policy.max_attempts_per_model.max(1)
201}
202
203pub(super) fn is_retryable_llm_error(message: &str) -> bool {
204    let lower = message.to_ascii_lowercase();
205    let non_retryable = [
206        "401",
207        "403",
208        "404",
209        "400",
210        "422",
211        "unauthorized",
212        "forbidden",
213        "invalid api key",
214        "invalid_request",
215        "bad request",
216    ];
217    if non_retryable.iter().any(|p| lower.contains(p)) {
218        return false;
219    }
220    let retryable = [
221        "429",
222        "too many requests",
223        "rate limit",
224        "timeout",
225        "timed out",
226        "temporar",
227        "connection",
228        "network",
229        "unavailable",
230        "server error",
231        "502",
232        "503",
233        "504",
234        "reset by peer",
235        "eof",
236    ];
237    retryable.iter().any(|p| lower.contains(p))
238}
239
240pub(super) fn retry_backoff_ms(config: &AgentConfig, retry_index: usize) -> u64 {
241    let initial = config.llm_retry_policy.initial_backoff_ms;
242    let cap = config
243        .llm_retry_policy
244        .max_backoff_ms
245        .max(config.llm_retry_policy.initial_backoff_ms);
246    if retry_index == 0 {
247        return initial.min(cap);
248    }
249    let shift = (retry_index - 1).min(20) as u32;
250    let factor = 1u64.checked_shl(shift).unwrap_or(u64::MAX);
251    initial.saturating_mul(factor).min(cap)
252}
253
254pub(super) async fn wait_retry_backoff(
255    config: &AgentConfig,
256    retry_index: usize,
257    run_cancellation_token: Option<&RunCancellationToken>,
258) -> bool {
259    let wait_ms = retry_backoff_ms(config, retry_index);
260    match await_or_cancel(
261        run_cancellation_token,
262        tokio::time::sleep(std::time::Duration::from_millis(wait_ms)),
263    )
264    .await
265    {
266        CancelAware::Cancelled => true,
267        CancelAware::Value(_) => false,
268    }
269}
270
271pub(super) enum LlmAttemptOutcome<T> {
272    Success {
273        value: T,
274        model: String,
275        attempts: usize,
276    },
277    Cancelled,
278    Exhausted {
279        last_error: String,
280        attempts: usize,
281    },
282}
283
284fn is_run_cancelled(token: Option<&RunCancellationToken>) -> bool {
285    is_cancelled(token)
286}
287
288pub(super) fn step_tool_provider_for_run(
289    config: &AgentConfig,
290    tools: HashMap<String, Arc<dyn Tool>>,
291) -> Arc<dyn StepToolProvider> {
292    config.step_tool_provider.clone().unwrap_or_else(|| {
293        Arc::new(StaticStepToolProvider::new(tools)) as Arc<dyn StepToolProvider>
294    })
295}
296
297pub(super) fn llm_executor_for_run(config: &AgentConfig) -> Arc<dyn LlmExecutor> {
298    config
299        .llm_executor
300        .clone()
301        .unwrap_or_else(|| Arc::new(GenaiLlmExecutor::new(Client::default())))
302}
303
304pub(super) async fn resolve_step_tool_snapshot(
305    step_tool_provider: &Arc<dyn StepToolProvider>,
306    run_ctx: &RunContext,
307) -> Result<StepToolSnapshot, AgentLoopError> {
308    step_tool_provider
309        .provide(StepToolInput { state: run_ctx })
310        .await
311}
312
313fn mark_step_completed(run_state: &mut RunState) {
314    run_state.completed_steps += 1;
315}
316
317fn build_loop_outcome(
318    run_ctx: RunContext,
319    termination: TerminationReason,
320    response: Option<String>,
321    run_state: &RunState,
322    failure: Option<outcome::LoopFailure>,
323) -> LoopOutcome {
324    LoopOutcome {
325        run_ctx,
326        termination,
327        response: response.filter(|text| !text.is_empty()),
328        usage: run_state.usage(),
329        stats: run_state.stats(),
330        failure,
331    }
332}
333
334fn stop_reason_for_step(
335    run_state: &RunState,
336    result: &StreamResult,
337    run_ctx: &RunContext,
338    stop_conditions: &[Arc<dyn StopPolicy>],
339) -> Option<StopReason> {
340    let stop_input = run_state.to_policy_input(result, run_ctx);
341    check_stop_policies(stop_conditions, &stop_input)
342}
343
344pub(super) async fn run_llm_with_retry_and_fallback<T, Invoke, Fut>(
345    config: &AgentConfig,
346    run_cancellation_token: Option<&RunCancellationToken>,
347    retry_current_model: bool,
348    unknown_error: &str,
349    mut invoke: Invoke,
350) -> LlmAttemptOutcome<T>
351where
352    Invoke: FnMut(String) -> Fut,
353    Fut: std::future::Future<Output = genai::Result<T>>,
354{
355    let mut last_llm_error = unknown_error.to_string();
356    let model_candidates = effective_llm_models(config);
357    let max_attempts = llm_retry_attempts(config);
358    let mut total_attempts = 0usize;
359
360    for model in model_candidates {
361        for attempt in 1..=max_attempts {
362            total_attempts = total_attempts.saturating_add(1);
363            let response_res =
364                match await_or_cancel(run_cancellation_token, invoke(model.clone())).await {
365                    CancelAware::Cancelled => return LlmAttemptOutcome::Cancelled,
366                    CancelAware::Value(resp) => resp,
367                };
368
369            match response_res {
370                Ok(value) => {
371                    return LlmAttemptOutcome::Success {
372                        value,
373                        model,
374                        attempts: total_attempts,
375                    };
376                }
377                Err(e) => {
378                    let message = e.to_string();
379                    last_llm_error =
380                        format!("model='{model}' attempt={attempt}/{max_attempts}: {message}");
381                    let can_retry_same_model = retry_current_model
382                        && attempt < max_attempts
383                        && is_retryable_llm_error(&message);
384                    if can_retry_same_model {
385                        let cancelled =
386                            wait_retry_backoff(config, attempt, run_cancellation_token).await;
387                        if cancelled {
388                            return LlmAttemptOutcome::Cancelled;
389                        }
390                        continue;
391                    }
392                    break;
393                }
394            }
395        }
396    }
397
398    LlmAttemptOutcome::Exhausted {
399        last_error: last_llm_error,
400        attempts: total_attempts,
401    }
402}
403
404pub(super) async fn run_step_prepare_phases(
405    run_ctx: &RunContext,
406    tool_descriptors: &[crate::contracts::tool::ToolDescriptor],
407    config: &AgentConfig,
408) -> Result<(Vec<Message>, Vec<String>, bool, Vec<TrackedPatch>), AgentLoopError> {
409    let ((messages, filtered_tools, skip_inference), pending) = run_phase_block(
410        run_ctx,
411        tool_descriptors,
412        &config.plugins,
413        &[Phase::StepStart, Phase::BeforeInference],
414        |_| {},
415        |step| inference_inputs_from_step(step, &config.system_prompt),
416    )
417    .await?;
418    Ok((messages, filtered_tools, skip_inference, pending))
419}
420
421pub(super) struct PreparedStep {
422    pub(super) messages: Vec<Message>,
423    pub(super) filtered_tools: Vec<String>,
424    pub(super) skip_inference: bool,
425    pub(super) pending_patches: Vec<TrackedPatch>,
426}
427
428pub(super) async fn prepare_step_execution(
429    run_ctx: &RunContext,
430    tool_descriptors: &[crate::contracts::tool::ToolDescriptor],
431    config: &AgentConfig,
432) -> Result<PreparedStep, AgentLoopError> {
433    let (messages, filtered_tools, skip_inference, pending) =
434        run_step_prepare_phases(run_ctx, tool_descriptors, config).await?;
435    Ok(PreparedStep {
436        messages,
437        filtered_tools,
438        skip_inference,
439        pending_patches: pending,
440    })
441}
442
443pub(super) async fn apply_llm_error_cleanup(
444    run_ctx: &mut RunContext,
445    tool_descriptors: &[crate::contracts::tool::ToolDescriptor],
446    plugins: &[Arc<dyn crate::contracts::plugin::AgentPlugin>],
447    error_type: &'static str,
448    message: String,
449) -> Result<(), AgentLoopError> {
450    emit_cleanup_phases_and_apply(run_ctx, tool_descriptors, plugins, error_type, message).await
451}
452
453pub(super) async fn complete_step_after_inference(
454    run_ctx: &mut RunContext,
455    result: &StreamResult,
456    step_meta: MessageMetadata,
457    assistant_message_id: Option<String>,
458    tool_descriptors: &[crate::contracts::tool::ToolDescriptor],
459    plugins: &[Arc<dyn crate::contracts::plugin::AgentPlugin>],
460) -> Result<(), AgentLoopError> {
461    let pending = emit_phase_block(
462        Phase::AfterInference,
463        run_ctx,
464        tool_descriptors,
465        plugins,
466        |step| {
467            step.response = Some(result.clone());
468        },
469    )
470    .await?;
471    run_ctx.add_thread_patches(pending);
472
473    let assistant = assistant_turn_message(result, step_meta, assistant_message_id);
474    run_ctx.add_message(Arc::new(assistant));
475
476    let pending =
477        emit_phase_block(Phase::StepEnd, run_ctx, tool_descriptors, plugins, |_| {}).await?;
478    run_ctx.add_thread_patches(pending);
479    Ok(())
480}
481
482pub(super) fn interaction_requested_pending_events(interaction: &Interaction) -> [AgentEvent; 2] {
483    [
484        AgentEvent::InteractionRequested {
485            interaction: interaction.clone(),
486        },
487        AgentEvent::Pending {
488            interaction: interaction.clone(),
489        },
490    ]
491}
492
493/// Emit events for a pending frontend tool invocation.
494///
495/// When a `FrontendToolInvocation` is available, emits standard `ToolCallStart` +
496/// `ToolCallReady` events using the frontend invocation's identity. This makes
497/// frontend tools appear as regular tool calls in the event stream.
498///
499/// Falls back to legacy `InteractionRequested` + `Pending` events when no
500/// `FrontendToolInvocation` is provided.
501pub(super) fn pending_tool_events(
502    interaction: &Interaction,
503    frontend_invocation: Option<&FrontendToolInvocation>,
504) -> Vec<AgentEvent> {
505    if let Some(inv) = frontend_invocation {
506        vec![
507            AgentEvent::ToolCallStart {
508                id: inv.call_id.clone(),
509                name: inv.tool_name.clone(),
510            },
511            AgentEvent::ToolCallReady {
512                id: inv.call_id.clone(),
513                name: inv.tool_name.clone(),
514                arguments: inv.arguments.clone(),
515            },
516        ]
517    } else {
518        interaction_requested_pending_events(interaction).to_vec()
519    }
520}
521
522pub(super) struct ToolExecutionContext {
523    pub(super) state: serde_json::Value,
524    pub(super) run_config: tirea_contract::RunConfig,
525}
526
527pub(super) fn prepare_tool_execution_context(
528    run_ctx: &RunContext,
529    config: Option<&AgentConfig>,
530) -> Result<ToolExecutionContext, AgentLoopError> {
531    let state = run_ctx
532        .snapshot()
533        .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
534    let run_config = scope_with_tool_caller_context(run_ctx, &state, config)?;
535    Ok(ToolExecutionContext { state, run_config })
536}
537
538pub(super) async fn finalize_run_end(
539    run_ctx: &mut RunContext,
540    tool_descriptors: &[crate::contracts::tool::ToolDescriptor],
541    plugins: &[Arc<dyn crate::contracts::plugin::AgentPlugin>],
542) {
543    emit_run_end_phase(run_ctx, tool_descriptors, plugins).await
544}
545
546fn stream_result_from_chat_response(response: &genai::chat::ChatResponse) -> StreamResult {
547    let text = response
548        .first_text()
549        .map(|s| s.to_string())
550        .unwrap_or_default();
551    let tool_calls: Vec<crate::contracts::thread::ToolCall> = response
552        .tool_calls()
553        .into_iter()
554        .map(|tc| {
555            crate::contracts::thread::ToolCall::new(
556                &tc.call_id,
557                &tc.fn_name,
558                tc.fn_arguments.clone(),
559            )
560        })
561        .collect();
562
563    StreamResult {
564        text,
565        tool_calls,
566        usage: Some(response.usage.clone()),
567    }
568}
569
570fn assistant_turn_message(
571    result: &StreamResult,
572    step_meta: MessageMetadata,
573    message_id: Option<String>,
574) -> Message {
575    let mut msg = if result.tool_calls.is_empty() {
576        assistant_message(&result.text)
577    } else {
578        assistant_tool_calls(&result.text, result.tool_calls.clone())
579    }
580    .with_metadata(step_meta);
581    if let Some(message_id) = message_id {
582        msg = msg.with_id(message_id);
583    }
584    msg
585}
586
587async fn drain_run_start_outbox_and_replay_nonstream(
588    run_ctx: &mut RunContext,
589    tools: &HashMap<String, Arc<dyn Tool>>,
590    config: &AgentConfig,
591    tool_descriptors: &[crate::contracts::tool::ToolDescriptor],
592) -> Result<bool, AgentLoopError> {
593    let outbox = drain_agent_outbox(run_ctx, "agent_outbox_run_start_nonstream")?;
594    if outbox.replay_tool_calls.is_empty() {
595        return Ok(false);
596    }
597
598    for tool_call in &outbox.replay_tool_calls {
599        let state = run_ctx
600            .snapshot()
601            .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
602        let tool = tools.get(&tool_call.name).cloned();
603        let rt_for_replay = scope_with_tool_caller_context(run_ctx, &state, Some(config))?;
604        let replay_phase_ctx = ToolPhaseContext {
605            tool_descriptors,
606            plugins: &config.plugins,
607            activity_manager: None,
608            run_config: &rt_for_replay,
609            thread_id: run_ctx.thread_id(),
610            thread_messages: run_ctx.messages(),
611            cancellation_token: None,
612        };
613        let replay_result =
614            execute_single_tool_with_phases(tool.as_deref(), tool_call, &state, &replay_phase_ctx)
615                .await?;
616
617        let replay_msg = tool_response(&tool_call.id, &replay_result.execution.result);
618        run_ctx.add_message(Arc::new(replay_msg));
619
620        if !replay_result.reminders.is_empty() {
621            let msgs: Vec<Arc<Message>> = replay_result
622                .reminders
623                .iter()
624                .map(|reminder| {
625                    Arc::new(Message::internal_system(format!(
626                        "<system-reminder>{}</system-reminder>",
627                        reminder
628                    )))
629                })
630                .collect();
631            run_ctx.add_messages(msgs);
632        }
633
634        if let Some(patch) = replay_result.execution.patch {
635            run_ctx.add_thread_patch(patch);
636        }
637        if !replay_result.pending_patches.is_empty() {
638            run_ctx.add_thread_patches(replay_result.pending_patches);
639        }
640
641        if let Some(new_interaction) = replay_result.pending_interaction {
642            let new_frontend_invocation = replay_result.pending_frontend_invocation;
643            let state = run_ctx
644                .snapshot()
645                .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
646            let patch =
647                set_agent_pending_interaction(&state, new_interaction, new_frontend_invocation)?;
648            if !patch.patch().is_empty() {
649                run_ctx.add_thread_patch(patch);
650            }
651            return Ok(true);
652        }
653    }
654
655    let state = run_ctx
656        .snapshot()
657        .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
658    let clear_patch = clear_agent_pending_interaction(&state)?;
659    if !clear_patch.patch().is_empty() {
660        run_ctx.add_thread_patch(clear_patch);
661    }
662
663    Ok(true)
664}
665
666/// Run the full agent loop until completion or a stop condition is met.
667///
668/// This is the primary non-streaming entry point. Tools are passed directly
669/// and used as the default tool set unless `config.step_tool_provider` is set
670/// (for dynamic per-step tool resolution).
671pub async fn run_loop(
672    config: &AgentConfig,
673    tools: HashMap<String, Arc<dyn Tool>>,
674    mut run_ctx: RunContext,
675    cancellation_token: Option<RunCancellationToken>,
676    state_committer: Option<Arc<dyn StateCommitter>>,
677) -> LoopOutcome {
678    let executor = llm_executor_for_run(config);
679    let mut run_state = RunState::new();
680    let stop_conditions = effective_stop_conditions(config);
681    let run_cancellation_token = cancellation_token;
682    let mut last_text = String::new();
683    let step_tool_provider = step_tool_provider_for_run(config, tools);
684    let run_identity = stream_core::resolve_stream_run_identity(&mut run_ctx);
685    let run_id = run_identity.run_id;
686    let parent_run_id = run_identity.parent_run_id;
687    let pending_delta_commit =
688        PendingDeltaCommitContext::new(&run_id, parent_run_id.as_deref(), state_committer.as_ref());
689    let initial_step_tools = match resolve_step_tool_snapshot(&step_tool_provider, &run_ctx).await {
690        Ok(snapshot) => snapshot,
691        Err(error) => {
692            return build_loop_outcome(
693                run_ctx,
694                TerminationReason::Error,
695                None,
696                &run_state,
697                Some(outcome::LoopFailure::State(error.to_string())),
698            );
699        }
700    };
701    let StepToolSnapshot {
702        tools: initial_tools,
703        descriptors: initial_descriptors,
704    } = initial_step_tools;
705    let mut active_tool_descriptors = initial_descriptors;
706
707    macro_rules! terminate_run {
708        ($termination:expr, $response:expr, $failure:expr) => {{
709            finalize_run_end(&mut run_ctx, &active_tool_descriptors, &config.plugins).await;
710            if let Err(error) = pending_delta_commit
711                .commit(&mut run_ctx, CheckpointReason::RunFinished, true)
712                .await
713            {
714                return build_loop_outcome(
715                    run_ctx,
716                    TerminationReason::Error,
717                    None,
718                    &run_state,
719                    Some(outcome::LoopFailure::State(error.to_string())),
720                );
721            }
722            return build_loop_outcome(run_ctx, $termination, $response, &run_state, $failure);
723        }};
724    }
725
726    // Phase: RunStart
727    let pending = match emit_phase_block(
728        Phase::RunStart,
729        &run_ctx,
730        &active_tool_descriptors,
731        &config.plugins,
732        |_| {},
733    )
734    .await
735    {
736        Ok(pending) => pending,
737        Err(error) => {
738            terminate_run!(
739                TerminationReason::Error,
740                None,
741                Some(outcome::LoopFailure::State(error.to_string()))
742            );
743        }
744    };
745    run_ctx.add_thread_patches(pending);
746    if let Err(error) = pending_delta_commit
747        .commit(&mut run_ctx, CheckpointReason::UserMessage, false)
748        .await
749    {
750        terminate_run!(
751            TerminationReason::Error,
752            None,
753            Some(outcome::LoopFailure::State(error.to_string()))
754        );
755    }
756
757    let replayed_at_run_start = match drain_run_start_outbox_and_replay_nonstream(
758        &mut run_ctx,
759        &initial_tools,
760        config,
761        &active_tool_descriptors,
762    )
763    .await
764    {
765        Ok(replayed) => replayed,
766        Err(error) => {
767            terminate_run!(
768                TerminationReason::Error,
769                None,
770                Some(outcome::LoopFailure::State(error.to_string()))
771            );
772        }
773    };
774    if replayed_at_run_start {
775        if let Err(error) = pending_delta_commit
776            .commit(&mut run_ctx, CheckpointReason::ToolResultsCommitted, false)
777            .await
778        {
779            terminate_run!(
780                TerminationReason::Error,
781                None,
782                Some(outcome::LoopFailure::State(error.to_string()))
783            );
784        }
785    }
786    if pending_interaction_from_ctx(&run_ctx).is_some() {
787        terminate_run!(TerminationReason::PendingInteraction, None, None);
788    }
789
790    loop {
791        if is_run_cancelled(run_cancellation_token.as_ref()) {
792            terminate_run!(TerminationReason::Cancelled, None, None);
793        }
794
795        let step_tools = match resolve_step_tool_snapshot(&step_tool_provider, &run_ctx).await {
796            Ok(snapshot) => snapshot,
797            Err(e) => {
798                terminate_run!(
799                    TerminationReason::Error,
800                    None,
801                    Some(outcome::LoopFailure::State(e.to_string()))
802                );
803            }
804        };
805        active_tool_descriptors = step_tools.descriptors.clone();
806
807        let prepared =
808            match prepare_step_execution(&run_ctx, &active_tool_descriptors, config).await {
809                Ok(v) => v,
810                Err(e) => {
811                    terminate_run!(
812                        TerminationReason::Error,
813                        None,
814                        Some(outcome::LoopFailure::State(e.to_string()))
815                    );
816                }
817            };
818        run_ctx.add_thread_patches(prepared.pending_patches);
819
820        if prepared.skip_inference {
821            if pending_interaction_from_ctx(&run_ctx).is_some() {
822                terminate_run!(TerminationReason::PendingInteraction, None, None);
823            }
824            terminate_run!(
825                TerminationReason::PluginRequested,
826                Some(last_text.clone()),
827                None
828            );
829        }
830
831        // Call LLM with unified retry + fallback model strategy.
832        let messages = prepared.messages;
833        let filtered_tools = prepared.filtered_tools;
834        let attempt_outcome = run_llm_with_retry_and_fallback(
835            config,
836            run_cancellation_token.as_ref(),
837            true,
838            "unknown llm error",
839            |model| {
840                let request =
841                    build_request_for_filtered_tools(&messages, &step_tools.tools, &filtered_tools);
842                let executor = executor.clone();
843                async move {
844                    executor
845                        .exec_chat_response(&model, request, config.chat_options.as_ref())
846                        .await
847                }
848            },
849        )
850        .await;
851
852        let response = match attempt_outcome {
853            LlmAttemptOutcome::Success {
854                value, attempts, ..
855            } => {
856                run_state.record_llm_attempts(attempts);
857                value
858            }
859            LlmAttemptOutcome::Cancelled => {
860                append_cancellation_user_message(&mut run_ctx, CancellationStage::Inference);
861                terminate_run!(TerminationReason::Cancelled, None, None);
862            }
863            LlmAttemptOutcome::Exhausted {
864                last_error,
865                attempts,
866            } => {
867                run_state.record_llm_attempts(attempts);
868                if let Err(phase_error) = apply_llm_error_cleanup(
869                    &mut run_ctx,
870                    &active_tool_descriptors,
871                    &config.plugins,
872                    "llm_exec_error",
873                    last_error.clone(),
874                )
875                .await
876                {
877                    terminate_run!(
878                        TerminationReason::Error,
879                        None,
880                        Some(outcome::LoopFailure::State(phase_error.to_string()))
881                    );
882                }
883                terminate_run!(
884                    TerminationReason::Error,
885                    None,
886                    Some(outcome::LoopFailure::Llm(last_error))
887                );
888            }
889        };
890
891        let result = stream_result_from_chat_response(&response);
892        run_state.update_from_response(&result);
893        last_text = result.text.clone();
894
895        // Add assistant message
896        let assistant_msg_id = gen_message_id();
897        let step_meta = step_metadata(Some(run_id.clone()), run_state.completed_steps as u32);
898        if let Err(e) = complete_step_after_inference(
899            &mut run_ctx,
900            &result,
901            step_meta.clone(),
902            Some(assistant_msg_id.clone()),
903            &active_tool_descriptors,
904            &config.plugins,
905        )
906        .await
907        {
908            terminate_run!(
909                TerminationReason::Error,
910                None,
911                Some(outcome::LoopFailure::State(e.to_string()))
912            );
913        }
914        if let Err(error) = pending_delta_commit
915            .commit(
916                &mut run_ctx,
917                CheckpointReason::AssistantTurnCommitted,
918                false,
919            )
920            .await
921        {
922            terminate_run!(
923                TerminationReason::Error,
924                None,
925                Some(outcome::LoopFailure::State(error.to_string()))
926            );
927        }
928
929        mark_step_completed(&mut run_state);
930
931        if !result.needs_tools() {
932            run_state.record_step_without_tools();
933            if let Some(reason) =
934                stop_reason_for_step(&run_state, &result, &run_ctx, &stop_conditions)
935            {
936                terminate_run!(TerminationReason::Stopped(reason), None, None);
937            }
938            terminate_run!(TerminationReason::NaturalEnd, Some(last_text.clone()), None);
939        }
940
941        // Execute tools with phase hooks using configured execution strategy.
942        let tool_context = match prepare_tool_execution_context(&run_ctx, Some(config)) {
943            Ok(ctx) => ctx,
944            Err(e) => {
945                terminate_run!(
946                    TerminationReason::Error,
947                    None,
948                    Some(outcome::LoopFailure::State(e.to_string()))
949                );
950            }
951        };
952        let thread_messages_for_tools = run_ctx.messages().to_vec();
953        let thread_version_for_tools = run_ctx.version();
954
955        let tool_exec_future = config.tool_executor.execute(ToolExecutionRequest {
956            tools: &step_tools.tools,
957            calls: &result.tool_calls,
958            state: &tool_context.state,
959            tool_descriptors: &active_tool_descriptors,
960            plugins: &config.plugins,
961            activity_manager: None,
962            run_config: &tool_context.run_config,
963            thread_id: run_ctx.thread_id(),
964            thread_messages: &thread_messages_for_tools,
965            state_version: thread_version_for_tools,
966            cancellation_token: run_cancellation_token.as_ref(),
967        });
968        let results = tool_exec_future.await.map_err(AgentLoopError::from);
969
970        let results = match results {
971            Ok(r) => r,
972            Err(AgentLoopError::Cancelled { .. }) => {
973                append_cancellation_user_message(&mut run_ctx, CancellationStage::ToolExecution);
974                terminate_run!(TerminationReason::Cancelled, None, None);
975            }
976            Err(e) => {
977                terminate_run!(
978                    TerminationReason::Error,
979                    None,
980                    Some(outcome::LoopFailure::State(e.to_string()))
981                );
982            }
983        };
984
985        let applied = match apply_tool_results_to_session(
986            &mut run_ctx,
987            &results,
988            Some(step_meta),
989            config
990                .tool_executor
991                .requires_parallel_patch_conflict_check(),
992        ) {
993            Ok(a) => a,
994            Err(_e) => {
995                // On error, we can't easily rollback RunContext, so just terminate
996                terminate_run!(
997                    TerminationReason::Error,
998                    None,
999                    Some(outcome::LoopFailure::State(_e.to_string()))
1000                );
1001            }
1002        };
1003        if let Err(error) = pending_delta_commit
1004            .commit(&mut run_ctx, CheckpointReason::ToolResultsCommitted, false)
1005            .await
1006        {
1007            terminate_run!(
1008                TerminationReason::Error,
1009                None,
1010                Some(outcome::LoopFailure::State(error.to_string()))
1011            );
1012        }
1013
1014        // Pause if any tool is waiting for client response.
1015        if let Some(_interaction) = applied.pending_interaction {
1016            terminate_run!(TerminationReason::PendingInteraction, None, None);
1017        }
1018
1019        // Track tool-step metrics for post-tool stop condition evaluation.
1020        let error_count = results
1021            .iter()
1022            .filter(|r| r.execution.result.is_error())
1023            .count();
1024        run_state.record_tool_step(&result.tool_calls, error_count);
1025
1026        // Check stop conditions.
1027        if let Some(reason) = stop_reason_for_step(&run_state, &result, &run_ctx, &stop_conditions)
1028        {
1029            terminate_run!(TerminationReason::Stopped(reason), None, None);
1030        }
1031    }
1032}
1033
1034/// Run the agent loop with streaming output.
1035///
1036/// Returns a stream of AgentEvent for real-time updates. Tools are passed
1037/// directly and used as the default tool set unless `config.step_tool_provider`
1038/// is set (for dynamic per-step tool resolution).
1039pub fn run_loop_stream(
1040    config: AgentConfig,
1041    tools: HashMap<String, Arc<dyn Tool>>,
1042    run_ctx: RunContext,
1043    cancellation_token: Option<RunCancellationToken>,
1044    state_committer: Option<Arc<dyn StateCommitter>>,
1045) -> Pin<Box<dyn Stream<Item = AgentEvent> + Send>> {
1046    stream_runner::run_loop_stream_impl(config, tools, run_ctx, cancellation_token, state_committer)
1047}
1048
1049#[cfg(test)]
1050mod tests;