Skip to main content

tirea_agent_loop/runtime/loop_runner/
mod.rs

1//! Agent loop implementation with Phase-based plugin execution.
2//!
3//! The agent loop orchestrates the conversation between user, LLM, and tools:
4//!
5//! ```text
6//! User Input → LLM → Tool Calls? → Execute Tools → LLM → ... → Final Response
7//! ```
8//!
9//! # Phase Execution
10//!
11//! Each phase dispatches to its typed plugin hook:
12//!
13//! ```text
14//! RunStart (once)
15//!     │
16//!     ▼
17//! ┌─────────────────────────┐
18//! │      StepStart          │ ← plugins can apply state patches
19//! ├─────────────────────────┤
20//! │    BeforeInference      │ ← plugins can inject prompt context, filter tools
21//! ├─────────────────────────┤
22//! │      [LLM CALL]         │
23//! ├─────────────────────────┤
24//! │    AfterInference       │
25//! ├─────────────────────────┤
26//! │  ┌───────────────────┐  │
27//! │  │ BeforeToolExecute │  │ ← plugins can block/pending
28//! │  ├───────────────────┤  │
29//! │  │   [TOOL EXEC]     │  │
30//! │  ├───────────────────┤  │
31//! │  │ AfterToolExecute  │  │ ← plugins can add reminders
32//! │  └───────────────────┘  │
33//! ├─────────────────────────┤
34//! │       StepEnd           │
35//! └─────────────────────────┘
36//!     │
37//!     ▼
38//! RunEnd (once)
39//! ```
40
41mod config;
42mod core;
43mod event_envelope_meta;
44mod outcome;
45mod parallel_state_merge;
46mod plugin_runtime;
47mod run_state;
48mod state_commit;
49mod stream_core;
50mod stream_runner;
51mod tool_exec;
52mod truncation_recovery;
53
54use crate::contracts::io::ResumeDecisionAction;
55use crate::contracts::runtime::phase::Phase;
56use crate::contracts::runtime::state::{reduce_state_actions, AnyStateAction, ScopeContext};
57use crate::contracts::runtime::tool_call::{Tool, ToolResult};
58use crate::contracts::runtime::ActivityManager;
59use crate::contracts::runtime::{
60    DecisionReplayPolicy, RunIdentity, RunLifecycleAction, RunLifecycleState, StreamResult,
61    SuspendedCall, ToolCallResume, ToolCallResumeMode, ToolCallStatus, ToolExecutionRequest,
62    ToolExecutionResult,
63};
64use crate::contracts::thread::CheckpointReason;
65use crate::contracts::thread::{gen_message_id, Message, MessageMetadata, ToolCall};
66use crate::contracts::RunContext;
67use crate::contracts::{AgentEvent, RunAction, TerminationReason, ToolCallDecision};
68use crate::engine::convert::{assistant_message, assistant_tool_calls, tool_response};
69use crate::runtime::activity::ActivityHub;
70
71use crate::runtime::streaming::StreamCollector;
72use async_stream::stream;
73use futures::{Stream, StreamExt};
74use genai::Client;
75use serde_json::Value;
76use std::collections::{HashMap, HashSet, VecDeque};
77use std::future::Future;
78use std::pin::Pin;
79use std::sync::atomic::{AtomicU64, Ordering};
80use std::sync::Arc;
81use std::time::{Instant, SystemTime, UNIX_EPOCH};
82use uuid::Uuid;
83
84pub use crate::contracts::runtime::ToolExecutor;
85pub use crate::runtime::run_context::{
86    await_or_cancel, is_cancelled, CancelAware, RunCancellationToken, StateCommitError,
87    StateCommitter,
88};
89use config::StaticStepToolProvider;
90pub use config::{Agent, BaseAgent, GenaiLlmExecutor, LlmRetryPolicy};
91pub use config::{LlmEventStream, LlmExecutor};
92pub use config::{StepToolInput, StepToolProvider, StepToolSnapshot};
93#[cfg(test)]
94use core::build_messages;
95use core::{
96    build_request_for_filtered_tools, inference_inputs_from_step, suspended_calls_from_ctx,
97    tool_call_states_from_ctx, transition_tool_call_state, upsert_tool_call_state,
98    ToolCallStateSeed, ToolCallStateTransition,
99};
100pub use outcome::{tool_map, tool_map_from_arc, AgentLoopError};
101pub use outcome::{LoopOutcome, LoopStats, LoopUsage};
102#[cfg(test)]
103use plugin_runtime::emit_agent_phase;
104#[cfg(test)]
105use plugin_runtime::emit_cleanup_phases;
106use run_state::LoopRunState;
107pub use state_commit::ChannelStateCommitter;
108use state_commit::PendingDeltaCommitContext;
109use tirea_state::TrackedPatch;
110#[cfg(test)]
111use tokio_util::sync::CancellationToken;
112#[cfg(test)]
113use tool_exec::execute_single_tool_with_phases;
114#[cfg(test)]
115use tool_exec::execute_tools_parallel_with_phases;
116pub use tool_exec::ExecuteToolsOutcome;
117use tool_exec::{
118    apply_tool_results_impl, apply_tool_results_to_session, caller_context_for_tool_execution,
119    execute_single_tool_with_phases_deferred, step_metadata, ToolPhaseContext,
120};
121pub use tool_exec::{
122    execute_tools, execute_tools_with_behaviors, execute_tools_with_config,
123    ParallelToolExecutionMode, ParallelToolExecutor, SequentialToolExecutor,
124};
125
126/// Fully resolved agent wiring ready for execution.
127///
128/// Contains everything needed to run an agent loop: the agent,
129/// the resolved tool map, and the run policy. This is a pure data struct
130/// that can be inspected, mutated, and tested independently.
131pub struct ResolvedRun {
132    /// The agent (model, behavior, execution strategies, ...).
133    ///
134    /// Exposed as a concrete [`BaseAgent`] so callers can mutate fields
135    /// (model, plugins, tool_executor, ...) between resolution and execution.
136    /// Converted to `Arc<dyn Agent>` at the execution boundary.
137    pub agent: BaseAgent,
138    /// Resolved tool map after filtering and wiring.
139    pub tools: HashMap<String, Arc<dyn Tool>>,
140    /// Typed per-run policy.
141    pub run_policy: crate::contracts::RunPolicy,
142    /// Optional lineage seed for nested tool-driven runs.
143    pub parent_tool_call_id: Option<String>,
144}
145
146impl ResolvedRun {
147    /// Add or replace a tool in the resolved tool map.
148    #[must_use]
149    pub fn with_tool(mut self, id: String, tool: Arc<dyn Tool>) -> Self {
150        self.tools.insert(id, tool);
151        self
152    }
153
154    /// Overlay tools from a map (insert-if-absent semantics).
155    pub fn overlay_tools(&mut self, tools: HashMap<String, Arc<dyn Tool>>) {
156        for (id, tool) in tools {
157            self.tools.entry(id).or_insert(tool);
158        }
159    }
160}
161
162pub(crate) fn current_unix_millis() -> u64 {
163    SystemTime::now()
164        .duration_since(UNIX_EPOCH)
165        .map_or(0, |d| d.as_millis().min(u128::from(u64::MAX)) as u64)
166}
167
168fn ensure_run_identity(
169    agent: &dyn Agent,
170    run_ctx: &mut RunContext,
171    mut run_identity: RunIdentity,
172) -> RunIdentity {
173    if run_identity.run_id_opt().is_none() {
174        run_identity.run_id = Uuid::now_v7().to_string();
175    }
176    if run_identity.agent_id_opt().is_none() {
177        run_identity.agent_id = agent.id().to_string();
178    }
179    if run_identity.thread_id_opt().is_none() {
180        run_identity.thread_id = run_ctx.thread_id().to_string();
181    }
182    run_ctx.set_run_identity(run_identity.clone());
183    run_identity
184}
185
186#[cfg(test)]
187pub(super) fn sync_run_lifecycle_for_termination(
188    run_ctx: &mut RunContext,
189    run_identity: &RunIdentity,
190    termination: &TerminationReason,
191) -> Result<(), AgentLoopError> {
192    sync_run_lifecycle_for_termination_with_context(run_ctx, run_identity, termination)
193}
194
195fn sync_run_lifecycle_for_termination_with_context(
196    run_ctx: &mut RunContext,
197    run_identity: &RunIdentity,
198    termination: &TerminationReason,
199) -> Result<(), AgentLoopError> {
200    if run_identity.run_id.trim().is_empty() {
201        return Ok(());
202    };
203
204    let (status, done_reason) = termination.to_run_status();
205
206    let base_state = run_ctx
207        .snapshot()
208        .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
209    let actions = vec![AnyStateAction::new::<RunLifecycleState>(
210        RunLifecycleAction::Set {
211            id: run_identity.run_id.clone(),
212            status,
213            done_reason,
214            updated_at: current_unix_millis(),
215        },
216    )];
217    let patches = reduce_state_actions(actions, &base_state, "agent_loop", &ScopeContext::run())
218        .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
219    run_ctx.add_thread_patches(patches);
220    Ok(())
221}
222
223#[derive(Debug, Clone, Copy, PartialEq, Eq)]
224pub(super) enum CancellationStage {
225    Inference,
226    ToolExecution,
227}
228
229pub(super) const CANCELLATION_INFERENCE_USER_MESSAGE: &str =
230    "The previous run was interrupted during inference. Please continue from the current context.";
231pub(super) const CANCELLATION_TOOL_USER_MESSAGE: &str =
232    "The previous run was interrupted while using tools. Please continue from the current context.";
233
234pub(super) fn append_cancellation_user_message(run_ctx: &mut RunContext, stage: CancellationStage) {
235    let content = match stage {
236        CancellationStage::Inference => CANCELLATION_INFERENCE_USER_MESSAGE,
237        CancellationStage::ToolExecution => CANCELLATION_TOOL_USER_MESSAGE,
238    };
239    run_ctx.add_message(Arc::new(Message::user(content)));
240}
241
242pub(super) fn effective_llm_models(agent: &dyn Agent) -> Vec<String> {
243    let mut models = Vec::with_capacity(1 + agent.fallback_models().len());
244    models.push(agent.model().to_string());
245    for model in agent.fallback_models() {
246        if model.trim().is_empty() {
247            continue;
248        }
249        if !models.iter().any(|m| m == model) {
250            models.push(model.clone());
251        }
252    }
253    models
254}
255
256pub(super) fn effective_llm_models_from(
257    agent: &dyn Agent,
258    start_model: Option<&str>,
259) -> Vec<String> {
260    let models = effective_llm_models(agent);
261    let Some(start_model) = start_model.map(str::trim).filter(|model| !model.is_empty()) else {
262        return models;
263    };
264    let Some(index) = models.iter().position(|model| model == start_model) else {
265        return models;
266    };
267    models.into_iter().skip(index).collect()
268}
269
270pub(super) fn next_llm_model_after(agent: &dyn Agent, current_model: &str) -> Option<String> {
271    let models = effective_llm_models(agent);
272    let current_index = models.iter().position(|model| model == current_model)?;
273    models.into_iter().nth(current_index + 1)
274}
275
276pub(super) fn llm_retry_attempts(agent: &dyn Agent) -> usize {
277    agent.llm_retry_policy().max_attempts_per_model.max(1)
278}
279
280#[derive(Debug, Clone, Copy, PartialEq, Eq)]
281pub(super) enum LlmErrorClass {
282    RateLimit,
283    Timeout,
284    Connection,
285    ServerUnavailable,
286    ServerError,
287    Auth,
288    ClientRequest,
289    Unknown,
290}
291
292impl LlmErrorClass {
293    pub(super) fn is_retryable(self) -> bool {
294        matches!(
295            self,
296            LlmErrorClass::RateLimit
297                | LlmErrorClass::Timeout
298                | LlmErrorClass::Connection
299                | LlmErrorClass::ServerUnavailable
300                | LlmErrorClass::ServerError
301        )
302    }
303
304    /// Stable string label for telemetry and structured logging.
305    pub(super) fn as_str(self) -> &'static str {
306        match self {
307            LlmErrorClass::RateLimit => "rate_limit",
308            LlmErrorClass::Timeout => "timeout",
309            LlmErrorClass::Connection => "connection",
310            LlmErrorClass::ServerUnavailable => "server_unavailable",
311            LlmErrorClass::ServerError => "server_error",
312            LlmErrorClass::Auth => "auth",
313            LlmErrorClass::ClientRequest => "client_request",
314            LlmErrorClass::Unknown => "unknown",
315        }
316    }
317}
318
319fn classify_llm_error_message(message: &str) -> LlmErrorClass {
320    let lower = message.to_ascii_lowercase();
321    if ["429", "too many requests", "rate limit"]
322        .iter()
323        .any(|p| lower.contains(p))
324    {
325        return LlmErrorClass::RateLimit;
326    }
327    if ["timeout", "timed out"].iter().any(|p| lower.contains(p)) {
328        return LlmErrorClass::Timeout;
329    }
330    if [
331        "connection",
332        "network",
333        "reset by peer",
334        "broken pipe",
335        "eof",
336        "connection refused",
337        "error sending request for url",
338    ]
339    .iter()
340    .any(|p| lower.contains(p))
341    {
342        return LlmErrorClass::Connection;
343    }
344    if ["503", "service unavailable", "unavailable", "temporar"]
345        .iter()
346        .any(|p| lower.contains(p))
347    {
348        return LlmErrorClass::ServerUnavailable;
349    }
350    if [
351        "500",
352        "502",
353        "504",
354        "server error",
355        "bad gateway",
356        "gateway timeout",
357    ]
358    .iter()
359    .any(|p| lower.contains(p))
360    {
361        return LlmErrorClass::ServerError;
362    }
363    if ["401", "403", "unauthorized", "forbidden", "invalid api key"]
364        .iter()
365        .any(|p| lower.contains(p))
366    {
367        return LlmErrorClass::Auth;
368    }
369    if ["400", "404", "422", "invalid_request", "bad request"]
370        .iter()
371        .any(|p| lower.contains(p))
372    {
373        return LlmErrorClass::ClientRequest;
374    }
375    LlmErrorClass::Unknown
376}
377
378fn classify_status_code(status_code: u16) -> LlmErrorClass {
379    match status_code {
380        408 => LlmErrorClass::Timeout,
381        429 => LlmErrorClass::RateLimit,
382        401 | 403 => LlmErrorClass::Auth,
383        400 | 404 | 422 => LlmErrorClass::ClientRequest,
384        503 => LlmErrorClass::ServerUnavailable,
385        500..=599 => LlmErrorClass::ServerError,
386        400..=499 => LlmErrorClass::ClientRequest,
387        _ => LlmErrorClass::Unknown,
388    }
389}
390
391fn classify_error_chain(error: &(dyn std::error::Error + 'static)) -> Option<LlmErrorClass> {
392    let mut current = Some(error);
393    while let Some(err) = current {
394        if let Some(io_error) = err.downcast_ref::<std::io::Error>() {
395            use std::io::ErrorKind;
396
397            let class = match io_error.kind() {
398                ErrorKind::TimedOut => Some(LlmErrorClass::Timeout),
399                ErrorKind::ConnectionAborted
400                | ErrorKind::ConnectionRefused
401                | ErrorKind::ConnectionReset
402                | ErrorKind::BrokenPipe
403                | ErrorKind::NotConnected
404                | ErrorKind::UnexpectedEof => Some(LlmErrorClass::Connection),
405                _ => None,
406            };
407            if class.is_some() {
408                return class;
409            }
410        }
411        current = err.source();
412    }
413    None
414}
415
416fn classify_webc_error(error: &genai::webc::Error) -> LlmErrorClass {
417    match error {
418        genai::webc::Error::ResponseFailedStatus { status, .. } => {
419            classify_status_code(status.as_u16())
420        }
421        genai::webc::Error::Reqwest(err) => classify_error_chain(err)
422            .unwrap_or_else(|| classify_llm_error_message(&err.to_string())),
423        _ => classify_llm_error_message(&error.to_string()),
424    }
425}
426
427/// Classify a provider error event body by parsing its structured fields.
428///
429/// Provider error events (OpenAI, Anthropic, Gemini) embed a JSON body with a
430/// `type` field that describes the error category. This function extracts that
431/// field and maps it to an [`LlmErrorClass`] without relying on keyword
432/// matching against stringified content.
433///
434/// Known error type strings:
435/// - OpenAI: `"server_error"`, `"rate_limit_error"`, `"invalid_request_error"`, …
436/// - Anthropic: `"overloaded_error"`, `"api_error"`, `"authentication_error"`, …
437fn classify_chat_response_body(body: &serde_json::Value) -> LlmErrorClass {
438    // Collect candidate type strings from both the top-level and nested error
439    // object.  OpenAI streams extract the inner error, so body["type"] is
440    // already the error type.  Anthropic / Gemini may wrap it in an envelope
441    // where body["type"] == "error" and the real type lives at
442    // body["error"]["type"].
443    let top_type = body.get("type").and_then(|v| v.as_str());
444    let nested_type = body
445        .get("error")
446        .and_then(|e| e.get("type"))
447        .and_then(|v| v.as_str());
448
449    // Try the nested (more specific) type first when the top-level type is a
450    // generic envelope marker like "error".
451    let candidates: &[&str] = match (top_type, nested_type) {
452        (_, Some(inner)) => &[inner, top_type.unwrap_or("")],
453        (Some(top), None) => &[top],
454        (None, None) => &[],
455    };
456
457    for t in candidates {
458        let lower = t.to_ascii_lowercase();
459        if lower.contains("rate_limit") {
460            return LlmErrorClass::RateLimit;
461        }
462        if lower.contains("overloaded") || lower.contains("unavailable") {
463            return LlmErrorClass::ServerUnavailable;
464        }
465        if lower.contains("timeout") {
466            return LlmErrorClass::Timeout;
467        }
468        if lower.contains("server_error") || lower.contains("api_error") {
469            return LlmErrorClass::ServerError;
470        }
471        if lower.contains("authentication") || lower.contains("permission") {
472            return LlmErrorClass::Auth;
473        }
474        if lower.contains("invalid_request") || lower.contains("not_found") {
475            return LlmErrorClass::ClientRequest;
476        }
477    }
478
479    // Fallback: check for an HTTP status code in the body (some providers
480    // include a numeric `status` or `code` field).
481    let status_code = body
482        .get("status")
483        .or_else(|| body.get("code"))
484        .and_then(|v| v.as_u64())
485        .and_then(|v| u16::try_from(v).ok());
486
487    if let Some(code) = status_code {
488        let class = classify_status_code(code);
489        if class != LlmErrorClass::Unknown {
490            return class;
491        }
492    }
493
494    LlmErrorClass::Unknown
495}
496
497pub(super) fn classify_llm_error(error: &genai::Error) -> LlmErrorClass {
498    match error {
499        genai::Error::HttpError { status, .. } => classify_status_code(status.as_u16()),
500        genai::Error::WebStream { cause, error, .. } => classify_error_chain(error.as_ref())
501            .unwrap_or_else(|| classify_llm_error_message(cause)),
502        genai::Error::WebAdapterCall { webc_error, .. }
503        | genai::Error::WebModelCall { webc_error, .. } => classify_webc_error(webc_error),
504        genai::Error::Internal(message) => classify_llm_error_message(message),
505
506        // Mid-stream JSON parse failure — almost always caused by truncated /
507        // corrupted SSE data due to a transient network issue.
508        genai::Error::StreamParse { .. } => LlmErrorClass::Connection,
509
510        // Provider returned no response body at all — treat as server-side fault.
511        genai::Error::NoChatResponse { .. } => LlmErrorClass::ServerError,
512
513        // Provider streamed an explicit error event with a structured body.
514        genai::Error::ChatResponse { body, .. } => classify_chat_response_body(body),
515
516        // Auth configuration errors from genai itself (not HTTP 401/403).
517        genai::Error::RequiresApiKey { .. }
518        | genai::Error::NoAuthResolver { .. }
519        | genai::Error::NoAuthData { .. } => LlmErrorClass::Auth,
520
521        other => classify_llm_error_message(&other.to_string()),
522    }
523}
524
525#[cfg(test)]
526pub(super) fn is_retryable_llm_error(error: &genai::Error) -> bool {
527    classify_llm_error(error).is_retryable()
528}
529
530static RETRY_BACKOFF_ENTROPY: AtomicU64 = AtomicU64::new(0x9e37_79b9_7f4a_7c15);
531
532#[derive(Debug, Clone, Default)]
533pub(super) struct RetryBackoffWindow {
534    started_at: Option<Instant>,
535}
536
537impl RetryBackoffWindow {
538    pub(super) fn elapsed_ms(&mut self) -> u64 {
539        self.started_at
540            .get_or_insert_with(Instant::now)
541            .elapsed()
542            .as_millis()
543            .try_into()
544            .unwrap_or(u64::MAX)
545    }
546
547    pub(super) fn reset(&mut self) {
548        self.started_at = None;
549    }
550}
551
552#[derive(Debug, Clone, Copy, PartialEq, Eq)]
553pub(super) enum RetryBackoffOutcome {
554    Completed,
555    Cancelled,
556    BudgetExhausted,
557}
558
559fn mix_retry_entropy(mut entropy: u64) -> u64 {
560    entropy = entropy.wrapping_add(0x9e37_79b9_7f4a_7c15);
561    entropy = (entropy ^ (entropy >> 30)).wrapping_mul(0xbf58_476d_1ce4_e5b9);
562    entropy = (entropy ^ (entropy >> 27)).wrapping_mul(0x94d0_49bb_1331_11eb);
563    entropy ^ (entropy >> 31)
564}
565
566fn next_retry_entropy() -> u64 {
567    let counter = RETRY_BACKOFF_ENTROPY.fetch_add(0x9e37_79b9_7f4a_7c15, Ordering::Relaxed);
568    let now = SystemTime::now()
569        .duration_since(UNIX_EPOCH)
570        .map(|duration| duration.as_nanos() as u64)
571        .unwrap_or(counter);
572    mix_retry_entropy(counter ^ now)
573}
574
575pub(super) fn retry_base_backoff_ms(policy: &LlmRetryPolicy, retry_attempt: usize) -> u64 {
576    let initial = policy.initial_backoff_ms;
577    let cap = policy.max_backoff_ms.max(policy.initial_backoff_ms);
578    let attempt = retry_attempt.max(1);
579    if attempt == 1 {
580        return initial.min(cap);
581    }
582    let shift = (attempt - 1).min(20) as u32;
583    let factor = 1u64.checked_shl(shift).unwrap_or(u64::MAX);
584    initial.saturating_mul(factor).min(cap)
585}
586
587fn jittered_backoff_ms(policy: &LlmRetryPolicy, retry_attempt: usize, entropy: u64) -> u64 {
588    let cap = policy.max_backoff_ms.max(policy.initial_backoff_ms);
589    let base_ms = retry_base_backoff_ms(policy, retry_attempt).min(cap);
590    let jitter_percent = policy.backoff_jitter_percent.min(100) as u64;
591    if jitter_percent == 0 || base_ms == 0 {
592        return base_ms;
593    }
594
595    let jitter_window = base_ms.saturating_mul(jitter_percent) / 100;
596    let lower = base_ms.saturating_sub(jitter_window);
597    let upper = base_ms.saturating_add(jitter_window).min(cap);
598    if upper <= lower {
599        return lower;
600    }
601
602    let span = upper - lower;
603    lower + (mix_retry_entropy(entropy) % (span.saturating_add(1)))
604}
605
606pub(super) fn retry_backoff_plan_ms(
607    policy: &LlmRetryPolicy,
608    retry_attempt: usize,
609    elapsed_ms: u64,
610    entropy: u64,
611) -> Option<u64> {
612    let wait_ms = jittered_backoff_ms(policy, retry_attempt, entropy);
613    match policy.max_retry_window_ms {
614        Some(budget_ms) if elapsed_ms.saturating_add(wait_ms) > budget_ms => None,
615        _ => Some(wait_ms),
616    }
617}
618
619pub(super) async fn wait_retry_backoff(
620    agent: &dyn Agent,
621    retry_attempt: usize,
622    retry_window: &mut RetryBackoffWindow,
623    run_cancellation_token: Option<&RunCancellationToken>,
624) -> RetryBackoffOutcome {
625    let elapsed_ms = retry_window.elapsed_ms();
626    let Some(wait_ms) = retry_backoff_plan_ms(
627        agent.llm_retry_policy(),
628        retry_attempt,
629        elapsed_ms,
630        next_retry_entropy(),
631    ) else {
632        return RetryBackoffOutcome::BudgetExhausted;
633    };
634    tracing::debug!(
635        attempt = retry_attempt,
636        backoff_ms = wait_ms,
637        elapsed_ms = elapsed_ms,
638        "waiting before LLM retry"
639    );
640    match await_or_cancel(
641        run_cancellation_token,
642        tokio::time::sleep(std::time::Duration::from_millis(wait_ms)),
643    )
644    .await
645    {
646        CancelAware::Cancelled => RetryBackoffOutcome::Cancelled,
647        CancelAware::Value(_) => RetryBackoffOutcome::Completed,
648    }
649}
650
651pub(super) enum LlmAttemptOutcome<T> {
652    Success {
653        value: T,
654        model: String,
655        attempts: usize,
656    },
657    Cancelled,
658    Exhausted {
659        last_error: String,
660        last_error_class: Option<&'static str>,
661        attempts: usize,
662    },
663}
664
665fn is_run_cancelled(token: Option<&RunCancellationToken>) -> bool {
666    is_cancelled(token)
667}
668
669pub(super) fn step_tool_provider_for_run(
670    agent: &dyn Agent,
671    tools: HashMap<String, Arc<dyn Tool>>,
672) -> Arc<dyn StepToolProvider> {
673    agent.step_tool_provider().unwrap_or_else(|| {
674        Arc::new(StaticStepToolProvider::new(tools)) as Arc<dyn StepToolProvider>
675    })
676}
677
678pub(super) fn llm_executor_for_run(agent: &dyn Agent) -> Arc<dyn LlmExecutor> {
679    agent
680        .llm_executor()
681        .unwrap_or_else(|| Arc::new(GenaiLlmExecutor::new(Client::default())))
682}
683
684pub(super) async fn resolve_step_tool_snapshot(
685    step_tool_provider: &Arc<dyn StepToolProvider>,
686    run_ctx: &RunContext,
687) -> Result<StepToolSnapshot, AgentLoopError> {
688    step_tool_provider
689        .provide(StepToolInput { state: run_ctx })
690        .await
691}
692
693fn mark_step_completed(run_state: &mut LoopRunState) {
694    run_state.completed_steps += 1;
695}
696
697fn stitch_response_text(prefix: &str, segment: &str) -> String {
698    if prefix.is_empty() {
699        return segment.to_string();
700    }
701    if segment.is_empty() {
702        return prefix.to_string();
703    }
704    let mut stitched = String::with_capacity(prefix.len() + segment.len());
705    stitched.push_str(prefix);
706    stitched.push_str(segment);
707    stitched
708}
709
710fn extend_response_prefix(prefix: &mut String, segment: &str) {
711    if !segment.is_empty() {
712        prefix.push_str(segment);
713    }
714}
715
716fn build_loop_outcome(
717    run_ctx: RunContext,
718    termination: TerminationReason,
719    response: Option<String>,
720    run_state: &LoopRunState,
721    failure: Option<outcome::LoopFailure>,
722) -> LoopOutcome {
723    LoopOutcome {
724        run_ctx,
725        termination,
726        response: response.filter(|text| !text.is_empty()),
727        usage: run_state.usage(),
728        stats: run_state.stats(),
729        failure,
730    }
731}
732
733pub(super) async fn run_llm_with_retry_and_fallback<T, Invoke, Fut>(
734    agent: &dyn Agent,
735    run_cancellation_token: Option<&RunCancellationToken>,
736    retry_current_model: bool,
737    start_model: Option<&str>,
738    unknown_error: &str,
739    mut invoke: Invoke,
740) -> LlmAttemptOutcome<T>
741where
742    Invoke: FnMut(String) -> Fut,
743    Fut: std::future::Future<Output = genai::Result<T>>,
744{
745    let mut last_llm_error = unknown_error.to_string();
746    let mut last_error_class: Option<&'static str> = None;
747    let model_candidates = effective_llm_models_from(agent, start_model);
748    let max_attempts = llm_retry_attempts(agent);
749    let mut total_attempts = 0usize;
750    let mut retry_window = RetryBackoffWindow::default();
751
752    'models: for model in model_candidates {
753        for attempt in 1..=max_attempts {
754            total_attempts = total_attempts.saturating_add(1);
755            let response_res =
756                match await_or_cancel(run_cancellation_token, invoke(model.clone())).await {
757                    CancelAware::Cancelled => return LlmAttemptOutcome::Cancelled,
758                    CancelAware::Value(resp) => resp,
759                };
760
761            match response_res {
762                Ok(value) => {
763                    if total_attempts > 1 {
764                        tracing::info!(
765                            model = %model,
766                            attempts = total_attempts,
767                            "LLM call succeeded after retries"
768                        );
769                    }
770                    return LlmAttemptOutcome::Success {
771                        value,
772                        model,
773                        attempts: total_attempts,
774                    };
775                }
776                Err(e) => {
777                    let error_class = classify_llm_error(&e);
778                    last_error_class = Some(error_class.as_str());
779                    let message = e.to_string();
780                    last_llm_error =
781                        format!("model='{model}' attempt={attempt}/{max_attempts}: {message}");
782                    let can_retry_same_model =
783                        retry_current_model && attempt < max_attempts && error_class.is_retryable();
784                    tracing::warn!(
785                        model = %model,
786                        attempt = attempt,
787                        max_attempts = max_attempts,
788                        error_class = error_class.as_str(),
789                        retryable = can_retry_same_model,
790                        error = %message,
791                        "LLM call failed"
792                    );
793                    if can_retry_same_model {
794                        match wait_retry_backoff(
795                            agent,
796                            attempt,
797                            &mut retry_window,
798                            run_cancellation_token,
799                        )
800                        .await
801                        {
802                            RetryBackoffOutcome::Completed => continue,
803                            RetryBackoffOutcome::Cancelled => {
804                                return LlmAttemptOutcome::Cancelled;
805                            }
806                            RetryBackoffOutcome::BudgetExhausted => {
807                                tracing::warn!(
808                                    model = %model,
809                                    attempt = attempt,
810                                    "LLM retry budget exhausted"
811                                );
812                                last_llm_error =
813                                    format!("{last_llm_error} (retry budget exhausted)");
814                                break 'models;
815                            }
816                        }
817                    }
818                    break;
819                }
820            }
821        }
822    }
823
824    LlmAttemptOutcome::Exhausted {
825        last_error: last_llm_error,
826        last_error_class,
827        attempts: total_attempts,
828    }
829}
830
831pub(super) async fn run_step_prepare_phases(
832    run_ctx: &RunContext,
833    tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
834    agent: &dyn Agent,
835) -> Result<
836    (
837        Vec<Message>,
838        Vec<String>,
839        RunAction,
840        Vec<std::sync::Arc<dyn tirea_contract::runtime::inference::InferenceRequestTransform>>,
841        Vec<TrackedPatch>,
842        Vec<tirea_contract::SerializedStateAction>,
843    ),
844    AgentLoopError,
845> {
846    let system_prompt = agent.system_prompt().to_string();
847    let ((messages, filtered_tools, run_action, transforms), pending, actions) =
848        plugin_runtime::run_phase_block(
849            run_ctx,
850            tool_descriptors,
851            agent,
852            &[Phase::StepStart, Phase::BeforeInference],
853            |_| {},
854            |step| inference_inputs_from_step(step, &system_prompt),
855        )
856        .await?;
857    Ok((
858        messages,
859        filtered_tools,
860        run_action,
861        transforms,
862        pending,
863        actions,
864    ))
865}
866
867pub(super) struct PreparedStep {
868    pub(super) messages: Vec<Message>,
869    pub(super) filtered_tools: Vec<String>,
870    pub(super) run_action: RunAction,
871    pub(super) pending_patches: Vec<TrackedPatch>,
872    pub(super) serialized_state_actions: Vec<tirea_contract::SerializedStateAction>,
873    pub(super) request_transforms:
874        Vec<std::sync::Arc<dyn tirea_contract::runtime::inference::InferenceRequestTransform>>,
875}
876
877pub(super) async fn prepare_step_execution(
878    run_ctx: &RunContext,
879    tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
880    agent: &dyn Agent,
881) -> Result<PreparedStep, AgentLoopError> {
882    let (messages, filtered_tools, run_action, transforms, pending, actions) =
883        run_step_prepare_phases(run_ctx, tool_descriptors, agent).await?;
884    Ok(PreparedStep {
885        messages,
886        filtered_tools,
887        run_action,
888        pending_patches: pending,
889        serialized_state_actions: actions,
890        request_transforms: transforms,
891    })
892}
893
894pub(super) async fn apply_llm_error_cleanup(
895    run_ctx: &mut RunContext,
896    tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
897    agent: &dyn Agent,
898    error_type: &'static str,
899    message: String,
900    error_class: Option<&str>,
901) -> Result<(), AgentLoopError> {
902    plugin_runtime::emit_cleanup_phases(
903        run_ctx,
904        tool_descriptors,
905        agent,
906        error_type,
907        message,
908        error_class,
909    )
910    .await
911}
912
913pub(super) async fn complete_step_after_inference(
914    run_ctx: &mut RunContext,
915    result: &StreamResult,
916    step_meta: MessageMetadata,
917    assistant_message_id: Option<String>,
918    tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
919    agent: &dyn Agent,
920) -> Result<RunAction, AgentLoopError> {
921    let (run_action, pending, actions) = plugin_runtime::run_phase_block(
922        run_ctx,
923        tool_descriptors,
924        agent,
925        &[Phase::AfterInference],
926        |step| {
927            use crate::contracts::runtime::inference::LLMResponse;
928            step.llm_response = Some(LLMResponse::success(result.clone()));
929        },
930        |step| step.run_action(),
931    )
932    .await?;
933    run_ctx.add_thread_patches(pending);
934    run_ctx.add_serialized_state_actions(actions);
935
936    let assistant = assistant_turn_message(result, step_meta, assistant_message_id);
937    run_ctx.add_message(Arc::new(assistant));
938
939    let (pending, actions) =
940        plugin_runtime::emit_phase_block(Phase::StepEnd, run_ctx, tool_descriptors, agent, |_| {})
941            .await?;
942    run_ctx.add_thread_patches(pending);
943    run_ctx.add_serialized_state_actions(actions);
944    Ok(run_action)
945}
946
947/// Emit events for a pending tool-call projection.
948pub(super) fn pending_tool_events(call: &SuspendedCall) -> Vec<AgentEvent> {
949    vec![
950        AgentEvent::ToolCallStart {
951            id: call.ticket.pending.id.clone(),
952            name: call.ticket.pending.name.clone(),
953        },
954        AgentEvent::ToolCallReady {
955            id: call.ticket.pending.id.clone(),
956            name: call.ticket.pending.name.clone(),
957            arguments: call.ticket.pending.arguments.clone(),
958        },
959    ]
960}
961
962pub(super) fn has_suspended_calls(run_ctx: &RunContext) -> bool {
963    !suspended_calls_from_ctx(run_ctx).is_empty()
964}
965
966pub(super) fn suspended_call_ids(run_ctx: &RunContext) -> HashSet<String> {
967    suspended_calls_from_ctx(run_ctx).into_keys().collect()
968}
969
970pub(super) fn newly_suspended_call_ids(
971    run_ctx: &RunContext,
972    baseline_ids: &HashSet<String>,
973) -> HashSet<String> {
974    suspended_calls_from_ctx(run_ctx)
975        .into_keys()
976        .filter(|id| !baseline_ids.contains(id))
977        .collect()
978}
979
980pub(super) fn suspended_call_pending_events(run_ctx: &RunContext) -> Vec<AgentEvent> {
981    let mut calls: Vec<SuspendedCall> = suspended_calls_from_ctx(run_ctx).into_values().collect();
982    calls.sort_by(|left, right| left.call_id.cmp(&right.call_id));
983    calls
984        .into_iter()
985        .flat_map(|call| pending_tool_events(&call))
986        .collect()
987}
988
989pub(super) fn suspended_call_pending_events_for_ids(
990    run_ctx: &RunContext,
991    call_ids: &HashSet<String>,
992) -> Vec<AgentEvent> {
993    if call_ids.is_empty() {
994        return Vec::new();
995    }
996    let mut calls: Vec<SuspendedCall> = suspended_calls_from_ctx(run_ctx)
997        .into_iter()
998        .filter_map(|(call_id, call)| call_ids.contains(&call_id).then_some(call))
999        .collect();
1000    calls.sort_by(|left, right| left.call_id.cmp(&right.call_id));
1001    calls
1002        .into_iter()
1003        .flat_map(|call| pending_tool_events(&call))
1004        .collect()
1005}
1006
1007pub(super) struct ToolExecutionContext {
1008    pub(super) state: serde_json::Value,
1009    pub(super) run_policy: tirea_contract::RunPolicy,
1010    pub(super) run_identity: RunIdentity,
1011    pub(super) caller_context: crate::contracts::runtime::tool_call::CallerContext,
1012}
1013
1014pub(super) fn prepare_tool_execution_context(
1015    run_ctx: &RunContext,
1016) -> Result<ToolExecutionContext, AgentLoopError> {
1017    let state = run_ctx
1018        .snapshot()
1019        .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1020    let caller_context = caller_context_for_tool_execution(run_ctx, &state);
1021    Ok(ToolExecutionContext {
1022        state,
1023        run_policy: run_ctx.run_policy().clone(),
1024        run_identity: run_ctx.run_identity().clone(),
1025        caller_context,
1026    })
1027}
1028
1029pub(super) async fn finalize_run_end(
1030    run_ctx: &mut RunContext,
1031    tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
1032    agent: &dyn Agent,
1033) {
1034    plugin_runtime::emit_run_end_phase(run_ctx, tool_descriptors, agent).await
1035}
1036
1037fn normalize_termination_for_suspended_calls(
1038    run_ctx: &RunContext,
1039    termination: TerminationReason,
1040    response: Option<String>,
1041) -> (TerminationReason, Option<String>) {
1042    let final_termination = if !matches!(
1043        termination,
1044        TerminationReason::Error(_) | TerminationReason::Cancelled
1045    ) && has_suspended_calls(run_ctx)
1046    {
1047        TerminationReason::Suspended
1048    } else {
1049        termination
1050    };
1051    let final_response = if final_termination == TerminationReason::Suspended {
1052        None
1053    } else {
1054        response
1055    };
1056    (final_termination, final_response)
1057}
1058
1059async fn persist_run_termination(
1060    run_ctx: &mut RunContext,
1061    termination: &TerminationReason,
1062    tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
1063    agent: &dyn Agent,
1064    run_identity: &RunIdentity,
1065    pending_delta_commit: &PendingDeltaCommitContext<'_>,
1066) -> Result<(), AgentLoopError> {
1067    sync_run_lifecycle_for_termination_with_context(run_ctx, run_identity, termination)?;
1068    finalize_run_end(run_ctx, tool_descriptors, agent).await;
1069    pending_delta_commit
1070        .commit_run_finished(run_ctx, termination)
1071        .await?;
1072    Ok(())
1073}
1074
1075fn stream_result_from_chat_response(response: &genai::chat::ChatResponse) -> StreamResult {
1076    let text = response
1077        .first_text()
1078        .map(|s| s.to_string())
1079        .unwrap_or_default();
1080    let tool_calls: Vec<crate::contracts::thread::ToolCall> = response
1081        .tool_calls()
1082        .into_iter()
1083        .map(|tc| {
1084            crate::contracts::thread::ToolCall::new(
1085                &tc.call_id,
1086                &tc.fn_name,
1087                tc.fn_arguments.clone(),
1088            )
1089        })
1090        .collect();
1091
1092    let usage = Some(crate::runtime::streaming::token_usage_from_genai(
1093        &response.usage,
1094    ));
1095    let stop_reason = response
1096        .stop_reason
1097        .as_ref()
1098        .and_then(crate::runtime::streaming::map_genai_stop_reason)
1099        .or({
1100            if !tool_calls.is_empty() {
1101                Some(tirea_contract::runtime::inference::StopReason::ToolUse)
1102            } else {
1103                Some(tirea_contract::runtime::inference::StopReason::EndTurn)
1104            }
1105        });
1106    StreamResult {
1107        text,
1108        tool_calls,
1109        usage,
1110        stop_reason,
1111    }
1112}
1113
1114fn assistant_turn_message(
1115    result: &StreamResult,
1116    step_meta: MessageMetadata,
1117    message_id: Option<String>,
1118) -> Message {
1119    let mut msg = if result.tool_calls.is_empty() {
1120        assistant_message(&result.text)
1121    } else {
1122        assistant_tool_calls(&result.text, result.tool_calls.clone())
1123    }
1124    .with_metadata(step_meta);
1125    if let Some(message_id) = message_id {
1126        msg = msg.with_id(message_id);
1127    }
1128    msg
1129}
1130
1131struct RunStartDrainOutcome {
1132    events: Vec<AgentEvent>,
1133    replayed: bool,
1134}
1135
1136fn decision_result_value(action: &ResumeDecisionAction, result: &Value) -> serde_json::Value {
1137    if result.is_null() {
1138        serde_json::Value::Bool(matches!(action, ResumeDecisionAction::Resume))
1139    } else {
1140        result.clone()
1141    }
1142}
1143
1144fn runtime_resume_inputs(run_ctx: &RunContext) -> HashMap<String, ToolCallResume> {
1145    let mut decisions = HashMap::new();
1146    for (call_id, state) in tool_call_states_from_ctx(run_ctx) {
1147        if !matches!(state.status, ToolCallStatus::Resuming) {
1148            continue;
1149        }
1150        let Some(mut resume) = state.resume else {
1151            continue;
1152        };
1153        if resume.decision_id.trim().is_empty() {
1154            resume.decision_id = call_id.clone();
1155        }
1156        decisions.insert(call_id, resume);
1157    }
1158    decisions
1159}
1160
1161fn settle_orphan_resuming_tool_states(
1162    run_ctx: &mut RunContext,
1163    suspended: &HashMap<String, SuspendedCall>,
1164    resumes: &HashMap<String, ToolCallResume>,
1165) -> Result<bool, AgentLoopError> {
1166    let states = tool_call_states_from_ctx(run_ctx);
1167    let mut changed = false;
1168
1169    for (call_id, resume) in resumes {
1170        if suspended.contains_key(call_id) {
1171            continue;
1172        }
1173        let Some(state) = states.get(call_id).cloned() else {
1174            continue;
1175        };
1176        let target_status = match &resume.action {
1177            ResumeDecisionAction::Cancel => ToolCallStatus::Cancelled,
1178            ResumeDecisionAction::Resume => ToolCallStatus::Failed,
1179        };
1180        if state.status == target_status && state.resume.as_ref() == Some(resume) {
1181            continue;
1182        }
1183
1184        let Some(next_state) = transition_tool_call_state(
1185            Some(state.clone()),
1186            ToolCallStateSeed {
1187                call_id: call_id.as_str(),
1188                tool_name: state.tool_name.as_str(),
1189                arguments: &state.arguments,
1190                status: state.status,
1191                resume_token: state.resume_token.clone(),
1192            },
1193            ToolCallStateTransition {
1194                status: target_status,
1195                resume_token: state.resume_token.clone(),
1196                resume: Some(resume.clone()),
1197                updated_at: current_unix_millis(),
1198            },
1199        ) else {
1200            continue;
1201        };
1202
1203        let base_state = run_ctx
1204            .snapshot()
1205            .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1206        let patch = upsert_tool_call_state(&base_state, call_id, next_state)?;
1207        if patch.patch().is_empty() {
1208            continue;
1209        }
1210        run_ctx.add_thread_patch(patch);
1211        changed = true;
1212    }
1213
1214    Ok(changed)
1215}
1216
1217fn all_suspended_calls_have_resume(
1218    suspended: &HashMap<String, SuspendedCall>,
1219    resumes: &HashMap<String, ToolCallResume>,
1220) -> bool {
1221    suspended
1222        .keys()
1223        .all(|call_id| resumes.contains_key(call_id))
1224}
1225
1226async fn drain_resuming_tool_calls_and_replay(
1227    run_ctx: &mut RunContext,
1228    tools: &HashMap<String, Arc<dyn Tool>>,
1229    agent: &dyn Agent,
1230    tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
1231) -> Result<RunStartDrainOutcome, AgentLoopError> {
1232    let decisions = runtime_resume_inputs(run_ctx);
1233    if decisions.is_empty() {
1234        return Ok(RunStartDrainOutcome {
1235            events: Vec::new(),
1236            replayed: false,
1237        });
1238    }
1239
1240    let suspended = suspended_calls_from_ctx(run_ctx);
1241    let mut state_changed = false;
1242    if settle_orphan_resuming_tool_states(run_ctx, &suspended, &decisions)? {
1243        state_changed = true;
1244    }
1245    if suspended.is_empty() {
1246        if state_changed {
1247            let snapshot = run_ctx
1248                .snapshot()
1249                .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1250            return Ok(RunStartDrainOutcome {
1251                events: vec![AgentEvent::StateSnapshot { snapshot }],
1252                replayed: false,
1253            });
1254        }
1255        return Ok(RunStartDrainOutcome {
1256            events: Vec::new(),
1257            replayed: false,
1258        });
1259    }
1260
1261    if matches!(
1262        agent.tool_executor().decision_replay_policy(),
1263        DecisionReplayPolicy::BatchAllSuspended
1264    ) && !all_suspended_calls_have_resume(&suspended, &decisions)
1265    {
1266        if state_changed {
1267            let snapshot = run_ctx
1268                .snapshot()
1269                .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1270            return Ok(RunStartDrainOutcome {
1271                events: vec![AgentEvent::StateSnapshot { snapshot }],
1272                replayed: false,
1273            });
1274        }
1275        return Ok(RunStartDrainOutcome {
1276            events: Vec::new(),
1277            replayed: false,
1278        });
1279    }
1280
1281    let mut events = Vec::new();
1282    let mut decision_ids: Vec<String> = decisions.keys().cloned().collect();
1283    decision_ids.sort();
1284
1285    let mut replayed = false;
1286
1287    for call_id in decision_ids {
1288        let Some(suspended_call) = suspended.get(&call_id).cloned() else {
1289            continue;
1290        };
1291        let Some(decision) = decisions.get(&call_id).cloned() else {
1292            continue;
1293        };
1294        replayed = true;
1295        let decision_result = decision_result_value(&decision.action, &decision.result);
1296        let resume_payload = ToolCallResume {
1297            result: decision_result.clone(),
1298            ..decision.clone()
1299        };
1300        events.push(AgentEvent::ToolCallResumed {
1301            target_id: suspended_call.call_id.clone(),
1302            result: decision_result.clone(),
1303        });
1304
1305        match decision.action {
1306            ResumeDecisionAction::Cancel => {
1307                let cancel_reason = resume_payload.reason.clone();
1308                if upsert_tool_call_lifecycle_state(
1309                    run_ctx,
1310                    &suspended_call,
1311                    ToolCallStatus::Cancelled,
1312                    Some(resume_payload),
1313                )? {
1314                    state_changed = true;
1315                }
1316                events.push(append_denied_tool_result_message(
1317                    run_ctx,
1318                    &suspended_call.call_id,
1319                    Some(&suspended_call.tool_name),
1320                    cancel_reason.as_deref(),
1321                ));
1322                // Cancel path skips tool execution, so no automatic scope
1323                // cleanup runs. Delete just the suspended_call entry; keep
1324                // tool_call_state (Cancelled status) for audit.
1325                let cleanup_path = format!(
1326                    "__tool_call_scope.{}.suspended_call",
1327                    suspended_call.call_id
1328                );
1329                let cleanup_patch = tirea_state::Patch::with_ops(vec![tirea_state::Op::delete(
1330                    tirea_state::parse_path(&cleanup_path),
1331                )]);
1332                let tracked = tirea_state::TrackedPatch::new(cleanup_patch)
1333                    .with_source("framework:scope_cleanup");
1334                if !tracked.patch().is_empty() {
1335                    state_changed = true;
1336                    run_ctx.add_thread_patch(tracked);
1337                }
1338            }
1339            ResumeDecisionAction::Resume => {
1340                if upsert_tool_call_lifecycle_state(
1341                    run_ctx,
1342                    &suspended_call,
1343                    ToolCallStatus::Resuming,
1344                    Some(resume_payload.clone()),
1345                )? {
1346                    state_changed = true;
1347                }
1348                let Some(tool_call) = replay_tool_call_for_resolution(
1349                    run_ctx,
1350                    &suspended_call,
1351                    &ToolCallDecision {
1352                        target_id: suspended_call.call_id.clone(),
1353                        resume: resume_payload.clone(),
1354                    },
1355                ) else {
1356                    continue;
1357                };
1358                let state = run_ctx
1359                    .snapshot()
1360                    .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1361                let tool = tools.get(&tool_call.name).cloned();
1362                let caller_context = caller_context_for_tool_execution(run_ctx, &state);
1363                let replay_phase_ctx = ToolPhaseContext {
1364                    tool_descriptors,
1365                    agent_behavior: Some(agent.behavior()),
1366                    activity_manager: tirea_contract::runtime::activity::NoOpActivityManager::arc(),
1367                    run_policy: run_ctx.run_policy(),
1368                    run_identity: run_ctx.run_identity().clone(),
1369                    caller_context,
1370                    thread_id: run_ctx.thread_id(),
1371                    thread_messages: run_ctx.messages(),
1372                    cancellation_token: None,
1373                };
1374                let replay_result = execute_single_tool_with_phases_deferred(
1375                    tool.as_deref(),
1376                    &tool_call,
1377                    &state,
1378                    &replay_phase_ctx,
1379                )
1380                .await?;
1381
1382                let replay_msg_id = gen_message_id();
1383                let replay_msg = tool_response(&tool_call.id, &replay_result.execution.result)
1384                    .with_id(replay_msg_id.clone());
1385                run_ctx.add_message(Arc::new(replay_msg));
1386
1387                if !replay_result.reminders.is_empty() {
1388                    let msgs: Vec<Arc<Message>> = replay_result
1389                        .reminders
1390                        .iter()
1391                        .map(|reminder| {
1392                            Arc::new(Message::internal_system(format!(
1393                                "<system-reminder>{}</system-reminder>",
1394                                reminder
1395                            )))
1396                        })
1397                        .collect();
1398                    run_ctx.add_messages(msgs);
1399                }
1400
1401                if let Some(patch) = replay_result.execution.patch.clone() {
1402                    state_changed = true;
1403                    run_ctx.add_thread_patch(patch);
1404                }
1405                if !replay_result.pending_patches.is_empty() {
1406                    state_changed = true;
1407                    run_ctx.add_thread_patches(replay_result.pending_patches.clone());
1408                }
1409                events.push(AgentEvent::ToolCallDone {
1410                    id: tool_call.id.clone(),
1411                    result: replay_result.execution.result,
1412                    patch: replay_result.execution.patch,
1413                    message_id: replay_msg_id,
1414                });
1415
1416                if let Some(next_suspended_call) = replay_result.suspended_call.clone() {
1417                    let state = run_ctx
1418                        .snapshot()
1419                        .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1420                    let action = next_suspended_call.clone().into_state_action();
1421                    let patches = reduce_state_actions(
1422                        vec![action],
1423                        &state,
1424                        "agent_loop",
1425                        &ScopeContext::run(),
1426                    )
1427                    .map_err(|e| {
1428                        AgentLoopError::StateError(format!(
1429                            "failed to reduce suspended call action: {e}"
1430                        ))
1431                    })?;
1432                    for patch in patches {
1433                        if !patch.patch().is_empty() {
1434                            state_changed = true;
1435                            run_ctx.add_thread_patch(patch);
1436                        }
1437                    }
1438                    for event in pending_tool_events(&next_suspended_call) {
1439                        events.push(event);
1440                    }
1441                }
1442            }
1443        }
1444    }
1445
1446    // No explicit clear_suspended_call needed: terminal-outcome tool calls
1447    // clear `__tool_call_scope.<call_id>.suspended_call` automatically in
1448    // execute_single_tool_with_phases_impl while preserving tool_call_state.
1449
1450    if state_changed {
1451        let snapshot = run_ctx
1452            .snapshot()
1453            .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1454        events.push(AgentEvent::StateSnapshot { snapshot });
1455    }
1456
1457    Ok(RunStartDrainOutcome { events, replayed })
1458}
1459
1460async fn drain_run_start_resume_replay(
1461    run_ctx: &mut RunContext,
1462    tools: &HashMap<String, Arc<dyn Tool>>,
1463    agent: &dyn Agent,
1464    tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
1465) -> Result<RunStartDrainOutcome, AgentLoopError> {
1466    drain_resuming_tool_calls_and_replay(run_ctx, tools, agent, tool_descriptors).await
1467}
1468
1469async fn commit_run_start_and_drain_replay(
1470    run_ctx: &mut RunContext,
1471    tools: &HashMap<String, Arc<dyn Tool>>,
1472    agent: &dyn Agent,
1473    active_tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
1474    pending_delta_commit: &PendingDeltaCommitContext<'_>,
1475) -> Result<RunStartDrainOutcome, AgentLoopError> {
1476    pending_delta_commit
1477        .commit(run_ctx, CheckpointReason::UserMessage, false)
1478        .await?;
1479
1480    let run_start_drain =
1481        drain_run_start_resume_replay(run_ctx, tools, agent, active_tool_descriptors).await?;
1482
1483    if run_start_drain.replayed {
1484        pending_delta_commit
1485            .commit(run_ctx, CheckpointReason::ToolResultsCommitted, false)
1486            .await?;
1487    }
1488
1489    Ok(run_start_drain)
1490}
1491
1492fn normalize_decision_tool_result(
1493    response: &serde_json::Value,
1494    fallback_arguments: &serde_json::Value,
1495) -> serde_json::Value {
1496    match response {
1497        serde_json::Value::Bool(_) => fallback_arguments.clone(),
1498        value => value.clone(),
1499    }
1500}
1501
1502fn denied_tool_result_for_call(
1503    run_ctx: &RunContext,
1504    call_id: &str,
1505    fallback_tool_name: Option<&str>,
1506    decision_reason: Option<&str>,
1507) -> ToolResult {
1508    let tool_name = fallback_tool_name
1509        .filter(|name| !name.is_empty())
1510        .map(str::to_string)
1511        .or_else(|| find_tool_call_in_messages(run_ctx, call_id).map(|call| call.name))
1512        .unwrap_or_else(|| "tool".to_string());
1513    let reason = decision_reason
1514        .map(str::to_string)
1515        .filter(|reason| !reason.trim().is_empty())
1516        .unwrap_or_else(|| "User denied the action".to_string());
1517    ToolResult::error(tool_name, reason)
1518}
1519
1520fn append_denied_tool_result_message(
1521    run_ctx: &mut RunContext,
1522    call_id: &str,
1523    fallback_tool_name: Option<&str>,
1524    decision_reason: Option<&str>,
1525) -> AgentEvent {
1526    let denied_result =
1527        denied_tool_result_for_call(run_ctx, call_id, fallback_tool_name, decision_reason);
1528    let message_id = gen_message_id();
1529    let denied_message = tool_response(call_id, &denied_result).with_id(message_id.clone());
1530    run_ctx.add_message(Arc::new(denied_message));
1531    AgentEvent::ToolCallDone {
1532        id: call_id.to_string(),
1533        result: denied_result,
1534        patch: None,
1535        message_id,
1536    }
1537}
1538
1539fn find_tool_call_in_messages(run_ctx: &RunContext, call_id: &str) -> Option<ToolCall> {
1540    run_ctx.messages().iter().rev().find_map(|message| {
1541        message
1542            .tool_calls
1543            .as_ref()
1544            .and_then(|calls| calls.iter().find(|call| call.id == call_id).cloned())
1545    })
1546}
1547
1548fn replay_tool_call_for_resolution(
1549    _run_ctx: &RunContext,
1550    suspended_call: &SuspendedCall,
1551    decision: &ToolCallDecision,
1552) -> Option<ToolCall> {
1553    if matches!(decision.resume.action, ResumeDecisionAction::Cancel) {
1554        return None;
1555    }
1556
1557    match suspended_call.ticket.resume_mode {
1558        ToolCallResumeMode::ReplayToolCall => Some(ToolCall::new(
1559            suspended_call.call_id.clone(),
1560            suspended_call.tool_name.clone(),
1561            suspended_call.arguments.clone(),
1562        )),
1563        ToolCallResumeMode::UseDecisionAsToolResult | ToolCallResumeMode::PassDecisionToTool => {
1564            Some(ToolCall::new(
1565                suspended_call.call_id.clone(),
1566                suspended_call.tool_name.clone(),
1567                normalize_decision_tool_result(&decision.resume.result, &suspended_call.arguments),
1568            ))
1569        }
1570    }
1571}
1572
1573fn upsert_tool_call_lifecycle_state(
1574    run_ctx: &mut RunContext,
1575    suspended_call: &SuspendedCall,
1576    status: ToolCallStatus,
1577    resume: Option<ToolCallResume>,
1578) -> Result<bool, AgentLoopError> {
1579    let current_state = tool_call_states_from_ctx(run_ctx).remove(&suspended_call.call_id);
1580    let Some(tool_state) = transition_tool_call_state(
1581        current_state,
1582        ToolCallStateSeed {
1583            call_id: &suspended_call.call_id,
1584            tool_name: &suspended_call.tool_name,
1585            arguments: &suspended_call.arguments,
1586            status: ToolCallStatus::Suspended,
1587            resume_token: Some(suspended_call.ticket.pending.id.clone()),
1588        },
1589        ToolCallStateTransition {
1590            status,
1591            resume_token: Some(suspended_call.ticket.pending.id.clone()),
1592            resume,
1593            updated_at: current_unix_millis(),
1594        },
1595    ) else {
1596        return Ok(false);
1597    };
1598
1599    let base_state = run_ctx
1600        .snapshot()
1601        .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1602    let patch = upsert_tool_call_state(&base_state, &suspended_call.call_id, tool_state)?;
1603    if patch.patch().is_empty() {
1604        return Ok(false);
1605    }
1606    run_ctx.add_thread_patch(patch);
1607    Ok(true)
1608}
1609
1610pub(super) fn resolve_suspended_call(
1611    run_ctx: &mut RunContext,
1612    response: &ToolCallDecision,
1613) -> Result<Option<DecisionReplayOutcome>, AgentLoopError> {
1614    let suspended_calls = suspended_calls_from_ctx(run_ctx);
1615    if suspended_calls.is_empty() {
1616        return Ok(None);
1617    }
1618
1619    let suspended_call = suspended_calls
1620        .get(&response.target_id)
1621        .cloned()
1622        .or_else(|| {
1623            suspended_calls
1624                .values()
1625                .find(|call| {
1626                    call.ticket.suspension.id == response.target_id
1627                        || call.ticket.pending.id == response.target_id
1628                        || call.call_id == response.target_id
1629                })
1630                .cloned()
1631        });
1632    let Some(suspended_call) = suspended_call else {
1633        return Ok(None);
1634    };
1635
1636    let _ = upsert_tool_call_lifecycle_state(
1637        run_ctx,
1638        &suspended_call,
1639        ToolCallStatus::Resuming,
1640        Some(response.resume.clone()),
1641    )?;
1642
1643    Ok(Some(DecisionReplayOutcome {
1644        events: Vec::new(),
1645        resolved_call_ids: vec![suspended_call.call_id],
1646    }))
1647}
1648
1649pub(super) fn drain_decision_channel(
1650    run_ctx: &mut RunContext,
1651    decision_rx: &mut Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1652    pending_decisions: &mut VecDeque<ToolCallDecision>,
1653) -> Result<DecisionReplayOutcome, AgentLoopError> {
1654    let mut disconnected = false;
1655    if let Some(rx) = decision_rx.as_mut() {
1656        loop {
1657            match rx.try_recv() {
1658                Ok(response) => pending_decisions.push_back(response),
1659                Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
1660                Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
1661                    disconnected = true;
1662                    break;
1663                }
1664            }
1665        }
1666    }
1667    if disconnected {
1668        *decision_rx = None;
1669    }
1670
1671    if pending_decisions.is_empty() {
1672        return Ok(DecisionReplayOutcome {
1673            events: Vec::new(),
1674            resolved_call_ids: Vec::new(),
1675        });
1676    }
1677
1678    let mut unresolved = VecDeque::new();
1679    let mut events = Vec::new();
1680    let mut resolved_call_ids = Vec::new();
1681    let mut seen = HashSet::new();
1682
1683    while let Some(response) = pending_decisions.pop_front() {
1684        if let Some(outcome) = resolve_suspended_call(run_ctx, &response)? {
1685            for call_id in outcome.resolved_call_ids {
1686                if seen.insert(call_id.clone()) {
1687                    resolved_call_ids.push(call_id);
1688                }
1689            }
1690            events.extend(outcome.events);
1691        } else {
1692            unresolved.push_back(response);
1693        }
1694    }
1695    *pending_decisions = unresolved;
1696
1697    Ok(DecisionReplayOutcome {
1698        events,
1699        resolved_call_ids,
1700    })
1701}
1702
1703async fn replay_after_decisions(
1704    run_ctx: &mut RunContext,
1705    decisions_applied: bool,
1706    step_tool_provider: &Arc<dyn StepToolProvider>,
1707    agent: &dyn Agent,
1708    active_tool_descriptors: &mut Vec<crate::contracts::runtime::tool_call::ToolDescriptor>,
1709    pending_delta_commit: &PendingDeltaCommitContext<'_>,
1710) -> Result<Vec<AgentEvent>, AgentLoopError> {
1711    if !decisions_applied {
1712        return Ok(Vec::new());
1713    }
1714
1715    let decision_tools = resolve_step_tool_snapshot(step_tool_provider, run_ctx).await?;
1716    *active_tool_descriptors = decision_tools.descriptors.clone();
1717
1718    let decision_drain = drain_run_start_resume_replay(
1719        run_ctx,
1720        &decision_tools.tools,
1721        agent,
1722        active_tool_descriptors,
1723    )
1724    .await?;
1725
1726    pending_delta_commit
1727        .commit(run_ctx, CheckpointReason::ToolResultsCommitted, false)
1728        .await?;
1729
1730    Ok(decision_drain.events)
1731}
1732
1733async fn apply_decisions_and_replay(
1734    run_ctx: &mut RunContext,
1735    decision_rx: &mut Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1736    pending_decisions: &mut VecDeque<ToolCallDecision>,
1737    step_tool_provider: &Arc<dyn StepToolProvider>,
1738    agent: &dyn Agent,
1739    active_tool_descriptors: &mut Vec<crate::contracts::runtime::tool_call::ToolDescriptor>,
1740    pending_delta_commit: &PendingDeltaCommitContext<'_>,
1741) -> Result<Vec<AgentEvent>, AgentLoopError> {
1742    Ok(drain_and_replay_decisions(
1743        run_ctx,
1744        decision_rx,
1745        pending_decisions,
1746        None,
1747        DecisionReplayInputs {
1748            step_tool_provider,
1749            agent,
1750            active_tool_descriptors,
1751        },
1752        pending_delta_commit,
1753    )
1754    .await?
1755    .events)
1756}
1757
1758pub(super) struct DecisionReplayOutcome {
1759    events: Vec<AgentEvent>,
1760    resolved_call_ids: Vec<String>,
1761}
1762
1763struct DecisionReplayInputs<'a> {
1764    step_tool_provider: &'a Arc<dyn StepToolProvider>,
1765    agent: &'a dyn Agent,
1766    active_tool_descriptors: &'a mut Vec<crate::contracts::runtime::tool_call::ToolDescriptor>,
1767}
1768
1769async fn drain_and_replay_decisions(
1770    run_ctx: &mut RunContext,
1771    decision_rx: &mut Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1772    pending_decisions: &mut VecDeque<ToolCallDecision>,
1773    decision: Option<ToolCallDecision>,
1774    replay_inputs: DecisionReplayInputs<'_>,
1775    pending_delta_commit: &PendingDeltaCommitContext<'_>,
1776) -> Result<DecisionReplayOutcome, AgentLoopError> {
1777    if let Some(decision) = decision {
1778        pending_decisions.push_back(decision);
1779    }
1780    let decision_drain = drain_decision_channel(run_ctx, decision_rx, pending_decisions)?;
1781    let mut events = decision_drain.events;
1782    let replay_events = replay_after_decisions(
1783        run_ctx,
1784        !decision_drain.resolved_call_ids.is_empty(),
1785        replay_inputs.step_tool_provider,
1786        replay_inputs.agent,
1787        replay_inputs.active_tool_descriptors,
1788        pending_delta_commit,
1789    )
1790    .await?;
1791    events.extend(replay_events);
1792
1793    Ok(DecisionReplayOutcome {
1794        events,
1795        resolved_call_ids: decision_drain.resolved_call_ids,
1796    })
1797}
1798
1799async fn apply_decision_and_replay(
1800    run_ctx: &mut RunContext,
1801    response: ToolCallDecision,
1802    decision_rx: &mut Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1803    pending_decisions: &mut VecDeque<ToolCallDecision>,
1804    replay_inputs: DecisionReplayInputs<'_>,
1805    pending_delta_commit: &PendingDeltaCommitContext<'_>,
1806) -> Result<DecisionReplayOutcome, AgentLoopError> {
1807    drain_and_replay_decisions(
1808        run_ctx,
1809        decision_rx,
1810        pending_decisions,
1811        Some(response),
1812        replay_inputs,
1813        pending_delta_commit,
1814    )
1815    .await
1816}
1817
1818async fn recv_decision(
1819    decision_rx: &mut Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1820) -> Option<ToolCallDecision> {
1821    let rx = decision_rx.as_mut()?;
1822    rx.recv().await
1823}
1824
1825/// Run the full agent loop until completion, suspension, cancellation, or error.
1826///
1827/// This is the primary non-streaming entry point. Tools are passed directly
1828/// and used as the default tool set unless the agent's step_tool_provider is set
1829/// (for dynamic per-step tool resolution).
1830pub async fn run_loop(
1831    agent: &dyn Agent,
1832    tools: HashMap<String, Arc<dyn Tool>>,
1833    run_ctx: RunContext,
1834    cancellation_token: Option<RunCancellationToken>,
1835    state_committer: Option<Arc<dyn StateCommitter>>,
1836    decision_rx: Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1837) -> LoopOutcome {
1838    let run_identity = run_ctx.run_identity().clone();
1839    run_loop_with_context(
1840        agent,
1841        tools,
1842        run_ctx,
1843        run_identity,
1844        cancellation_token,
1845        state_committer,
1846        decision_rx,
1847    )
1848    .await
1849}
1850
1851/// Run the full agent loop until completion, suspension, cancellation, or error.
1852///
1853/// This is the primary non-streaming entry point. Tools are passed directly
1854/// and used as the default tool set unless the agent's step_tool_provider is set
1855/// (for dynamic per-step tool resolution).
1856pub async fn run_loop_with_context(
1857    agent: &dyn Agent,
1858    tools: HashMap<String, Arc<dyn Tool>>,
1859    mut run_ctx: RunContext,
1860    run_identity: RunIdentity,
1861    cancellation_token: Option<RunCancellationToken>,
1862    state_committer: Option<Arc<dyn StateCommitter>>,
1863    mut decision_rx: Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1864) -> LoopOutcome {
1865    let run_identity = ensure_run_identity(agent, &mut run_ctx, run_identity);
1866    let executor = llm_executor_for_run(agent);
1867    let tool_executor = agent.tool_executor();
1868    let mut run_state = LoopRunState::new();
1869    let mut pending_decisions = VecDeque::new();
1870    let run_cancellation_token = cancellation_token;
1871    let mut last_text = String::new();
1872    let mut continued_response_prefix = String::new();
1873    let step_tool_provider = step_tool_provider_for_run(agent, tools);
1874    let run_id = run_identity.run_id.clone();
1875    let baseline_suspended_call_ids = suspended_call_ids(&run_ctx);
1876    let pending_delta_commit =
1877        PendingDeltaCommitContext::new(&run_identity, state_committer.as_ref());
1878    let initial_step_tools = match resolve_step_tool_snapshot(&step_tool_provider, &run_ctx).await {
1879        Ok(snapshot) => snapshot,
1880        Err(error) => {
1881            let msg = error.to_string();
1882            return build_loop_outcome(
1883                run_ctx,
1884                TerminationReason::Error(msg.clone()),
1885                None,
1886                &run_state,
1887                Some(outcome::LoopFailure::State(msg)),
1888            );
1889        }
1890    };
1891    let StepToolSnapshot {
1892        tools: initial_tools,
1893        descriptors: initial_descriptors,
1894    } = initial_step_tools;
1895    let mut active_tool_descriptors = initial_descriptors;
1896
1897    macro_rules! terminate_run {
1898        ($termination:expr, $response:expr, $failure:expr) => {{
1899            let reason: TerminationReason = $termination;
1900            let (final_termination, final_response) =
1901                normalize_termination_for_suspended_calls(&run_ctx, reason, $response);
1902            if let Err(error) = persist_run_termination(
1903                &mut run_ctx,
1904                &final_termination,
1905                &active_tool_descriptors,
1906                agent,
1907                &run_identity,
1908                &pending_delta_commit,
1909            )
1910            .await
1911            {
1912                let msg = error.to_string();
1913                return build_loop_outcome(
1914                    run_ctx,
1915                    TerminationReason::Error(msg.clone()),
1916                    None,
1917                    &run_state,
1918                    Some(outcome::LoopFailure::State(msg)),
1919                );
1920            }
1921            return build_loop_outcome(
1922                run_ctx,
1923                final_termination,
1924                final_response,
1925                &run_state,
1926                $failure,
1927            );
1928        }};
1929    }
1930
1931    // Phase: RunStart
1932    let (pending, actions) = match plugin_runtime::emit_phase_block(
1933        Phase::RunStart,
1934        &run_ctx,
1935        &active_tool_descriptors,
1936        agent,
1937        |_| {},
1938    )
1939    .await
1940    {
1941        Ok(result) => result,
1942        Err(error) => {
1943            let msg = error.to_string();
1944            terminate_run!(
1945                TerminationReason::Error(msg.clone()),
1946                None,
1947                Some(outcome::LoopFailure::State(msg))
1948            );
1949        }
1950    };
1951    run_ctx.add_thread_patches(pending);
1952    run_ctx.add_serialized_state_actions(actions);
1953    if let Err(error) = commit_run_start_and_drain_replay(
1954        &mut run_ctx,
1955        &initial_tools,
1956        agent,
1957        &active_tool_descriptors,
1958        &pending_delta_commit,
1959    )
1960    .await
1961    {
1962        let msg = error.to_string();
1963        terminate_run!(
1964            TerminationReason::Error(msg.clone()),
1965            None,
1966            Some(outcome::LoopFailure::State(msg))
1967        );
1968    }
1969    let run_start_new_suspended = newly_suspended_call_ids(&run_ctx, &baseline_suspended_call_ids);
1970    if !run_start_new_suspended.is_empty() {
1971        terminate_run!(TerminationReason::Suspended, None, None);
1972    }
1973    loop {
1974        if let Err(error) = apply_decisions_and_replay(
1975            &mut run_ctx,
1976            &mut decision_rx,
1977            &mut pending_decisions,
1978            &step_tool_provider,
1979            agent,
1980            &mut active_tool_descriptors,
1981            &pending_delta_commit,
1982        )
1983        .await
1984        {
1985            let msg = error.to_string();
1986            terminate_run!(
1987                TerminationReason::Error(msg.clone()),
1988                None,
1989                Some(outcome::LoopFailure::State(msg))
1990            );
1991        }
1992
1993        if is_run_cancelled(run_cancellation_token.as_ref()) {
1994            terminate_run!(TerminationReason::Cancelled, None, None);
1995        }
1996
1997        let step_tools = match resolve_step_tool_snapshot(&step_tool_provider, &run_ctx).await {
1998            Ok(snapshot) => snapshot,
1999            Err(e) => {
2000                let msg = e.to_string();
2001                terminate_run!(
2002                    TerminationReason::Error(msg.clone()),
2003                    None,
2004                    Some(outcome::LoopFailure::State(msg))
2005                );
2006            }
2007        };
2008        active_tool_descriptors = step_tools.descriptors.clone();
2009
2010        let prepared = match prepare_step_execution(&run_ctx, &active_tool_descriptors, agent).await
2011        {
2012            Ok(v) => v,
2013            Err(e) => {
2014                let msg = e.to_string();
2015                terminate_run!(
2016                    TerminationReason::Error(msg.clone()),
2017                    None,
2018                    Some(outcome::LoopFailure::State(msg))
2019                );
2020            }
2021        };
2022        run_ctx.add_thread_patches(prepared.pending_patches);
2023
2024        match prepared.run_action {
2025            RunAction::Continue => {}
2026            RunAction::Terminate(reason) => {
2027                let response = if matches!(reason, TerminationReason::BehaviorRequested) {
2028                    Some(last_text.clone())
2029                } else {
2030                    None
2031                };
2032                terminate_run!(reason, response, None);
2033            }
2034        }
2035
2036        // Call LLM with unified retry + fallback model strategy.
2037        let messages = prepared.messages;
2038        let filtered_tools = prepared.filtered_tools;
2039        let request_transforms = prepared.request_transforms;
2040        let chat_options = agent.chat_options().cloned();
2041        let attempt_outcome = run_llm_with_retry_and_fallback(
2042            agent,
2043            run_cancellation_token.as_ref(),
2044            true,
2045            None,
2046            "unknown llm error",
2047            |model| {
2048                let request = build_request_for_filtered_tools(
2049                    &messages,
2050                    &step_tools.tools,
2051                    &filtered_tools,
2052                    &request_transforms,
2053                );
2054                let executor = executor.clone();
2055                let chat_options = chat_options.clone();
2056                async move {
2057                    executor
2058                        .exec_chat_response(&model, request, chat_options.as_ref())
2059                        .await
2060                }
2061            },
2062        )
2063        .await;
2064
2065        let response = match attempt_outcome {
2066            LlmAttemptOutcome::Success {
2067                value, attempts, ..
2068            } => {
2069                run_state.record_llm_attempts(attempts);
2070                value
2071            }
2072            LlmAttemptOutcome::Cancelled => {
2073                append_cancellation_user_message(&mut run_ctx, CancellationStage::Inference);
2074                terminate_run!(TerminationReason::Cancelled, None, None);
2075            }
2076            LlmAttemptOutcome::Exhausted {
2077                last_error,
2078                last_error_class,
2079                attempts,
2080            } => {
2081                run_state.record_llm_attempts(attempts);
2082                if let Err(phase_error) = apply_llm_error_cleanup(
2083                    &mut run_ctx,
2084                    &active_tool_descriptors,
2085                    agent,
2086                    "llm_exec_error",
2087                    last_error.clone(),
2088                    last_error_class,
2089                )
2090                .await
2091                {
2092                    let msg = phase_error.to_string();
2093                    terminate_run!(
2094                        TerminationReason::Error(msg.clone()),
2095                        None,
2096                        Some(outcome::LoopFailure::State(msg))
2097                    );
2098                }
2099                terminate_run!(
2100                    TerminationReason::Error(last_error.clone()),
2101                    None,
2102                    Some(outcome::LoopFailure::Llm(last_error))
2103                );
2104            }
2105        };
2106
2107        let result = stream_result_from_chat_response(&response);
2108        run_state.update_from_response(&result);
2109        last_text = stitch_response_text(&continued_response_prefix, &result.text);
2110
2111        // Add assistant message
2112        let assistant_msg_id = gen_message_id();
2113        let step_meta = step_metadata(Some(run_id.clone()), run_state.completed_steps as u32);
2114        let post_inference_action = match complete_step_after_inference(
2115            &mut run_ctx,
2116            &result,
2117            step_meta.clone(),
2118            Some(assistant_msg_id.clone()),
2119            &active_tool_descriptors,
2120            agent,
2121        )
2122        .await
2123        {
2124            Ok(action) => action,
2125            Err(e) => {
2126                let msg = e.to_string();
2127                terminate_run!(
2128                    TerminationReason::Error(msg.clone()),
2129                    None,
2130                    Some(outcome::LoopFailure::State(msg))
2131                );
2132            }
2133        };
2134        if let Err(error) = pending_delta_commit
2135            .commit(
2136                &mut run_ctx,
2137                CheckpointReason::AssistantTurnCommitted,
2138                false,
2139            )
2140            .await
2141        {
2142            let msg = error.to_string();
2143            terminate_run!(
2144                TerminationReason::Error(msg.clone()),
2145                None,
2146                Some(outcome::LoopFailure::State(msg))
2147            );
2148        }
2149
2150        mark_step_completed(&mut run_state);
2151
2152        // Truncation recovery: if the model hit max_tokens with no tool
2153        // calls, inject a continuation prompt and re-enter inference.
2154        if truncation_recovery::should_retry(&result, &mut run_state) {
2155            extend_response_prefix(&mut continued_response_prefix, &result.text);
2156            let prompt = truncation_recovery::continuation_message();
2157            run_ctx.add_message(Arc::new(prompt));
2158            continue;
2159        }
2160        continued_response_prefix.clear();
2161
2162        let post_inference_termination = match &post_inference_action {
2163            RunAction::Terminate(reason) => Some(reason.clone()),
2164            _ => None,
2165        };
2166
2167        // Only `Stopped` termination is deferred past tool execution so the
2168        // current round's tools complete (e.g. MaxRounds lets tools finish).
2169        // All other reasons terminate immediately before tool execution.
2170        if let Some(reason) = &post_inference_termination {
2171            if !matches!(reason, TerminationReason::Stopped(_)) {
2172                terminate_run!(reason.clone(), Some(last_text.clone()), None);
2173            }
2174        }
2175
2176        if !result.needs_tools() {
2177            run_state.record_step_without_tools();
2178            let reason = post_inference_termination.unwrap_or(TerminationReason::NaturalEnd);
2179            terminate_run!(reason, Some(last_text.clone()), None);
2180        }
2181
2182        // Execute tools with phase hooks using configured execution strategy.
2183        let tool_context = match prepare_tool_execution_context(&run_ctx) {
2184            Ok(ctx) => ctx,
2185            Err(e) => {
2186                let msg = e.to_string();
2187                terminate_run!(
2188                    TerminationReason::Error(msg.clone()),
2189                    None,
2190                    Some(outcome::LoopFailure::State(msg))
2191                );
2192            }
2193        };
2194        let thread_messages_for_tools = run_ctx.messages().to_vec();
2195        let thread_version_for_tools = run_ctx.version();
2196
2197        let tool_exec_future = tool_executor.execute(ToolExecutionRequest {
2198            tools: &step_tools.tools,
2199            calls: &result.tool_calls,
2200            state: &tool_context.state,
2201            tool_descriptors: &active_tool_descriptors,
2202            agent_behavior: Some(agent.behavior()),
2203            activity_manager: tirea_contract::runtime::activity::NoOpActivityManager::arc(),
2204            run_policy: &tool_context.run_policy,
2205            run_identity: tool_context.run_identity.clone(),
2206            caller_context: tool_context.caller_context.clone(),
2207            thread_id: run_ctx.thread_id(),
2208            thread_messages: &thread_messages_for_tools,
2209            state_version: thread_version_for_tools,
2210            cancellation_token: run_cancellation_token.as_ref(),
2211        });
2212        let results = tool_exec_future.await.map_err(AgentLoopError::from);
2213
2214        let results = match results {
2215            Ok(r) => r,
2216            Err(AgentLoopError::Cancelled) => {
2217                append_cancellation_user_message(&mut run_ctx, CancellationStage::ToolExecution);
2218                terminate_run!(TerminationReason::Cancelled, None, None);
2219            }
2220            Err(e) => {
2221                let msg = e.to_string();
2222                terminate_run!(
2223                    TerminationReason::Error(msg.clone()),
2224                    None,
2225                    Some(outcome::LoopFailure::State(msg))
2226                );
2227            }
2228        };
2229
2230        if let Err(_e) = apply_tool_results_to_session(
2231            &mut run_ctx,
2232            &results,
2233            Some(step_meta),
2234            tool_executor.requires_parallel_patch_conflict_check(),
2235        ) {
2236            // On error, we can't easily rollback RunContext, so just terminate
2237            let msg = _e.to_string();
2238            terminate_run!(
2239                TerminationReason::Error(msg.clone()),
2240                None,
2241                Some(outcome::LoopFailure::State(msg))
2242            );
2243        }
2244        if let Err(error) = pending_delta_commit
2245            .commit(&mut run_ctx, CheckpointReason::ToolResultsCommitted, false)
2246            .await
2247        {
2248            let msg = error.to_string();
2249            terminate_run!(
2250                TerminationReason::Error(msg.clone()),
2251                None,
2252                Some(outcome::LoopFailure::State(msg))
2253            );
2254        }
2255
2256        if let Err(error) = apply_decisions_and_replay(
2257            &mut run_ctx,
2258            &mut decision_rx,
2259            &mut pending_decisions,
2260            &step_tool_provider,
2261            agent,
2262            &mut active_tool_descriptors,
2263            &pending_delta_commit,
2264        )
2265        .await
2266        {
2267            let msg = error.to_string();
2268            terminate_run!(
2269                TerminationReason::Error(msg.clone()),
2270                None,
2271                Some(outcome::LoopFailure::State(msg))
2272            );
2273        }
2274
2275        // If ALL tools are suspended (no completed results), terminate immediately.
2276        if has_suspended_calls(&run_ctx) {
2277            let has_completed = results
2278                .iter()
2279                .any(|r| !matches!(r.outcome, crate::contracts::ToolCallOutcome::Suspended));
2280            if !has_completed {
2281                terminate_run!(TerminationReason::Suspended, None, None);
2282            }
2283        }
2284
2285        // Deferred post-inference termination: tools from the current round
2286        // have completed; stop the loop before the next inference.
2287        if let Some(reason) = post_inference_termination {
2288            terminate_run!(reason, Some(last_text.clone()), None);
2289        }
2290
2291        // Track tool-step metrics for loop stats and plugin consumers.
2292        let error_count = results
2293            .iter()
2294            .filter(|r| r.execution.result.is_error())
2295            .count();
2296        run_state.record_tool_step(&result.tool_calls, error_count);
2297    }
2298}
2299
2300/// Run the agent loop with streaming output.
2301///
2302/// Returns a stream of AgentEvent for real-time updates. Tools are passed
2303/// directly and used as the default tool set unless the agent's step_tool_provider
2304/// is set (for dynamic per-step tool resolution).
2305pub fn run_loop_stream(
2306    agent: Arc<dyn Agent>,
2307    tools: HashMap<String, Arc<dyn Tool>>,
2308    run_ctx: RunContext,
2309    cancellation_token: Option<RunCancellationToken>,
2310    state_committer: Option<Arc<dyn StateCommitter>>,
2311    decision_rx: Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
2312) -> Pin<Box<dyn Stream<Item = AgentEvent> + Send>> {
2313    let run_identity = run_ctx.run_identity().clone();
2314    run_loop_stream_with_context(
2315        agent,
2316        tools,
2317        run_ctx,
2318        run_identity,
2319        cancellation_token,
2320        state_committer,
2321        decision_rx,
2322    )
2323}
2324
2325pub fn run_loop_stream_with_context(
2326    agent: Arc<dyn Agent>,
2327    tools: HashMap<String, Arc<dyn Tool>>,
2328    mut run_ctx: RunContext,
2329    run_identity: RunIdentity,
2330    cancellation_token: Option<RunCancellationToken>,
2331    state_committer: Option<Arc<dyn StateCommitter>>,
2332    decision_rx: Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
2333) -> Pin<Box<dyn Stream<Item = AgentEvent> + Send>> {
2334    let run_identity = ensure_run_identity(agent.as_ref(), &mut run_ctx, run_identity);
2335    stream_runner::run_stream(
2336        agent,
2337        tools,
2338        run_ctx,
2339        run_identity,
2340        cancellation_token,
2341        state_committer,
2342        decision_rx,
2343    )
2344}
2345
2346#[cfg(test)]
2347mod tests;