Skip to main content

tirea_agent_loop/runtime/loop_runner/
mod.rs

1//! Agent loop implementation with Phase-based plugin execution.
2//!
3//! The agent loop orchestrates the conversation between user, LLM, and tools:
4//!
5//! ```text
6//! User Input → LLM → Tool Calls? → Execute Tools → LLM → ... → Final Response
7//! ```
8//!
9//! # Phase Execution
10//!
11//! Each phase dispatches to its typed plugin hook:
12//!
13//! ```text
14//! RunStart (once)
15//!     │
16//!     ▼
17//! ┌─────────────────────────┐
18//! │      StepStart          │ ← plugins can apply state patches
19//! ├─────────────────────────┤
20//! │    BeforeInference      │ ← plugins can inject prompt context, filter tools
21//! ├─────────────────────────┤
22//! │      [LLM CALL]         │
23//! ├─────────────────────────┤
24//! │    AfterInference       │
25//! ├─────────────────────────┤
26//! │  ┌───────────────────┐  │
27//! │  │ BeforeToolExecute │  │ ← plugins can block/pending
28//! │  ├───────────────────┤  │
29//! │  │   [TOOL EXEC]     │  │
30//! │  ├───────────────────┤  │
31//! │  │ AfterToolExecute  │  │ ← plugins can add reminders
32//! │  └───────────────────┘  │
33//! ├─────────────────────────┤
34//! │       StepEnd           │
35//! └─────────────────────────┘
36//!     │
37//!     ▼
38//! RunEnd (once)
39//! ```
40
41mod config;
42mod core;
43mod event_envelope_meta;
44mod outcome;
45mod parallel_state_merge;
46mod plugin_runtime;
47mod run_state;
48mod state_commit;
49mod stream_core;
50mod stream_runner;
51mod tool_exec;
52mod truncation_recovery;
53
54use crate::contracts::io::ResumeDecisionAction;
55use crate::contracts::runtime::phase::Phase;
56use crate::contracts::runtime::state::{reduce_state_actions, AnyStateAction, ScopeContext};
57use crate::contracts::runtime::tool_call::{Tool, ToolResult};
58use crate::contracts::runtime::ActivityManager;
59use crate::contracts::runtime::{
60    DecisionReplayPolicy, RunLifecycleAction, RunLifecycleState, StreamResult, SuspendedCall,
61    ToolCallResume, ToolCallResumeMode, ToolCallStatus, ToolExecutionRequest, ToolExecutionResult,
62};
63use crate::contracts::thread::CheckpointReason;
64use crate::contracts::thread::{gen_message_id, Message, MessageMetadata, ToolCall};
65use crate::contracts::RunContext;
66use crate::contracts::{AgentEvent, RunAction, RunOrigin, TerminationReason, ToolCallDecision};
67use crate::engine::convert::{assistant_message, assistant_tool_calls, tool_response};
68use crate::runtime::activity::ActivityHub;
69
70use crate::runtime::streaming::StreamCollector;
71use async_stream::stream;
72use futures::{Stream, StreamExt};
73use genai::Client;
74use serde_json::Value;
75use std::collections::{HashMap, HashSet, VecDeque};
76use std::future::Future;
77use std::pin::Pin;
78use std::sync::atomic::{AtomicU64, Ordering};
79use std::sync::Arc;
80use std::time::{Instant, SystemTime, UNIX_EPOCH};
81use uuid::Uuid;
82
83pub use crate::contracts::runtime::ToolExecutor;
84pub use crate::runtime::run_context::{
85    await_or_cancel, is_cancelled, CancelAware, RunCancellationToken, StateCommitError,
86    StateCommitter, TOOL_SCOPE_CALLER_AGENT_ID_KEY, TOOL_SCOPE_CALLER_MESSAGES_KEY,
87    TOOL_SCOPE_CALLER_STATE_KEY, TOOL_SCOPE_CALLER_THREAD_ID_KEY,
88};
89use config::StaticStepToolProvider;
90pub use config::{Agent, BaseAgent, GenaiLlmExecutor, LlmRetryPolicy};
91pub use config::{LlmEventStream, LlmExecutor};
92pub use config::{StepToolInput, StepToolProvider, StepToolSnapshot};
93#[cfg(test)]
94use core::build_messages;
95use core::{
96    build_request_for_filtered_tools, inference_inputs_from_step, suspended_calls_from_ctx,
97    tool_call_states_from_ctx, transition_tool_call_state, upsert_tool_call_state,
98    ToolCallStateSeed, ToolCallStateTransition,
99};
100pub use outcome::{tool_map, tool_map_from_arc, AgentLoopError};
101pub use outcome::{LoopOutcome, LoopStats, LoopUsage};
102#[cfg(test)]
103use plugin_runtime::emit_agent_phase;
104#[cfg(test)]
105use plugin_runtime::emit_cleanup_phases;
106use run_state::LoopRunState;
107pub use state_commit::ChannelStateCommitter;
108use state_commit::PendingDeltaCommitContext;
109use tirea_state::TrackedPatch;
110#[cfg(test)]
111use tokio_util::sync::CancellationToken;
112#[cfg(test)]
113use tool_exec::execute_single_tool_with_phases;
114#[cfg(test)]
115use tool_exec::execute_tools_parallel_with_phases;
116pub use tool_exec::ExecuteToolsOutcome;
117use tool_exec::{
118    apply_tool_results_impl, apply_tool_results_to_session,
119    execute_single_tool_with_phases_deferred, scope_with_tool_caller_context, step_metadata,
120    ToolPhaseContext,
121};
122pub use tool_exec::{
123    execute_tools, execute_tools_with_behaviors, execute_tools_with_config,
124    ParallelToolExecutionMode, ParallelToolExecutor, SequentialToolExecutor,
125};
126
127/// Fully resolved agent wiring ready for execution.
128///
129/// Contains everything needed to run an agent loop: the agent,
130/// the resolved tool map, and the runtime config. This is a pure data struct
131/// that can be inspected, mutated, and tested independently.
132pub struct ResolvedRun {
133    /// The agent (model, behavior, execution strategies, ...).
134    ///
135    /// Exposed as a concrete [`BaseAgent`] so callers can mutate fields
136    /// (model, plugins, tool_executor, ...) between resolution and execution.
137    /// Converted to `Arc<dyn Agent>` at the execution boundary.
138    pub agent: BaseAgent,
139    /// Resolved tool map after filtering and wiring.
140    pub tools: HashMap<String, Arc<dyn Tool>>,
141    /// Runtime configuration (user_id, run_id, ...).
142    pub run_config: crate::contracts::RunConfig,
143}
144
145impl ResolvedRun {
146    /// Add or replace a tool in the resolved tool map.
147    #[must_use]
148    pub fn with_tool(mut self, id: String, tool: Arc<dyn Tool>) -> Self {
149        self.tools.insert(id, tool);
150        self
151    }
152
153    /// Overlay tools from a map (insert-if-absent semantics).
154    pub fn overlay_tools(&mut self, tools: HashMap<String, Arc<dyn Tool>>) {
155        for (id, tool) in tools {
156            self.tools.entry(id).or_insert(tool);
157        }
158    }
159}
160
161#[derive(Debug, Clone)]
162pub struct RunExecutionContext {
163    pub run_id: String,
164    pub parent_run_id: Option<String>,
165    pub agent_id: String,
166    pub origin: RunOrigin,
167}
168
169impl RunExecutionContext {
170    pub const RUN_ID_KEY: &'static str = "run_id";
171    pub const PARENT_RUN_ID_KEY: &'static str = "parent_run_id";
172    pub const AGENT_ID_KEY: &'static str = "agent_id";
173    pub const ORIGIN_KEY: &'static str = "origin";
174
175    #[must_use]
176    pub const fn new(
177        run_id: String,
178        parent_run_id: Option<String>,
179        agent_id: String,
180        origin: RunOrigin,
181    ) -> Self {
182        Self {
183            run_id,
184            parent_run_id,
185            agent_id,
186            origin,
187        }
188    }
189
190    #[must_use]
191    pub fn from_run_config(run_ctx: &RunContext) -> Self {
192        let run_id = run_ctx
193            .run_config
194            .value(Self::RUN_ID_KEY)
195            .and_then(Value::as_str)
196            .map(str::trim)
197            .filter(|id| !id.is_empty())
198            .map(ToOwned::to_owned)
199            .unwrap_or_else(uuid_v7);
200
201        let parent_run_id = run_ctx
202            .run_config
203            .value(Self::PARENT_RUN_ID_KEY)
204            .and_then(Value::as_str)
205            .map(str::trim)
206            .filter(|id| !id.is_empty())
207            .map(ToOwned::to_owned);
208
209        let agent_id = run_ctx
210            .run_config
211            .value(Self::AGENT_ID_KEY)
212            .and_then(Value::as_str)
213            .map(str::trim)
214            .filter(|id| !id.is_empty())
215            .map(ToOwned::to_owned)
216            .unwrap_or_else(|| run_ctx.thread_id().to_string());
217
218        let origin = run_ctx
219            .run_config
220            .value(Self::ORIGIN_KEY)
221            .and_then(|value| serde_json::from_value::<RunOrigin>(value.clone()).ok())
222            .unwrap_or_default();
223
224        Self::new(run_id, parent_run_id, agent_id, origin)
225    }
226}
227
228fn uuid_v7() -> String {
229    Uuid::now_v7().simple().to_string()
230}
231
232fn bind_execution_context_to_run_config(
233    run_ctx: &mut RunContext,
234    execution_ctx: &RunExecutionContext,
235) {
236    let _ = run_ctx.run_config.set(
237        RunExecutionContext::RUN_ID_KEY,
238        execution_ctx.run_id.clone(),
239    );
240    if let Some(parent_run_id) = execution_ctx.parent_run_id.as_deref() {
241        let _ = run_ctx.run_config.set(
242            RunExecutionContext::PARENT_RUN_ID_KEY,
243            parent_run_id.to_string(),
244        );
245    }
246    let _ = run_ctx.run_config.set(
247        RunExecutionContext::AGENT_ID_KEY,
248        execution_ctx.agent_id.clone(),
249    );
250    if let Ok(origin_value) = serde_json::to_value(execution_ctx.origin) {
251        let _ = run_ctx
252            .run_config
253            .set(RunExecutionContext::ORIGIN_KEY, origin_value);
254    }
255}
256
257pub(crate) fn current_unix_millis() -> u64 {
258    SystemTime::now()
259        .duration_since(UNIX_EPOCH)
260        .map_or(0, |d| d.as_millis().min(u128::from(u64::MAX)) as u64)
261}
262
263#[cfg(test)]
264pub(super) fn sync_run_lifecycle_for_termination(
265    run_ctx: &mut RunContext,
266    execution_ctx: &RunExecutionContext,
267    termination: &TerminationReason,
268) -> Result<(), AgentLoopError> {
269    sync_run_lifecycle_for_termination_with_context(run_ctx, execution_ctx, termination)
270}
271
272fn sync_run_lifecycle_for_termination_with_context(
273    run_ctx: &mut RunContext,
274    execution_ctx: &RunExecutionContext,
275    termination: &TerminationReason,
276) -> Result<(), AgentLoopError> {
277    if execution_ctx.run_id.trim().is_empty() {
278        return Ok(());
279    };
280
281    let (status, done_reason) = termination.to_run_status();
282
283    let base_state = run_ctx
284        .snapshot()
285        .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
286    let actions = vec![AnyStateAction::new::<RunLifecycleState>(
287        RunLifecycleAction::Set {
288            id: execution_ctx.run_id.clone(),
289            status,
290            done_reason,
291            updated_at: current_unix_millis(),
292        },
293    )];
294    let patches = reduce_state_actions(actions, &base_state, "agent_loop", &ScopeContext::run())
295        .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
296    run_ctx.add_thread_patches(patches);
297    Ok(())
298}
299
300#[derive(Debug, Clone, Copy, PartialEq, Eq)]
301pub(super) enum CancellationStage {
302    Inference,
303    ToolExecution,
304}
305
306pub(super) const CANCELLATION_INFERENCE_USER_MESSAGE: &str =
307    "The previous run was interrupted during inference. Please continue from the current context.";
308pub(super) const CANCELLATION_TOOL_USER_MESSAGE: &str =
309    "The previous run was interrupted while using tools. Please continue from the current context.";
310
311pub(super) fn append_cancellation_user_message(run_ctx: &mut RunContext, stage: CancellationStage) {
312    let content = match stage {
313        CancellationStage::Inference => CANCELLATION_INFERENCE_USER_MESSAGE,
314        CancellationStage::ToolExecution => CANCELLATION_TOOL_USER_MESSAGE,
315    };
316    run_ctx.add_message(Arc::new(Message::user(content)));
317}
318
319pub(super) fn effective_llm_models(agent: &dyn Agent) -> Vec<String> {
320    let mut models = Vec::with_capacity(1 + agent.fallback_models().len());
321    models.push(agent.model().to_string());
322    for model in agent.fallback_models() {
323        if model.trim().is_empty() {
324            continue;
325        }
326        if !models.iter().any(|m| m == model) {
327            models.push(model.clone());
328        }
329    }
330    models
331}
332
333pub(super) fn effective_llm_models_from(
334    agent: &dyn Agent,
335    start_model: Option<&str>,
336) -> Vec<String> {
337    let models = effective_llm_models(agent);
338    let Some(start_model) = start_model.map(str::trim).filter(|model| !model.is_empty()) else {
339        return models;
340    };
341    let Some(index) = models.iter().position(|model| model == start_model) else {
342        return models;
343    };
344    models.into_iter().skip(index).collect()
345}
346
347pub(super) fn next_llm_model_after(agent: &dyn Agent, current_model: &str) -> Option<String> {
348    let models = effective_llm_models(agent);
349    let current_index = models.iter().position(|model| model == current_model)?;
350    models.into_iter().nth(current_index + 1)
351}
352
353pub(super) fn llm_retry_attempts(agent: &dyn Agent) -> usize {
354    agent.llm_retry_policy().max_attempts_per_model.max(1)
355}
356
357#[derive(Debug, Clone, Copy, PartialEq, Eq)]
358pub(super) enum LlmErrorClass {
359    RateLimit,
360    Timeout,
361    Connection,
362    ServerUnavailable,
363    ServerError,
364    Auth,
365    ClientRequest,
366    Unknown,
367}
368
369impl LlmErrorClass {
370    pub(super) fn is_retryable(self) -> bool {
371        matches!(
372            self,
373            LlmErrorClass::RateLimit
374                | LlmErrorClass::Timeout
375                | LlmErrorClass::Connection
376                | LlmErrorClass::ServerUnavailable
377                | LlmErrorClass::ServerError
378        )
379    }
380
381    /// Stable string label for telemetry and structured logging.
382    pub(super) fn as_str(self) -> &'static str {
383        match self {
384            LlmErrorClass::RateLimit => "rate_limit",
385            LlmErrorClass::Timeout => "timeout",
386            LlmErrorClass::Connection => "connection",
387            LlmErrorClass::ServerUnavailable => "server_unavailable",
388            LlmErrorClass::ServerError => "server_error",
389            LlmErrorClass::Auth => "auth",
390            LlmErrorClass::ClientRequest => "client_request",
391            LlmErrorClass::Unknown => "unknown",
392        }
393    }
394}
395
396fn classify_llm_error_message(message: &str) -> LlmErrorClass {
397    let lower = message.to_ascii_lowercase();
398    if ["429", "too many requests", "rate limit"]
399        .iter()
400        .any(|p| lower.contains(p))
401    {
402        return LlmErrorClass::RateLimit;
403    }
404    if ["timeout", "timed out"].iter().any(|p| lower.contains(p)) {
405        return LlmErrorClass::Timeout;
406    }
407    if [
408        "connection",
409        "network",
410        "reset by peer",
411        "broken pipe",
412        "eof",
413        "connection refused",
414        "error sending request for url",
415    ]
416    .iter()
417    .any(|p| lower.contains(p))
418    {
419        return LlmErrorClass::Connection;
420    }
421    if ["503", "service unavailable", "unavailable", "temporar"]
422        .iter()
423        .any(|p| lower.contains(p))
424    {
425        return LlmErrorClass::ServerUnavailable;
426    }
427    if [
428        "500",
429        "502",
430        "504",
431        "server error",
432        "bad gateway",
433        "gateway timeout",
434    ]
435    .iter()
436    .any(|p| lower.contains(p))
437    {
438        return LlmErrorClass::ServerError;
439    }
440    if ["401", "403", "unauthorized", "forbidden", "invalid api key"]
441        .iter()
442        .any(|p| lower.contains(p))
443    {
444        return LlmErrorClass::Auth;
445    }
446    if ["400", "404", "422", "invalid_request", "bad request"]
447        .iter()
448        .any(|p| lower.contains(p))
449    {
450        return LlmErrorClass::ClientRequest;
451    }
452    LlmErrorClass::Unknown
453}
454
455fn classify_status_code(status_code: u16) -> LlmErrorClass {
456    match status_code {
457        408 => LlmErrorClass::Timeout,
458        429 => LlmErrorClass::RateLimit,
459        401 | 403 => LlmErrorClass::Auth,
460        400 | 404 | 422 => LlmErrorClass::ClientRequest,
461        503 => LlmErrorClass::ServerUnavailable,
462        500..=599 => LlmErrorClass::ServerError,
463        400..=499 => LlmErrorClass::ClientRequest,
464        _ => LlmErrorClass::Unknown,
465    }
466}
467
468fn classify_error_chain(error: &(dyn std::error::Error + 'static)) -> Option<LlmErrorClass> {
469    let mut current = Some(error);
470    while let Some(err) = current {
471        if let Some(io_error) = err.downcast_ref::<std::io::Error>() {
472            use std::io::ErrorKind;
473
474            let class = match io_error.kind() {
475                ErrorKind::TimedOut => Some(LlmErrorClass::Timeout),
476                ErrorKind::ConnectionAborted
477                | ErrorKind::ConnectionRefused
478                | ErrorKind::ConnectionReset
479                | ErrorKind::BrokenPipe
480                | ErrorKind::NotConnected
481                | ErrorKind::UnexpectedEof => Some(LlmErrorClass::Connection),
482                _ => None,
483            };
484            if class.is_some() {
485                return class;
486            }
487        }
488        current = err.source();
489    }
490    None
491}
492
493fn classify_webc_error(error: &genai::webc::Error) -> LlmErrorClass {
494    match error {
495        genai::webc::Error::ResponseFailedStatus { status, .. } => {
496            classify_status_code(status.as_u16())
497        }
498        genai::webc::Error::Reqwest(err) => classify_error_chain(err)
499            .unwrap_or_else(|| classify_llm_error_message(&err.to_string())),
500        _ => classify_llm_error_message(&error.to_string()),
501    }
502}
503
504/// Classify a provider error event body by parsing its structured fields.
505///
506/// Provider error events (OpenAI, Anthropic, Gemini) embed a JSON body with a
507/// `type` field that describes the error category. This function extracts that
508/// field and maps it to an [`LlmErrorClass`] without relying on keyword
509/// matching against stringified content.
510///
511/// Known error type strings:
512/// - OpenAI: `"server_error"`, `"rate_limit_error"`, `"invalid_request_error"`, …
513/// - Anthropic: `"overloaded_error"`, `"api_error"`, `"authentication_error"`, …
514fn classify_chat_response_body(body: &serde_json::Value) -> LlmErrorClass {
515    // Collect candidate type strings from both the top-level and nested error
516    // object.  OpenAI streams extract the inner error, so body["type"] is
517    // already the error type.  Anthropic / Gemini may wrap it in an envelope
518    // where body["type"] == "error" and the real type lives at
519    // body["error"]["type"].
520    let top_type = body.get("type").and_then(|v| v.as_str());
521    let nested_type = body
522        .get("error")
523        .and_then(|e| e.get("type"))
524        .and_then(|v| v.as_str());
525
526    // Try the nested (more specific) type first when the top-level type is a
527    // generic envelope marker like "error".
528    let candidates: &[&str] = match (top_type, nested_type) {
529        (_, Some(inner)) => &[inner, top_type.unwrap_or("")],
530        (Some(top), None) => &[top],
531        (None, None) => &[],
532    };
533
534    for t in candidates {
535        let lower = t.to_ascii_lowercase();
536        if lower.contains("rate_limit") {
537            return LlmErrorClass::RateLimit;
538        }
539        if lower.contains("overloaded") || lower.contains("unavailable") {
540            return LlmErrorClass::ServerUnavailable;
541        }
542        if lower.contains("timeout") {
543            return LlmErrorClass::Timeout;
544        }
545        if lower.contains("server_error") || lower.contains("api_error") {
546            return LlmErrorClass::ServerError;
547        }
548        if lower.contains("authentication") || lower.contains("permission") {
549            return LlmErrorClass::Auth;
550        }
551        if lower.contains("invalid_request") || lower.contains("not_found") {
552            return LlmErrorClass::ClientRequest;
553        }
554    }
555
556    // Fallback: check for an HTTP status code in the body (some providers
557    // include a numeric `status` or `code` field).
558    let status_code = body
559        .get("status")
560        .or_else(|| body.get("code"))
561        .and_then(|v| v.as_u64())
562        .and_then(|v| u16::try_from(v).ok());
563
564    if let Some(code) = status_code {
565        let class = classify_status_code(code);
566        if class != LlmErrorClass::Unknown {
567            return class;
568        }
569    }
570
571    LlmErrorClass::Unknown
572}
573
574pub(super) fn classify_llm_error(error: &genai::Error) -> LlmErrorClass {
575    match error {
576        genai::Error::HttpError { status, .. } => classify_status_code(status.as_u16()),
577        genai::Error::WebStream { cause, error, .. } => classify_error_chain(error.as_ref())
578            .unwrap_or_else(|| classify_llm_error_message(cause)),
579        genai::Error::WebAdapterCall { webc_error, .. }
580        | genai::Error::WebModelCall { webc_error, .. } => classify_webc_error(webc_error),
581        genai::Error::Internal(message) => classify_llm_error_message(message),
582
583        // Mid-stream JSON parse failure — almost always caused by truncated /
584        // corrupted SSE data due to a transient network issue.
585        genai::Error::StreamParse { .. } => LlmErrorClass::Connection,
586
587        // Provider returned no response body at all — treat as server-side fault.
588        genai::Error::NoChatResponse { .. } => LlmErrorClass::ServerError,
589
590        // Provider streamed an explicit error event with a structured body.
591        genai::Error::ChatResponse { body, .. } => classify_chat_response_body(body),
592
593        // Auth configuration errors from genai itself (not HTTP 401/403).
594        genai::Error::RequiresApiKey { .. }
595        | genai::Error::NoAuthResolver { .. }
596        | genai::Error::NoAuthData { .. } => LlmErrorClass::Auth,
597
598        other => classify_llm_error_message(&other.to_string()),
599    }
600}
601
602#[cfg(test)]
603pub(super) fn is_retryable_llm_error(error: &genai::Error) -> bool {
604    classify_llm_error(error).is_retryable()
605}
606
607static RETRY_BACKOFF_ENTROPY: AtomicU64 = AtomicU64::new(0x9e37_79b9_7f4a_7c15);
608
609#[derive(Debug, Clone, Default)]
610pub(super) struct RetryBackoffWindow {
611    started_at: Option<Instant>,
612}
613
614impl RetryBackoffWindow {
615    pub(super) fn elapsed_ms(&mut self) -> u64 {
616        self.started_at
617            .get_or_insert_with(Instant::now)
618            .elapsed()
619            .as_millis()
620            .try_into()
621            .unwrap_or(u64::MAX)
622    }
623
624    pub(super) fn reset(&mut self) {
625        self.started_at = None;
626    }
627}
628
629#[derive(Debug, Clone, Copy, PartialEq, Eq)]
630pub(super) enum RetryBackoffOutcome {
631    Completed,
632    Cancelled,
633    BudgetExhausted,
634}
635
636fn mix_retry_entropy(mut entropy: u64) -> u64 {
637    entropy = entropy.wrapping_add(0x9e37_79b9_7f4a_7c15);
638    entropy = (entropy ^ (entropy >> 30)).wrapping_mul(0xbf58_476d_1ce4_e5b9);
639    entropy = (entropy ^ (entropy >> 27)).wrapping_mul(0x94d0_49bb_1331_11eb);
640    entropy ^ (entropy >> 31)
641}
642
643fn next_retry_entropy() -> u64 {
644    let counter = RETRY_BACKOFF_ENTROPY.fetch_add(0x9e37_79b9_7f4a_7c15, Ordering::Relaxed);
645    let now = SystemTime::now()
646        .duration_since(UNIX_EPOCH)
647        .map(|duration| duration.as_nanos() as u64)
648        .unwrap_or(counter);
649    mix_retry_entropy(counter ^ now)
650}
651
652pub(super) fn retry_base_backoff_ms(policy: &LlmRetryPolicy, retry_attempt: usize) -> u64 {
653    let initial = policy.initial_backoff_ms;
654    let cap = policy.max_backoff_ms.max(policy.initial_backoff_ms);
655    let attempt = retry_attempt.max(1);
656    if attempt == 1 {
657        return initial.min(cap);
658    }
659    let shift = (attempt - 1).min(20) as u32;
660    let factor = 1u64.checked_shl(shift).unwrap_or(u64::MAX);
661    initial.saturating_mul(factor).min(cap)
662}
663
664fn jittered_backoff_ms(policy: &LlmRetryPolicy, retry_attempt: usize, entropy: u64) -> u64 {
665    let cap = policy.max_backoff_ms.max(policy.initial_backoff_ms);
666    let base_ms = retry_base_backoff_ms(policy, retry_attempt).min(cap);
667    let jitter_percent = policy.backoff_jitter_percent.min(100) as u64;
668    if jitter_percent == 0 || base_ms == 0 {
669        return base_ms;
670    }
671
672    let jitter_window = base_ms.saturating_mul(jitter_percent) / 100;
673    let lower = base_ms.saturating_sub(jitter_window);
674    let upper = base_ms.saturating_add(jitter_window).min(cap);
675    if upper <= lower {
676        return lower;
677    }
678
679    let span = upper - lower;
680    lower + (mix_retry_entropy(entropy) % (span.saturating_add(1)))
681}
682
683pub(super) fn retry_backoff_plan_ms(
684    policy: &LlmRetryPolicy,
685    retry_attempt: usize,
686    elapsed_ms: u64,
687    entropy: u64,
688) -> Option<u64> {
689    let wait_ms = jittered_backoff_ms(policy, retry_attempt, entropy);
690    match policy.max_retry_window_ms {
691        Some(budget_ms) if elapsed_ms.saturating_add(wait_ms) > budget_ms => None,
692        _ => Some(wait_ms),
693    }
694}
695
696pub(super) async fn wait_retry_backoff(
697    agent: &dyn Agent,
698    retry_attempt: usize,
699    retry_window: &mut RetryBackoffWindow,
700    run_cancellation_token: Option<&RunCancellationToken>,
701) -> RetryBackoffOutcome {
702    let elapsed_ms = retry_window.elapsed_ms();
703    let Some(wait_ms) = retry_backoff_plan_ms(
704        agent.llm_retry_policy(),
705        retry_attempt,
706        elapsed_ms,
707        next_retry_entropy(),
708    ) else {
709        return RetryBackoffOutcome::BudgetExhausted;
710    };
711    tracing::debug!(
712        attempt = retry_attempt,
713        backoff_ms = wait_ms,
714        elapsed_ms = elapsed_ms,
715        "waiting before LLM retry"
716    );
717    match await_or_cancel(
718        run_cancellation_token,
719        tokio::time::sleep(std::time::Duration::from_millis(wait_ms)),
720    )
721    .await
722    {
723        CancelAware::Cancelled => RetryBackoffOutcome::Cancelled,
724        CancelAware::Value(_) => RetryBackoffOutcome::Completed,
725    }
726}
727
728pub(super) enum LlmAttemptOutcome<T> {
729    Success {
730        value: T,
731        model: String,
732        attempts: usize,
733    },
734    Cancelled,
735    Exhausted {
736        last_error: String,
737        last_error_class: Option<&'static str>,
738        attempts: usize,
739    },
740}
741
742fn is_run_cancelled(token: Option<&RunCancellationToken>) -> bool {
743    is_cancelled(token)
744}
745
746pub(super) fn step_tool_provider_for_run(
747    agent: &dyn Agent,
748    tools: HashMap<String, Arc<dyn Tool>>,
749) -> Arc<dyn StepToolProvider> {
750    agent.step_tool_provider().unwrap_or_else(|| {
751        Arc::new(StaticStepToolProvider::new(tools)) as Arc<dyn StepToolProvider>
752    })
753}
754
755pub(super) fn llm_executor_for_run(agent: &dyn Agent) -> Arc<dyn LlmExecutor> {
756    agent
757        .llm_executor()
758        .unwrap_or_else(|| Arc::new(GenaiLlmExecutor::new(Client::default())))
759}
760
761pub(super) async fn resolve_step_tool_snapshot(
762    step_tool_provider: &Arc<dyn StepToolProvider>,
763    run_ctx: &RunContext,
764) -> Result<StepToolSnapshot, AgentLoopError> {
765    step_tool_provider
766        .provide(StepToolInput { state: run_ctx })
767        .await
768}
769
770fn mark_step_completed(run_state: &mut LoopRunState) {
771    run_state.completed_steps += 1;
772}
773
774fn stitch_response_text(prefix: &str, segment: &str) -> String {
775    if prefix.is_empty() {
776        return segment.to_string();
777    }
778    if segment.is_empty() {
779        return prefix.to_string();
780    }
781    let mut stitched = String::with_capacity(prefix.len() + segment.len());
782    stitched.push_str(prefix);
783    stitched.push_str(segment);
784    stitched
785}
786
787fn extend_response_prefix(prefix: &mut String, segment: &str) {
788    if !segment.is_empty() {
789        prefix.push_str(segment);
790    }
791}
792
793fn build_loop_outcome(
794    run_ctx: RunContext,
795    termination: TerminationReason,
796    response: Option<String>,
797    run_state: &LoopRunState,
798    failure: Option<outcome::LoopFailure>,
799) -> LoopOutcome {
800    LoopOutcome {
801        run_ctx,
802        termination,
803        response: response.filter(|text| !text.is_empty()),
804        usage: run_state.usage(),
805        stats: run_state.stats(),
806        failure,
807    }
808}
809
810pub(super) async fn run_llm_with_retry_and_fallback<T, Invoke, Fut>(
811    agent: &dyn Agent,
812    run_cancellation_token: Option<&RunCancellationToken>,
813    retry_current_model: bool,
814    start_model: Option<&str>,
815    unknown_error: &str,
816    mut invoke: Invoke,
817) -> LlmAttemptOutcome<T>
818where
819    Invoke: FnMut(String) -> Fut,
820    Fut: std::future::Future<Output = genai::Result<T>>,
821{
822    let mut last_llm_error = unknown_error.to_string();
823    let mut last_error_class: Option<&'static str> = None;
824    let model_candidates = effective_llm_models_from(agent, start_model);
825    let max_attempts = llm_retry_attempts(agent);
826    let mut total_attempts = 0usize;
827    let mut retry_window = RetryBackoffWindow::default();
828
829    'models: for model in model_candidates {
830        for attempt in 1..=max_attempts {
831            total_attempts = total_attempts.saturating_add(1);
832            let response_res =
833                match await_or_cancel(run_cancellation_token, invoke(model.clone())).await {
834                    CancelAware::Cancelled => return LlmAttemptOutcome::Cancelled,
835                    CancelAware::Value(resp) => resp,
836                };
837
838            match response_res {
839                Ok(value) => {
840                    if total_attempts > 1 {
841                        tracing::info!(
842                            model = %model,
843                            attempts = total_attempts,
844                            "LLM call succeeded after retries"
845                        );
846                    }
847                    return LlmAttemptOutcome::Success {
848                        value,
849                        model,
850                        attempts: total_attempts,
851                    };
852                }
853                Err(e) => {
854                    let error_class = classify_llm_error(&e);
855                    last_error_class = Some(error_class.as_str());
856                    let message = e.to_string();
857                    last_llm_error =
858                        format!("model='{model}' attempt={attempt}/{max_attempts}: {message}");
859                    let can_retry_same_model =
860                        retry_current_model && attempt < max_attempts && error_class.is_retryable();
861                    tracing::warn!(
862                        model = %model,
863                        attempt = attempt,
864                        max_attempts = max_attempts,
865                        error_class = error_class.as_str(),
866                        retryable = can_retry_same_model,
867                        error = %message,
868                        "LLM call failed"
869                    );
870                    if can_retry_same_model {
871                        match wait_retry_backoff(
872                            agent,
873                            attempt,
874                            &mut retry_window,
875                            run_cancellation_token,
876                        )
877                        .await
878                        {
879                            RetryBackoffOutcome::Completed => continue,
880                            RetryBackoffOutcome::Cancelled => {
881                                return LlmAttemptOutcome::Cancelled;
882                            }
883                            RetryBackoffOutcome::BudgetExhausted => {
884                                tracing::warn!(
885                                    model = %model,
886                                    attempt = attempt,
887                                    "LLM retry budget exhausted"
888                                );
889                                last_llm_error =
890                                    format!("{last_llm_error} (retry budget exhausted)");
891                                break 'models;
892                            }
893                        }
894                    }
895                    break;
896                }
897            }
898        }
899    }
900
901    LlmAttemptOutcome::Exhausted {
902        last_error: last_llm_error,
903        last_error_class,
904        attempts: total_attempts,
905    }
906}
907
908pub(super) async fn run_step_prepare_phases(
909    run_ctx: &RunContext,
910    tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
911    agent: &dyn Agent,
912) -> Result<
913    (
914        Vec<Message>,
915        Vec<String>,
916        RunAction,
917        Vec<std::sync::Arc<dyn tirea_contract::runtime::inference::InferenceRequestTransform>>,
918        Vec<TrackedPatch>,
919        Vec<tirea_contract::SerializedAction>,
920    ),
921    AgentLoopError,
922> {
923    let system_prompt = agent.system_prompt().to_string();
924    let ((messages, filtered_tools, run_action, transforms), pending, actions) =
925        plugin_runtime::run_phase_block(
926            run_ctx,
927            tool_descriptors,
928            agent,
929            &[Phase::StepStart, Phase::BeforeInference],
930            |_| {},
931            |step| inference_inputs_from_step(step, &system_prompt),
932        )
933        .await?;
934    Ok((
935        messages,
936        filtered_tools,
937        run_action,
938        transforms,
939        pending,
940        actions,
941    ))
942}
943
944pub(super) struct PreparedStep {
945    pub(super) messages: Vec<Message>,
946    pub(super) filtered_tools: Vec<String>,
947    pub(super) run_action: RunAction,
948    pub(super) pending_patches: Vec<TrackedPatch>,
949    pub(super) serialized_actions: Vec<tirea_contract::SerializedAction>,
950    pub(super) request_transforms:
951        Vec<std::sync::Arc<dyn tirea_contract::runtime::inference::InferenceRequestTransform>>,
952}
953
954pub(super) async fn prepare_step_execution(
955    run_ctx: &RunContext,
956    tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
957    agent: &dyn Agent,
958) -> Result<PreparedStep, AgentLoopError> {
959    let (messages, filtered_tools, run_action, transforms, pending, actions) =
960        run_step_prepare_phases(run_ctx, tool_descriptors, agent).await?;
961    Ok(PreparedStep {
962        messages,
963        filtered_tools,
964        run_action,
965        pending_patches: pending,
966        serialized_actions: actions,
967        request_transforms: transforms,
968    })
969}
970
971pub(super) async fn apply_llm_error_cleanup(
972    run_ctx: &mut RunContext,
973    tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
974    agent: &dyn Agent,
975    error_type: &'static str,
976    message: String,
977    error_class: Option<&str>,
978) -> Result<(), AgentLoopError> {
979    plugin_runtime::emit_cleanup_phases(
980        run_ctx,
981        tool_descriptors,
982        agent,
983        error_type,
984        message,
985        error_class,
986    )
987    .await
988}
989
990pub(super) async fn complete_step_after_inference(
991    run_ctx: &mut RunContext,
992    result: &StreamResult,
993    step_meta: MessageMetadata,
994    assistant_message_id: Option<String>,
995    tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
996    agent: &dyn Agent,
997) -> Result<RunAction, AgentLoopError> {
998    let (run_action, pending, actions) = plugin_runtime::run_phase_block(
999        run_ctx,
1000        tool_descriptors,
1001        agent,
1002        &[Phase::AfterInference],
1003        |step| {
1004            use crate::contracts::runtime::inference::LLMResponse;
1005            step.llm_response = Some(LLMResponse::success(result.clone()));
1006        },
1007        |step| step.run_action(),
1008    )
1009    .await?;
1010    run_ctx.add_thread_patches(pending);
1011    run_ctx.add_serialized_actions(actions);
1012
1013    let assistant = assistant_turn_message(result, step_meta, assistant_message_id);
1014    run_ctx.add_message(Arc::new(assistant));
1015
1016    let (pending, actions) =
1017        plugin_runtime::emit_phase_block(Phase::StepEnd, run_ctx, tool_descriptors, agent, |_| {})
1018            .await?;
1019    run_ctx.add_thread_patches(pending);
1020    run_ctx.add_serialized_actions(actions);
1021    Ok(run_action)
1022}
1023
1024/// Emit events for a pending tool-call projection.
1025pub(super) fn pending_tool_events(call: &SuspendedCall) -> Vec<AgentEvent> {
1026    vec![
1027        AgentEvent::ToolCallStart {
1028            id: call.ticket.pending.id.clone(),
1029            name: call.ticket.pending.name.clone(),
1030        },
1031        AgentEvent::ToolCallReady {
1032            id: call.ticket.pending.id.clone(),
1033            name: call.ticket.pending.name.clone(),
1034            arguments: call.ticket.pending.arguments.clone(),
1035        },
1036    ]
1037}
1038
1039pub(super) fn has_suspended_calls(run_ctx: &RunContext) -> bool {
1040    !suspended_calls_from_ctx(run_ctx).is_empty()
1041}
1042
1043pub(super) fn suspended_call_ids(run_ctx: &RunContext) -> HashSet<String> {
1044    suspended_calls_from_ctx(run_ctx).into_keys().collect()
1045}
1046
1047pub(super) fn newly_suspended_call_ids(
1048    run_ctx: &RunContext,
1049    baseline_ids: &HashSet<String>,
1050) -> HashSet<String> {
1051    suspended_calls_from_ctx(run_ctx)
1052        .into_keys()
1053        .filter(|id| !baseline_ids.contains(id))
1054        .collect()
1055}
1056
1057pub(super) fn suspended_call_pending_events(run_ctx: &RunContext) -> Vec<AgentEvent> {
1058    let mut calls: Vec<SuspendedCall> = suspended_calls_from_ctx(run_ctx).into_values().collect();
1059    calls.sort_by(|left, right| left.call_id.cmp(&right.call_id));
1060    calls
1061        .into_iter()
1062        .flat_map(|call| pending_tool_events(&call))
1063        .collect()
1064}
1065
1066pub(super) fn suspended_call_pending_events_for_ids(
1067    run_ctx: &RunContext,
1068    call_ids: &HashSet<String>,
1069) -> Vec<AgentEvent> {
1070    if call_ids.is_empty() {
1071        return Vec::new();
1072    }
1073    let mut calls: Vec<SuspendedCall> = suspended_calls_from_ctx(run_ctx)
1074        .into_iter()
1075        .filter_map(|(call_id, call)| call_ids.contains(&call_id).then_some(call))
1076        .collect();
1077    calls.sort_by(|left, right| left.call_id.cmp(&right.call_id));
1078    calls
1079        .into_iter()
1080        .flat_map(|call| pending_tool_events(&call))
1081        .collect()
1082}
1083
1084pub(super) struct ToolExecutionContext {
1085    pub(super) state: serde_json::Value,
1086    pub(super) run_config: tirea_contract::RunConfig,
1087}
1088
1089pub(super) fn prepare_tool_execution_context(
1090    run_ctx: &RunContext,
1091) -> Result<ToolExecutionContext, AgentLoopError> {
1092    let state = run_ctx
1093        .snapshot()
1094        .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1095    let run_config = scope_with_tool_caller_context(run_ctx, &state)?;
1096    Ok(ToolExecutionContext { state, run_config })
1097}
1098
1099pub(super) async fn finalize_run_end(
1100    run_ctx: &mut RunContext,
1101    tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
1102    agent: &dyn Agent,
1103) {
1104    plugin_runtime::emit_run_end_phase(run_ctx, tool_descriptors, agent).await
1105}
1106
1107#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1108enum RunFinishedCommitPolicy {
1109    Required,
1110    BestEffort,
1111}
1112
1113fn normalize_termination_for_suspended_calls(
1114    run_ctx: &RunContext,
1115    termination: TerminationReason,
1116    response: Option<String>,
1117) -> (TerminationReason, Option<String>) {
1118    let final_termination = if !matches!(
1119        termination,
1120        TerminationReason::Error(_) | TerminationReason::Cancelled
1121    ) && has_suspended_calls(run_ctx)
1122    {
1123        TerminationReason::Suspended
1124    } else {
1125        termination
1126    };
1127    let final_response = if final_termination == TerminationReason::Suspended {
1128        None
1129    } else {
1130        response
1131    };
1132    (final_termination, final_response)
1133}
1134
1135async fn persist_run_termination(
1136    run_ctx: &mut RunContext,
1137    termination: &TerminationReason,
1138    tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
1139    agent: &dyn Agent,
1140    execution_ctx: &RunExecutionContext,
1141    pending_delta_commit: &PendingDeltaCommitContext<'_>,
1142    run_finished_commit_policy: RunFinishedCommitPolicy,
1143) -> Result<(), AgentLoopError> {
1144    sync_run_lifecycle_for_termination_with_context(run_ctx, execution_ctx, termination)?;
1145    finalize_run_end(run_ctx, tool_descriptors, agent).await;
1146    if let Err(error) = pending_delta_commit
1147        .commit_run_finished(run_ctx, termination)
1148        .await
1149    {
1150        match run_finished_commit_policy {
1151            RunFinishedCommitPolicy::Required => return Err(error),
1152            RunFinishedCommitPolicy::BestEffort => {
1153                tracing::warn!(error = %error, "failed to commit run-finished delta");
1154            }
1155        }
1156    }
1157    Ok(())
1158}
1159
1160fn stream_result_from_chat_response(response: &genai::chat::ChatResponse) -> StreamResult {
1161    let text = response
1162        .first_text()
1163        .map(|s| s.to_string())
1164        .unwrap_or_default();
1165    let tool_calls: Vec<crate::contracts::thread::ToolCall> = response
1166        .tool_calls()
1167        .into_iter()
1168        .map(|tc| {
1169            crate::contracts::thread::ToolCall::new(
1170                &tc.call_id,
1171                &tc.fn_name,
1172                tc.fn_arguments.clone(),
1173            )
1174        })
1175        .collect();
1176
1177    let usage = Some(crate::runtime::streaming::token_usage_from_genai(
1178        &response.usage,
1179    ));
1180    let stop_reason = response
1181        .stop_reason
1182        .as_ref()
1183        .and_then(crate::runtime::streaming::map_genai_stop_reason)
1184        .or({
1185            if !tool_calls.is_empty() {
1186                Some(tirea_contract::runtime::inference::StopReason::ToolUse)
1187            } else {
1188                Some(tirea_contract::runtime::inference::StopReason::EndTurn)
1189            }
1190        });
1191    StreamResult {
1192        text,
1193        tool_calls,
1194        usage,
1195        stop_reason,
1196    }
1197}
1198
1199fn assistant_turn_message(
1200    result: &StreamResult,
1201    step_meta: MessageMetadata,
1202    message_id: Option<String>,
1203) -> Message {
1204    let mut msg = if result.tool_calls.is_empty() {
1205        assistant_message(&result.text)
1206    } else {
1207        assistant_tool_calls(&result.text, result.tool_calls.clone())
1208    }
1209    .with_metadata(step_meta);
1210    if let Some(message_id) = message_id {
1211        msg = msg.with_id(message_id);
1212    }
1213    msg
1214}
1215
1216struct RunStartDrainOutcome {
1217    events: Vec<AgentEvent>,
1218    replayed: bool,
1219}
1220
1221fn decision_result_value(action: &ResumeDecisionAction, result: &Value) -> serde_json::Value {
1222    if result.is_null() {
1223        serde_json::Value::Bool(matches!(action, ResumeDecisionAction::Resume))
1224    } else {
1225        result.clone()
1226    }
1227}
1228
1229fn runtime_resume_inputs(run_ctx: &RunContext) -> HashMap<String, ToolCallResume> {
1230    let mut decisions = HashMap::new();
1231    for (call_id, state) in tool_call_states_from_ctx(run_ctx) {
1232        if !matches!(state.status, ToolCallStatus::Resuming) {
1233            continue;
1234        }
1235        let Some(mut resume) = state.resume else {
1236            continue;
1237        };
1238        if resume.decision_id.trim().is_empty() {
1239            resume.decision_id = call_id.clone();
1240        }
1241        decisions.insert(call_id, resume);
1242    }
1243    decisions
1244}
1245
1246fn settle_orphan_resuming_tool_states(
1247    run_ctx: &mut RunContext,
1248    suspended: &HashMap<String, SuspendedCall>,
1249    resumes: &HashMap<String, ToolCallResume>,
1250) -> Result<bool, AgentLoopError> {
1251    let states = tool_call_states_from_ctx(run_ctx);
1252    let mut changed = false;
1253
1254    for (call_id, resume) in resumes {
1255        if suspended.contains_key(call_id) {
1256            continue;
1257        }
1258        let Some(state) = states.get(call_id).cloned() else {
1259            continue;
1260        };
1261        let target_status = match &resume.action {
1262            ResumeDecisionAction::Cancel => ToolCallStatus::Cancelled,
1263            ResumeDecisionAction::Resume => ToolCallStatus::Failed,
1264        };
1265        if state.status == target_status && state.resume.as_ref() == Some(resume) {
1266            continue;
1267        }
1268
1269        let Some(next_state) = transition_tool_call_state(
1270            Some(state.clone()),
1271            ToolCallStateSeed {
1272                call_id: call_id.as_str(),
1273                tool_name: state.tool_name.as_str(),
1274                arguments: &state.arguments,
1275                status: state.status,
1276                resume_token: state.resume_token.clone(),
1277            },
1278            ToolCallStateTransition {
1279                status: target_status,
1280                resume_token: state.resume_token.clone(),
1281                resume: Some(resume.clone()),
1282                updated_at: current_unix_millis(),
1283            },
1284        ) else {
1285            continue;
1286        };
1287
1288        let base_state = run_ctx
1289            .snapshot()
1290            .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1291        let patch = upsert_tool_call_state(&base_state, call_id, next_state)?;
1292        if patch.patch().is_empty() {
1293            continue;
1294        }
1295        run_ctx.add_thread_patch(patch);
1296        changed = true;
1297    }
1298
1299    Ok(changed)
1300}
1301
1302fn all_suspended_calls_have_resume(
1303    suspended: &HashMap<String, SuspendedCall>,
1304    resumes: &HashMap<String, ToolCallResume>,
1305) -> bool {
1306    suspended
1307        .keys()
1308        .all(|call_id| resumes.contains_key(call_id))
1309}
1310
1311async fn drain_resuming_tool_calls_and_replay(
1312    run_ctx: &mut RunContext,
1313    tools: &HashMap<String, Arc<dyn Tool>>,
1314    agent: &dyn Agent,
1315    tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
1316) -> Result<RunStartDrainOutcome, AgentLoopError> {
1317    let decisions = runtime_resume_inputs(run_ctx);
1318    if decisions.is_empty() {
1319        return Ok(RunStartDrainOutcome {
1320            events: Vec::new(),
1321            replayed: false,
1322        });
1323    }
1324
1325    let suspended = suspended_calls_from_ctx(run_ctx);
1326    let mut state_changed = false;
1327    if settle_orphan_resuming_tool_states(run_ctx, &suspended, &decisions)? {
1328        state_changed = true;
1329    }
1330    if suspended.is_empty() {
1331        if state_changed {
1332            let snapshot = run_ctx
1333                .snapshot()
1334                .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1335            return Ok(RunStartDrainOutcome {
1336                events: vec![AgentEvent::StateSnapshot { snapshot }],
1337                replayed: false,
1338            });
1339        }
1340        return Ok(RunStartDrainOutcome {
1341            events: Vec::new(),
1342            replayed: false,
1343        });
1344    }
1345
1346    if matches!(
1347        agent.tool_executor().decision_replay_policy(),
1348        DecisionReplayPolicy::BatchAllSuspended
1349    ) && !all_suspended_calls_have_resume(&suspended, &decisions)
1350    {
1351        if state_changed {
1352            let snapshot = run_ctx
1353                .snapshot()
1354                .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1355            return Ok(RunStartDrainOutcome {
1356                events: vec![AgentEvent::StateSnapshot { snapshot }],
1357                replayed: false,
1358            });
1359        }
1360        return Ok(RunStartDrainOutcome {
1361            events: Vec::new(),
1362            replayed: false,
1363        });
1364    }
1365
1366    let mut events = Vec::new();
1367    let mut decision_ids: Vec<String> = decisions.keys().cloned().collect();
1368    decision_ids.sort();
1369
1370    let mut replayed = false;
1371
1372    for call_id in decision_ids {
1373        let Some(suspended_call) = suspended.get(&call_id).cloned() else {
1374            continue;
1375        };
1376        let Some(decision) = decisions.get(&call_id).cloned() else {
1377            continue;
1378        };
1379        replayed = true;
1380        let decision_result = decision_result_value(&decision.action, &decision.result);
1381        let resume_payload = ToolCallResume {
1382            result: decision_result.clone(),
1383            ..decision.clone()
1384        };
1385        events.push(AgentEvent::ToolCallResumed {
1386            target_id: suspended_call.call_id.clone(),
1387            result: decision_result.clone(),
1388        });
1389
1390        match decision.action {
1391            ResumeDecisionAction::Cancel => {
1392                let cancel_reason = resume_payload.reason.clone();
1393                if upsert_tool_call_lifecycle_state(
1394                    run_ctx,
1395                    &suspended_call,
1396                    ToolCallStatus::Cancelled,
1397                    Some(resume_payload),
1398                )? {
1399                    state_changed = true;
1400                }
1401                events.push(append_denied_tool_result_message(
1402                    run_ctx,
1403                    &suspended_call.call_id,
1404                    Some(&suspended_call.tool_name),
1405                    cancel_reason.as_deref(),
1406                ));
1407                // Cancel path skips tool execution, so no automatic scope
1408                // cleanup runs. Delete just the suspended_call entry; keep
1409                // tool_call_state (Cancelled status) for audit.
1410                let cleanup_path = format!(
1411                    "__tool_call_scope.{}.suspended_call",
1412                    suspended_call.call_id
1413                );
1414                let cleanup_patch = tirea_state::Patch::with_ops(vec![tirea_state::Op::delete(
1415                    tirea_state::parse_path(&cleanup_path),
1416                )]);
1417                let tracked = tirea_state::TrackedPatch::new(cleanup_patch)
1418                    .with_source("framework:scope_cleanup");
1419                if !tracked.patch().is_empty() {
1420                    state_changed = true;
1421                    run_ctx.add_thread_patch(tracked);
1422                }
1423            }
1424            ResumeDecisionAction::Resume => {
1425                if upsert_tool_call_lifecycle_state(
1426                    run_ctx,
1427                    &suspended_call,
1428                    ToolCallStatus::Resuming,
1429                    Some(resume_payload.clone()),
1430                )? {
1431                    state_changed = true;
1432                }
1433                let Some(tool_call) = replay_tool_call_for_resolution(
1434                    run_ctx,
1435                    &suspended_call,
1436                    &ToolCallDecision {
1437                        target_id: suspended_call.call_id.clone(),
1438                        resume: resume_payload.clone(),
1439                    },
1440                ) else {
1441                    continue;
1442                };
1443                let state = run_ctx
1444                    .snapshot()
1445                    .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1446                let tool = tools.get(&tool_call.name).cloned();
1447                let rt_for_replay = scope_with_tool_caller_context(run_ctx, &state)?;
1448                let replay_phase_ctx = ToolPhaseContext {
1449                    tool_descriptors,
1450                    agent_behavior: Some(agent.behavior()),
1451                    activity_manager: tirea_contract::runtime::activity::NoOpActivityManager::arc(),
1452                    run_config: &rt_for_replay,
1453                    thread_id: run_ctx.thread_id(),
1454                    thread_messages: run_ctx.messages(),
1455                    cancellation_token: None,
1456                };
1457                let replay_result = execute_single_tool_with_phases_deferred(
1458                    tool.as_deref(),
1459                    &tool_call,
1460                    &state,
1461                    &replay_phase_ctx,
1462                )
1463                .await?;
1464
1465                let replay_msg_id = gen_message_id();
1466                let replay_msg = tool_response(&tool_call.id, &replay_result.execution.result)
1467                    .with_id(replay_msg_id.clone());
1468                run_ctx.add_message(Arc::new(replay_msg));
1469
1470                if !replay_result.reminders.is_empty() {
1471                    let msgs: Vec<Arc<Message>> = replay_result
1472                        .reminders
1473                        .iter()
1474                        .map(|reminder| {
1475                            Arc::new(Message::internal_system(format!(
1476                                "<system-reminder>{}</system-reminder>",
1477                                reminder
1478                            )))
1479                        })
1480                        .collect();
1481                    run_ctx.add_messages(msgs);
1482                }
1483
1484                if let Some(patch) = replay_result.execution.patch.clone() {
1485                    state_changed = true;
1486                    run_ctx.add_thread_patch(patch);
1487                }
1488                if !replay_result.pending_patches.is_empty() {
1489                    state_changed = true;
1490                    run_ctx.add_thread_patches(replay_result.pending_patches.clone());
1491                }
1492                events.push(AgentEvent::ToolCallDone {
1493                    id: tool_call.id.clone(),
1494                    result: replay_result.execution.result,
1495                    patch: replay_result.execution.patch,
1496                    message_id: replay_msg_id,
1497                });
1498
1499                if let Some(next_suspended_call) = replay_result.suspended_call.clone() {
1500                    let state = run_ctx
1501                        .snapshot()
1502                        .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1503                    let action = next_suspended_call.clone().into_state_action();
1504                    let patches = reduce_state_actions(
1505                        vec![action],
1506                        &state,
1507                        "agent_loop",
1508                        &ScopeContext::run(),
1509                    )
1510                    .map_err(|e| {
1511                        AgentLoopError::StateError(format!(
1512                            "failed to reduce suspended call action: {e}"
1513                        ))
1514                    })?;
1515                    for patch in patches {
1516                        if !patch.patch().is_empty() {
1517                            state_changed = true;
1518                            run_ctx.add_thread_patch(patch);
1519                        }
1520                    }
1521                    for event in pending_tool_events(&next_suspended_call) {
1522                        events.push(event);
1523                    }
1524                }
1525            }
1526        }
1527    }
1528
1529    // No explicit clear_suspended_call needed: terminal-outcome tool calls
1530    // have their entire `__tool_call_scope.<call_id>` subtree deleted by
1531    // automatic scope cleanup in execute_single_tool_with_phases_impl.
1532
1533    if state_changed {
1534        let snapshot = run_ctx
1535            .snapshot()
1536            .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1537        events.push(AgentEvent::StateSnapshot { snapshot });
1538    }
1539
1540    Ok(RunStartDrainOutcome { events, replayed })
1541}
1542
1543async fn drain_run_start_resume_replay(
1544    run_ctx: &mut RunContext,
1545    tools: &HashMap<String, Arc<dyn Tool>>,
1546    agent: &dyn Agent,
1547    tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
1548) -> Result<RunStartDrainOutcome, AgentLoopError> {
1549    drain_resuming_tool_calls_and_replay(run_ctx, tools, agent, tool_descriptors).await
1550}
1551
1552async fn commit_run_start_and_drain_replay(
1553    run_ctx: &mut RunContext,
1554    tools: &HashMap<String, Arc<dyn Tool>>,
1555    agent: &dyn Agent,
1556    active_tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
1557    pending_delta_commit: &PendingDeltaCommitContext<'_>,
1558) -> Result<RunStartDrainOutcome, AgentLoopError> {
1559    pending_delta_commit
1560        .commit(run_ctx, CheckpointReason::UserMessage, false)
1561        .await?;
1562
1563    let run_start_drain =
1564        drain_run_start_resume_replay(run_ctx, tools, agent, active_tool_descriptors).await?;
1565
1566    if run_start_drain.replayed {
1567        pending_delta_commit
1568            .commit(run_ctx, CheckpointReason::ToolResultsCommitted, false)
1569            .await?;
1570    }
1571
1572    Ok(run_start_drain)
1573}
1574
1575fn normalize_decision_tool_result(
1576    response: &serde_json::Value,
1577    fallback_arguments: &serde_json::Value,
1578) -> serde_json::Value {
1579    match response {
1580        serde_json::Value::Bool(_) => fallback_arguments.clone(),
1581        value => value.clone(),
1582    }
1583}
1584
1585fn denied_tool_result_for_call(
1586    run_ctx: &RunContext,
1587    call_id: &str,
1588    fallback_tool_name: Option<&str>,
1589    decision_reason: Option<&str>,
1590) -> ToolResult {
1591    let tool_name = fallback_tool_name
1592        .filter(|name| !name.is_empty())
1593        .map(str::to_string)
1594        .or_else(|| find_tool_call_in_messages(run_ctx, call_id).map(|call| call.name))
1595        .unwrap_or_else(|| "tool".to_string());
1596    let reason = decision_reason
1597        .map(str::to_string)
1598        .filter(|reason| !reason.trim().is_empty())
1599        .unwrap_or_else(|| "User denied the action".to_string());
1600    ToolResult::error(tool_name, reason)
1601}
1602
1603fn append_denied_tool_result_message(
1604    run_ctx: &mut RunContext,
1605    call_id: &str,
1606    fallback_tool_name: Option<&str>,
1607    decision_reason: Option<&str>,
1608) -> AgentEvent {
1609    let denied_result =
1610        denied_tool_result_for_call(run_ctx, call_id, fallback_tool_name, decision_reason);
1611    let message_id = gen_message_id();
1612    let denied_message = tool_response(call_id, &denied_result).with_id(message_id.clone());
1613    run_ctx.add_message(Arc::new(denied_message));
1614    AgentEvent::ToolCallDone {
1615        id: call_id.to_string(),
1616        result: denied_result,
1617        patch: None,
1618        message_id,
1619    }
1620}
1621
1622fn find_tool_call_in_messages(run_ctx: &RunContext, call_id: &str) -> Option<ToolCall> {
1623    run_ctx.messages().iter().rev().find_map(|message| {
1624        message
1625            .tool_calls
1626            .as_ref()
1627            .and_then(|calls| calls.iter().find(|call| call.id == call_id).cloned())
1628    })
1629}
1630
1631fn replay_tool_call_for_resolution(
1632    _run_ctx: &RunContext,
1633    suspended_call: &SuspendedCall,
1634    decision: &ToolCallDecision,
1635) -> Option<ToolCall> {
1636    if matches!(decision.resume.action, ResumeDecisionAction::Cancel) {
1637        return None;
1638    }
1639
1640    match suspended_call.ticket.resume_mode {
1641        ToolCallResumeMode::ReplayToolCall => Some(ToolCall::new(
1642            suspended_call.call_id.clone(),
1643            suspended_call.tool_name.clone(),
1644            suspended_call.arguments.clone(),
1645        )),
1646        ToolCallResumeMode::UseDecisionAsToolResult | ToolCallResumeMode::PassDecisionToTool => {
1647            Some(ToolCall::new(
1648                suspended_call.call_id.clone(),
1649                suspended_call.tool_name.clone(),
1650                normalize_decision_tool_result(&decision.resume.result, &suspended_call.arguments),
1651            ))
1652        }
1653    }
1654}
1655
1656fn upsert_tool_call_lifecycle_state(
1657    run_ctx: &mut RunContext,
1658    suspended_call: &SuspendedCall,
1659    status: ToolCallStatus,
1660    resume: Option<ToolCallResume>,
1661) -> Result<bool, AgentLoopError> {
1662    let current_state = tool_call_states_from_ctx(run_ctx).remove(&suspended_call.call_id);
1663    let Some(tool_state) = transition_tool_call_state(
1664        current_state,
1665        ToolCallStateSeed {
1666            call_id: &suspended_call.call_id,
1667            tool_name: &suspended_call.tool_name,
1668            arguments: &suspended_call.arguments,
1669            status: ToolCallStatus::Suspended,
1670            resume_token: Some(suspended_call.ticket.pending.id.clone()),
1671        },
1672        ToolCallStateTransition {
1673            status,
1674            resume_token: Some(suspended_call.ticket.pending.id.clone()),
1675            resume,
1676            updated_at: current_unix_millis(),
1677        },
1678    ) else {
1679        return Ok(false);
1680    };
1681
1682    let base_state = run_ctx
1683        .snapshot()
1684        .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1685    let patch = upsert_tool_call_state(&base_state, &suspended_call.call_id, tool_state)?;
1686    if patch.patch().is_empty() {
1687        return Ok(false);
1688    }
1689    run_ctx.add_thread_patch(patch);
1690    Ok(true)
1691}
1692
1693pub(super) fn resolve_suspended_call(
1694    run_ctx: &mut RunContext,
1695    response: &ToolCallDecision,
1696) -> Result<Option<DecisionReplayOutcome>, AgentLoopError> {
1697    let suspended_calls = suspended_calls_from_ctx(run_ctx);
1698    if suspended_calls.is_empty() {
1699        return Ok(None);
1700    }
1701
1702    let suspended_call = suspended_calls
1703        .get(&response.target_id)
1704        .cloned()
1705        .or_else(|| {
1706            suspended_calls
1707                .values()
1708                .find(|call| {
1709                    call.ticket.suspension.id == response.target_id
1710                        || call.ticket.pending.id == response.target_id
1711                        || call.call_id == response.target_id
1712                })
1713                .cloned()
1714        });
1715    let Some(suspended_call) = suspended_call else {
1716        return Ok(None);
1717    };
1718
1719    let _ = upsert_tool_call_lifecycle_state(
1720        run_ctx,
1721        &suspended_call,
1722        ToolCallStatus::Resuming,
1723        Some(response.resume.clone()),
1724    )?;
1725
1726    Ok(Some(DecisionReplayOutcome {
1727        events: Vec::new(),
1728        resolved_call_ids: vec![suspended_call.call_id],
1729    }))
1730}
1731
1732pub(super) fn drain_decision_channel(
1733    run_ctx: &mut RunContext,
1734    decision_rx: &mut Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1735    pending_decisions: &mut VecDeque<ToolCallDecision>,
1736) -> Result<DecisionReplayOutcome, AgentLoopError> {
1737    let mut disconnected = false;
1738    if let Some(rx) = decision_rx.as_mut() {
1739        loop {
1740            match rx.try_recv() {
1741                Ok(response) => pending_decisions.push_back(response),
1742                Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
1743                Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
1744                    disconnected = true;
1745                    break;
1746                }
1747            }
1748        }
1749    }
1750    if disconnected {
1751        *decision_rx = None;
1752    }
1753
1754    if pending_decisions.is_empty() {
1755        return Ok(DecisionReplayOutcome {
1756            events: Vec::new(),
1757            resolved_call_ids: Vec::new(),
1758        });
1759    }
1760
1761    let mut unresolved = VecDeque::new();
1762    let mut events = Vec::new();
1763    let mut resolved_call_ids = Vec::new();
1764    let mut seen = HashSet::new();
1765
1766    while let Some(response) = pending_decisions.pop_front() {
1767        if let Some(outcome) = resolve_suspended_call(run_ctx, &response)? {
1768            for call_id in outcome.resolved_call_ids {
1769                if seen.insert(call_id.clone()) {
1770                    resolved_call_ids.push(call_id);
1771                }
1772            }
1773            events.extend(outcome.events);
1774        } else {
1775            unresolved.push_back(response);
1776        }
1777    }
1778    *pending_decisions = unresolved;
1779
1780    Ok(DecisionReplayOutcome {
1781        events,
1782        resolved_call_ids,
1783    })
1784}
1785
1786async fn replay_after_decisions(
1787    run_ctx: &mut RunContext,
1788    decisions_applied: bool,
1789    step_tool_provider: &Arc<dyn StepToolProvider>,
1790    agent: &dyn Agent,
1791    active_tool_descriptors: &mut Vec<crate::contracts::runtime::tool_call::ToolDescriptor>,
1792    pending_delta_commit: &PendingDeltaCommitContext<'_>,
1793) -> Result<Vec<AgentEvent>, AgentLoopError> {
1794    if !decisions_applied {
1795        return Ok(Vec::new());
1796    }
1797
1798    let decision_tools = resolve_step_tool_snapshot(step_tool_provider, run_ctx).await?;
1799    *active_tool_descriptors = decision_tools.descriptors.clone();
1800
1801    let decision_drain = drain_run_start_resume_replay(
1802        run_ctx,
1803        &decision_tools.tools,
1804        agent,
1805        active_tool_descriptors,
1806    )
1807    .await?;
1808
1809    pending_delta_commit
1810        .commit(run_ctx, CheckpointReason::ToolResultsCommitted, false)
1811        .await?;
1812
1813    Ok(decision_drain.events)
1814}
1815
1816async fn apply_decisions_and_replay(
1817    run_ctx: &mut RunContext,
1818    decision_rx: &mut Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1819    pending_decisions: &mut VecDeque<ToolCallDecision>,
1820    step_tool_provider: &Arc<dyn StepToolProvider>,
1821    agent: &dyn Agent,
1822    active_tool_descriptors: &mut Vec<crate::contracts::runtime::tool_call::ToolDescriptor>,
1823    pending_delta_commit: &PendingDeltaCommitContext<'_>,
1824) -> Result<Vec<AgentEvent>, AgentLoopError> {
1825    Ok(drain_and_replay_decisions(
1826        run_ctx,
1827        decision_rx,
1828        pending_decisions,
1829        None,
1830        DecisionReplayInputs {
1831            step_tool_provider,
1832            agent,
1833            active_tool_descriptors,
1834        },
1835        pending_delta_commit,
1836    )
1837    .await?
1838    .events)
1839}
1840
1841pub(super) struct DecisionReplayOutcome {
1842    events: Vec<AgentEvent>,
1843    resolved_call_ids: Vec<String>,
1844}
1845
1846struct DecisionReplayInputs<'a> {
1847    step_tool_provider: &'a Arc<dyn StepToolProvider>,
1848    agent: &'a dyn Agent,
1849    active_tool_descriptors: &'a mut Vec<crate::contracts::runtime::tool_call::ToolDescriptor>,
1850}
1851
1852async fn drain_and_replay_decisions(
1853    run_ctx: &mut RunContext,
1854    decision_rx: &mut Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1855    pending_decisions: &mut VecDeque<ToolCallDecision>,
1856    decision: Option<ToolCallDecision>,
1857    replay_inputs: DecisionReplayInputs<'_>,
1858    pending_delta_commit: &PendingDeltaCommitContext<'_>,
1859) -> Result<DecisionReplayOutcome, AgentLoopError> {
1860    if let Some(decision) = decision {
1861        pending_decisions.push_back(decision);
1862    }
1863    let decision_drain = drain_decision_channel(run_ctx, decision_rx, pending_decisions)?;
1864    let mut events = decision_drain.events;
1865    let replay_events = replay_after_decisions(
1866        run_ctx,
1867        !decision_drain.resolved_call_ids.is_empty(),
1868        replay_inputs.step_tool_provider,
1869        replay_inputs.agent,
1870        replay_inputs.active_tool_descriptors,
1871        pending_delta_commit,
1872    )
1873    .await?;
1874    events.extend(replay_events);
1875
1876    Ok(DecisionReplayOutcome {
1877        events,
1878        resolved_call_ids: decision_drain.resolved_call_ids,
1879    })
1880}
1881
1882async fn apply_decision_and_replay(
1883    run_ctx: &mut RunContext,
1884    response: ToolCallDecision,
1885    decision_rx: &mut Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1886    pending_decisions: &mut VecDeque<ToolCallDecision>,
1887    replay_inputs: DecisionReplayInputs<'_>,
1888    pending_delta_commit: &PendingDeltaCommitContext<'_>,
1889) -> Result<DecisionReplayOutcome, AgentLoopError> {
1890    drain_and_replay_decisions(
1891        run_ctx,
1892        decision_rx,
1893        pending_decisions,
1894        Some(response),
1895        replay_inputs,
1896        pending_delta_commit,
1897    )
1898    .await
1899}
1900
1901async fn recv_decision(
1902    decision_rx: &mut Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1903) -> Option<ToolCallDecision> {
1904    let rx = decision_rx.as_mut()?;
1905    rx.recv().await
1906}
1907
1908/// Run the full agent loop until completion, suspension, cancellation, or error.
1909///
1910/// This is the primary non-streaming entry point. Tools are passed directly
1911/// and used as the default tool set unless the agent's step_tool_provider is set
1912/// (for dynamic per-step tool resolution).
1913pub async fn run_loop(
1914    agent: &dyn Agent,
1915    tools: HashMap<String, Arc<dyn Tool>>,
1916    run_ctx: RunContext,
1917    cancellation_token: Option<RunCancellationToken>,
1918    state_committer: Option<Arc<dyn StateCommitter>>,
1919    decision_rx: Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1920) -> LoopOutcome {
1921    let execution_ctx = RunExecutionContext::from_run_config(&run_ctx);
1922    run_loop_with_context(
1923        agent,
1924        tools,
1925        run_ctx,
1926        execution_ctx,
1927        cancellation_token,
1928        state_committer,
1929        decision_rx,
1930    )
1931    .await
1932}
1933
1934/// Run the full agent loop until completion, suspension, cancellation, or error.
1935///
1936/// This is the primary non-streaming entry point. Tools are passed directly
1937/// and used as the default tool set unless the agent's step_tool_provider is set
1938/// (for dynamic per-step tool resolution).
1939pub async fn run_loop_with_context(
1940    agent: &dyn Agent,
1941    tools: HashMap<String, Arc<dyn Tool>>,
1942    mut run_ctx: RunContext,
1943    execution_ctx: RunExecutionContext,
1944    cancellation_token: Option<RunCancellationToken>,
1945    state_committer: Option<Arc<dyn StateCommitter>>,
1946    mut decision_rx: Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1947) -> LoopOutcome {
1948    bind_execution_context_to_run_config(&mut run_ctx, &execution_ctx);
1949
1950    let executor = llm_executor_for_run(agent);
1951    let tool_executor = agent.tool_executor();
1952    let mut run_state = LoopRunState::new();
1953    let mut pending_decisions = VecDeque::new();
1954    let run_cancellation_token = cancellation_token;
1955    let mut last_text = String::new();
1956    let mut continued_response_prefix = String::new();
1957    let step_tool_provider = step_tool_provider_for_run(agent, tools);
1958    let run_id = execution_ctx.run_id.clone();
1959    let baseline_suspended_call_ids = suspended_call_ids(&run_ctx);
1960    let pending_delta_commit =
1961        PendingDeltaCommitContext::new(&execution_ctx, state_committer.as_ref());
1962    let initial_step_tools = match resolve_step_tool_snapshot(&step_tool_provider, &run_ctx).await {
1963        Ok(snapshot) => snapshot,
1964        Err(error) => {
1965            let msg = error.to_string();
1966            return build_loop_outcome(
1967                run_ctx,
1968                TerminationReason::Error(msg.clone()),
1969                None,
1970                &run_state,
1971                Some(outcome::LoopFailure::State(msg)),
1972            );
1973        }
1974    };
1975    let StepToolSnapshot {
1976        tools: initial_tools,
1977        descriptors: initial_descriptors,
1978    } = initial_step_tools;
1979    let mut active_tool_descriptors = initial_descriptors;
1980
1981    macro_rules! terminate_run {
1982        ($termination:expr, $response:expr, $failure:expr) => {{
1983            let reason: TerminationReason = $termination;
1984            let (final_termination, final_response) =
1985                normalize_termination_for_suspended_calls(&run_ctx, reason, $response);
1986            if let Err(error) = persist_run_termination(
1987                &mut run_ctx,
1988                &final_termination,
1989                &active_tool_descriptors,
1990                agent,
1991                &execution_ctx,
1992                &pending_delta_commit,
1993                RunFinishedCommitPolicy::Required,
1994            )
1995            .await
1996            {
1997                let msg = error.to_string();
1998                return build_loop_outcome(
1999                    run_ctx,
2000                    TerminationReason::Error(msg.clone()),
2001                    None,
2002                    &run_state,
2003                    Some(outcome::LoopFailure::State(msg)),
2004                );
2005            }
2006            return build_loop_outcome(
2007                run_ctx,
2008                final_termination,
2009                final_response,
2010                &run_state,
2011                $failure,
2012            );
2013        }};
2014    }
2015
2016    // Phase: RunStart
2017    let (pending, actions) = match plugin_runtime::emit_phase_block(
2018        Phase::RunStart,
2019        &run_ctx,
2020        &active_tool_descriptors,
2021        agent,
2022        |_| {},
2023    )
2024    .await
2025    {
2026        Ok(result) => result,
2027        Err(error) => {
2028            let msg = error.to_string();
2029            terminate_run!(
2030                TerminationReason::Error(msg.clone()),
2031                None,
2032                Some(outcome::LoopFailure::State(msg))
2033            );
2034        }
2035    };
2036    run_ctx.add_thread_patches(pending);
2037    run_ctx.add_serialized_actions(actions);
2038    if let Err(error) = commit_run_start_and_drain_replay(
2039        &mut run_ctx,
2040        &initial_tools,
2041        agent,
2042        &active_tool_descriptors,
2043        &pending_delta_commit,
2044    )
2045    .await
2046    {
2047        let msg = error.to_string();
2048        terminate_run!(
2049            TerminationReason::Error(msg.clone()),
2050            None,
2051            Some(outcome::LoopFailure::State(msg))
2052        );
2053    }
2054    let run_start_new_suspended = newly_suspended_call_ids(&run_ctx, &baseline_suspended_call_ids);
2055    if !run_start_new_suspended.is_empty() {
2056        terminate_run!(TerminationReason::Suspended, None, None);
2057    }
2058    loop {
2059        if let Err(error) = apply_decisions_and_replay(
2060            &mut run_ctx,
2061            &mut decision_rx,
2062            &mut pending_decisions,
2063            &step_tool_provider,
2064            agent,
2065            &mut active_tool_descriptors,
2066            &pending_delta_commit,
2067        )
2068        .await
2069        {
2070            let msg = error.to_string();
2071            terminate_run!(
2072                TerminationReason::Error(msg.clone()),
2073                None,
2074                Some(outcome::LoopFailure::State(msg))
2075            );
2076        }
2077
2078        if is_run_cancelled(run_cancellation_token.as_ref()) {
2079            terminate_run!(TerminationReason::Cancelled, None, None);
2080        }
2081
2082        let step_tools = match resolve_step_tool_snapshot(&step_tool_provider, &run_ctx).await {
2083            Ok(snapshot) => snapshot,
2084            Err(e) => {
2085                let msg = e.to_string();
2086                terminate_run!(
2087                    TerminationReason::Error(msg.clone()),
2088                    None,
2089                    Some(outcome::LoopFailure::State(msg))
2090                );
2091            }
2092        };
2093        active_tool_descriptors = step_tools.descriptors.clone();
2094
2095        let prepared = match prepare_step_execution(&run_ctx, &active_tool_descriptors, agent).await
2096        {
2097            Ok(v) => v,
2098            Err(e) => {
2099                let msg = e.to_string();
2100                terminate_run!(
2101                    TerminationReason::Error(msg.clone()),
2102                    None,
2103                    Some(outcome::LoopFailure::State(msg))
2104                );
2105            }
2106        };
2107        run_ctx.add_thread_patches(prepared.pending_patches);
2108
2109        match prepared.run_action {
2110            RunAction::Continue => {}
2111            RunAction::Terminate(reason) => {
2112                let response = if matches!(reason, TerminationReason::BehaviorRequested) {
2113                    Some(last_text.clone())
2114                } else {
2115                    None
2116                };
2117                terminate_run!(reason, response, None);
2118            }
2119        }
2120
2121        // Call LLM with unified retry + fallback model strategy.
2122        let messages = prepared.messages;
2123        let filtered_tools = prepared.filtered_tools;
2124        let request_transforms = prepared.request_transforms;
2125        let chat_options = agent.chat_options().cloned();
2126        let attempt_outcome = run_llm_with_retry_and_fallback(
2127            agent,
2128            run_cancellation_token.as_ref(),
2129            true,
2130            None,
2131            "unknown llm error",
2132            |model| {
2133                let request = build_request_for_filtered_tools(
2134                    &messages,
2135                    &step_tools.tools,
2136                    &filtered_tools,
2137                    &request_transforms,
2138                );
2139                let executor = executor.clone();
2140                let chat_options = chat_options.clone();
2141                async move {
2142                    executor
2143                        .exec_chat_response(&model, request, chat_options.as_ref())
2144                        .await
2145                }
2146            },
2147        )
2148        .await;
2149
2150        let response = match attempt_outcome {
2151            LlmAttemptOutcome::Success {
2152                value, attempts, ..
2153            } => {
2154                run_state.record_llm_attempts(attempts);
2155                value
2156            }
2157            LlmAttemptOutcome::Cancelled => {
2158                append_cancellation_user_message(&mut run_ctx, CancellationStage::Inference);
2159                terminate_run!(TerminationReason::Cancelled, None, None);
2160            }
2161            LlmAttemptOutcome::Exhausted {
2162                last_error,
2163                last_error_class,
2164                attempts,
2165            } => {
2166                run_state.record_llm_attempts(attempts);
2167                if let Err(phase_error) = apply_llm_error_cleanup(
2168                    &mut run_ctx,
2169                    &active_tool_descriptors,
2170                    agent,
2171                    "llm_exec_error",
2172                    last_error.clone(),
2173                    last_error_class,
2174                )
2175                .await
2176                {
2177                    let msg = phase_error.to_string();
2178                    terminate_run!(
2179                        TerminationReason::Error(msg.clone()),
2180                        None,
2181                        Some(outcome::LoopFailure::State(msg))
2182                    );
2183                }
2184                terminate_run!(
2185                    TerminationReason::Error(last_error.clone()),
2186                    None,
2187                    Some(outcome::LoopFailure::Llm(last_error))
2188                );
2189            }
2190        };
2191
2192        let result = stream_result_from_chat_response(&response);
2193        run_state.update_from_response(&result);
2194        last_text = stitch_response_text(&continued_response_prefix, &result.text);
2195
2196        // Add assistant message
2197        let assistant_msg_id = gen_message_id();
2198        let step_meta = step_metadata(Some(run_id.clone()), run_state.completed_steps as u32);
2199        let post_inference_action = match complete_step_after_inference(
2200            &mut run_ctx,
2201            &result,
2202            step_meta.clone(),
2203            Some(assistant_msg_id.clone()),
2204            &active_tool_descriptors,
2205            agent,
2206        )
2207        .await
2208        {
2209            Ok(action) => action,
2210            Err(e) => {
2211                let msg = e.to_string();
2212                terminate_run!(
2213                    TerminationReason::Error(msg.clone()),
2214                    None,
2215                    Some(outcome::LoopFailure::State(msg))
2216                );
2217            }
2218        };
2219        if let Err(error) = pending_delta_commit
2220            .commit(
2221                &mut run_ctx,
2222                CheckpointReason::AssistantTurnCommitted,
2223                false,
2224            )
2225            .await
2226        {
2227            let msg = error.to_string();
2228            terminate_run!(
2229                TerminationReason::Error(msg.clone()),
2230                None,
2231                Some(outcome::LoopFailure::State(msg))
2232            );
2233        }
2234
2235        mark_step_completed(&mut run_state);
2236
2237        // Truncation recovery: if the model hit max_tokens with no tool
2238        // calls, inject a continuation prompt and re-enter inference.
2239        if truncation_recovery::should_retry(&result, &mut run_state) {
2240            extend_response_prefix(&mut continued_response_prefix, &result.text);
2241            let prompt = truncation_recovery::continuation_message();
2242            run_ctx.add_message(Arc::new(prompt));
2243            continue;
2244        }
2245        continued_response_prefix.clear();
2246
2247        let post_inference_termination = match &post_inference_action {
2248            RunAction::Terminate(reason) => Some(reason.clone()),
2249            _ => None,
2250        };
2251
2252        // Only `Stopped` termination is deferred past tool execution so the
2253        // current round's tools complete (e.g. MaxRounds lets tools finish).
2254        // All other reasons terminate immediately before tool execution.
2255        if let Some(reason) = &post_inference_termination {
2256            if !matches!(reason, TerminationReason::Stopped(_)) {
2257                terminate_run!(reason.clone(), Some(last_text.clone()), None);
2258            }
2259        }
2260
2261        if !result.needs_tools() {
2262            run_state.record_step_without_tools();
2263            let reason = post_inference_termination.unwrap_or(TerminationReason::NaturalEnd);
2264            terminate_run!(reason, Some(last_text.clone()), None);
2265        }
2266
2267        // Execute tools with phase hooks using configured execution strategy.
2268        let tool_context = match prepare_tool_execution_context(&run_ctx) {
2269            Ok(ctx) => ctx,
2270            Err(e) => {
2271                let msg = e.to_string();
2272                terminate_run!(
2273                    TerminationReason::Error(msg.clone()),
2274                    None,
2275                    Some(outcome::LoopFailure::State(msg))
2276                );
2277            }
2278        };
2279        let thread_messages_for_tools = run_ctx.messages().to_vec();
2280        let thread_version_for_tools = run_ctx.version();
2281
2282        let tool_exec_future = tool_executor.execute(ToolExecutionRequest {
2283            tools: &step_tools.tools,
2284            calls: &result.tool_calls,
2285            state: &tool_context.state,
2286            tool_descriptors: &active_tool_descriptors,
2287            agent_behavior: Some(agent.behavior()),
2288            activity_manager: tirea_contract::runtime::activity::NoOpActivityManager::arc(),
2289            run_config: &tool_context.run_config,
2290            thread_id: run_ctx.thread_id(),
2291            thread_messages: &thread_messages_for_tools,
2292            state_version: thread_version_for_tools,
2293            cancellation_token: run_cancellation_token.as_ref(),
2294        });
2295        let results = tool_exec_future.await.map_err(AgentLoopError::from);
2296
2297        let results = match results {
2298            Ok(r) => r,
2299            Err(AgentLoopError::Cancelled) => {
2300                append_cancellation_user_message(&mut run_ctx, CancellationStage::ToolExecution);
2301                terminate_run!(TerminationReason::Cancelled, None, None);
2302            }
2303            Err(e) => {
2304                let msg = e.to_string();
2305                terminate_run!(
2306                    TerminationReason::Error(msg.clone()),
2307                    None,
2308                    Some(outcome::LoopFailure::State(msg))
2309                );
2310            }
2311        };
2312
2313        if let Err(_e) = apply_tool_results_to_session(
2314            &mut run_ctx,
2315            &results,
2316            Some(step_meta),
2317            tool_executor.requires_parallel_patch_conflict_check(),
2318        ) {
2319            // On error, we can't easily rollback RunContext, so just terminate
2320            let msg = _e.to_string();
2321            terminate_run!(
2322                TerminationReason::Error(msg.clone()),
2323                None,
2324                Some(outcome::LoopFailure::State(msg))
2325            );
2326        }
2327        if let Err(error) = pending_delta_commit
2328            .commit(&mut run_ctx, CheckpointReason::ToolResultsCommitted, false)
2329            .await
2330        {
2331            let msg = error.to_string();
2332            terminate_run!(
2333                TerminationReason::Error(msg.clone()),
2334                None,
2335                Some(outcome::LoopFailure::State(msg))
2336            );
2337        }
2338
2339        if let Err(error) = apply_decisions_and_replay(
2340            &mut run_ctx,
2341            &mut decision_rx,
2342            &mut pending_decisions,
2343            &step_tool_provider,
2344            agent,
2345            &mut active_tool_descriptors,
2346            &pending_delta_commit,
2347        )
2348        .await
2349        {
2350            let msg = error.to_string();
2351            terminate_run!(
2352                TerminationReason::Error(msg.clone()),
2353                None,
2354                Some(outcome::LoopFailure::State(msg))
2355            );
2356        }
2357
2358        // If ALL tools are suspended (no completed results), terminate immediately.
2359        if has_suspended_calls(&run_ctx) {
2360            let has_completed = results
2361                .iter()
2362                .any(|r| !matches!(r.outcome, crate::contracts::ToolCallOutcome::Suspended));
2363            if !has_completed {
2364                terminate_run!(TerminationReason::Suspended, None, None);
2365            }
2366        }
2367
2368        // Deferred post-inference termination: tools from the current round
2369        // have completed; stop the loop before the next inference.
2370        if let Some(reason) = post_inference_termination {
2371            terminate_run!(reason, Some(last_text.clone()), None);
2372        }
2373
2374        // Track tool-step metrics for loop stats and plugin consumers.
2375        let error_count = results
2376            .iter()
2377            .filter(|r| r.execution.result.is_error())
2378            .count();
2379        run_state.record_tool_step(&result.tool_calls, error_count);
2380    }
2381}
2382
2383/// Run the agent loop with streaming output.
2384///
2385/// Returns a stream of AgentEvent for real-time updates. Tools are passed
2386/// directly and used as the default tool set unless the agent's step_tool_provider
2387/// is set (for dynamic per-step tool resolution).
2388pub fn run_loop_stream(
2389    agent: Arc<dyn Agent>,
2390    tools: HashMap<String, Arc<dyn Tool>>,
2391    run_ctx: RunContext,
2392    cancellation_token: Option<RunCancellationToken>,
2393    state_committer: Option<Arc<dyn StateCommitter>>,
2394    decision_rx: Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
2395) -> Pin<Box<dyn Stream<Item = AgentEvent> + Send>> {
2396    let execution_ctx = RunExecutionContext::from_run_config(&run_ctx);
2397    run_loop_stream_with_context(
2398        agent,
2399        tools,
2400        run_ctx,
2401        execution_ctx,
2402        cancellation_token,
2403        state_committer,
2404        decision_rx,
2405    )
2406}
2407
2408pub fn run_loop_stream_with_context(
2409    agent: Arc<dyn Agent>,
2410    tools: HashMap<String, Arc<dyn Tool>>,
2411    mut run_ctx: RunContext,
2412    execution_ctx: RunExecutionContext,
2413    cancellation_token: Option<RunCancellationToken>,
2414    state_committer: Option<Arc<dyn StateCommitter>>,
2415    decision_rx: Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
2416) -> Pin<Box<dyn Stream<Item = AgentEvent> + Send>> {
2417    bind_execution_context_to_run_config(&mut run_ctx, &execution_ctx);
2418
2419    stream_runner::run_stream(
2420        agent,
2421        tools,
2422        run_ctx,
2423        execution_ctx,
2424        cancellation_token,
2425        state_committer,
2426        decision_rx,
2427    )
2428}
2429
2430#[cfg(test)]
2431mod tests;