Skip to main content

tirea_agent_loop/runtime/loop_runner/
mod.rs

1//! Agent loop implementation with Phase-based plugin execution.
2//!
3//! The agent loop orchestrates the conversation between user, LLM, and tools:
4//!
5//! ```text
6//! User Input → LLM → Tool Calls? → Execute Tools → LLM → ... → Final Response
7//! ```
8//!
9//! # Phase Execution
10//!
11//! Each phase dispatches to its typed plugin hook:
12//!
13//! ```text
14//! RunStart (once)
15//!     │
16//!     ▼
17//! ┌─────────────────────────┐
18//! │      StepStart          │ ← plugins can apply state patches
19//! ├─────────────────────────┤
20//! │    BeforeInference      │ ← plugins can inject prompt context, filter tools
21//! ├─────────────────────────┤
22//! │      [LLM CALL]         │
23//! ├─────────────────────────┤
24//! │    AfterInference       │
25//! ├─────────────────────────┤
26//! │  ┌───────────────────┐  │
27//! │  │ BeforeToolExecute │  │ ← plugins can block/pending
28//! │  ├───────────────────┤  │
29//! │  │   [TOOL EXEC]     │  │
30//! │  ├───────────────────┤  │
31//! │  │ AfterToolExecute  │  │ ← plugins can add reminders
32//! │  └───────────────────┘  │
33//! ├─────────────────────────┤
34//! │       StepEnd           │
35//! └─────────────────────────┘
36//!     │
37//!     ▼
38//! RunEnd (once)
39//! ```
40
41mod config;
42mod core;
43mod event_envelope_meta;
44mod outcome;
45mod parallel_state_merge;
46mod plugin_runtime;
47mod run_state;
48mod state_commit;
49mod stream_core;
50mod stream_runner;
51mod tool_exec;
52mod truncation_recovery;
53
54use crate::contracts::io::ResumeDecisionAction;
55use crate::contracts::runtime::phase::Phase;
56use crate::contracts::runtime::state::{reduce_state_actions, AnyStateAction, ScopeContext};
57use crate::contracts::runtime::tool_call::{Tool, ToolResult};
58use crate::contracts::runtime::ActivityManager;
59use crate::contracts::runtime::{
60    DecisionReplayPolicy, RunLifecycleAction, RunLifecycleState, StreamResult, SuspendedCall,
61    ToolCallResume, ToolCallResumeMode, ToolCallStatus, ToolExecutionRequest, ToolExecutionResult,
62};
63use crate::contracts::thread::CheckpointReason;
64use crate::contracts::thread::{gen_message_id, Message, MessageMetadata, ToolCall};
65use crate::contracts::RunContext;
66use crate::contracts::{AgentEvent, RunAction, RunOrigin, TerminationReason, ToolCallDecision};
67use crate::engine::convert::{assistant_message, assistant_tool_calls, tool_response};
68use crate::runtime::activity::ActivityHub;
69
70use crate::runtime::streaming::StreamCollector;
71use async_stream::stream;
72use futures::{Stream, StreamExt};
73use genai::Client;
74use serde_json::Value;
75use std::collections::{HashMap, HashSet, VecDeque};
76use std::future::Future;
77use std::pin::Pin;
78use std::sync::atomic::{AtomicU64, Ordering};
79use std::sync::Arc;
80use std::time::{Instant, SystemTime, UNIX_EPOCH};
81use uuid::Uuid;
82
83pub use crate::contracts::runtime::ToolExecutor;
84pub use crate::runtime::run_context::{
85    await_or_cancel, is_cancelled, CancelAware, RunCancellationToken, StateCommitError,
86    StateCommitter, TOOL_SCOPE_CALLER_AGENT_ID_KEY, TOOL_SCOPE_CALLER_MESSAGES_KEY,
87    TOOL_SCOPE_CALLER_STATE_KEY, TOOL_SCOPE_CALLER_THREAD_ID_KEY,
88};
89use config::StaticStepToolProvider;
90pub use config::{Agent, BaseAgent, GenaiLlmExecutor, LlmRetryPolicy};
91pub use config::{LlmEventStream, LlmExecutor};
92pub use config::{StepToolInput, StepToolProvider, StepToolSnapshot};
93#[cfg(test)]
94use core::build_messages;
95use core::{
96    build_request_for_filtered_tools, inference_inputs_from_step, suspended_calls_from_ctx,
97    tool_call_states_from_ctx, transition_tool_call_state, upsert_tool_call_state,
98    ToolCallStateSeed, ToolCallStateTransition,
99};
100pub use outcome::{tool_map, tool_map_from_arc, AgentLoopError};
101pub use outcome::{LoopOutcome, LoopStats, LoopUsage};
102#[cfg(test)]
103use plugin_runtime::emit_agent_phase;
104#[cfg(test)]
105use plugin_runtime::emit_cleanup_phases;
106use run_state::LoopRunState;
107pub use state_commit::ChannelStateCommitter;
108use state_commit::PendingDeltaCommitContext;
109use tirea_state::TrackedPatch;
110#[cfg(test)]
111use tokio_util::sync::CancellationToken;
112#[cfg(test)]
113use tool_exec::execute_single_tool_with_phases;
114#[cfg(test)]
115use tool_exec::execute_tools_parallel_with_phases;
116pub use tool_exec::ExecuteToolsOutcome;
117use tool_exec::{
118    apply_tool_results_impl, apply_tool_results_to_session,
119    execute_single_tool_with_phases_deferred, scope_with_tool_caller_context, step_metadata,
120    ToolPhaseContext,
121};
122pub use tool_exec::{
123    execute_tools, execute_tools_with_behaviors, execute_tools_with_config,
124    ParallelToolExecutionMode, ParallelToolExecutor, SequentialToolExecutor,
125};
126
127/// Fully resolved agent wiring ready for execution.
128///
129/// Contains everything needed to run an agent loop: the agent,
130/// the resolved tool map, and the runtime config. This is a pure data struct
131/// that can be inspected, mutated, and tested independently.
132pub struct ResolvedRun {
133    /// The agent (model, behavior, execution strategies, ...).
134    ///
135    /// Exposed as a concrete [`BaseAgent`] so callers can mutate fields
136    /// (model, plugins, tool_executor, ...) between resolution and execution.
137    /// Converted to `Arc<dyn Agent>` at the execution boundary.
138    pub agent: BaseAgent,
139    /// Resolved tool map after filtering and wiring.
140    pub tools: HashMap<String, Arc<dyn Tool>>,
141    /// Runtime configuration (user_id, run_id, ...).
142    pub run_config: crate::contracts::RunConfig,
143}
144
145impl ResolvedRun {
146    /// Add or replace a tool in the resolved tool map.
147    #[must_use]
148    pub fn with_tool(mut self, id: String, tool: Arc<dyn Tool>) -> Self {
149        self.tools.insert(id, tool);
150        self
151    }
152
153    /// Overlay tools from a map (insert-if-absent semantics).
154    pub fn overlay_tools(&mut self, tools: HashMap<String, Arc<dyn Tool>>) {
155        for (id, tool) in tools {
156            self.tools.entry(id).or_insert(tool);
157        }
158    }
159}
160
161#[derive(Debug, Clone)]
162pub struct RunExecutionContext {
163    pub run_id: String,
164    pub parent_run_id: Option<String>,
165    pub agent_id: String,
166    pub origin: RunOrigin,
167}
168
169impl RunExecutionContext {
170    pub const RUN_ID_KEY: &'static str = "run_id";
171    pub const PARENT_RUN_ID_KEY: &'static str = "parent_run_id";
172    pub const AGENT_ID_KEY: &'static str = "agent_id";
173    pub const ORIGIN_KEY: &'static str = "origin";
174
175    #[must_use]
176    pub const fn new(
177        run_id: String,
178        parent_run_id: Option<String>,
179        agent_id: String,
180        origin: RunOrigin,
181    ) -> Self {
182        Self {
183            run_id,
184            parent_run_id,
185            agent_id,
186            origin,
187        }
188    }
189
190    #[must_use]
191    pub fn from_run_config(run_ctx: &RunContext) -> Self {
192        let run_id = run_ctx
193            .run_config
194            .value(Self::RUN_ID_KEY)
195            .and_then(Value::as_str)
196            .map(str::trim)
197            .filter(|id| !id.is_empty())
198            .map(ToOwned::to_owned)
199            .unwrap_or_else(uuid_v7);
200
201        let parent_run_id = run_ctx
202            .run_config
203            .value(Self::PARENT_RUN_ID_KEY)
204            .and_then(Value::as_str)
205            .map(str::trim)
206            .filter(|id| !id.is_empty())
207            .map(ToOwned::to_owned);
208
209        let agent_id = run_ctx
210            .run_config
211            .value(Self::AGENT_ID_KEY)
212            .and_then(Value::as_str)
213            .map(str::trim)
214            .filter(|id| !id.is_empty())
215            .map(ToOwned::to_owned)
216            .unwrap_or_else(|| run_ctx.thread_id().to_string());
217
218        let origin = run_ctx
219            .run_config
220            .value(Self::ORIGIN_KEY)
221            .and_then(|value| serde_json::from_value::<RunOrigin>(value.clone()).ok())
222            .unwrap_or_default();
223
224        Self::new(run_id, parent_run_id, agent_id, origin)
225    }
226}
227
228fn uuid_v7() -> String {
229    Uuid::now_v7().simple().to_string()
230}
231
232fn bind_execution_context_to_run_config(
233    run_ctx: &mut RunContext,
234    execution_ctx: &RunExecutionContext,
235) {
236    let _ = run_ctx.run_config.set(
237        RunExecutionContext::RUN_ID_KEY,
238        execution_ctx.run_id.clone(),
239    );
240    if let Some(parent_run_id) = execution_ctx.parent_run_id.as_deref() {
241        let _ = run_ctx.run_config.set(
242            RunExecutionContext::PARENT_RUN_ID_KEY,
243            parent_run_id.to_string(),
244        );
245    }
246    let _ = run_ctx.run_config.set(
247        RunExecutionContext::AGENT_ID_KEY,
248        execution_ctx.agent_id.clone(),
249    );
250    if let Ok(origin_value) = serde_json::to_value(execution_ctx.origin) {
251        let _ = run_ctx
252            .run_config
253            .set(RunExecutionContext::ORIGIN_KEY, origin_value);
254    }
255}
256
257pub(crate) fn current_unix_millis() -> u64 {
258    SystemTime::now()
259        .duration_since(UNIX_EPOCH)
260        .map_or(0, |d| d.as_millis().min(u128::from(u64::MAX)) as u64)
261}
262
263#[cfg(test)]
264pub(super) fn sync_run_lifecycle_for_termination(
265    run_ctx: &mut RunContext,
266    execution_ctx: &RunExecutionContext,
267    termination: &TerminationReason,
268) -> Result<(), AgentLoopError> {
269    sync_run_lifecycle_for_termination_with_context(run_ctx, execution_ctx, termination)
270}
271
272fn sync_run_lifecycle_for_termination_with_context(
273    run_ctx: &mut RunContext,
274    execution_ctx: &RunExecutionContext,
275    termination: &TerminationReason,
276) -> Result<(), AgentLoopError> {
277    if execution_ctx.run_id.trim().is_empty() {
278        return Ok(());
279    };
280
281    let (status, done_reason) = termination.to_run_status();
282
283    let base_state = run_ctx
284        .snapshot()
285        .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
286    let actions = vec![AnyStateAction::new::<RunLifecycleState>(
287        RunLifecycleAction::Set {
288            id: execution_ctx.run_id.clone(),
289            status,
290            done_reason,
291            updated_at: current_unix_millis(),
292        },
293    )];
294    let patches = reduce_state_actions(actions, &base_state, "agent_loop", &ScopeContext::run())
295        .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
296    run_ctx.add_thread_patches(patches);
297    Ok(())
298}
299
300#[derive(Debug, Clone, Copy, PartialEq, Eq)]
301pub(super) enum CancellationStage {
302    Inference,
303    ToolExecution,
304}
305
306pub(super) const CANCELLATION_INFERENCE_USER_MESSAGE: &str =
307    "The previous run was interrupted during inference. Please continue from the current context.";
308pub(super) const CANCELLATION_TOOL_USER_MESSAGE: &str =
309    "The previous run was interrupted while using tools. Please continue from the current context.";
310
311pub(super) fn append_cancellation_user_message(run_ctx: &mut RunContext, stage: CancellationStage) {
312    let content = match stage {
313        CancellationStage::Inference => CANCELLATION_INFERENCE_USER_MESSAGE,
314        CancellationStage::ToolExecution => CANCELLATION_TOOL_USER_MESSAGE,
315    };
316    run_ctx.add_message(Arc::new(Message::user(content)));
317}
318
319pub(super) fn effective_llm_models(agent: &dyn Agent) -> Vec<String> {
320    let mut models = Vec::with_capacity(1 + agent.fallback_models().len());
321    models.push(agent.model().to_string());
322    for model in agent.fallback_models() {
323        if model.trim().is_empty() {
324            continue;
325        }
326        if !models.iter().any(|m| m == model) {
327            models.push(model.clone());
328        }
329    }
330    models
331}
332
333pub(super) fn effective_llm_models_from(
334    agent: &dyn Agent,
335    start_model: Option<&str>,
336) -> Vec<String> {
337    let models = effective_llm_models(agent);
338    let Some(start_model) = start_model.map(str::trim).filter(|model| !model.is_empty()) else {
339        return models;
340    };
341    let Some(index) = models.iter().position(|model| model == start_model) else {
342        return models;
343    };
344    models.into_iter().skip(index).collect()
345}
346
347pub(super) fn next_llm_model_after(agent: &dyn Agent, current_model: &str) -> Option<String> {
348    let models = effective_llm_models(agent);
349    let current_index = models.iter().position(|model| model == current_model)?;
350    models.into_iter().nth(current_index + 1)
351}
352
353pub(super) fn llm_retry_attempts(agent: &dyn Agent) -> usize {
354    agent.llm_retry_policy().max_attempts_per_model.max(1)
355}
356
357#[derive(Debug, Clone, Copy, PartialEq, Eq)]
358pub(super) enum LlmErrorClass {
359    RateLimit,
360    Timeout,
361    Connection,
362    ServerUnavailable,
363    ServerError,
364    Auth,
365    ClientRequest,
366    Unknown,
367}
368
369impl LlmErrorClass {
370    pub(super) fn is_retryable(self) -> bool {
371        matches!(
372            self,
373            LlmErrorClass::RateLimit
374                | LlmErrorClass::Timeout
375                | LlmErrorClass::Connection
376                | LlmErrorClass::ServerUnavailable
377                | LlmErrorClass::ServerError
378        )
379    }
380
381    /// Stable string label for telemetry and structured logging.
382    pub(super) fn as_str(self) -> &'static str {
383        match self {
384            LlmErrorClass::RateLimit => "rate_limit",
385            LlmErrorClass::Timeout => "timeout",
386            LlmErrorClass::Connection => "connection",
387            LlmErrorClass::ServerUnavailable => "server_unavailable",
388            LlmErrorClass::ServerError => "server_error",
389            LlmErrorClass::Auth => "auth",
390            LlmErrorClass::ClientRequest => "client_request",
391            LlmErrorClass::Unknown => "unknown",
392        }
393    }
394}
395
396fn classify_llm_error_message(message: &str) -> LlmErrorClass {
397    let lower = message.to_ascii_lowercase();
398    if ["429", "too many requests", "rate limit"]
399        .iter()
400        .any(|p| lower.contains(p))
401    {
402        return LlmErrorClass::RateLimit;
403    }
404    if ["timeout", "timed out"].iter().any(|p| lower.contains(p)) {
405        return LlmErrorClass::Timeout;
406    }
407    if [
408        "connection",
409        "network",
410        "reset by peer",
411        "broken pipe",
412        "eof",
413        "connection refused",
414        "error sending request for url",
415    ]
416    .iter()
417    .any(|p| lower.contains(p))
418    {
419        return LlmErrorClass::Connection;
420    }
421    if ["503", "service unavailable", "unavailable", "temporar"]
422        .iter()
423        .any(|p| lower.contains(p))
424    {
425        return LlmErrorClass::ServerUnavailable;
426    }
427    if [
428        "500",
429        "502",
430        "504",
431        "server error",
432        "bad gateway",
433        "gateway timeout",
434    ]
435    .iter()
436    .any(|p| lower.contains(p))
437    {
438        return LlmErrorClass::ServerError;
439    }
440    if ["401", "403", "unauthorized", "forbidden", "invalid api key"]
441        .iter()
442        .any(|p| lower.contains(p))
443    {
444        return LlmErrorClass::Auth;
445    }
446    if ["400", "404", "422", "invalid_request", "bad request"]
447        .iter()
448        .any(|p| lower.contains(p))
449    {
450        return LlmErrorClass::ClientRequest;
451    }
452    LlmErrorClass::Unknown
453}
454
455fn classify_status_code(status_code: u16) -> LlmErrorClass {
456    match status_code {
457        408 => LlmErrorClass::Timeout,
458        429 => LlmErrorClass::RateLimit,
459        401 | 403 => LlmErrorClass::Auth,
460        400 | 404 | 422 => LlmErrorClass::ClientRequest,
461        503 => LlmErrorClass::ServerUnavailable,
462        500..=599 => LlmErrorClass::ServerError,
463        400..=499 => LlmErrorClass::ClientRequest,
464        _ => LlmErrorClass::Unknown,
465    }
466}
467
468fn classify_error_chain(error: &(dyn std::error::Error + 'static)) -> Option<LlmErrorClass> {
469    let mut current = Some(error);
470    while let Some(err) = current {
471        if let Some(io_error) = err.downcast_ref::<std::io::Error>() {
472            use std::io::ErrorKind;
473
474            let class = match io_error.kind() {
475                ErrorKind::TimedOut => Some(LlmErrorClass::Timeout),
476                ErrorKind::ConnectionAborted
477                | ErrorKind::ConnectionRefused
478                | ErrorKind::ConnectionReset
479                | ErrorKind::BrokenPipe
480                | ErrorKind::NotConnected
481                | ErrorKind::UnexpectedEof => Some(LlmErrorClass::Connection),
482                _ => None,
483            };
484            if class.is_some() {
485                return class;
486            }
487        }
488        current = err.source();
489    }
490    None
491}
492
493fn classify_webc_error(error: &genai::webc::Error) -> LlmErrorClass {
494    match error {
495        genai::webc::Error::ResponseFailedStatus { status, .. } => {
496            classify_status_code(status.as_u16())
497        }
498        genai::webc::Error::Reqwest(err) => classify_error_chain(err)
499            .unwrap_or_else(|| classify_llm_error_message(&err.to_string())),
500        _ => classify_llm_error_message(&error.to_string()),
501    }
502}
503
504/// Classify a provider error event body by parsing its structured fields.
505///
506/// Provider error events (OpenAI, Anthropic, Gemini) embed a JSON body with a
507/// `type` field that describes the error category. This function extracts that
508/// field and maps it to an [`LlmErrorClass`] without relying on keyword
509/// matching against stringified content.
510///
511/// Known error type strings:
512/// - OpenAI: `"server_error"`, `"rate_limit_error"`, `"invalid_request_error"`, …
513/// - Anthropic: `"overloaded_error"`, `"api_error"`, `"authentication_error"`, …
514fn classify_chat_response_body(body: &serde_json::Value) -> LlmErrorClass {
515    // Collect candidate type strings from both the top-level and nested error
516    // object.  OpenAI streams extract the inner error, so body["type"] is
517    // already the error type.  Anthropic / Gemini may wrap it in an envelope
518    // where body["type"] == "error" and the real type lives at
519    // body["error"]["type"].
520    let top_type = body.get("type").and_then(|v| v.as_str());
521    let nested_type = body
522        .get("error")
523        .and_then(|e| e.get("type"))
524        .and_then(|v| v.as_str());
525
526    // Try the nested (more specific) type first when the top-level type is a
527    // generic envelope marker like "error".
528    let candidates: &[&str] = match (top_type, nested_type) {
529        (_, Some(inner)) => &[inner, top_type.unwrap_or("")],
530        (Some(top), None) => &[top],
531        (None, None) => &[],
532    };
533
534    for t in candidates {
535        let lower = t.to_ascii_lowercase();
536        if lower.contains("rate_limit") {
537            return LlmErrorClass::RateLimit;
538        }
539        if lower.contains("overloaded") || lower.contains("unavailable") {
540            return LlmErrorClass::ServerUnavailable;
541        }
542        if lower.contains("timeout") {
543            return LlmErrorClass::Timeout;
544        }
545        if lower.contains("server_error") || lower.contains("api_error") {
546            return LlmErrorClass::ServerError;
547        }
548        if lower.contains("authentication") || lower.contains("permission") {
549            return LlmErrorClass::Auth;
550        }
551        if lower.contains("invalid_request") || lower.contains("not_found") {
552            return LlmErrorClass::ClientRequest;
553        }
554    }
555
556    // Fallback: check for an HTTP status code in the body (some providers
557    // include a numeric `status` or `code` field).
558    let status_code = body
559        .get("status")
560        .or_else(|| body.get("code"))
561        .and_then(|v| v.as_u64())
562        .and_then(|v| u16::try_from(v).ok());
563
564    if let Some(code) = status_code {
565        let class = classify_status_code(code);
566        if class != LlmErrorClass::Unknown {
567            return class;
568        }
569    }
570
571    LlmErrorClass::Unknown
572}
573
574pub(super) fn classify_llm_error(error: &genai::Error) -> LlmErrorClass {
575    match error {
576        genai::Error::HttpError { status, .. } => classify_status_code(status.as_u16()),
577        genai::Error::WebStream { cause, error, .. } => classify_error_chain(error.as_ref())
578            .unwrap_or_else(|| classify_llm_error_message(cause)),
579        genai::Error::WebAdapterCall { webc_error, .. }
580        | genai::Error::WebModelCall { webc_error, .. } => classify_webc_error(webc_error),
581        genai::Error::Internal(message) => classify_llm_error_message(message),
582
583        // Mid-stream JSON parse failure — almost always caused by truncated /
584        // corrupted SSE data due to a transient network issue.
585        genai::Error::StreamParse { .. } => LlmErrorClass::Connection,
586
587        // Provider returned no response body at all — treat as server-side fault.
588        genai::Error::NoChatResponse { .. } => LlmErrorClass::ServerError,
589
590        // Provider streamed an explicit error event with a structured body.
591        genai::Error::ChatResponse { body, .. } => classify_chat_response_body(body),
592
593        // Auth configuration errors from genai itself (not HTTP 401/403).
594        genai::Error::RequiresApiKey { .. }
595        | genai::Error::NoAuthResolver { .. }
596        | genai::Error::NoAuthData { .. } => LlmErrorClass::Auth,
597
598        other => classify_llm_error_message(&other.to_string()),
599    }
600}
601
602#[cfg(test)]
603pub(super) fn is_retryable_llm_error(error: &genai::Error) -> bool {
604    classify_llm_error(error).is_retryable()
605}
606
607static RETRY_BACKOFF_ENTROPY: AtomicU64 = AtomicU64::new(0x9e37_79b9_7f4a_7c15);
608
609#[derive(Debug, Clone, Default)]
610pub(super) struct RetryBackoffWindow {
611    started_at: Option<Instant>,
612}
613
614impl RetryBackoffWindow {
615    pub(super) fn elapsed_ms(&mut self) -> u64 {
616        self.started_at
617            .get_or_insert_with(Instant::now)
618            .elapsed()
619            .as_millis()
620            .try_into()
621            .unwrap_or(u64::MAX)
622    }
623
624    pub(super) fn reset(&mut self) {
625        self.started_at = None;
626    }
627}
628
629#[derive(Debug, Clone, Copy, PartialEq, Eq)]
630pub(super) enum RetryBackoffOutcome {
631    Completed,
632    Cancelled,
633    BudgetExhausted,
634}
635
636fn mix_retry_entropy(mut entropy: u64) -> u64 {
637    entropy = entropy.wrapping_add(0x9e37_79b9_7f4a_7c15);
638    entropy = (entropy ^ (entropy >> 30)).wrapping_mul(0xbf58_476d_1ce4_e5b9);
639    entropy = (entropy ^ (entropy >> 27)).wrapping_mul(0x94d0_49bb_1331_11eb);
640    entropy ^ (entropy >> 31)
641}
642
643fn next_retry_entropy() -> u64 {
644    let counter = RETRY_BACKOFF_ENTROPY.fetch_add(0x9e37_79b9_7f4a_7c15, Ordering::Relaxed);
645    let now = SystemTime::now()
646        .duration_since(UNIX_EPOCH)
647        .map(|duration| duration.as_nanos() as u64)
648        .unwrap_or(counter);
649    mix_retry_entropy(counter ^ now)
650}
651
652pub(super) fn retry_base_backoff_ms(policy: &LlmRetryPolicy, retry_attempt: usize) -> u64 {
653    let initial = policy.initial_backoff_ms;
654    let cap = policy.max_backoff_ms.max(policy.initial_backoff_ms);
655    let attempt = retry_attempt.max(1);
656    if attempt == 1 {
657        return initial.min(cap);
658    }
659    let shift = (attempt - 1).min(20) as u32;
660    let factor = 1u64.checked_shl(shift).unwrap_or(u64::MAX);
661    initial.saturating_mul(factor).min(cap)
662}
663
664fn jittered_backoff_ms(policy: &LlmRetryPolicy, retry_attempt: usize, entropy: u64) -> u64 {
665    let cap = policy.max_backoff_ms.max(policy.initial_backoff_ms);
666    let base_ms = retry_base_backoff_ms(policy, retry_attempt).min(cap);
667    let jitter_percent = policy.backoff_jitter_percent.min(100) as u64;
668    if jitter_percent == 0 || base_ms == 0 {
669        return base_ms;
670    }
671
672    let jitter_window = base_ms.saturating_mul(jitter_percent) / 100;
673    let lower = base_ms.saturating_sub(jitter_window);
674    let upper = base_ms.saturating_add(jitter_window).min(cap);
675    if upper <= lower {
676        return lower;
677    }
678
679    let span = upper - lower;
680    lower + (mix_retry_entropy(entropy) % (span.saturating_add(1)))
681}
682
683pub(super) fn retry_backoff_plan_ms(
684    policy: &LlmRetryPolicy,
685    retry_attempt: usize,
686    elapsed_ms: u64,
687    entropy: u64,
688) -> Option<u64> {
689    let wait_ms = jittered_backoff_ms(policy, retry_attempt, entropy);
690    match policy.max_retry_window_ms {
691        Some(budget_ms) if elapsed_ms.saturating_add(wait_ms) > budget_ms => None,
692        _ => Some(wait_ms),
693    }
694}
695
696pub(super) async fn wait_retry_backoff(
697    agent: &dyn Agent,
698    retry_attempt: usize,
699    retry_window: &mut RetryBackoffWindow,
700    run_cancellation_token: Option<&RunCancellationToken>,
701) -> RetryBackoffOutcome {
702    let elapsed_ms = retry_window.elapsed_ms();
703    let Some(wait_ms) = retry_backoff_plan_ms(
704        agent.llm_retry_policy(),
705        retry_attempt,
706        elapsed_ms,
707        next_retry_entropy(),
708    ) else {
709        return RetryBackoffOutcome::BudgetExhausted;
710    };
711    tracing::debug!(
712        attempt = retry_attempt,
713        backoff_ms = wait_ms,
714        elapsed_ms = elapsed_ms,
715        "waiting before LLM retry"
716    );
717    match await_or_cancel(
718        run_cancellation_token,
719        tokio::time::sleep(std::time::Duration::from_millis(wait_ms)),
720    )
721    .await
722    {
723        CancelAware::Cancelled => RetryBackoffOutcome::Cancelled,
724        CancelAware::Value(_) => RetryBackoffOutcome::Completed,
725    }
726}
727
728pub(super) enum LlmAttemptOutcome<T> {
729    Success {
730        value: T,
731        model: String,
732        attempts: usize,
733    },
734    Cancelled,
735    Exhausted {
736        last_error: String,
737        last_error_class: Option<&'static str>,
738        attempts: usize,
739    },
740}
741
742fn is_run_cancelled(token: Option<&RunCancellationToken>) -> bool {
743    is_cancelled(token)
744}
745
746pub(super) fn step_tool_provider_for_run(
747    agent: &dyn Agent,
748    tools: HashMap<String, Arc<dyn Tool>>,
749) -> Arc<dyn StepToolProvider> {
750    agent.step_tool_provider().unwrap_or_else(|| {
751        Arc::new(StaticStepToolProvider::new(tools)) as Arc<dyn StepToolProvider>
752    })
753}
754
755pub(super) fn llm_executor_for_run(agent: &dyn Agent) -> Arc<dyn LlmExecutor> {
756    agent
757        .llm_executor()
758        .unwrap_or_else(|| Arc::new(GenaiLlmExecutor::new(Client::default())))
759}
760
761pub(super) async fn resolve_step_tool_snapshot(
762    step_tool_provider: &Arc<dyn StepToolProvider>,
763    run_ctx: &RunContext,
764) -> Result<StepToolSnapshot, AgentLoopError> {
765    step_tool_provider
766        .provide(StepToolInput { state: run_ctx })
767        .await
768}
769
770fn mark_step_completed(run_state: &mut LoopRunState) {
771    run_state.completed_steps += 1;
772}
773
774fn stitch_response_text(prefix: &str, segment: &str) -> String {
775    if prefix.is_empty() {
776        return segment.to_string();
777    }
778    if segment.is_empty() {
779        return prefix.to_string();
780    }
781    let mut stitched = String::with_capacity(prefix.len() + segment.len());
782    stitched.push_str(prefix);
783    stitched.push_str(segment);
784    stitched
785}
786
787fn extend_response_prefix(prefix: &mut String, segment: &str) {
788    if !segment.is_empty() {
789        prefix.push_str(segment);
790    }
791}
792
793fn build_loop_outcome(
794    run_ctx: RunContext,
795    termination: TerminationReason,
796    response: Option<String>,
797    run_state: &LoopRunState,
798    failure: Option<outcome::LoopFailure>,
799) -> LoopOutcome {
800    LoopOutcome {
801        run_ctx,
802        termination,
803        response: response.filter(|text| !text.is_empty()),
804        usage: run_state.usage(),
805        stats: run_state.stats(),
806        failure,
807    }
808}
809
810pub(super) async fn run_llm_with_retry_and_fallback<T, Invoke, Fut>(
811    agent: &dyn Agent,
812    run_cancellation_token: Option<&RunCancellationToken>,
813    retry_current_model: bool,
814    start_model: Option<&str>,
815    unknown_error: &str,
816    mut invoke: Invoke,
817) -> LlmAttemptOutcome<T>
818where
819    Invoke: FnMut(String) -> Fut,
820    Fut: std::future::Future<Output = genai::Result<T>>,
821{
822    let mut last_llm_error = unknown_error.to_string();
823    let mut last_error_class: Option<&'static str> = None;
824    let model_candidates = effective_llm_models_from(agent, start_model);
825    let max_attempts = llm_retry_attempts(agent);
826    let mut total_attempts = 0usize;
827    let mut retry_window = RetryBackoffWindow::default();
828
829    'models: for model in model_candidates {
830        for attempt in 1..=max_attempts {
831            total_attempts = total_attempts.saturating_add(1);
832            let response_res =
833                match await_or_cancel(run_cancellation_token, invoke(model.clone())).await {
834                    CancelAware::Cancelled => return LlmAttemptOutcome::Cancelled,
835                    CancelAware::Value(resp) => resp,
836                };
837
838            match response_res {
839                Ok(value) => {
840                    if total_attempts > 1 {
841                        tracing::info!(
842                            model = %model,
843                            attempts = total_attempts,
844                            "LLM call succeeded after retries"
845                        );
846                    }
847                    return LlmAttemptOutcome::Success {
848                        value,
849                        model,
850                        attempts: total_attempts,
851                    };
852                }
853                Err(e) => {
854                    let error_class = classify_llm_error(&e);
855                    last_error_class = Some(error_class.as_str());
856                    let message = e.to_string();
857                    last_llm_error =
858                        format!("model='{model}' attempt={attempt}/{max_attempts}: {message}");
859                    let can_retry_same_model =
860                        retry_current_model && attempt < max_attempts && error_class.is_retryable();
861                    tracing::warn!(
862                        model = %model,
863                        attempt = attempt,
864                        max_attempts = max_attempts,
865                        error_class = error_class.as_str(),
866                        retryable = can_retry_same_model,
867                        error = %message,
868                        "LLM call failed"
869                    );
870                    if can_retry_same_model {
871                        match wait_retry_backoff(
872                            agent,
873                            attempt,
874                            &mut retry_window,
875                            run_cancellation_token,
876                        )
877                        .await
878                        {
879                            RetryBackoffOutcome::Completed => continue,
880                            RetryBackoffOutcome::Cancelled => {
881                                return LlmAttemptOutcome::Cancelled;
882                            }
883                            RetryBackoffOutcome::BudgetExhausted => {
884                                tracing::warn!(
885                                    model = %model,
886                                    attempt = attempt,
887                                    "LLM retry budget exhausted"
888                                );
889                                last_llm_error =
890                                    format!("{last_llm_error} (retry budget exhausted)");
891                                break 'models;
892                            }
893                        }
894                    }
895                    break;
896                }
897            }
898        }
899    }
900
901    LlmAttemptOutcome::Exhausted {
902        last_error: last_llm_error,
903        last_error_class,
904        attempts: total_attempts,
905    }
906}
907
908pub(super) async fn run_step_prepare_phases(
909    run_ctx: &RunContext,
910    tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
911    agent: &dyn Agent,
912) -> Result<
913    (
914        Vec<Message>,
915        Vec<String>,
916        RunAction,
917        Vec<std::sync::Arc<dyn tirea_contract::runtime::inference::InferenceRequestTransform>>,
918        Vec<TrackedPatch>,
919        Vec<tirea_contract::SerializedStateAction>,
920    ),
921    AgentLoopError,
922> {
923    let system_prompt = agent.system_prompt().to_string();
924    let ((messages, filtered_tools, run_action, transforms), pending, actions) =
925        plugin_runtime::run_phase_block(
926            run_ctx,
927            tool_descriptors,
928            agent,
929            &[Phase::StepStart, Phase::BeforeInference],
930            |_| {},
931            |step| inference_inputs_from_step(step, &system_prompt),
932        )
933        .await?;
934    Ok((
935        messages,
936        filtered_tools,
937        run_action,
938        transforms,
939        pending,
940        actions,
941    ))
942}
943
944pub(super) struct PreparedStep {
945    pub(super) messages: Vec<Message>,
946    pub(super) filtered_tools: Vec<String>,
947    pub(super) run_action: RunAction,
948    pub(super) pending_patches: Vec<TrackedPatch>,
949    pub(super) serialized_state_actions: Vec<tirea_contract::SerializedStateAction>,
950    pub(super) request_transforms:
951        Vec<std::sync::Arc<dyn tirea_contract::runtime::inference::InferenceRequestTransform>>,
952}
953
954pub(super) async fn prepare_step_execution(
955    run_ctx: &RunContext,
956    tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
957    agent: &dyn Agent,
958) -> Result<PreparedStep, AgentLoopError> {
959    let (messages, filtered_tools, run_action, transforms, pending, actions) =
960        run_step_prepare_phases(run_ctx, tool_descriptors, agent).await?;
961    Ok(PreparedStep {
962        messages,
963        filtered_tools,
964        run_action,
965        pending_patches: pending,
966        serialized_state_actions: actions,
967        request_transforms: transforms,
968    })
969}
970
971pub(super) async fn apply_llm_error_cleanup(
972    run_ctx: &mut RunContext,
973    tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
974    agent: &dyn Agent,
975    error_type: &'static str,
976    message: String,
977    error_class: Option<&str>,
978) -> Result<(), AgentLoopError> {
979    plugin_runtime::emit_cleanup_phases(
980        run_ctx,
981        tool_descriptors,
982        agent,
983        error_type,
984        message,
985        error_class,
986    )
987    .await
988}
989
990pub(super) async fn complete_step_after_inference(
991    run_ctx: &mut RunContext,
992    result: &StreamResult,
993    step_meta: MessageMetadata,
994    assistant_message_id: Option<String>,
995    tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
996    agent: &dyn Agent,
997) -> Result<RunAction, AgentLoopError> {
998    let (run_action, pending, actions) = plugin_runtime::run_phase_block(
999        run_ctx,
1000        tool_descriptors,
1001        agent,
1002        &[Phase::AfterInference],
1003        |step| {
1004            use crate::contracts::runtime::inference::LLMResponse;
1005            step.llm_response = Some(LLMResponse::success(result.clone()));
1006        },
1007        |step| step.run_action(),
1008    )
1009    .await?;
1010    run_ctx.add_thread_patches(pending);
1011    run_ctx.add_serialized_state_actions(actions);
1012
1013    let assistant = assistant_turn_message(result, step_meta, assistant_message_id);
1014    run_ctx.add_message(Arc::new(assistant));
1015
1016    let (pending, actions) =
1017        plugin_runtime::emit_phase_block(Phase::StepEnd, run_ctx, tool_descriptors, agent, |_| {})
1018            .await?;
1019    run_ctx.add_thread_patches(pending);
1020    run_ctx.add_serialized_state_actions(actions);
1021    Ok(run_action)
1022}
1023
1024/// Emit events for a pending tool-call projection.
1025pub(super) fn pending_tool_events(call: &SuspendedCall) -> Vec<AgentEvent> {
1026    vec![
1027        AgentEvent::ToolCallStart {
1028            id: call.ticket.pending.id.clone(),
1029            name: call.ticket.pending.name.clone(),
1030        },
1031        AgentEvent::ToolCallReady {
1032            id: call.ticket.pending.id.clone(),
1033            name: call.ticket.pending.name.clone(),
1034            arguments: call.ticket.pending.arguments.clone(),
1035        },
1036    ]
1037}
1038
1039pub(super) fn has_suspended_calls(run_ctx: &RunContext) -> bool {
1040    !suspended_calls_from_ctx(run_ctx).is_empty()
1041}
1042
1043pub(super) fn suspended_call_ids(run_ctx: &RunContext) -> HashSet<String> {
1044    suspended_calls_from_ctx(run_ctx).into_keys().collect()
1045}
1046
1047pub(super) fn newly_suspended_call_ids(
1048    run_ctx: &RunContext,
1049    baseline_ids: &HashSet<String>,
1050) -> HashSet<String> {
1051    suspended_calls_from_ctx(run_ctx)
1052        .into_keys()
1053        .filter(|id| !baseline_ids.contains(id))
1054        .collect()
1055}
1056
1057pub(super) fn suspended_call_pending_events(run_ctx: &RunContext) -> Vec<AgentEvent> {
1058    let mut calls: Vec<SuspendedCall> = suspended_calls_from_ctx(run_ctx).into_values().collect();
1059    calls.sort_by(|left, right| left.call_id.cmp(&right.call_id));
1060    calls
1061        .into_iter()
1062        .flat_map(|call| pending_tool_events(&call))
1063        .collect()
1064}
1065
1066pub(super) fn suspended_call_pending_events_for_ids(
1067    run_ctx: &RunContext,
1068    call_ids: &HashSet<String>,
1069) -> Vec<AgentEvent> {
1070    if call_ids.is_empty() {
1071        return Vec::new();
1072    }
1073    let mut calls: Vec<SuspendedCall> = suspended_calls_from_ctx(run_ctx)
1074        .into_iter()
1075        .filter_map(|(call_id, call)| call_ids.contains(&call_id).then_some(call))
1076        .collect();
1077    calls.sort_by(|left, right| left.call_id.cmp(&right.call_id));
1078    calls
1079        .into_iter()
1080        .flat_map(|call| pending_tool_events(&call))
1081        .collect()
1082}
1083
1084pub(super) struct ToolExecutionContext {
1085    pub(super) state: serde_json::Value,
1086    pub(super) run_config: tirea_contract::RunConfig,
1087}
1088
1089pub(super) fn prepare_tool_execution_context(
1090    run_ctx: &RunContext,
1091) -> Result<ToolExecutionContext, AgentLoopError> {
1092    let state = run_ctx
1093        .snapshot()
1094        .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1095    let run_config = scope_with_tool_caller_context(run_ctx, &state)?;
1096    Ok(ToolExecutionContext { state, run_config })
1097}
1098
1099pub(super) async fn finalize_run_end(
1100    run_ctx: &mut RunContext,
1101    tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
1102    agent: &dyn Agent,
1103) {
1104    plugin_runtime::emit_run_end_phase(run_ctx, tool_descriptors, agent).await
1105}
1106
1107fn normalize_termination_for_suspended_calls(
1108    run_ctx: &RunContext,
1109    termination: TerminationReason,
1110    response: Option<String>,
1111) -> (TerminationReason, Option<String>) {
1112    let final_termination = if !matches!(
1113        termination,
1114        TerminationReason::Error(_) | TerminationReason::Cancelled
1115    ) && has_suspended_calls(run_ctx)
1116    {
1117        TerminationReason::Suspended
1118    } else {
1119        termination
1120    };
1121    let final_response = if final_termination == TerminationReason::Suspended {
1122        None
1123    } else {
1124        response
1125    };
1126    (final_termination, final_response)
1127}
1128
1129async fn persist_run_termination(
1130    run_ctx: &mut RunContext,
1131    termination: &TerminationReason,
1132    tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
1133    agent: &dyn Agent,
1134    execution_ctx: &RunExecutionContext,
1135    pending_delta_commit: &PendingDeltaCommitContext<'_>,
1136) -> Result<(), AgentLoopError> {
1137    sync_run_lifecycle_for_termination_with_context(run_ctx, execution_ctx, termination)?;
1138    finalize_run_end(run_ctx, tool_descriptors, agent).await;
1139    pending_delta_commit
1140        .commit_run_finished(run_ctx, termination)
1141        .await?;
1142    Ok(())
1143}
1144
1145fn stream_result_from_chat_response(response: &genai::chat::ChatResponse) -> StreamResult {
1146    let text = response
1147        .first_text()
1148        .map(|s| s.to_string())
1149        .unwrap_or_default();
1150    let tool_calls: Vec<crate::contracts::thread::ToolCall> = response
1151        .tool_calls()
1152        .into_iter()
1153        .map(|tc| {
1154            crate::contracts::thread::ToolCall::new(
1155                &tc.call_id,
1156                &tc.fn_name,
1157                tc.fn_arguments.clone(),
1158            )
1159        })
1160        .collect();
1161
1162    let usage = Some(crate::runtime::streaming::token_usage_from_genai(
1163        &response.usage,
1164    ));
1165    let stop_reason = response
1166        .stop_reason
1167        .as_ref()
1168        .and_then(crate::runtime::streaming::map_genai_stop_reason)
1169        .or({
1170            if !tool_calls.is_empty() {
1171                Some(tirea_contract::runtime::inference::StopReason::ToolUse)
1172            } else {
1173                Some(tirea_contract::runtime::inference::StopReason::EndTurn)
1174            }
1175        });
1176    StreamResult {
1177        text,
1178        tool_calls,
1179        usage,
1180        stop_reason,
1181    }
1182}
1183
1184fn assistant_turn_message(
1185    result: &StreamResult,
1186    step_meta: MessageMetadata,
1187    message_id: Option<String>,
1188) -> Message {
1189    let mut msg = if result.tool_calls.is_empty() {
1190        assistant_message(&result.text)
1191    } else {
1192        assistant_tool_calls(&result.text, result.tool_calls.clone())
1193    }
1194    .with_metadata(step_meta);
1195    if let Some(message_id) = message_id {
1196        msg = msg.with_id(message_id);
1197    }
1198    msg
1199}
1200
1201struct RunStartDrainOutcome {
1202    events: Vec<AgentEvent>,
1203    replayed: bool,
1204}
1205
1206fn decision_result_value(action: &ResumeDecisionAction, result: &Value) -> serde_json::Value {
1207    if result.is_null() {
1208        serde_json::Value::Bool(matches!(action, ResumeDecisionAction::Resume))
1209    } else {
1210        result.clone()
1211    }
1212}
1213
1214fn runtime_resume_inputs(run_ctx: &RunContext) -> HashMap<String, ToolCallResume> {
1215    let mut decisions = HashMap::new();
1216    for (call_id, state) in tool_call_states_from_ctx(run_ctx) {
1217        if !matches!(state.status, ToolCallStatus::Resuming) {
1218            continue;
1219        }
1220        let Some(mut resume) = state.resume else {
1221            continue;
1222        };
1223        if resume.decision_id.trim().is_empty() {
1224            resume.decision_id = call_id.clone();
1225        }
1226        decisions.insert(call_id, resume);
1227    }
1228    decisions
1229}
1230
1231fn settle_orphan_resuming_tool_states(
1232    run_ctx: &mut RunContext,
1233    suspended: &HashMap<String, SuspendedCall>,
1234    resumes: &HashMap<String, ToolCallResume>,
1235) -> Result<bool, AgentLoopError> {
1236    let states = tool_call_states_from_ctx(run_ctx);
1237    let mut changed = false;
1238
1239    for (call_id, resume) in resumes {
1240        if suspended.contains_key(call_id) {
1241            continue;
1242        }
1243        let Some(state) = states.get(call_id).cloned() else {
1244            continue;
1245        };
1246        let target_status = match &resume.action {
1247            ResumeDecisionAction::Cancel => ToolCallStatus::Cancelled,
1248            ResumeDecisionAction::Resume => ToolCallStatus::Failed,
1249        };
1250        if state.status == target_status && state.resume.as_ref() == Some(resume) {
1251            continue;
1252        }
1253
1254        let Some(next_state) = transition_tool_call_state(
1255            Some(state.clone()),
1256            ToolCallStateSeed {
1257                call_id: call_id.as_str(),
1258                tool_name: state.tool_name.as_str(),
1259                arguments: &state.arguments,
1260                status: state.status,
1261                resume_token: state.resume_token.clone(),
1262            },
1263            ToolCallStateTransition {
1264                status: target_status,
1265                resume_token: state.resume_token.clone(),
1266                resume: Some(resume.clone()),
1267                updated_at: current_unix_millis(),
1268            },
1269        ) else {
1270            continue;
1271        };
1272
1273        let base_state = run_ctx
1274            .snapshot()
1275            .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1276        let patch = upsert_tool_call_state(&base_state, call_id, next_state)?;
1277        if patch.patch().is_empty() {
1278            continue;
1279        }
1280        run_ctx.add_thread_patch(patch);
1281        changed = true;
1282    }
1283
1284    Ok(changed)
1285}
1286
1287fn all_suspended_calls_have_resume(
1288    suspended: &HashMap<String, SuspendedCall>,
1289    resumes: &HashMap<String, ToolCallResume>,
1290) -> bool {
1291    suspended
1292        .keys()
1293        .all(|call_id| resumes.contains_key(call_id))
1294}
1295
1296async fn drain_resuming_tool_calls_and_replay(
1297    run_ctx: &mut RunContext,
1298    tools: &HashMap<String, Arc<dyn Tool>>,
1299    agent: &dyn Agent,
1300    tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
1301) -> Result<RunStartDrainOutcome, AgentLoopError> {
1302    let decisions = runtime_resume_inputs(run_ctx);
1303    if decisions.is_empty() {
1304        return Ok(RunStartDrainOutcome {
1305            events: Vec::new(),
1306            replayed: false,
1307        });
1308    }
1309
1310    let suspended = suspended_calls_from_ctx(run_ctx);
1311    let mut state_changed = false;
1312    if settle_orphan_resuming_tool_states(run_ctx, &suspended, &decisions)? {
1313        state_changed = true;
1314    }
1315    if suspended.is_empty() {
1316        if state_changed {
1317            let snapshot = run_ctx
1318                .snapshot()
1319                .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1320            return Ok(RunStartDrainOutcome {
1321                events: vec![AgentEvent::StateSnapshot { snapshot }],
1322                replayed: false,
1323            });
1324        }
1325        return Ok(RunStartDrainOutcome {
1326            events: Vec::new(),
1327            replayed: false,
1328        });
1329    }
1330
1331    if matches!(
1332        agent.tool_executor().decision_replay_policy(),
1333        DecisionReplayPolicy::BatchAllSuspended
1334    ) && !all_suspended_calls_have_resume(&suspended, &decisions)
1335    {
1336        if state_changed {
1337            let snapshot = run_ctx
1338                .snapshot()
1339                .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1340            return Ok(RunStartDrainOutcome {
1341                events: vec![AgentEvent::StateSnapshot { snapshot }],
1342                replayed: false,
1343            });
1344        }
1345        return Ok(RunStartDrainOutcome {
1346            events: Vec::new(),
1347            replayed: false,
1348        });
1349    }
1350
1351    let mut events = Vec::new();
1352    let mut decision_ids: Vec<String> = decisions.keys().cloned().collect();
1353    decision_ids.sort();
1354
1355    let mut replayed = false;
1356
1357    for call_id in decision_ids {
1358        let Some(suspended_call) = suspended.get(&call_id).cloned() else {
1359            continue;
1360        };
1361        let Some(decision) = decisions.get(&call_id).cloned() else {
1362            continue;
1363        };
1364        replayed = true;
1365        let decision_result = decision_result_value(&decision.action, &decision.result);
1366        let resume_payload = ToolCallResume {
1367            result: decision_result.clone(),
1368            ..decision.clone()
1369        };
1370        events.push(AgentEvent::ToolCallResumed {
1371            target_id: suspended_call.call_id.clone(),
1372            result: decision_result.clone(),
1373        });
1374
1375        match decision.action {
1376            ResumeDecisionAction::Cancel => {
1377                let cancel_reason = resume_payload.reason.clone();
1378                if upsert_tool_call_lifecycle_state(
1379                    run_ctx,
1380                    &suspended_call,
1381                    ToolCallStatus::Cancelled,
1382                    Some(resume_payload),
1383                )? {
1384                    state_changed = true;
1385                }
1386                events.push(append_denied_tool_result_message(
1387                    run_ctx,
1388                    &suspended_call.call_id,
1389                    Some(&suspended_call.tool_name),
1390                    cancel_reason.as_deref(),
1391                ));
1392                // Cancel path skips tool execution, so no automatic scope
1393                // cleanup runs. Delete just the suspended_call entry; keep
1394                // tool_call_state (Cancelled status) for audit.
1395                let cleanup_path = format!(
1396                    "__tool_call_scope.{}.suspended_call",
1397                    suspended_call.call_id
1398                );
1399                let cleanup_patch = tirea_state::Patch::with_ops(vec![tirea_state::Op::delete(
1400                    tirea_state::parse_path(&cleanup_path),
1401                )]);
1402                let tracked = tirea_state::TrackedPatch::new(cleanup_patch)
1403                    .with_source("framework:scope_cleanup");
1404                if !tracked.patch().is_empty() {
1405                    state_changed = true;
1406                    run_ctx.add_thread_patch(tracked);
1407                }
1408            }
1409            ResumeDecisionAction::Resume => {
1410                if upsert_tool_call_lifecycle_state(
1411                    run_ctx,
1412                    &suspended_call,
1413                    ToolCallStatus::Resuming,
1414                    Some(resume_payload.clone()),
1415                )? {
1416                    state_changed = true;
1417                }
1418                let Some(tool_call) = replay_tool_call_for_resolution(
1419                    run_ctx,
1420                    &suspended_call,
1421                    &ToolCallDecision {
1422                        target_id: suspended_call.call_id.clone(),
1423                        resume: resume_payload.clone(),
1424                    },
1425                ) else {
1426                    continue;
1427                };
1428                let state = run_ctx
1429                    .snapshot()
1430                    .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1431                let tool = tools.get(&tool_call.name).cloned();
1432                let rt_for_replay = scope_with_tool_caller_context(run_ctx, &state)?;
1433                let replay_phase_ctx = ToolPhaseContext {
1434                    tool_descriptors,
1435                    agent_behavior: Some(agent.behavior()),
1436                    activity_manager: tirea_contract::runtime::activity::NoOpActivityManager::arc(),
1437                    run_config: &rt_for_replay,
1438                    thread_id: run_ctx.thread_id(),
1439                    thread_messages: run_ctx.messages(),
1440                    cancellation_token: None,
1441                };
1442                let replay_result = execute_single_tool_with_phases_deferred(
1443                    tool.as_deref(),
1444                    &tool_call,
1445                    &state,
1446                    &replay_phase_ctx,
1447                )
1448                .await?;
1449
1450                let replay_msg_id = gen_message_id();
1451                let replay_msg = tool_response(&tool_call.id, &replay_result.execution.result)
1452                    .with_id(replay_msg_id.clone());
1453                run_ctx.add_message(Arc::new(replay_msg));
1454
1455                if !replay_result.reminders.is_empty() {
1456                    let msgs: Vec<Arc<Message>> = replay_result
1457                        .reminders
1458                        .iter()
1459                        .map(|reminder| {
1460                            Arc::new(Message::internal_system(format!(
1461                                "<system-reminder>{}</system-reminder>",
1462                                reminder
1463                            )))
1464                        })
1465                        .collect();
1466                    run_ctx.add_messages(msgs);
1467                }
1468
1469                if let Some(patch) = replay_result.execution.patch.clone() {
1470                    state_changed = true;
1471                    run_ctx.add_thread_patch(patch);
1472                }
1473                if !replay_result.pending_patches.is_empty() {
1474                    state_changed = true;
1475                    run_ctx.add_thread_patches(replay_result.pending_patches.clone());
1476                }
1477                events.push(AgentEvent::ToolCallDone {
1478                    id: tool_call.id.clone(),
1479                    result: replay_result.execution.result,
1480                    patch: replay_result.execution.patch,
1481                    message_id: replay_msg_id,
1482                });
1483
1484                if let Some(next_suspended_call) = replay_result.suspended_call.clone() {
1485                    let state = run_ctx
1486                        .snapshot()
1487                        .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1488                    let action = next_suspended_call.clone().into_state_action();
1489                    let patches = reduce_state_actions(
1490                        vec![action],
1491                        &state,
1492                        "agent_loop",
1493                        &ScopeContext::run(),
1494                    )
1495                    .map_err(|e| {
1496                        AgentLoopError::StateError(format!(
1497                            "failed to reduce suspended call action: {e}"
1498                        ))
1499                    })?;
1500                    for patch in patches {
1501                        if !patch.patch().is_empty() {
1502                            state_changed = true;
1503                            run_ctx.add_thread_patch(patch);
1504                        }
1505                    }
1506                    for event in pending_tool_events(&next_suspended_call) {
1507                        events.push(event);
1508                    }
1509                }
1510            }
1511        }
1512    }
1513
1514    // No explicit clear_suspended_call needed: terminal-outcome tool calls
1515    // clear `__tool_call_scope.<call_id>.suspended_call` automatically in
1516    // execute_single_tool_with_phases_impl while preserving tool_call_state.
1517
1518    if state_changed {
1519        let snapshot = run_ctx
1520            .snapshot()
1521            .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1522        events.push(AgentEvent::StateSnapshot { snapshot });
1523    }
1524
1525    Ok(RunStartDrainOutcome { events, replayed })
1526}
1527
1528async fn drain_run_start_resume_replay(
1529    run_ctx: &mut RunContext,
1530    tools: &HashMap<String, Arc<dyn Tool>>,
1531    agent: &dyn Agent,
1532    tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
1533) -> Result<RunStartDrainOutcome, AgentLoopError> {
1534    drain_resuming_tool_calls_and_replay(run_ctx, tools, agent, tool_descriptors).await
1535}
1536
1537async fn commit_run_start_and_drain_replay(
1538    run_ctx: &mut RunContext,
1539    tools: &HashMap<String, Arc<dyn Tool>>,
1540    agent: &dyn Agent,
1541    active_tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
1542    pending_delta_commit: &PendingDeltaCommitContext<'_>,
1543) -> Result<RunStartDrainOutcome, AgentLoopError> {
1544    pending_delta_commit
1545        .commit(run_ctx, CheckpointReason::UserMessage, false)
1546        .await?;
1547
1548    let run_start_drain =
1549        drain_run_start_resume_replay(run_ctx, tools, agent, active_tool_descriptors).await?;
1550
1551    if run_start_drain.replayed {
1552        pending_delta_commit
1553            .commit(run_ctx, CheckpointReason::ToolResultsCommitted, false)
1554            .await?;
1555    }
1556
1557    Ok(run_start_drain)
1558}
1559
1560fn normalize_decision_tool_result(
1561    response: &serde_json::Value,
1562    fallback_arguments: &serde_json::Value,
1563) -> serde_json::Value {
1564    match response {
1565        serde_json::Value::Bool(_) => fallback_arguments.clone(),
1566        value => value.clone(),
1567    }
1568}
1569
1570fn denied_tool_result_for_call(
1571    run_ctx: &RunContext,
1572    call_id: &str,
1573    fallback_tool_name: Option<&str>,
1574    decision_reason: Option<&str>,
1575) -> ToolResult {
1576    let tool_name = fallback_tool_name
1577        .filter(|name| !name.is_empty())
1578        .map(str::to_string)
1579        .or_else(|| find_tool_call_in_messages(run_ctx, call_id).map(|call| call.name))
1580        .unwrap_or_else(|| "tool".to_string());
1581    let reason = decision_reason
1582        .map(str::to_string)
1583        .filter(|reason| !reason.trim().is_empty())
1584        .unwrap_or_else(|| "User denied the action".to_string());
1585    ToolResult::error(tool_name, reason)
1586}
1587
1588fn append_denied_tool_result_message(
1589    run_ctx: &mut RunContext,
1590    call_id: &str,
1591    fallback_tool_name: Option<&str>,
1592    decision_reason: Option<&str>,
1593) -> AgentEvent {
1594    let denied_result =
1595        denied_tool_result_for_call(run_ctx, call_id, fallback_tool_name, decision_reason);
1596    let message_id = gen_message_id();
1597    let denied_message = tool_response(call_id, &denied_result).with_id(message_id.clone());
1598    run_ctx.add_message(Arc::new(denied_message));
1599    AgentEvent::ToolCallDone {
1600        id: call_id.to_string(),
1601        result: denied_result,
1602        patch: None,
1603        message_id,
1604    }
1605}
1606
1607fn find_tool_call_in_messages(run_ctx: &RunContext, call_id: &str) -> Option<ToolCall> {
1608    run_ctx.messages().iter().rev().find_map(|message| {
1609        message
1610            .tool_calls
1611            .as_ref()
1612            .and_then(|calls| calls.iter().find(|call| call.id == call_id).cloned())
1613    })
1614}
1615
1616fn replay_tool_call_for_resolution(
1617    _run_ctx: &RunContext,
1618    suspended_call: &SuspendedCall,
1619    decision: &ToolCallDecision,
1620) -> Option<ToolCall> {
1621    if matches!(decision.resume.action, ResumeDecisionAction::Cancel) {
1622        return None;
1623    }
1624
1625    match suspended_call.ticket.resume_mode {
1626        ToolCallResumeMode::ReplayToolCall => Some(ToolCall::new(
1627            suspended_call.call_id.clone(),
1628            suspended_call.tool_name.clone(),
1629            suspended_call.arguments.clone(),
1630        )),
1631        ToolCallResumeMode::UseDecisionAsToolResult | ToolCallResumeMode::PassDecisionToTool => {
1632            Some(ToolCall::new(
1633                suspended_call.call_id.clone(),
1634                suspended_call.tool_name.clone(),
1635                normalize_decision_tool_result(&decision.resume.result, &suspended_call.arguments),
1636            ))
1637        }
1638    }
1639}
1640
1641fn upsert_tool_call_lifecycle_state(
1642    run_ctx: &mut RunContext,
1643    suspended_call: &SuspendedCall,
1644    status: ToolCallStatus,
1645    resume: Option<ToolCallResume>,
1646) -> Result<bool, AgentLoopError> {
1647    let current_state = tool_call_states_from_ctx(run_ctx).remove(&suspended_call.call_id);
1648    let Some(tool_state) = transition_tool_call_state(
1649        current_state,
1650        ToolCallStateSeed {
1651            call_id: &suspended_call.call_id,
1652            tool_name: &suspended_call.tool_name,
1653            arguments: &suspended_call.arguments,
1654            status: ToolCallStatus::Suspended,
1655            resume_token: Some(suspended_call.ticket.pending.id.clone()),
1656        },
1657        ToolCallStateTransition {
1658            status,
1659            resume_token: Some(suspended_call.ticket.pending.id.clone()),
1660            resume,
1661            updated_at: current_unix_millis(),
1662        },
1663    ) else {
1664        return Ok(false);
1665    };
1666
1667    let base_state = run_ctx
1668        .snapshot()
1669        .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1670    let patch = upsert_tool_call_state(&base_state, &suspended_call.call_id, tool_state)?;
1671    if patch.patch().is_empty() {
1672        return Ok(false);
1673    }
1674    run_ctx.add_thread_patch(patch);
1675    Ok(true)
1676}
1677
1678pub(super) fn resolve_suspended_call(
1679    run_ctx: &mut RunContext,
1680    response: &ToolCallDecision,
1681) -> Result<Option<DecisionReplayOutcome>, AgentLoopError> {
1682    let suspended_calls = suspended_calls_from_ctx(run_ctx);
1683    if suspended_calls.is_empty() {
1684        return Ok(None);
1685    }
1686
1687    let suspended_call = suspended_calls
1688        .get(&response.target_id)
1689        .cloned()
1690        .or_else(|| {
1691            suspended_calls
1692                .values()
1693                .find(|call| {
1694                    call.ticket.suspension.id == response.target_id
1695                        || call.ticket.pending.id == response.target_id
1696                        || call.call_id == response.target_id
1697                })
1698                .cloned()
1699        });
1700    let Some(suspended_call) = suspended_call else {
1701        return Ok(None);
1702    };
1703
1704    let _ = upsert_tool_call_lifecycle_state(
1705        run_ctx,
1706        &suspended_call,
1707        ToolCallStatus::Resuming,
1708        Some(response.resume.clone()),
1709    )?;
1710
1711    Ok(Some(DecisionReplayOutcome {
1712        events: Vec::new(),
1713        resolved_call_ids: vec![suspended_call.call_id],
1714    }))
1715}
1716
1717pub(super) fn drain_decision_channel(
1718    run_ctx: &mut RunContext,
1719    decision_rx: &mut Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1720    pending_decisions: &mut VecDeque<ToolCallDecision>,
1721) -> Result<DecisionReplayOutcome, AgentLoopError> {
1722    let mut disconnected = false;
1723    if let Some(rx) = decision_rx.as_mut() {
1724        loop {
1725            match rx.try_recv() {
1726                Ok(response) => pending_decisions.push_back(response),
1727                Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
1728                Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
1729                    disconnected = true;
1730                    break;
1731                }
1732            }
1733        }
1734    }
1735    if disconnected {
1736        *decision_rx = None;
1737    }
1738
1739    if pending_decisions.is_empty() {
1740        return Ok(DecisionReplayOutcome {
1741            events: Vec::new(),
1742            resolved_call_ids: Vec::new(),
1743        });
1744    }
1745
1746    let mut unresolved = VecDeque::new();
1747    let mut events = Vec::new();
1748    let mut resolved_call_ids = Vec::new();
1749    let mut seen = HashSet::new();
1750
1751    while let Some(response) = pending_decisions.pop_front() {
1752        if let Some(outcome) = resolve_suspended_call(run_ctx, &response)? {
1753            for call_id in outcome.resolved_call_ids {
1754                if seen.insert(call_id.clone()) {
1755                    resolved_call_ids.push(call_id);
1756                }
1757            }
1758            events.extend(outcome.events);
1759        } else {
1760            unresolved.push_back(response);
1761        }
1762    }
1763    *pending_decisions = unresolved;
1764
1765    Ok(DecisionReplayOutcome {
1766        events,
1767        resolved_call_ids,
1768    })
1769}
1770
1771async fn replay_after_decisions(
1772    run_ctx: &mut RunContext,
1773    decisions_applied: bool,
1774    step_tool_provider: &Arc<dyn StepToolProvider>,
1775    agent: &dyn Agent,
1776    active_tool_descriptors: &mut Vec<crate::contracts::runtime::tool_call::ToolDescriptor>,
1777    pending_delta_commit: &PendingDeltaCommitContext<'_>,
1778) -> Result<Vec<AgentEvent>, AgentLoopError> {
1779    if !decisions_applied {
1780        return Ok(Vec::new());
1781    }
1782
1783    let decision_tools = resolve_step_tool_snapshot(step_tool_provider, run_ctx).await?;
1784    *active_tool_descriptors = decision_tools.descriptors.clone();
1785
1786    let decision_drain = drain_run_start_resume_replay(
1787        run_ctx,
1788        &decision_tools.tools,
1789        agent,
1790        active_tool_descriptors,
1791    )
1792    .await?;
1793
1794    pending_delta_commit
1795        .commit(run_ctx, CheckpointReason::ToolResultsCommitted, false)
1796        .await?;
1797
1798    Ok(decision_drain.events)
1799}
1800
1801async fn apply_decisions_and_replay(
1802    run_ctx: &mut RunContext,
1803    decision_rx: &mut Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1804    pending_decisions: &mut VecDeque<ToolCallDecision>,
1805    step_tool_provider: &Arc<dyn StepToolProvider>,
1806    agent: &dyn Agent,
1807    active_tool_descriptors: &mut Vec<crate::contracts::runtime::tool_call::ToolDescriptor>,
1808    pending_delta_commit: &PendingDeltaCommitContext<'_>,
1809) -> Result<Vec<AgentEvent>, AgentLoopError> {
1810    Ok(drain_and_replay_decisions(
1811        run_ctx,
1812        decision_rx,
1813        pending_decisions,
1814        None,
1815        DecisionReplayInputs {
1816            step_tool_provider,
1817            agent,
1818            active_tool_descriptors,
1819        },
1820        pending_delta_commit,
1821    )
1822    .await?
1823    .events)
1824}
1825
1826pub(super) struct DecisionReplayOutcome {
1827    events: Vec<AgentEvent>,
1828    resolved_call_ids: Vec<String>,
1829}
1830
1831struct DecisionReplayInputs<'a> {
1832    step_tool_provider: &'a Arc<dyn StepToolProvider>,
1833    agent: &'a dyn Agent,
1834    active_tool_descriptors: &'a mut Vec<crate::contracts::runtime::tool_call::ToolDescriptor>,
1835}
1836
1837async fn drain_and_replay_decisions(
1838    run_ctx: &mut RunContext,
1839    decision_rx: &mut Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1840    pending_decisions: &mut VecDeque<ToolCallDecision>,
1841    decision: Option<ToolCallDecision>,
1842    replay_inputs: DecisionReplayInputs<'_>,
1843    pending_delta_commit: &PendingDeltaCommitContext<'_>,
1844) -> Result<DecisionReplayOutcome, AgentLoopError> {
1845    if let Some(decision) = decision {
1846        pending_decisions.push_back(decision);
1847    }
1848    let decision_drain = drain_decision_channel(run_ctx, decision_rx, pending_decisions)?;
1849    let mut events = decision_drain.events;
1850    let replay_events = replay_after_decisions(
1851        run_ctx,
1852        !decision_drain.resolved_call_ids.is_empty(),
1853        replay_inputs.step_tool_provider,
1854        replay_inputs.agent,
1855        replay_inputs.active_tool_descriptors,
1856        pending_delta_commit,
1857    )
1858    .await?;
1859    events.extend(replay_events);
1860
1861    Ok(DecisionReplayOutcome {
1862        events,
1863        resolved_call_ids: decision_drain.resolved_call_ids,
1864    })
1865}
1866
1867async fn apply_decision_and_replay(
1868    run_ctx: &mut RunContext,
1869    response: ToolCallDecision,
1870    decision_rx: &mut Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1871    pending_decisions: &mut VecDeque<ToolCallDecision>,
1872    replay_inputs: DecisionReplayInputs<'_>,
1873    pending_delta_commit: &PendingDeltaCommitContext<'_>,
1874) -> Result<DecisionReplayOutcome, AgentLoopError> {
1875    drain_and_replay_decisions(
1876        run_ctx,
1877        decision_rx,
1878        pending_decisions,
1879        Some(response),
1880        replay_inputs,
1881        pending_delta_commit,
1882    )
1883    .await
1884}
1885
1886async fn recv_decision(
1887    decision_rx: &mut Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1888) -> Option<ToolCallDecision> {
1889    let rx = decision_rx.as_mut()?;
1890    rx.recv().await
1891}
1892
1893/// Run the full agent loop until completion, suspension, cancellation, or error.
1894///
1895/// This is the primary non-streaming entry point. Tools are passed directly
1896/// and used as the default tool set unless the agent's step_tool_provider is set
1897/// (for dynamic per-step tool resolution).
1898pub async fn run_loop(
1899    agent: &dyn Agent,
1900    tools: HashMap<String, Arc<dyn Tool>>,
1901    run_ctx: RunContext,
1902    cancellation_token: Option<RunCancellationToken>,
1903    state_committer: Option<Arc<dyn StateCommitter>>,
1904    decision_rx: Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1905) -> LoopOutcome {
1906    let execution_ctx = RunExecutionContext::from_run_config(&run_ctx);
1907    run_loop_with_context(
1908        agent,
1909        tools,
1910        run_ctx,
1911        execution_ctx,
1912        cancellation_token,
1913        state_committer,
1914        decision_rx,
1915    )
1916    .await
1917}
1918
1919/// Run the full agent loop until completion, suspension, cancellation, or error.
1920///
1921/// This is the primary non-streaming entry point. Tools are passed directly
1922/// and used as the default tool set unless the agent's step_tool_provider is set
1923/// (for dynamic per-step tool resolution).
1924pub async fn run_loop_with_context(
1925    agent: &dyn Agent,
1926    tools: HashMap<String, Arc<dyn Tool>>,
1927    mut run_ctx: RunContext,
1928    execution_ctx: RunExecutionContext,
1929    cancellation_token: Option<RunCancellationToken>,
1930    state_committer: Option<Arc<dyn StateCommitter>>,
1931    mut decision_rx: Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1932) -> LoopOutcome {
1933    bind_execution_context_to_run_config(&mut run_ctx, &execution_ctx);
1934
1935    let executor = llm_executor_for_run(agent);
1936    let tool_executor = agent.tool_executor();
1937    let mut run_state = LoopRunState::new();
1938    let mut pending_decisions = VecDeque::new();
1939    let run_cancellation_token = cancellation_token;
1940    let mut last_text = String::new();
1941    let mut continued_response_prefix = String::new();
1942    let step_tool_provider = step_tool_provider_for_run(agent, tools);
1943    let run_id = execution_ctx.run_id.clone();
1944    let baseline_suspended_call_ids = suspended_call_ids(&run_ctx);
1945    let pending_delta_commit =
1946        PendingDeltaCommitContext::new(&execution_ctx, state_committer.as_ref());
1947    let initial_step_tools = match resolve_step_tool_snapshot(&step_tool_provider, &run_ctx).await {
1948        Ok(snapshot) => snapshot,
1949        Err(error) => {
1950            let msg = error.to_string();
1951            return build_loop_outcome(
1952                run_ctx,
1953                TerminationReason::Error(msg.clone()),
1954                None,
1955                &run_state,
1956                Some(outcome::LoopFailure::State(msg)),
1957            );
1958        }
1959    };
1960    let StepToolSnapshot {
1961        tools: initial_tools,
1962        descriptors: initial_descriptors,
1963    } = initial_step_tools;
1964    let mut active_tool_descriptors = initial_descriptors;
1965
1966    macro_rules! terminate_run {
1967        ($termination:expr, $response:expr, $failure:expr) => {{
1968            let reason: TerminationReason = $termination;
1969            let (final_termination, final_response) =
1970                normalize_termination_for_suspended_calls(&run_ctx, reason, $response);
1971            if let Err(error) = persist_run_termination(
1972                &mut run_ctx,
1973                &final_termination,
1974                &active_tool_descriptors,
1975                agent,
1976                &execution_ctx,
1977                &pending_delta_commit,
1978            )
1979            .await
1980            {
1981                let msg = error.to_string();
1982                return build_loop_outcome(
1983                    run_ctx,
1984                    TerminationReason::Error(msg.clone()),
1985                    None,
1986                    &run_state,
1987                    Some(outcome::LoopFailure::State(msg)),
1988                );
1989            }
1990            return build_loop_outcome(
1991                run_ctx,
1992                final_termination,
1993                final_response,
1994                &run_state,
1995                $failure,
1996            );
1997        }};
1998    }
1999
2000    // Phase: RunStart
2001    let (pending, actions) = match plugin_runtime::emit_phase_block(
2002        Phase::RunStart,
2003        &run_ctx,
2004        &active_tool_descriptors,
2005        agent,
2006        |_| {},
2007    )
2008    .await
2009    {
2010        Ok(result) => result,
2011        Err(error) => {
2012            let msg = error.to_string();
2013            terminate_run!(
2014                TerminationReason::Error(msg.clone()),
2015                None,
2016                Some(outcome::LoopFailure::State(msg))
2017            );
2018        }
2019    };
2020    run_ctx.add_thread_patches(pending);
2021    run_ctx.add_serialized_state_actions(actions);
2022    if let Err(error) = commit_run_start_and_drain_replay(
2023        &mut run_ctx,
2024        &initial_tools,
2025        agent,
2026        &active_tool_descriptors,
2027        &pending_delta_commit,
2028    )
2029    .await
2030    {
2031        let msg = error.to_string();
2032        terminate_run!(
2033            TerminationReason::Error(msg.clone()),
2034            None,
2035            Some(outcome::LoopFailure::State(msg))
2036        );
2037    }
2038    let run_start_new_suspended = newly_suspended_call_ids(&run_ctx, &baseline_suspended_call_ids);
2039    if !run_start_new_suspended.is_empty() {
2040        terminate_run!(TerminationReason::Suspended, None, None);
2041    }
2042    loop {
2043        if let Err(error) = apply_decisions_and_replay(
2044            &mut run_ctx,
2045            &mut decision_rx,
2046            &mut pending_decisions,
2047            &step_tool_provider,
2048            agent,
2049            &mut active_tool_descriptors,
2050            &pending_delta_commit,
2051        )
2052        .await
2053        {
2054            let msg = error.to_string();
2055            terminate_run!(
2056                TerminationReason::Error(msg.clone()),
2057                None,
2058                Some(outcome::LoopFailure::State(msg))
2059            );
2060        }
2061
2062        if is_run_cancelled(run_cancellation_token.as_ref()) {
2063            terminate_run!(TerminationReason::Cancelled, None, None);
2064        }
2065
2066        let step_tools = match resolve_step_tool_snapshot(&step_tool_provider, &run_ctx).await {
2067            Ok(snapshot) => snapshot,
2068            Err(e) => {
2069                let msg = e.to_string();
2070                terminate_run!(
2071                    TerminationReason::Error(msg.clone()),
2072                    None,
2073                    Some(outcome::LoopFailure::State(msg))
2074                );
2075            }
2076        };
2077        active_tool_descriptors = step_tools.descriptors.clone();
2078
2079        let prepared = match prepare_step_execution(&run_ctx, &active_tool_descriptors, agent).await
2080        {
2081            Ok(v) => v,
2082            Err(e) => {
2083                let msg = e.to_string();
2084                terminate_run!(
2085                    TerminationReason::Error(msg.clone()),
2086                    None,
2087                    Some(outcome::LoopFailure::State(msg))
2088                );
2089            }
2090        };
2091        run_ctx.add_thread_patches(prepared.pending_patches);
2092
2093        match prepared.run_action {
2094            RunAction::Continue => {}
2095            RunAction::Terminate(reason) => {
2096                let response = if matches!(reason, TerminationReason::BehaviorRequested) {
2097                    Some(last_text.clone())
2098                } else {
2099                    None
2100                };
2101                terminate_run!(reason, response, None);
2102            }
2103        }
2104
2105        // Call LLM with unified retry + fallback model strategy.
2106        let messages = prepared.messages;
2107        let filtered_tools = prepared.filtered_tools;
2108        let request_transforms = prepared.request_transforms;
2109        let chat_options = agent.chat_options().cloned();
2110        let attempt_outcome = run_llm_with_retry_and_fallback(
2111            agent,
2112            run_cancellation_token.as_ref(),
2113            true,
2114            None,
2115            "unknown llm error",
2116            |model| {
2117                let request = build_request_for_filtered_tools(
2118                    &messages,
2119                    &step_tools.tools,
2120                    &filtered_tools,
2121                    &request_transforms,
2122                );
2123                let executor = executor.clone();
2124                let chat_options = chat_options.clone();
2125                async move {
2126                    executor
2127                        .exec_chat_response(&model, request, chat_options.as_ref())
2128                        .await
2129                }
2130            },
2131        )
2132        .await;
2133
2134        let response = match attempt_outcome {
2135            LlmAttemptOutcome::Success {
2136                value, attempts, ..
2137            } => {
2138                run_state.record_llm_attempts(attempts);
2139                value
2140            }
2141            LlmAttemptOutcome::Cancelled => {
2142                append_cancellation_user_message(&mut run_ctx, CancellationStage::Inference);
2143                terminate_run!(TerminationReason::Cancelled, None, None);
2144            }
2145            LlmAttemptOutcome::Exhausted {
2146                last_error,
2147                last_error_class,
2148                attempts,
2149            } => {
2150                run_state.record_llm_attempts(attempts);
2151                if let Err(phase_error) = apply_llm_error_cleanup(
2152                    &mut run_ctx,
2153                    &active_tool_descriptors,
2154                    agent,
2155                    "llm_exec_error",
2156                    last_error.clone(),
2157                    last_error_class,
2158                )
2159                .await
2160                {
2161                    let msg = phase_error.to_string();
2162                    terminate_run!(
2163                        TerminationReason::Error(msg.clone()),
2164                        None,
2165                        Some(outcome::LoopFailure::State(msg))
2166                    );
2167                }
2168                terminate_run!(
2169                    TerminationReason::Error(last_error.clone()),
2170                    None,
2171                    Some(outcome::LoopFailure::Llm(last_error))
2172                );
2173            }
2174        };
2175
2176        let result = stream_result_from_chat_response(&response);
2177        run_state.update_from_response(&result);
2178        last_text = stitch_response_text(&continued_response_prefix, &result.text);
2179
2180        // Add assistant message
2181        let assistant_msg_id = gen_message_id();
2182        let step_meta = step_metadata(Some(run_id.clone()), run_state.completed_steps as u32);
2183        let post_inference_action = match complete_step_after_inference(
2184            &mut run_ctx,
2185            &result,
2186            step_meta.clone(),
2187            Some(assistant_msg_id.clone()),
2188            &active_tool_descriptors,
2189            agent,
2190        )
2191        .await
2192        {
2193            Ok(action) => action,
2194            Err(e) => {
2195                let msg = e.to_string();
2196                terminate_run!(
2197                    TerminationReason::Error(msg.clone()),
2198                    None,
2199                    Some(outcome::LoopFailure::State(msg))
2200                );
2201            }
2202        };
2203        if let Err(error) = pending_delta_commit
2204            .commit(
2205                &mut run_ctx,
2206                CheckpointReason::AssistantTurnCommitted,
2207                false,
2208            )
2209            .await
2210        {
2211            let msg = error.to_string();
2212            terminate_run!(
2213                TerminationReason::Error(msg.clone()),
2214                None,
2215                Some(outcome::LoopFailure::State(msg))
2216            );
2217        }
2218
2219        mark_step_completed(&mut run_state);
2220
2221        // Truncation recovery: if the model hit max_tokens with no tool
2222        // calls, inject a continuation prompt and re-enter inference.
2223        if truncation_recovery::should_retry(&result, &mut run_state) {
2224            extend_response_prefix(&mut continued_response_prefix, &result.text);
2225            let prompt = truncation_recovery::continuation_message();
2226            run_ctx.add_message(Arc::new(prompt));
2227            continue;
2228        }
2229        continued_response_prefix.clear();
2230
2231        let post_inference_termination = match &post_inference_action {
2232            RunAction::Terminate(reason) => Some(reason.clone()),
2233            _ => None,
2234        };
2235
2236        // Only `Stopped` termination is deferred past tool execution so the
2237        // current round's tools complete (e.g. MaxRounds lets tools finish).
2238        // All other reasons terminate immediately before tool execution.
2239        if let Some(reason) = &post_inference_termination {
2240            if !matches!(reason, TerminationReason::Stopped(_)) {
2241                terminate_run!(reason.clone(), Some(last_text.clone()), None);
2242            }
2243        }
2244
2245        if !result.needs_tools() {
2246            run_state.record_step_without_tools();
2247            let reason = post_inference_termination.unwrap_or(TerminationReason::NaturalEnd);
2248            terminate_run!(reason, Some(last_text.clone()), None);
2249        }
2250
2251        // Execute tools with phase hooks using configured execution strategy.
2252        let tool_context = match prepare_tool_execution_context(&run_ctx) {
2253            Ok(ctx) => ctx,
2254            Err(e) => {
2255                let msg = e.to_string();
2256                terminate_run!(
2257                    TerminationReason::Error(msg.clone()),
2258                    None,
2259                    Some(outcome::LoopFailure::State(msg))
2260                );
2261            }
2262        };
2263        let thread_messages_for_tools = run_ctx.messages().to_vec();
2264        let thread_version_for_tools = run_ctx.version();
2265
2266        let tool_exec_future = tool_executor.execute(ToolExecutionRequest {
2267            tools: &step_tools.tools,
2268            calls: &result.tool_calls,
2269            state: &tool_context.state,
2270            tool_descriptors: &active_tool_descriptors,
2271            agent_behavior: Some(agent.behavior()),
2272            activity_manager: tirea_contract::runtime::activity::NoOpActivityManager::arc(),
2273            run_config: &tool_context.run_config,
2274            thread_id: run_ctx.thread_id(),
2275            thread_messages: &thread_messages_for_tools,
2276            state_version: thread_version_for_tools,
2277            cancellation_token: run_cancellation_token.as_ref(),
2278        });
2279        let results = tool_exec_future.await.map_err(AgentLoopError::from);
2280
2281        let results = match results {
2282            Ok(r) => r,
2283            Err(AgentLoopError::Cancelled) => {
2284                append_cancellation_user_message(&mut run_ctx, CancellationStage::ToolExecution);
2285                terminate_run!(TerminationReason::Cancelled, None, None);
2286            }
2287            Err(e) => {
2288                let msg = e.to_string();
2289                terminate_run!(
2290                    TerminationReason::Error(msg.clone()),
2291                    None,
2292                    Some(outcome::LoopFailure::State(msg))
2293                );
2294            }
2295        };
2296
2297        if let Err(_e) = apply_tool_results_to_session(
2298            &mut run_ctx,
2299            &results,
2300            Some(step_meta),
2301            tool_executor.requires_parallel_patch_conflict_check(),
2302        ) {
2303            // On error, we can't easily rollback RunContext, so just terminate
2304            let msg = _e.to_string();
2305            terminate_run!(
2306                TerminationReason::Error(msg.clone()),
2307                None,
2308                Some(outcome::LoopFailure::State(msg))
2309            );
2310        }
2311        if let Err(error) = pending_delta_commit
2312            .commit(&mut run_ctx, CheckpointReason::ToolResultsCommitted, false)
2313            .await
2314        {
2315            let msg = error.to_string();
2316            terminate_run!(
2317                TerminationReason::Error(msg.clone()),
2318                None,
2319                Some(outcome::LoopFailure::State(msg))
2320            );
2321        }
2322
2323        if let Err(error) = apply_decisions_and_replay(
2324            &mut run_ctx,
2325            &mut decision_rx,
2326            &mut pending_decisions,
2327            &step_tool_provider,
2328            agent,
2329            &mut active_tool_descriptors,
2330            &pending_delta_commit,
2331        )
2332        .await
2333        {
2334            let msg = error.to_string();
2335            terminate_run!(
2336                TerminationReason::Error(msg.clone()),
2337                None,
2338                Some(outcome::LoopFailure::State(msg))
2339            );
2340        }
2341
2342        // If ALL tools are suspended (no completed results), terminate immediately.
2343        if has_suspended_calls(&run_ctx) {
2344            let has_completed = results
2345                .iter()
2346                .any(|r| !matches!(r.outcome, crate::contracts::ToolCallOutcome::Suspended));
2347            if !has_completed {
2348                terminate_run!(TerminationReason::Suspended, None, None);
2349            }
2350        }
2351
2352        // Deferred post-inference termination: tools from the current round
2353        // have completed; stop the loop before the next inference.
2354        if let Some(reason) = post_inference_termination {
2355            terminate_run!(reason, Some(last_text.clone()), None);
2356        }
2357
2358        // Track tool-step metrics for loop stats and plugin consumers.
2359        let error_count = results
2360            .iter()
2361            .filter(|r| r.execution.result.is_error())
2362            .count();
2363        run_state.record_tool_step(&result.tool_calls, error_count);
2364    }
2365}
2366
2367/// Run the agent loop with streaming output.
2368///
2369/// Returns a stream of AgentEvent for real-time updates. Tools are passed
2370/// directly and used as the default tool set unless the agent's step_tool_provider
2371/// is set (for dynamic per-step tool resolution).
2372pub fn run_loop_stream(
2373    agent: Arc<dyn Agent>,
2374    tools: HashMap<String, Arc<dyn Tool>>,
2375    run_ctx: RunContext,
2376    cancellation_token: Option<RunCancellationToken>,
2377    state_committer: Option<Arc<dyn StateCommitter>>,
2378    decision_rx: Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
2379) -> Pin<Box<dyn Stream<Item = AgentEvent> + Send>> {
2380    let execution_ctx = RunExecutionContext::from_run_config(&run_ctx);
2381    run_loop_stream_with_context(
2382        agent,
2383        tools,
2384        run_ctx,
2385        execution_ctx,
2386        cancellation_token,
2387        state_committer,
2388        decision_rx,
2389    )
2390}
2391
2392pub fn run_loop_stream_with_context(
2393    agent: Arc<dyn Agent>,
2394    tools: HashMap<String, Arc<dyn Tool>>,
2395    mut run_ctx: RunContext,
2396    execution_ctx: RunExecutionContext,
2397    cancellation_token: Option<RunCancellationToken>,
2398    state_committer: Option<Arc<dyn StateCommitter>>,
2399    decision_rx: Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
2400) -> Pin<Box<dyn Stream<Item = AgentEvent> + Send>> {
2401    bind_execution_context_to_run_config(&mut run_ctx, &execution_ctx);
2402
2403    stream_runner::run_stream(
2404        agent,
2405        tools,
2406        run_ctx,
2407        execution_ctx,
2408        cancellation_token,
2409        state_committer,
2410        decision_rx,
2411    )
2412}
2413
2414#[cfg(test)]
2415mod tests;