Skip to main content

tirea_agent_loop/runtime/loop_runner/
mod.rs

1//! Agent loop implementation with Phase-based plugin execution.
2//!
3//! The agent loop orchestrates the conversation between user, LLM, and tools:
4//!
5//! ```text
6//! User Input → LLM → Tool Calls? → Execute Tools → LLM → ... → Final Response
7//! ```
8//!
9//! # Phase Execution
10//!
11//! Each phase dispatches to its typed plugin hook:
12//!
13//! ```text
14//! RunStart (once)
15//!     │
16//!     ▼
17//! ┌─────────────────────────┐
18//! │      StepStart          │ ← plugins can apply state patches
19//! ├─────────────────────────┤
20//! │    BeforeInference      │ ← plugins can inject prompt context, filter tools
21//! ├─────────────────────────┤
22//! │      [LLM CALL]         │
23//! ├─────────────────────────┤
24//! │    AfterInference       │
25//! ├─────────────────────────┤
26//! │  ┌───────────────────┐  │
27//! │  │ BeforeToolExecute │  │ ← plugins can block/pending
28//! │  ├───────────────────┤  │
29//! │  │   [TOOL EXEC]     │  │
30//! │  ├───────────────────┤  │
31//! │  │ AfterToolExecute  │  │ ← plugins can add reminders
32//! │  └───────────────────┘  │
33//! ├─────────────────────────┤
34//! │       StepEnd           │
35//! └─────────────────────────┘
36//!     │
37//!     ▼
38//! RunEnd (once)
39//! ```
40
41mod config;
42mod core;
43mod event_envelope_meta;
44mod outcome;
45mod plugin_runtime;
46mod run_state;
47mod state_commit;
48mod stream_core;
49mod stream_runner;
50mod tool_exec;
51
52use crate::contracts::runtime::plugin::phase::Phase;
53use crate::contracts::runtime::ActivityManager;
54use crate::contracts::io::ResumeDecisionAction;
55use crate::contracts::runtime::{
56    state_paths::RUN_LIFECYCLE_STATE_PATH, DecisionReplayPolicy, StreamResult,
57    SuspendedCall, ToolCallResume, ToolCallResumeMode, ToolCallStatus, ToolExecutionRequest,
58    ToolExecutionResult,
59};
60use crate::contracts::thread::CheckpointReason;
61use crate::contracts::thread::{gen_message_id, Message, MessageMetadata, ToolCall};
62use crate::contracts::runtime::tool_call::{Tool, ToolResult};
63use crate::contracts::RunContext;
64use crate::contracts::{AgentEvent, RunAction, TerminationReason, ToolCallDecision};
65use crate::engine::convert::{assistant_message, assistant_tool_calls, tool_response};
66use crate::runtime::activity::ActivityHub;
67
68use crate::runtime::streaming::StreamCollector;
69use async_stream::stream;
70use futures::{Stream, StreamExt};
71use genai::Client;
72use serde_json::Value;
73use std::collections::{HashMap, HashSet, VecDeque};
74use std::future::Future;
75use std::pin::Pin;
76use std::sync::Arc;
77use uuid::Uuid;
78
79#[cfg(test)]
80use crate::contracts::runtime::plugin::AgentPlugin;
81pub use crate::contracts::runtime::ToolExecutor;
82pub use crate::runtime::run_context::{
83    await_or_cancel, is_cancelled, CancelAware, RunCancellationToken, StateCommitError,
84    StateCommitter, TOOL_SCOPE_CALLER_AGENT_ID_KEY, TOOL_SCOPE_CALLER_MESSAGES_KEY,
85    TOOL_SCOPE_CALLER_STATE_KEY, TOOL_SCOPE_CALLER_THREAD_ID_KEY,
86};
87use config::StaticStepToolProvider;
88pub use config::{AgentConfig, GenaiLlmExecutor, LlmRetryPolicy};
89pub use config::{LlmEventStream, LlmExecutor};
90pub use config::{StepToolInput, StepToolProvider, StepToolSnapshot};
91#[cfg(test)]
92use core::build_messages;
93use core::{
94    build_request_for_filtered_tools, clear_suspended_call, inference_inputs_from_step,
95    set_agent_suspended_calls, suspended_calls_from_ctx, tool_call_states_from_ctx,
96    transition_tool_call_state, upsert_tool_call_state, ToolCallStateSeed, ToolCallStateTransition,
97};
98pub use outcome::{tool_map, tool_map_from_arc, AgentLoopError};
99pub use outcome::{LoopOutcome, LoopStats, LoopUsage};
100#[cfg(test)]
101use plugin_runtime::emit_phase_checked;
102use plugin_runtime::{
103    emit_cleanup_phases_and_apply, emit_phase_block, emit_run_end_phase, run_phase_block,
104};
105use run_state::RunState;
106pub use state_commit::ChannelStateCommitter;
107use state_commit::PendingDeltaCommitContext;
108use std::time::{SystemTime, UNIX_EPOCH};
109use tirea_state::{Op, Patch, Path, TrackedPatch};
110#[cfg(test)]
111use tokio_util::sync::CancellationToken;
112#[cfg(test)]
113use tool_exec::execute_tools_parallel_with_phases;
114pub use tool_exec::ExecuteToolsOutcome;
115use tool_exec::{
116    apply_tool_results_impl, apply_tool_results_to_session, execute_single_tool_with_phases,
117    scope_with_tool_caller_context, step_metadata, ToolPhaseContext,
118};
119pub use tool_exec::{
120    execute_tools, execute_tools_with_config, execute_tools_with_plugins,
121    execute_tools_with_plugins_and_executor, ParallelToolExecutionMode, ParallelToolExecutor,
122    SequentialToolExecutor,
123};
124
125/// Fully resolved agent wiring ready for execution.
126///
127/// Contains everything needed to run an agent loop: the loop configuration,
128/// the resolved tool map, and the runtime config. This is a pure data struct
129/// that can be inspected, mutated, and tested independently.
130pub struct ResolvedRun {
131    /// Loop configuration (model, plugins, ...).
132    pub config: AgentConfig,
133    /// Resolved tool map after filtering and wiring.
134    pub tools: HashMap<String, Arc<dyn Tool>>,
135    /// Runtime configuration (user_id, run_id, ...).
136    pub run_config: crate::contracts::RunConfig,
137}
138
139impl ResolvedRun {
140    /// Add or replace a tool in the resolved tool map.
141    #[must_use]
142    pub fn with_tool(mut self, id: String, tool: Arc<dyn Tool>) -> Self {
143        self.tools.insert(id, tool);
144        self
145    }
146
147    /// Add a plugin to the resolved config.
148    #[must_use]
149    pub fn with_plugin(mut self, plugin: Arc<dyn crate::contracts::runtime::plugin::AgentPlugin>) -> Self {
150        self.config.plugins.push(plugin);
151        self
152    }
153
154    /// Overlay tools from a map (insert-if-absent semantics).
155    pub fn overlay_tools(&mut self, tools: HashMap<String, Arc<dyn Tool>>) {
156        for (id, tool) in tools {
157            self.tools.entry(id).or_insert(tool);
158        }
159    }
160}
161
162fn uuid_v7() -> String {
163    Uuid::now_v7().simple().to_string()
164}
165
166pub(crate) fn current_unix_millis() -> u64 {
167    SystemTime::now()
168        .duration_since(UNIX_EPOCH)
169        .map_or(0, |d| d.as_millis().min(u128::from(u64::MAX)) as u64)
170}
171
172pub(super) fn sync_run_lifecycle_for_termination(
173    run_ctx: &mut RunContext,
174    termination: &TerminationReason,
175) -> Result<(), AgentLoopError> {
176    let run_id = run_ctx
177        .run_config
178        .value("run_id")
179        .and_then(Value::as_str)
180        .map(str::trim)
181        .filter(|id| !id.is_empty());
182    let Some(run_id) = run_id else {
183        return Ok(());
184    };
185
186    let (status, done_reason) = termination.to_run_status();
187
188    let patch = TrackedPatch::new(Patch::with_ops(vec![Op::set(
189        Path::root().key(RUN_LIFECYCLE_STATE_PATH),
190        serde_json::json!({
191            "id": run_id,
192            "status": status,
193            "done_reason": done_reason,
194            "updated_at": current_unix_millis(),
195        }),
196    )]))
197    .with_source("agent_loop");
198
199    run_ctx.add_thread_patch(patch);
200    Ok(())
201}
202
203#[derive(Debug, Clone, Copy, PartialEq, Eq)]
204pub(super) enum CancellationStage {
205    Inference,
206    ToolExecution,
207}
208
209pub(super) const CANCELLATION_INFERENCE_USER_MESSAGE: &str =
210    "The previous run was interrupted during inference. Please continue from the current context.";
211pub(super) const CANCELLATION_TOOL_USER_MESSAGE: &str =
212    "The previous run was interrupted while using tools. Please continue from the current context.";
213
214pub(super) fn append_cancellation_user_message(run_ctx: &mut RunContext, stage: CancellationStage) {
215    let content = match stage {
216        CancellationStage::Inference => CANCELLATION_INFERENCE_USER_MESSAGE,
217        CancellationStage::ToolExecution => CANCELLATION_TOOL_USER_MESSAGE,
218    };
219    run_ctx.add_message(Arc::new(Message::user(content)));
220}
221
222pub(super) fn effective_llm_models(config: &AgentConfig) -> Vec<String> {
223    let mut models = Vec::with_capacity(1 + config.fallback_models.len());
224    models.push(config.model.clone());
225    for model in &config.fallback_models {
226        if model.trim().is_empty() {
227            continue;
228        }
229        if !models.iter().any(|m| m == model) {
230            models.push(model.clone());
231        }
232    }
233    models
234}
235
236pub(super) fn llm_retry_attempts(config: &AgentConfig) -> usize {
237    config.llm_retry_policy.max_attempts_per_model.max(1)
238}
239
240pub(super) fn is_retryable_llm_error(message: &str) -> bool {
241    let lower = message.to_ascii_lowercase();
242    let non_retryable = [
243        "401",
244        "403",
245        "404",
246        "400",
247        "422",
248        "unauthorized",
249        "forbidden",
250        "invalid api key",
251        "invalid_request",
252        "bad request",
253    ];
254    if non_retryable.iter().any(|p| lower.contains(p)) {
255        return false;
256    }
257    let retryable = [
258        "429",
259        "too many requests",
260        "rate limit",
261        "timeout",
262        "timed out",
263        "temporar",
264        "connection",
265        "network",
266        "unavailable",
267        "server error",
268        "502",
269        "503",
270        "504",
271        "reset by peer",
272        "eof",
273    ];
274    retryable.iter().any(|p| lower.contains(p))
275}
276
277pub(super) fn retry_backoff_ms(config: &AgentConfig, retry_index: usize) -> u64 {
278    let initial = config.llm_retry_policy.initial_backoff_ms;
279    let cap = config
280        .llm_retry_policy
281        .max_backoff_ms
282        .max(config.llm_retry_policy.initial_backoff_ms);
283    if retry_index == 0 {
284        return initial.min(cap);
285    }
286    let shift = (retry_index - 1).min(20) as u32;
287    let factor = 1u64.checked_shl(shift).unwrap_or(u64::MAX);
288    initial.saturating_mul(factor).min(cap)
289}
290
291pub(super) async fn wait_retry_backoff(
292    config: &AgentConfig,
293    retry_index: usize,
294    run_cancellation_token: Option<&RunCancellationToken>,
295) -> bool {
296    let wait_ms = retry_backoff_ms(config, retry_index);
297    match await_or_cancel(
298        run_cancellation_token,
299        tokio::time::sleep(std::time::Duration::from_millis(wait_ms)),
300    )
301    .await
302    {
303        CancelAware::Cancelled => true,
304        CancelAware::Value(_) => false,
305    }
306}
307
308pub(super) enum LlmAttemptOutcome<T> {
309    Success {
310        value: T,
311        model: String,
312        attempts: usize,
313    },
314    Cancelled,
315    Exhausted {
316        last_error: String,
317        attempts: usize,
318    },
319}
320
321fn is_run_cancelled(token: Option<&RunCancellationToken>) -> bool {
322    is_cancelled(token)
323}
324
325pub(super) fn step_tool_provider_for_run(
326    config: &AgentConfig,
327    tools: HashMap<String, Arc<dyn Tool>>,
328) -> Arc<dyn StepToolProvider> {
329    config.step_tool_provider.clone().unwrap_or_else(|| {
330        Arc::new(StaticStepToolProvider::new(tools)) as Arc<dyn StepToolProvider>
331    })
332}
333
334pub(super) fn llm_executor_for_run(config: &AgentConfig) -> Arc<dyn LlmExecutor> {
335    config
336        .llm_executor
337        .clone()
338        .unwrap_or_else(|| Arc::new(GenaiLlmExecutor::new(Client::default())))
339}
340
341pub(super) async fn resolve_step_tool_snapshot(
342    step_tool_provider: &Arc<dyn StepToolProvider>,
343    run_ctx: &RunContext,
344) -> Result<StepToolSnapshot, AgentLoopError> {
345    step_tool_provider
346        .provide(StepToolInput { state: run_ctx })
347        .await
348}
349
350fn mark_step_completed(run_state: &mut RunState) {
351    run_state.completed_steps += 1;
352}
353
354fn build_loop_outcome(
355    run_ctx: RunContext,
356    termination: TerminationReason,
357    response: Option<String>,
358    run_state: &RunState,
359    failure: Option<outcome::LoopFailure>,
360) -> LoopOutcome {
361    LoopOutcome {
362        run_ctx,
363        termination,
364        response: response.filter(|text| !text.is_empty()),
365        usage: run_state.usage(),
366        stats: run_state.stats(),
367        failure,
368    }
369}
370
371pub(super) async fn run_llm_with_retry_and_fallback<T, Invoke, Fut>(
372    config: &AgentConfig,
373    run_cancellation_token: Option<&RunCancellationToken>,
374    retry_current_model: bool,
375    unknown_error: &str,
376    mut invoke: Invoke,
377) -> LlmAttemptOutcome<T>
378where
379    Invoke: FnMut(String) -> Fut,
380    Fut: std::future::Future<Output = genai::Result<T>>,
381{
382    let mut last_llm_error = unknown_error.to_string();
383    let model_candidates = effective_llm_models(config);
384    let max_attempts = llm_retry_attempts(config);
385    let mut total_attempts = 0usize;
386
387    for model in model_candidates {
388        for attempt in 1..=max_attempts {
389            total_attempts = total_attempts.saturating_add(1);
390            let response_res =
391                match await_or_cancel(run_cancellation_token, invoke(model.clone())).await {
392                    CancelAware::Cancelled => return LlmAttemptOutcome::Cancelled,
393                    CancelAware::Value(resp) => resp,
394                };
395
396            match response_res {
397                Ok(value) => {
398                    return LlmAttemptOutcome::Success {
399                        value,
400                        model,
401                        attempts: total_attempts,
402                    };
403                }
404                Err(e) => {
405                    let message = e.to_string();
406                    last_llm_error =
407                        format!("model='{model}' attempt={attempt}/{max_attempts}: {message}");
408                    let can_retry_same_model = retry_current_model
409                        && attempt < max_attempts
410                        && is_retryable_llm_error(&message);
411                    if can_retry_same_model {
412                        let cancelled =
413                            wait_retry_backoff(config, attempt, run_cancellation_token).await;
414                        if cancelled {
415                            return LlmAttemptOutcome::Cancelled;
416                        }
417                        continue;
418                    }
419                    break;
420                }
421            }
422        }
423    }
424
425    LlmAttemptOutcome::Exhausted {
426        last_error: last_llm_error,
427        attempts: total_attempts,
428    }
429}
430
431pub(super) async fn run_step_prepare_phases(
432    run_ctx: &RunContext,
433    tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
434    config: &AgentConfig,
435) -> Result<
436    (
437        Vec<Message>,
438        Vec<String>,
439        RunAction,
440        Vec<TrackedPatch>,
441    ),
442    AgentLoopError,
443> {
444    let ((messages, filtered_tools, run_action), pending) = run_phase_block(
445        run_ctx,
446        tool_descriptors,
447        &config.plugins,
448        &[Phase::StepStart, Phase::BeforeInference],
449        |_| {},
450        |step| inference_inputs_from_step(step, &config.system_prompt),
451    )
452    .await?;
453    Ok((messages, filtered_tools, run_action, pending))
454}
455
456pub(super) struct PreparedStep {
457    pub(super) messages: Vec<Message>,
458    pub(super) filtered_tools: Vec<String>,
459    pub(super) run_action: RunAction,
460    pub(super) pending_patches: Vec<TrackedPatch>,
461}
462
463pub(super) async fn prepare_step_execution(
464    run_ctx: &RunContext,
465    tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
466    config: &AgentConfig,
467) -> Result<PreparedStep, AgentLoopError> {
468    let (messages, filtered_tools, run_action, pending) =
469        run_step_prepare_phases(run_ctx, tool_descriptors, config).await?;
470    Ok(PreparedStep {
471        messages,
472        filtered_tools,
473        run_action,
474        pending_patches: pending,
475    })
476}
477
478pub(super) async fn apply_llm_error_cleanup(
479    run_ctx: &mut RunContext,
480    tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
481    plugins: &[Arc<dyn crate::contracts::runtime::plugin::AgentPlugin>],
482    error_type: &'static str,
483    message: String,
484) -> Result<(), AgentLoopError> {
485    emit_cleanup_phases_and_apply(run_ctx, tool_descriptors, plugins, error_type, message).await
486}
487
488pub(super) async fn complete_step_after_inference(
489    run_ctx: &mut RunContext,
490    result: &StreamResult,
491    step_meta: MessageMetadata,
492    assistant_message_id: Option<String>,
493    tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
494    plugins: &[Arc<dyn crate::contracts::runtime::plugin::AgentPlugin>],
495) -> Result<Option<TerminationReason>, AgentLoopError> {
496    let (run_action, pending) = run_phase_block(
497        run_ctx,
498        tool_descriptors,
499        plugins,
500        &[Phase::AfterInference],
501        |step| {
502            step.response = Some(result.clone());
503        },
504        |step| step.run_action(),
505    )
506    .await?;
507    run_ctx.add_thread_patches(pending);
508
509    let assistant = assistant_turn_message(result, step_meta, assistant_message_id);
510    run_ctx.add_message(Arc::new(assistant));
511
512    let pending =
513        emit_phase_block(Phase::StepEnd, run_ctx, tool_descriptors, plugins, |_| {}).await?;
514    run_ctx.add_thread_patches(pending);
515    Ok(match run_action {
516        RunAction::Terminate(reason) => Some(reason),
517        RunAction::Continue => None,
518    })
519}
520
521/// Emit events for a pending tool-call projection.
522pub(super) fn pending_tool_events(call: &SuspendedCall) -> Vec<AgentEvent> {
523    vec![
524        AgentEvent::ToolCallStart {
525            id: call.ticket.pending.id.clone(),
526            name: call.ticket.pending.name.clone(),
527        },
528        AgentEvent::ToolCallReady {
529            id: call.ticket.pending.id.clone(),
530            name: call.ticket.pending.name.clone(),
531            arguments: call.ticket.pending.arguments.clone(),
532        },
533    ]
534}
535
536pub(super) fn has_suspended_calls(run_ctx: &RunContext) -> bool {
537    !suspended_calls_from_ctx(run_ctx).is_empty()
538}
539
540pub(super) fn suspended_call_ids(run_ctx: &RunContext) -> HashSet<String> {
541    suspended_calls_from_ctx(run_ctx).into_keys().collect()
542}
543
544pub(super) fn newly_suspended_call_ids(
545    run_ctx: &RunContext,
546    baseline_ids: &HashSet<String>,
547) -> HashSet<String> {
548    suspended_calls_from_ctx(run_ctx)
549        .into_keys()
550        .filter(|id| !baseline_ids.contains(id))
551        .collect()
552}
553
554pub(super) fn suspended_call_pending_events(run_ctx: &RunContext) -> Vec<AgentEvent> {
555    let mut calls: Vec<SuspendedCall> = suspended_calls_from_ctx(run_ctx).into_values().collect();
556    calls.sort_by(|left, right| left.call_id.cmp(&right.call_id));
557    calls
558        .into_iter()
559        .flat_map(|call| pending_tool_events(&call))
560        .collect()
561}
562
563pub(super) fn suspended_call_pending_events_for_ids(
564    run_ctx: &RunContext,
565    call_ids: &HashSet<String>,
566) -> Vec<AgentEvent> {
567    if call_ids.is_empty() {
568        return Vec::new();
569    }
570    let mut calls: Vec<SuspendedCall> = suspended_calls_from_ctx(run_ctx)
571        .into_iter()
572        .filter_map(|(call_id, call)| call_ids.contains(&call_id).then_some(call))
573        .collect();
574    calls.sort_by(|left, right| left.call_id.cmp(&right.call_id));
575    calls
576        .into_iter()
577        .flat_map(|call| pending_tool_events(&call))
578        .collect()
579}
580
581pub(super) struct ToolExecutionContext {
582    pub(super) state: serde_json::Value,
583    pub(super) run_config: tirea_contract::RunConfig,
584}
585
586pub(super) fn prepare_tool_execution_context(
587    run_ctx: &RunContext,
588    config: Option<&AgentConfig>,
589) -> Result<ToolExecutionContext, AgentLoopError> {
590    let state = run_ctx
591        .snapshot()
592        .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
593    let run_config = scope_with_tool_caller_context(run_ctx, &state, config)?;
594    Ok(ToolExecutionContext { state, run_config })
595}
596
597pub(super) async fn finalize_run_end(
598    run_ctx: &mut RunContext,
599    tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
600    plugins: &[Arc<dyn crate::contracts::runtime::plugin::AgentPlugin>],
601) {
602    emit_run_end_phase(run_ctx, tool_descriptors, plugins).await
603}
604
605#[derive(Debug, Clone, Copy, PartialEq, Eq)]
606enum RunFinishedCommitPolicy {
607    Required,
608    BestEffort,
609}
610
611fn normalize_termination_for_suspended_calls(
612    run_ctx: &RunContext,
613    termination: TerminationReason,
614    response: Option<String>,
615) -> (TerminationReason, Option<String>) {
616    let final_termination = if !matches!(
617        termination,
618        TerminationReason::Error | TerminationReason::Cancelled
619    ) && has_suspended_calls(run_ctx)
620    {
621        TerminationReason::Suspended
622    } else {
623        termination
624    };
625    let final_response = if final_termination == TerminationReason::Suspended {
626        None
627    } else {
628        response
629    };
630    (final_termination, final_response)
631}
632
633async fn persist_run_termination(
634    run_ctx: &mut RunContext,
635    termination: &TerminationReason,
636    tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
637    plugins: &[Arc<dyn crate::contracts::runtime::plugin::AgentPlugin>],
638    pending_delta_commit: &PendingDeltaCommitContext<'_>,
639    run_finished_commit_policy: RunFinishedCommitPolicy,
640) -> Result<(), AgentLoopError> {
641    sync_run_lifecycle_for_termination(run_ctx, termination)?;
642    finalize_run_end(run_ctx, tool_descriptors, plugins).await;
643    if let Err(error) = pending_delta_commit
644        .commit(run_ctx, CheckpointReason::RunFinished, true)
645        .await
646    {
647        match run_finished_commit_policy {
648            RunFinishedCommitPolicy::Required => return Err(error),
649            RunFinishedCommitPolicy::BestEffort => {
650                tracing::warn!(error = %error, "failed to commit run-finished delta");
651            }
652        }
653    }
654    Ok(())
655}
656
657fn stream_result_from_chat_response(response: &genai::chat::ChatResponse) -> StreamResult {
658    let text = response
659        .first_text()
660        .map(|s| s.to_string())
661        .unwrap_or_default();
662    let tool_calls: Vec<crate::contracts::thread::ToolCall> = response
663        .tool_calls()
664        .into_iter()
665        .map(|tc| {
666            crate::contracts::thread::ToolCall::new(
667                &tc.call_id,
668                &tc.fn_name,
669                tc.fn_arguments.clone(),
670            )
671        })
672        .collect();
673
674    StreamResult {
675        text,
676        tool_calls,
677        usage: Some(crate::runtime::streaming::token_usage_from_genai(
678            &response.usage,
679        )),
680    }
681}
682
683fn assistant_turn_message(
684    result: &StreamResult,
685    step_meta: MessageMetadata,
686    message_id: Option<String>,
687) -> Message {
688    let mut msg = if result.tool_calls.is_empty() {
689        assistant_message(&result.text)
690    } else {
691        assistant_tool_calls(&result.text, result.tool_calls.clone())
692    }
693    .with_metadata(step_meta);
694    if let Some(message_id) = message_id {
695        msg = msg.with_id(message_id);
696    }
697    msg
698}
699
700struct RunStartDrainOutcome {
701    events: Vec<AgentEvent>,
702    replayed: bool,
703}
704
705fn decision_result_value(action: &ResumeDecisionAction, result: &Value) -> serde_json::Value {
706    if result.is_null() {
707        serde_json::Value::Bool(matches!(action, ResumeDecisionAction::Resume))
708    } else {
709        result.clone()
710    }
711}
712
713fn runtime_resume_inputs(run_ctx: &RunContext) -> HashMap<String, ToolCallResume> {
714    let mut decisions = HashMap::new();
715    for (call_id, state) in tool_call_states_from_ctx(run_ctx) {
716        if !matches!(state.status, ToolCallStatus::Resuming) {
717            continue;
718        }
719        let Some(mut resume) = state.resume else {
720            continue;
721        };
722        if resume.decision_id.trim().is_empty() {
723            resume.decision_id = call_id.clone();
724        }
725        decisions.insert(call_id, resume);
726    }
727    decisions
728}
729
730fn settle_orphan_resuming_tool_states(
731    run_ctx: &mut RunContext,
732    suspended: &HashMap<String, SuspendedCall>,
733    resumes: &HashMap<String, ToolCallResume>,
734) -> Result<bool, AgentLoopError> {
735    let states = tool_call_states_from_ctx(run_ctx);
736    let mut changed = false;
737
738    for (call_id, resume) in resumes {
739        if suspended.contains_key(call_id) {
740            continue;
741        }
742        let Some(state) = states.get(call_id).cloned() else {
743            continue;
744        };
745        let target_status = match &resume.action {
746            ResumeDecisionAction::Cancel => ToolCallStatus::Cancelled,
747            ResumeDecisionAction::Resume => ToolCallStatus::Failed,
748        };
749        if state.status == target_status && state.resume.as_ref() == Some(resume) {
750            continue;
751        }
752
753        let Some(next_state) = transition_tool_call_state(
754            Some(state.clone()),
755            ToolCallStateSeed {
756                call_id: call_id.as_str(),
757                tool_name: state.tool_name.as_str(),
758                arguments: &state.arguments,
759                status: state.status,
760                resume_token: state.resume_token.clone(),
761            },
762            ToolCallStateTransition {
763                status: target_status,
764                resume_token: state.resume_token.clone(),
765                resume: Some(resume.clone()),
766                updated_at: current_unix_millis(),
767            },
768        ) else {
769            continue;
770        };
771
772        let patch = upsert_tool_call_state(call_id, next_state)?;
773        if patch.patch().is_empty() {
774            continue;
775        }
776        run_ctx.add_thread_patch(patch);
777        changed = true;
778    }
779
780    Ok(changed)
781}
782
783fn all_suspended_calls_have_resume(
784    suspended: &HashMap<String, SuspendedCall>,
785    resumes: &HashMap<String, ToolCallResume>,
786) -> bool {
787    suspended
788        .keys()
789        .all(|call_id| resumes.contains_key(call_id))
790}
791
792async fn drain_resuming_tool_calls_and_replay(
793    run_ctx: &mut RunContext,
794    tools: &HashMap<String, Arc<dyn Tool>>,
795    config: &AgentConfig,
796    tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
797) -> Result<RunStartDrainOutcome, AgentLoopError> {
798    let decisions = runtime_resume_inputs(run_ctx);
799    if decisions.is_empty() {
800        return Ok(RunStartDrainOutcome {
801            events: Vec::new(),
802            replayed: false,
803        });
804    }
805
806    let suspended = suspended_calls_from_ctx(run_ctx);
807    let mut state_changed = false;
808    if settle_orphan_resuming_tool_states(run_ctx, &suspended, &decisions)? {
809        state_changed = true;
810    }
811    if suspended.is_empty() {
812        if state_changed {
813            let snapshot = run_ctx
814                .snapshot()
815                .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
816            return Ok(RunStartDrainOutcome {
817                events: vec![AgentEvent::StateSnapshot { snapshot }],
818                replayed: false,
819            });
820        }
821        return Ok(RunStartDrainOutcome {
822            events: Vec::new(),
823            replayed: false,
824        });
825    }
826
827    if matches!(
828        config.tool_executor.decision_replay_policy(),
829        DecisionReplayPolicy::BatchAllSuspended
830    ) && !all_suspended_calls_have_resume(&suspended, &decisions)
831    {
832        if state_changed {
833            let snapshot = run_ctx
834                .snapshot()
835                .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
836            return Ok(RunStartDrainOutcome {
837                events: vec![AgentEvent::StateSnapshot { snapshot }],
838                replayed: false,
839            });
840        }
841        return Ok(RunStartDrainOutcome {
842            events: Vec::new(),
843            replayed: false,
844        });
845    }
846
847    let mut events = Vec::new();
848    let mut decision_ids: Vec<String> = decisions.keys().cloned().collect();
849    decision_ids.sort();
850
851    let mut replayed = false;
852    let mut suspended_to_clear = Vec::new();
853
854    for call_id in decision_ids {
855        let Some(suspended_call) = suspended.get(&call_id).cloned() else {
856            continue;
857        };
858        let Some(decision) = decisions.get(&call_id).cloned() else {
859            continue;
860        };
861        replayed = true;
862        let decision_result = decision_result_value(&decision.action, &decision.result);
863        let resume_payload = ToolCallResume {
864            result: decision_result.clone(),
865            ..decision.clone()
866        };
867        events.push(AgentEvent::ToolCallResumed {
868            target_id: suspended_call.call_id.clone(),
869            result: decision_result.clone(),
870        });
871
872        match decision.action {
873            ResumeDecisionAction::Cancel => {
874                let cancel_reason = resume_payload.reason.clone();
875                if upsert_tool_call_lifecycle_state(
876                    run_ctx,
877                    &suspended_call,
878                    ToolCallStatus::Cancelled,
879                    Some(resume_payload),
880                )? {
881                    state_changed = true;
882                }
883                events.push(append_denied_tool_result_message(
884                    run_ctx,
885                    &suspended_call.call_id,
886                    Some(&suspended_call.tool_name),
887                    cancel_reason.as_deref(),
888                ));
889                suspended_to_clear.push(call_id);
890            }
891            ResumeDecisionAction::Resume => {
892                if upsert_tool_call_lifecycle_state(
893                    run_ctx,
894                    &suspended_call,
895                    ToolCallStatus::Resuming,
896                    Some(resume_payload.clone()),
897                )? {
898                    state_changed = true;
899                }
900                let Some(tool_call) = replay_tool_call_for_resolution(
901                    run_ctx,
902                    &suspended_call,
903                    &ToolCallDecision {
904                        target_id: suspended_call.call_id.clone(),
905                        resume: resume_payload.clone(),
906                    },
907                ) else {
908                    continue;
909                };
910                let state = run_ctx
911                    .snapshot()
912                    .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
913                let tool = tools.get(&tool_call.name).cloned();
914                let rt_for_replay = scope_with_tool_caller_context(run_ctx, &state, Some(config))?;
915                let replay_phase_ctx = ToolPhaseContext {
916                    tool_descriptors,
917                    plugins: &config.plugins,
918                    activity_manager: tirea_contract::runtime::activity::NoOpActivityManager::arc(),
919                    run_config: &rt_for_replay,
920                    thread_id: run_ctx.thread_id(),
921                    thread_messages: run_ctx.messages(),
922                    cancellation_token: None,
923                };
924                let replay_result = execute_single_tool_with_phases(
925                    tool.as_deref(),
926                    &tool_call,
927                    &state,
928                    &replay_phase_ctx,
929                )
930                .await?;
931
932                let replay_msg_id = gen_message_id();
933                let replay_msg = tool_response(&tool_call.id, &replay_result.execution.result)
934                    .with_id(replay_msg_id.clone());
935                run_ctx.add_message(Arc::new(replay_msg));
936
937                if !replay_result.reminders.is_empty() {
938                    let msgs: Vec<Arc<Message>> = replay_result
939                        .reminders
940                        .iter()
941                        .map(|reminder| {
942                            Arc::new(Message::internal_system(format!(
943                                "<system-reminder>{}</system-reminder>",
944                                reminder
945                            )))
946                        })
947                        .collect();
948                    run_ctx.add_messages(msgs);
949                }
950
951                if let Some(patch) = replay_result.execution.patch.clone() {
952                    state_changed = true;
953                    run_ctx.add_thread_patch(patch);
954                }
955                if !replay_result.pending_patches.is_empty() {
956                    state_changed = true;
957                    run_ctx.add_thread_patches(replay_result.pending_patches.clone());
958                }
959
960                events.push(AgentEvent::ToolCallDone {
961                    id: tool_call.id.clone(),
962                    result: replay_result.execution.result,
963                    patch: replay_result.execution.patch,
964                    message_id: replay_msg_id,
965                });
966
967                if let Some(next_suspended_call) = replay_result.suspended_call.clone() {
968                    let state = run_ctx
969                        .snapshot()
970                        .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
971                    let mut merged = run_ctx.suspended_calls();
972                    merged.insert(
973                        next_suspended_call.call_id.clone(),
974                        next_suspended_call.clone(),
975                    );
976                    let patch = set_agent_suspended_calls(
977                        &state,
978                        merged.into_values().collect::<Vec<_>>(),
979                    )?;
980                    if !patch.patch().is_empty() {
981                        state_changed = true;
982                        run_ctx.add_thread_patch(patch);
983                    }
984                    for event in pending_tool_events(&next_suspended_call) {
985                        events.push(event);
986                    }
987                    if next_suspended_call.call_id != call_id {
988                        suspended_to_clear.push(call_id);
989                    }
990                } else {
991                    suspended_to_clear.push(call_id);
992                }
993            }
994        }
995    }
996
997    if !suspended_to_clear.is_empty() {
998        let mut unique = suspended_to_clear;
999        unique.sort();
1000        unique.dedup();
1001        for call_id in &unique {
1002            let state = run_ctx
1003                .snapshot()
1004                .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1005            let clear_patch = clear_suspended_call(&state, call_id)?;
1006            if !clear_patch.patch().is_empty() {
1007                state_changed = true;
1008                run_ctx.add_thread_patch(clear_patch);
1009            }
1010        }
1011    }
1012
1013    if state_changed {
1014        let snapshot = run_ctx
1015            .snapshot()
1016            .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1017        events.push(AgentEvent::StateSnapshot { snapshot });
1018    }
1019
1020    Ok(RunStartDrainOutcome { events, replayed })
1021}
1022
1023async fn drain_run_start_resume_replay(
1024    run_ctx: &mut RunContext,
1025    tools: &HashMap<String, Arc<dyn Tool>>,
1026    config: &AgentConfig,
1027    tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
1028) -> Result<RunStartDrainOutcome, AgentLoopError> {
1029    drain_resuming_tool_calls_and_replay(run_ctx, tools, config, tool_descriptors).await
1030}
1031
1032async fn commit_run_start_and_drain_replay(
1033    run_ctx: &mut RunContext,
1034    tools: &HashMap<String, Arc<dyn Tool>>,
1035    config: &AgentConfig,
1036    active_tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
1037    pending_delta_commit: &PendingDeltaCommitContext<'_>,
1038) -> Result<RunStartDrainOutcome, AgentLoopError> {
1039    pending_delta_commit
1040        .commit(run_ctx, CheckpointReason::UserMessage, false)
1041        .await?;
1042
1043    let run_start_drain =
1044        drain_run_start_resume_replay(run_ctx, tools, config, active_tool_descriptors).await?;
1045
1046    if run_start_drain.replayed {
1047        pending_delta_commit
1048            .commit(run_ctx, CheckpointReason::ToolResultsCommitted, false)
1049            .await?;
1050    }
1051
1052    Ok(run_start_drain)
1053}
1054
1055fn normalize_decision_tool_result(
1056    response: &serde_json::Value,
1057    fallback_arguments: &serde_json::Value,
1058) -> serde_json::Value {
1059    match response {
1060        serde_json::Value::Bool(_) => fallback_arguments.clone(),
1061        value => value.clone(),
1062    }
1063}
1064
1065fn denied_tool_result_for_call(
1066    run_ctx: &RunContext,
1067    call_id: &str,
1068    fallback_tool_name: Option<&str>,
1069    decision_reason: Option<&str>,
1070) -> ToolResult {
1071    let tool_name = fallback_tool_name
1072        .filter(|name| !name.is_empty())
1073        .map(str::to_string)
1074        .or_else(|| find_tool_call_in_messages(run_ctx, call_id).map(|call| call.name))
1075        .unwrap_or_else(|| "tool".to_string());
1076    let reason = decision_reason
1077        .map(str::to_string)
1078        .filter(|reason| !reason.trim().is_empty())
1079        .unwrap_or_else(|| "User denied the action".to_string());
1080    ToolResult::error(tool_name, reason)
1081}
1082
1083fn append_denied_tool_result_message(
1084    run_ctx: &mut RunContext,
1085    call_id: &str,
1086    fallback_tool_name: Option<&str>,
1087    decision_reason: Option<&str>,
1088) -> AgentEvent {
1089    let denied_result =
1090        denied_tool_result_for_call(run_ctx, call_id, fallback_tool_name, decision_reason);
1091    let message_id = gen_message_id();
1092    let denied_message = tool_response(call_id, &denied_result).with_id(message_id.clone());
1093    run_ctx.add_message(Arc::new(denied_message));
1094    AgentEvent::ToolCallDone {
1095        id: call_id.to_string(),
1096        result: denied_result,
1097        patch: None,
1098        message_id,
1099    }
1100}
1101
1102fn find_tool_call_in_messages(run_ctx: &RunContext, call_id: &str) -> Option<ToolCall> {
1103    run_ctx.messages().iter().rev().find_map(|message| {
1104        message
1105            .tool_calls
1106            .as_ref()
1107            .and_then(|calls| calls.iter().find(|call| call.id == call_id).cloned())
1108    })
1109}
1110
1111fn replay_tool_call_for_resolution(
1112    _run_ctx: &RunContext,
1113    suspended_call: &SuspendedCall,
1114    decision: &ToolCallDecision,
1115) -> Option<ToolCall> {
1116    if matches!(decision.resume.action, ResumeDecisionAction::Cancel) {
1117        return None;
1118    }
1119
1120    match suspended_call.ticket.resume_mode {
1121        ToolCallResumeMode::ReplayToolCall => Some(ToolCall::new(
1122            suspended_call.call_id.clone(),
1123            suspended_call.tool_name.clone(),
1124            suspended_call.arguments.clone(),
1125        )),
1126        ToolCallResumeMode::UseDecisionAsToolResult | ToolCallResumeMode::PassDecisionToTool => {
1127            Some(ToolCall::new(
1128                suspended_call.call_id.clone(),
1129                suspended_call.tool_name.clone(),
1130                normalize_decision_tool_result(&decision.resume.result, &suspended_call.arguments),
1131            ))
1132        }
1133    }
1134}
1135
1136fn upsert_tool_call_lifecycle_state(
1137    run_ctx: &mut RunContext,
1138    suspended_call: &SuspendedCall,
1139    status: ToolCallStatus,
1140    resume: Option<ToolCallResume>,
1141) -> Result<bool, AgentLoopError> {
1142    let current_state = tool_call_states_from_ctx(run_ctx).remove(&suspended_call.call_id);
1143    let Some(tool_state) = transition_tool_call_state(
1144        current_state,
1145        ToolCallStateSeed {
1146            call_id: &suspended_call.call_id,
1147            tool_name: &suspended_call.tool_name,
1148            arguments: &suspended_call.arguments,
1149            status: ToolCallStatus::Suspended,
1150            resume_token: Some(suspended_call.ticket.pending.id.clone()),
1151        },
1152        ToolCallStateTransition {
1153            status,
1154            resume_token: Some(suspended_call.ticket.pending.id.clone()),
1155            resume,
1156            updated_at: current_unix_millis(),
1157        },
1158    ) else {
1159        return Ok(false);
1160    };
1161
1162    let patch = upsert_tool_call_state(&suspended_call.call_id, tool_state)?;
1163    if patch.patch().is_empty() {
1164        return Ok(false);
1165    }
1166    run_ctx.add_thread_patch(patch);
1167    Ok(true)
1168}
1169
1170pub(super) fn resolve_suspended_call(
1171    run_ctx: &mut RunContext,
1172    response: &ToolCallDecision,
1173) -> Result<Option<DecisionReplayOutcome>, AgentLoopError> {
1174    let suspended_calls = suspended_calls_from_ctx(run_ctx);
1175    if suspended_calls.is_empty() {
1176        return Ok(None);
1177    }
1178
1179    let suspended_call = suspended_calls
1180        .get(&response.target_id)
1181        .cloned()
1182        .or_else(|| {
1183            suspended_calls
1184                .values()
1185                .find(|call| {
1186                    call.ticket.suspension.id == response.target_id
1187                        || call.ticket.pending.id == response.target_id
1188                        || call.call_id == response.target_id
1189                })
1190                .cloned()
1191        });
1192    let Some(suspended_call) = suspended_call else {
1193        return Ok(None);
1194    };
1195
1196    let _ = upsert_tool_call_lifecycle_state(
1197        run_ctx,
1198        &suspended_call,
1199        ToolCallStatus::Resuming,
1200        Some(response.resume.clone()),
1201    )?;
1202
1203    Ok(Some(DecisionReplayOutcome {
1204        events: Vec::new(),
1205        resolved_call_ids: vec![suspended_call.call_id],
1206    }))
1207}
1208
1209pub(super) fn drain_decision_channel(
1210    run_ctx: &mut RunContext,
1211    decision_rx: &mut Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1212    pending_decisions: &mut VecDeque<ToolCallDecision>,
1213) -> Result<DecisionReplayOutcome, AgentLoopError> {
1214    let mut disconnected = false;
1215    if let Some(rx) = decision_rx.as_mut() {
1216        loop {
1217            match rx.try_recv() {
1218                Ok(response) => pending_decisions.push_back(response),
1219                Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
1220                Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
1221                    disconnected = true;
1222                    break;
1223                }
1224            }
1225        }
1226    }
1227    if disconnected {
1228        *decision_rx = None;
1229    }
1230
1231    if pending_decisions.is_empty() {
1232        return Ok(DecisionReplayOutcome {
1233            events: Vec::new(),
1234            resolved_call_ids: Vec::new(),
1235        });
1236    }
1237
1238    let mut unresolved = VecDeque::new();
1239    let mut events = Vec::new();
1240    let mut resolved_call_ids = Vec::new();
1241    let mut seen = HashSet::new();
1242
1243    while let Some(response) = pending_decisions.pop_front() {
1244        if let Some(outcome) = resolve_suspended_call(run_ctx, &response)? {
1245            for call_id in outcome.resolved_call_ids {
1246                if seen.insert(call_id.clone()) {
1247                    resolved_call_ids.push(call_id);
1248                }
1249            }
1250            events.extend(outcome.events);
1251        } else {
1252            unresolved.push_back(response);
1253        }
1254    }
1255    *pending_decisions = unresolved;
1256
1257    Ok(DecisionReplayOutcome {
1258        events,
1259        resolved_call_ids,
1260    })
1261}
1262
1263async fn replay_after_decisions(
1264    run_ctx: &mut RunContext,
1265    decisions_applied: bool,
1266    step_tool_provider: &Arc<dyn StepToolProvider>,
1267    config: &AgentConfig,
1268    active_tool_descriptors: &mut Vec<crate::contracts::runtime::tool_call::ToolDescriptor>,
1269    pending_delta_commit: &PendingDeltaCommitContext<'_>,
1270) -> Result<Vec<AgentEvent>, AgentLoopError> {
1271    if !decisions_applied {
1272        return Ok(Vec::new());
1273    }
1274
1275    let decision_tools = resolve_step_tool_snapshot(step_tool_provider, run_ctx).await?;
1276    *active_tool_descriptors = decision_tools.descriptors.clone();
1277
1278    let decision_drain = drain_run_start_resume_replay(
1279        run_ctx,
1280        &decision_tools.tools,
1281        config,
1282        active_tool_descriptors,
1283    )
1284    .await?;
1285
1286    pending_delta_commit
1287        .commit(run_ctx, CheckpointReason::ToolResultsCommitted, false)
1288        .await?;
1289
1290    Ok(decision_drain.events)
1291}
1292
1293async fn apply_decisions_and_replay(
1294    run_ctx: &mut RunContext,
1295    decision_rx: &mut Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1296    pending_decisions: &mut VecDeque<ToolCallDecision>,
1297    step_tool_provider: &Arc<dyn StepToolProvider>,
1298    config: &AgentConfig,
1299    active_tool_descriptors: &mut Vec<crate::contracts::runtime::tool_call::ToolDescriptor>,
1300    pending_delta_commit: &PendingDeltaCommitContext<'_>,
1301) -> Result<Vec<AgentEvent>, AgentLoopError> {
1302    Ok(drain_and_replay_decisions(
1303        run_ctx,
1304        decision_rx,
1305        pending_decisions,
1306        None,
1307        step_tool_provider,
1308        config,
1309        active_tool_descriptors,
1310        pending_delta_commit,
1311    )
1312    .await?
1313    .events)
1314}
1315
1316pub(super) struct DecisionReplayOutcome {
1317    events: Vec<AgentEvent>,
1318    resolved_call_ids: Vec<String>,
1319}
1320
1321async fn drain_and_replay_decisions(
1322    run_ctx: &mut RunContext,
1323    decision_rx: &mut Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1324    pending_decisions: &mut VecDeque<ToolCallDecision>,
1325    decision: Option<ToolCallDecision>,
1326    step_tool_provider: &Arc<dyn StepToolProvider>,
1327    config: &AgentConfig,
1328    active_tool_descriptors: &mut Vec<crate::contracts::runtime::tool_call::ToolDescriptor>,
1329    pending_delta_commit: &PendingDeltaCommitContext<'_>,
1330) -> Result<DecisionReplayOutcome, AgentLoopError> {
1331    if let Some(decision) = decision {
1332        pending_decisions.push_back(decision);
1333    }
1334    let decision_drain = drain_decision_channel(run_ctx, decision_rx, pending_decisions)?;
1335    let mut events = decision_drain.events;
1336    let replay_events = replay_after_decisions(
1337        run_ctx,
1338        !decision_drain.resolved_call_ids.is_empty(),
1339        step_tool_provider,
1340        config,
1341        active_tool_descriptors,
1342        pending_delta_commit,
1343    )
1344    .await?;
1345    events.extend(replay_events);
1346
1347    Ok(DecisionReplayOutcome {
1348        events,
1349        resolved_call_ids: decision_drain.resolved_call_ids,
1350    })
1351}
1352
1353async fn apply_decision_and_replay(
1354    run_ctx: &mut RunContext,
1355    response: ToolCallDecision,
1356    decision_rx: &mut Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1357    pending_decisions: &mut VecDeque<ToolCallDecision>,
1358    step_tool_provider: &Arc<dyn StepToolProvider>,
1359    config: &AgentConfig,
1360    active_tool_descriptors: &mut Vec<crate::contracts::runtime::tool_call::ToolDescriptor>,
1361    pending_delta_commit: &PendingDeltaCommitContext<'_>,
1362) -> Result<DecisionReplayOutcome, AgentLoopError> {
1363    drain_and_replay_decisions(
1364        run_ctx,
1365        decision_rx,
1366        pending_decisions,
1367        Some(response),
1368        step_tool_provider,
1369        config,
1370        active_tool_descriptors,
1371        pending_delta_commit,
1372    )
1373    .await
1374}
1375
1376async fn recv_decision(
1377    decision_rx: &mut Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1378) -> Option<ToolCallDecision> {
1379    let rx = decision_rx.as_mut()?;
1380    rx.recv().await
1381}
1382
1383/// Run the full agent loop until completion, suspension, cancellation, or error.
1384///
1385/// This is the primary non-streaming entry point. Tools are passed directly
1386/// and used as the default tool set unless `config.step_tool_provider` is set
1387/// (for dynamic per-step tool resolution).
1388pub async fn run_loop(
1389    config: &AgentConfig,
1390    tools: HashMap<String, Arc<dyn Tool>>,
1391    mut run_ctx: RunContext,
1392    cancellation_token: Option<RunCancellationToken>,
1393    state_committer: Option<Arc<dyn StateCommitter>>,
1394    mut decision_rx: Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1395) -> LoopOutcome {
1396    let executor = llm_executor_for_run(config);
1397    let mut run_state = RunState::new();
1398    let mut pending_decisions = VecDeque::new();
1399    let run_cancellation_token = cancellation_token;
1400    let mut last_text = String::new();
1401    let step_tool_provider = step_tool_provider_for_run(config, tools);
1402    let run_identity = stream_core::resolve_stream_run_identity(&mut run_ctx);
1403    let run_id = run_identity.run_id;
1404    let parent_run_id = run_identity.parent_run_id;
1405    let baseline_suspended_call_ids = suspended_call_ids(&run_ctx);
1406    let pending_delta_commit =
1407        PendingDeltaCommitContext::new(&run_id, parent_run_id.as_deref(), state_committer.as_ref());
1408    let initial_step_tools = match resolve_step_tool_snapshot(&step_tool_provider, &run_ctx).await {
1409        Ok(snapshot) => snapshot,
1410        Err(error) => {
1411            return build_loop_outcome(
1412                run_ctx,
1413                TerminationReason::Error,
1414                None,
1415                &run_state,
1416                Some(outcome::LoopFailure::State(error.to_string())),
1417            );
1418        }
1419    };
1420    let StepToolSnapshot {
1421        tools: initial_tools,
1422        descriptors: initial_descriptors,
1423    } = initial_step_tools;
1424    let mut active_tool_descriptors = initial_descriptors;
1425
1426    macro_rules! terminate_run {
1427        ($termination:expr, $response:expr, $failure:expr) => {{
1428            let reason: TerminationReason = $termination;
1429            let (final_termination, final_response) =
1430                normalize_termination_for_suspended_calls(&run_ctx, reason, $response);
1431            if let Err(error) = persist_run_termination(
1432                &mut run_ctx,
1433                &final_termination,
1434                &active_tool_descriptors,
1435                &config.plugins,
1436                &pending_delta_commit,
1437                RunFinishedCommitPolicy::Required,
1438            )
1439            .await
1440            {
1441                return build_loop_outcome(
1442                    run_ctx,
1443                    TerminationReason::Error,
1444                    None,
1445                    &run_state,
1446                    Some(outcome::LoopFailure::State(error.to_string())),
1447                );
1448            }
1449            return build_loop_outcome(
1450                run_ctx,
1451                final_termination,
1452                final_response,
1453                &run_state,
1454                $failure,
1455            );
1456        }};
1457    }
1458
1459    // Phase: RunStart
1460    let pending = match emit_phase_block(
1461        Phase::RunStart,
1462        &run_ctx,
1463        &active_tool_descriptors,
1464        &config.plugins,
1465        |_| {},
1466    )
1467    .await
1468    {
1469        Ok(pending) => pending,
1470        Err(error) => {
1471            terminate_run!(
1472                TerminationReason::Error,
1473                None,
1474                Some(outcome::LoopFailure::State(error.to_string()))
1475            );
1476        }
1477    };
1478    run_ctx.add_thread_patches(pending);
1479    if let Err(error) = commit_run_start_and_drain_replay(
1480        &mut run_ctx,
1481        &initial_tools,
1482        config,
1483        &active_tool_descriptors,
1484        &pending_delta_commit,
1485    )
1486    .await
1487    {
1488        terminate_run!(
1489            TerminationReason::Error,
1490            None,
1491            Some(outcome::LoopFailure::State(error.to_string()))
1492        );
1493    }
1494    let run_start_new_suspended = newly_suspended_call_ids(&run_ctx, &baseline_suspended_call_ids);
1495    if !run_start_new_suspended.is_empty() {
1496        terminate_run!(TerminationReason::Suspended, None, None);
1497    }
1498    loop {
1499        if let Err(error) = apply_decisions_and_replay(
1500            &mut run_ctx,
1501            &mut decision_rx,
1502            &mut pending_decisions,
1503            &step_tool_provider,
1504            config,
1505            &mut active_tool_descriptors,
1506            &pending_delta_commit,
1507        )
1508        .await
1509        {
1510            terminate_run!(
1511                TerminationReason::Error,
1512                None,
1513                Some(outcome::LoopFailure::State(error.to_string()))
1514            );
1515        }
1516
1517        if is_run_cancelled(run_cancellation_token.as_ref()) {
1518            terminate_run!(TerminationReason::Cancelled, None, None);
1519        }
1520
1521        let step_tools = match resolve_step_tool_snapshot(&step_tool_provider, &run_ctx).await {
1522            Ok(snapshot) => snapshot,
1523            Err(e) => {
1524                terminate_run!(
1525                    TerminationReason::Error,
1526                    None,
1527                    Some(outcome::LoopFailure::State(e.to_string()))
1528                );
1529            }
1530        };
1531        active_tool_descriptors = step_tools.descriptors.clone();
1532
1533        let prepared =
1534            match prepare_step_execution(&run_ctx, &active_tool_descriptors, config).await {
1535                Ok(v) => v,
1536                Err(e) => {
1537                    terminate_run!(
1538                        TerminationReason::Error,
1539                        None,
1540                        Some(outcome::LoopFailure::State(e.to_string()))
1541                    );
1542                }
1543            };
1544        run_ctx.add_thread_patches(prepared.pending_patches);
1545
1546        match prepared.run_action {
1547            RunAction::Continue => {}
1548            RunAction::Terminate(reason) => {
1549                let response = if matches!(reason, TerminationReason::PluginRequested) {
1550                    Some(last_text.clone())
1551                } else {
1552                    None
1553                };
1554                terminate_run!(reason, response, None);
1555            }
1556        }
1557
1558        // Call LLM with unified retry + fallback model strategy.
1559        let messages = prepared.messages;
1560        let filtered_tools = prepared.filtered_tools;
1561        let attempt_outcome = run_llm_with_retry_and_fallback(
1562            config,
1563            run_cancellation_token.as_ref(),
1564            true,
1565            "unknown llm error",
1566            |model| {
1567                let request =
1568                    build_request_for_filtered_tools(&messages, &step_tools.tools, &filtered_tools);
1569                let executor = executor.clone();
1570                async move {
1571                    executor
1572                        .exec_chat_response(&model, request, config.chat_options.as_ref())
1573                        .await
1574                }
1575            },
1576        )
1577        .await;
1578
1579        let response = match attempt_outcome {
1580            LlmAttemptOutcome::Success {
1581                value, attempts, ..
1582            } => {
1583                run_state.record_llm_attempts(attempts);
1584                value
1585            }
1586            LlmAttemptOutcome::Cancelled => {
1587                append_cancellation_user_message(&mut run_ctx, CancellationStage::Inference);
1588                terminate_run!(TerminationReason::Cancelled, None, None);
1589            }
1590            LlmAttemptOutcome::Exhausted {
1591                last_error,
1592                attempts,
1593            } => {
1594                run_state.record_llm_attempts(attempts);
1595                if let Err(phase_error) = apply_llm_error_cleanup(
1596                    &mut run_ctx,
1597                    &active_tool_descriptors,
1598                    &config.plugins,
1599                    "llm_exec_error",
1600                    last_error.clone(),
1601                )
1602                .await
1603                {
1604                    terminate_run!(
1605                        TerminationReason::Error,
1606                        None,
1607                        Some(outcome::LoopFailure::State(phase_error.to_string()))
1608                    );
1609                }
1610                terminate_run!(
1611                    TerminationReason::Error,
1612                    None,
1613                    Some(outcome::LoopFailure::Llm(last_error))
1614                );
1615            }
1616        };
1617
1618        let result = stream_result_from_chat_response(&response);
1619        run_state.update_from_response(&result);
1620        last_text = result.text.clone();
1621
1622        // Add assistant message
1623        let assistant_msg_id = gen_message_id();
1624        let step_meta = step_metadata(Some(run_id.clone()), run_state.completed_steps as u32);
1625        let post_inference_termination = match complete_step_after_inference(
1626            &mut run_ctx,
1627            &result,
1628            step_meta.clone(),
1629            Some(assistant_msg_id.clone()),
1630            &active_tool_descriptors,
1631            &config.plugins,
1632        )
1633        .await
1634        {
1635            Ok(reason) => reason,
1636            Err(e) => {
1637                terminate_run!(
1638                    TerminationReason::Error,
1639                    None,
1640                    Some(outcome::LoopFailure::State(e.to_string()))
1641                );
1642            }
1643        };
1644        if let Err(error) = pending_delta_commit
1645            .commit(
1646                &mut run_ctx,
1647                CheckpointReason::AssistantTurnCommitted,
1648                false,
1649            )
1650            .await
1651        {
1652            terminate_run!(
1653                TerminationReason::Error,
1654                None,
1655                Some(outcome::LoopFailure::State(error.to_string()))
1656            );
1657        }
1658
1659        mark_step_completed(&mut run_state);
1660
1661        // Only `Stopped` termination is deferred past tool execution so the
1662        // current round's tools complete (e.g. MaxRounds lets tools finish).
1663        // All other reasons terminate immediately before tool execution.
1664        if let Some(reason) = &post_inference_termination {
1665            if !matches!(reason, TerminationReason::Stopped(_)) {
1666                terminate_run!(reason.clone(), Some(last_text.clone()), None);
1667            }
1668        }
1669
1670        if !result.needs_tools() {
1671            run_state.record_step_without_tools();
1672            let reason = post_inference_termination.unwrap_or(TerminationReason::NaturalEnd);
1673            terminate_run!(reason, Some(last_text.clone()), None);
1674        }
1675
1676        // Execute tools with phase hooks using configured execution strategy.
1677        let tool_context = match prepare_tool_execution_context(&run_ctx, Some(config)) {
1678            Ok(ctx) => ctx,
1679            Err(e) => {
1680                terminate_run!(
1681                    TerminationReason::Error,
1682                    None,
1683                    Some(outcome::LoopFailure::State(e.to_string()))
1684                );
1685            }
1686        };
1687        let thread_messages_for_tools = run_ctx.messages().to_vec();
1688        let thread_version_for_tools = run_ctx.version();
1689
1690        let tool_exec_future = config.tool_executor.execute(ToolExecutionRequest {
1691            tools: &step_tools.tools,
1692            calls: &result.tool_calls,
1693            state: &tool_context.state,
1694            tool_descriptors: &active_tool_descriptors,
1695            plugins: &config.plugins,
1696            activity_manager: tirea_contract::runtime::activity::NoOpActivityManager::arc(),
1697            run_config: &tool_context.run_config,
1698            thread_id: run_ctx.thread_id(),
1699            thread_messages: &thread_messages_for_tools,
1700            state_version: thread_version_for_tools,
1701            cancellation_token: run_cancellation_token.as_ref(),
1702        });
1703        let results = tool_exec_future.await.map_err(AgentLoopError::from);
1704
1705        let results = match results {
1706            Ok(r) => r,
1707            Err(AgentLoopError::Cancelled) => {
1708                append_cancellation_user_message(&mut run_ctx, CancellationStage::ToolExecution);
1709                terminate_run!(TerminationReason::Cancelled, None, None);
1710            }
1711            Err(e) => {
1712                terminate_run!(
1713                    TerminationReason::Error,
1714                    None,
1715                    Some(outcome::LoopFailure::State(e.to_string()))
1716                );
1717            }
1718        };
1719
1720        if let Err(_e) = apply_tool_results_to_session(
1721            &mut run_ctx,
1722            &results,
1723            Some(step_meta),
1724            config
1725                .tool_executor
1726                .requires_parallel_patch_conflict_check(),
1727        ) {
1728            // On error, we can't easily rollback RunContext, so just terminate
1729            terminate_run!(
1730                TerminationReason::Error,
1731                None,
1732                Some(outcome::LoopFailure::State(_e.to_string()))
1733            );
1734        }
1735        if let Err(error) = pending_delta_commit
1736            .commit(&mut run_ctx, CheckpointReason::ToolResultsCommitted, false)
1737            .await
1738        {
1739            terminate_run!(
1740                TerminationReason::Error,
1741                None,
1742                Some(outcome::LoopFailure::State(error.to_string()))
1743            );
1744        }
1745
1746        if let Err(error) = apply_decisions_and_replay(
1747            &mut run_ctx,
1748            &mut decision_rx,
1749            &mut pending_decisions,
1750            &step_tool_provider,
1751            config,
1752            &mut active_tool_descriptors,
1753            &pending_delta_commit,
1754        )
1755        .await
1756        {
1757            terminate_run!(
1758                TerminationReason::Error,
1759                None,
1760                Some(outcome::LoopFailure::State(error.to_string()))
1761            );
1762        }
1763
1764        // If ALL tools are suspended (no completed results), terminate immediately.
1765        if has_suspended_calls(&run_ctx) {
1766            let has_completed = results
1767                .iter()
1768                .any(|r| !matches!(r.outcome, crate::contracts::ToolCallOutcome::Suspended));
1769            if !has_completed {
1770                terminate_run!(TerminationReason::Suspended, None, None);
1771            }
1772        }
1773
1774        // Deferred post-inference termination: tools from the current round
1775        // have completed; stop the loop before the next inference.
1776        if let Some(reason) = post_inference_termination {
1777            terminate_run!(reason, Some(last_text.clone()), None);
1778        }
1779
1780        // Track tool-step metrics for loop stats and plugin consumers.
1781        let error_count = results
1782            .iter()
1783            .filter(|r| r.execution.result.is_error())
1784            .count();
1785        run_state.record_tool_step(&result.tool_calls, error_count);
1786    }
1787}
1788
1789/// Run the agent loop with streaming output.
1790///
1791/// Returns a stream of AgentEvent for real-time updates. Tools are passed
1792/// directly and used as the default tool set unless `config.step_tool_provider`
1793/// is set (for dynamic per-step tool resolution).
1794pub fn run_loop_stream(
1795    config: AgentConfig,
1796    tools: HashMap<String, Arc<dyn Tool>>,
1797    run_ctx: RunContext,
1798    cancellation_token: Option<RunCancellationToken>,
1799    state_committer: Option<Arc<dyn StateCommitter>>,
1800    decision_rx: Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1801) -> Pin<Box<dyn Stream<Item = AgentEvent> + Send>> {
1802    stream_runner::run_stream(
1803        config,
1804        tools,
1805        run_ctx,
1806        cancellation_token,
1807        state_committer,
1808        decision_rx,
1809    )
1810}
1811
1812#[cfg(test)]
1813mod tests;