1mod config;
42mod core;
43mod event_envelope_meta;
44mod outcome;
45mod plugin_runtime;
46mod run_state;
47mod state_commit;
48mod stream_core;
49mod stream_runner;
50mod tool_exec;
51
52use crate::contracts::runtime::plugin::phase::Phase;
53use crate::contracts::runtime::ActivityManager;
54use crate::contracts::io::ResumeDecisionAction;
55use crate::contracts::runtime::{
56 state_paths::RUN_LIFECYCLE_STATE_PATH, DecisionReplayPolicy, StreamResult,
57 SuspendedCall, ToolCallResume, ToolCallResumeMode, ToolCallStatus, ToolExecutionRequest,
58 ToolExecutionResult,
59};
60use crate::contracts::thread::CheckpointReason;
61use crate::contracts::thread::{gen_message_id, Message, MessageMetadata, ToolCall};
62use crate::contracts::runtime::tool_call::{Tool, ToolResult};
63use crate::contracts::RunContext;
64use crate::contracts::{AgentEvent, RunAction, TerminationReason, ToolCallDecision};
65use crate::engine::convert::{assistant_message, assistant_tool_calls, tool_response};
66use crate::runtime::activity::ActivityHub;
67
68use crate::runtime::streaming::StreamCollector;
69use async_stream::stream;
70use futures::{Stream, StreamExt};
71use genai::Client;
72use serde_json::Value;
73use std::collections::{HashMap, HashSet, VecDeque};
74use std::future::Future;
75use std::pin::Pin;
76use std::sync::Arc;
77use uuid::Uuid;
78
79#[cfg(test)]
80use crate::contracts::runtime::plugin::AgentPlugin;
81pub use crate::contracts::runtime::ToolExecutor;
82pub use crate::runtime::run_context::{
83 await_or_cancel, is_cancelled, CancelAware, RunCancellationToken, StateCommitError,
84 StateCommitter, TOOL_SCOPE_CALLER_AGENT_ID_KEY, TOOL_SCOPE_CALLER_MESSAGES_KEY,
85 TOOL_SCOPE_CALLER_STATE_KEY, TOOL_SCOPE_CALLER_THREAD_ID_KEY,
86};
87use config::StaticStepToolProvider;
88pub use config::{AgentConfig, GenaiLlmExecutor, LlmRetryPolicy};
89pub use config::{LlmEventStream, LlmExecutor};
90pub use config::{StepToolInput, StepToolProvider, StepToolSnapshot};
91#[cfg(test)]
92use core::build_messages;
93use core::{
94 build_request_for_filtered_tools, clear_suspended_call, inference_inputs_from_step,
95 set_agent_suspended_calls, suspended_calls_from_ctx, tool_call_states_from_ctx,
96 transition_tool_call_state, upsert_tool_call_state, ToolCallStateSeed, ToolCallStateTransition,
97};
98pub use outcome::{tool_map, tool_map_from_arc, AgentLoopError};
99pub use outcome::{LoopOutcome, LoopStats, LoopUsage};
100#[cfg(test)]
101use plugin_runtime::emit_phase_checked;
102use plugin_runtime::{
103 emit_cleanup_phases_and_apply, emit_phase_block, emit_run_end_phase, run_phase_block,
104};
105use run_state::RunState;
106pub use state_commit::ChannelStateCommitter;
107use state_commit::PendingDeltaCommitContext;
108use std::time::{SystemTime, UNIX_EPOCH};
109use tirea_state::{Op, Patch, Path, TrackedPatch};
110#[cfg(test)]
111use tokio_util::sync::CancellationToken;
112#[cfg(test)]
113use tool_exec::execute_tools_parallel_with_phases;
114pub use tool_exec::ExecuteToolsOutcome;
115use tool_exec::{
116 apply_tool_results_impl, apply_tool_results_to_session, execute_single_tool_with_phases,
117 scope_with_tool_caller_context, step_metadata, ToolPhaseContext,
118};
119pub use tool_exec::{
120 execute_tools, execute_tools_with_config, execute_tools_with_plugins,
121 execute_tools_with_plugins_and_executor, ParallelToolExecutionMode, ParallelToolExecutor,
122 SequentialToolExecutor,
123};
124
125pub struct ResolvedRun {
131 pub config: AgentConfig,
133 pub tools: HashMap<String, Arc<dyn Tool>>,
135 pub run_config: crate::contracts::RunConfig,
137}
138
139impl ResolvedRun {
140 #[must_use]
142 pub fn with_tool(mut self, id: String, tool: Arc<dyn Tool>) -> Self {
143 self.tools.insert(id, tool);
144 self
145 }
146
147 #[must_use]
149 pub fn with_plugin(mut self, plugin: Arc<dyn crate::contracts::runtime::plugin::AgentPlugin>) -> Self {
150 self.config.plugins.push(plugin);
151 self
152 }
153
154 pub fn overlay_tools(&mut self, tools: HashMap<String, Arc<dyn Tool>>) {
156 for (id, tool) in tools {
157 self.tools.entry(id).or_insert(tool);
158 }
159 }
160}
161
162fn uuid_v7() -> String {
163 Uuid::now_v7().simple().to_string()
164}
165
166pub(crate) fn current_unix_millis() -> u64 {
167 SystemTime::now()
168 .duration_since(UNIX_EPOCH)
169 .map_or(0, |d| d.as_millis().min(u128::from(u64::MAX)) as u64)
170}
171
172pub(super) fn sync_run_lifecycle_for_termination(
173 run_ctx: &mut RunContext,
174 termination: &TerminationReason,
175) -> Result<(), AgentLoopError> {
176 let run_id = run_ctx
177 .run_config
178 .value("run_id")
179 .and_then(Value::as_str)
180 .map(str::trim)
181 .filter(|id| !id.is_empty());
182 let Some(run_id) = run_id else {
183 return Ok(());
184 };
185
186 let (status, done_reason) = termination.to_run_status();
187
188 let patch = TrackedPatch::new(Patch::with_ops(vec![Op::set(
189 Path::root().key(RUN_LIFECYCLE_STATE_PATH),
190 serde_json::json!({
191 "id": run_id,
192 "status": status,
193 "done_reason": done_reason,
194 "updated_at": current_unix_millis(),
195 }),
196 )]))
197 .with_source("agent_loop");
198
199 run_ctx.add_thread_patch(patch);
200 Ok(())
201}
202
203#[derive(Debug, Clone, Copy, PartialEq, Eq)]
204pub(super) enum CancellationStage {
205 Inference,
206 ToolExecution,
207}
208
209pub(super) const CANCELLATION_INFERENCE_USER_MESSAGE: &str =
210 "The previous run was interrupted during inference. Please continue from the current context.";
211pub(super) const CANCELLATION_TOOL_USER_MESSAGE: &str =
212 "The previous run was interrupted while using tools. Please continue from the current context.";
213
214pub(super) fn append_cancellation_user_message(run_ctx: &mut RunContext, stage: CancellationStage) {
215 let content = match stage {
216 CancellationStage::Inference => CANCELLATION_INFERENCE_USER_MESSAGE,
217 CancellationStage::ToolExecution => CANCELLATION_TOOL_USER_MESSAGE,
218 };
219 run_ctx.add_message(Arc::new(Message::user(content)));
220}
221
222pub(super) fn effective_llm_models(config: &AgentConfig) -> Vec<String> {
223 let mut models = Vec::with_capacity(1 + config.fallback_models.len());
224 models.push(config.model.clone());
225 for model in &config.fallback_models {
226 if model.trim().is_empty() {
227 continue;
228 }
229 if !models.iter().any(|m| m == model) {
230 models.push(model.clone());
231 }
232 }
233 models
234}
235
236pub(super) fn llm_retry_attempts(config: &AgentConfig) -> usize {
237 config.llm_retry_policy.max_attempts_per_model.max(1)
238}
239
240pub(super) fn is_retryable_llm_error(message: &str) -> bool {
241 let lower = message.to_ascii_lowercase();
242 let non_retryable = [
243 "401",
244 "403",
245 "404",
246 "400",
247 "422",
248 "unauthorized",
249 "forbidden",
250 "invalid api key",
251 "invalid_request",
252 "bad request",
253 ];
254 if non_retryable.iter().any(|p| lower.contains(p)) {
255 return false;
256 }
257 let retryable = [
258 "429",
259 "too many requests",
260 "rate limit",
261 "timeout",
262 "timed out",
263 "temporar",
264 "connection",
265 "network",
266 "unavailable",
267 "server error",
268 "502",
269 "503",
270 "504",
271 "reset by peer",
272 "eof",
273 ];
274 retryable.iter().any(|p| lower.contains(p))
275}
276
277pub(super) fn retry_backoff_ms(config: &AgentConfig, retry_index: usize) -> u64 {
278 let initial = config.llm_retry_policy.initial_backoff_ms;
279 let cap = config
280 .llm_retry_policy
281 .max_backoff_ms
282 .max(config.llm_retry_policy.initial_backoff_ms);
283 if retry_index == 0 {
284 return initial.min(cap);
285 }
286 let shift = (retry_index - 1).min(20) as u32;
287 let factor = 1u64.checked_shl(shift).unwrap_or(u64::MAX);
288 initial.saturating_mul(factor).min(cap)
289}
290
291pub(super) async fn wait_retry_backoff(
292 config: &AgentConfig,
293 retry_index: usize,
294 run_cancellation_token: Option<&RunCancellationToken>,
295) -> bool {
296 let wait_ms = retry_backoff_ms(config, retry_index);
297 match await_or_cancel(
298 run_cancellation_token,
299 tokio::time::sleep(std::time::Duration::from_millis(wait_ms)),
300 )
301 .await
302 {
303 CancelAware::Cancelled => true,
304 CancelAware::Value(_) => false,
305 }
306}
307
308pub(super) enum LlmAttemptOutcome<T> {
309 Success {
310 value: T,
311 model: String,
312 attempts: usize,
313 },
314 Cancelled,
315 Exhausted {
316 last_error: String,
317 attempts: usize,
318 },
319}
320
321fn is_run_cancelled(token: Option<&RunCancellationToken>) -> bool {
322 is_cancelled(token)
323}
324
325pub(super) fn step_tool_provider_for_run(
326 config: &AgentConfig,
327 tools: HashMap<String, Arc<dyn Tool>>,
328) -> Arc<dyn StepToolProvider> {
329 config.step_tool_provider.clone().unwrap_or_else(|| {
330 Arc::new(StaticStepToolProvider::new(tools)) as Arc<dyn StepToolProvider>
331 })
332}
333
334pub(super) fn llm_executor_for_run(config: &AgentConfig) -> Arc<dyn LlmExecutor> {
335 config
336 .llm_executor
337 .clone()
338 .unwrap_or_else(|| Arc::new(GenaiLlmExecutor::new(Client::default())))
339}
340
341pub(super) async fn resolve_step_tool_snapshot(
342 step_tool_provider: &Arc<dyn StepToolProvider>,
343 run_ctx: &RunContext,
344) -> Result<StepToolSnapshot, AgentLoopError> {
345 step_tool_provider
346 .provide(StepToolInput { state: run_ctx })
347 .await
348}
349
350fn mark_step_completed(run_state: &mut RunState) {
351 run_state.completed_steps += 1;
352}
353
354fn build_loop_outcome(
355 run_ctx: RunContext,
356 termination: TerminationReason,
357 response: Option<String>,
358 run_state: &RunState,
359 failure: Option<outcome::LoopFailure>,
360) -> LoopOutcome {
361 LoopOutcome {
362 run_ctx,
363 termination,
364 response: response.filter(|text| !text.is_empty()),
365 usage: run_state.usage(),
366 stats: run_state.stats(),
367 failure,
368 }
369}
370
371pub(super) async fn run_llm_with_retry_and_fallback<T, Invoke, Fut>(
372 config: &AgentConfig,
373 run_cancellation_token: Option<&RunCancellationToken>,
374 retry_current_model: bool,
375 unknown_error: &str,
376 mut invoke: Invoke,
377) -> LlmAttemptOutcome<T>
378where
379 Invoke: FnMut(String) -> Fut,
380 Fut: std::future::Future<Output = genai::Result<T>>,
381{
382 let mut last_llm_error = unknown_error.to_string();
383 let model_candidates = effective_llm_models(config);
384 let max_attempts = llm_retry_attempts(config);
385 let mut total_attempts = 0usize;
386
387 for model in model_candidates {
388 for attempt in 1..=max_attempts {
389 total_attempts = total_attempts.saturating_add(1);
390 let response_res =
391 match await_or_cancel(run_cancellation_token, invoke(model.clone())).await {
392 CancelAware::Cancelled => return LlmAttemptOutcome::Cancelled,
393 CancelAware::Value(resp) => resp,
394 };
395
396 match response_res {
397 Ok(value) => {
398 return LlmAttemptOutcome::Success {
399 value,
400 model,
401 attempts: total_attempts,
402 };
403 }
404 Err(e) => {
405 let message = e.to_string();
406 last_llm_error =
407 format!("model='{model}' attempt={attempt}/{max_attempts}: {message}");
408 let can_retry_same_model = retry_current_model
409 && attempt < max_attempts
410 && is_retryable_llm_error(&message);
411 if can_retry_same_model {
412 let cancelled =
413 wait_retry_backoff(config, attempt, run_cancellation_token).await;
414 if cancelled {
415 return LlmAttemptOutcome::Cancelled;
416 }
417 continue;
418 }
419 break;
420 }
421 }
422 }
423 }
424
425 LlmAttemptOutcome::Exhausted {
426 last_error: last_llm_error,
427 attempts: total_attempts,
428 }
429}
430
431pub(super) async fn run_step_prepare_phases(
432 run_ctx: &RunContext,
433 tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
434 config: &AgentConfig,
435) -> Result<
436 (
437 Vec<Message>,
438 Vec<String>,
439 RunAction,
440 Vec<TrackedPatch>,
441 ),
442 AgentLoopError,
443> {
444 let ((messages, filtered_tools, run_action), pending) = run_phase_block(
445 run_ctx,
446 tool_descriptors,
447 &config.plugins,
448 &[Phase::StepStart, Phase::BeforeInference],
449 |_| {},
450 |step| inference_inputs_from_step(step, &config.system_prompt),
451 )
452 .await?;
453 Ok((messages, filtered_tools, run_action, pending))
454}
455
456pub(super) struct PreparedStep {
457 pub(super) messages: Vec<Message>,
458 pub(super) filtered_tools: Vec<String>,
459 pub(super) run_action: RunAction,
460 pub(super) pending_patches: Vec<TrackedPatch>,
461}
462
463pub(super) async fn prepare_step_execution(
464 run_ctx: &RunContext,
465 tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
466 config: &AgentConfig,
467) -> Result<PreparedStep, AgentLoopError> {
468 let (messages, filtered_tools, run_action, pending) =
469 run_step_prepare_phases(run_ctx, tool_descriptors, config).await?;
470 Ok(PreparedStep {
471 messages,
472 filtered_tools,
473 run_action,
474 pending_patches: pending,
475 })
476}
477
478pub(super) async fn apply_llm_error_cleanup(
479 run_ctx: &mut RunContext,
480 tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
481 plugins: &[Arc<dyn crate::contracts::runtime::plugin::AgentPlugin>],
482 error_type: &'static str,
483 message: String,
484) -> Result<(), AgentLoopError> {
485 emit_cleanup_phases_and_apply(run_ctx, tool_descriptors, plugins, error_type, message).await
486}
487
488pub(super) async fn complete_step_after_inference(
489 run_ctx: &mut RunContext,
490 result: &StreamResult,
491 step_meta: MessageMetadata,
492 assistant_message_id: Option<String>,
493 tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
494 plugins: &[Arc<dyn crate::contracts::runtime::plugin::AgentPlugin>],
495) -> Result<Option<TerminationReason>, AgentLoopError> {
496 let (run_action, pending) = run_phase_block(
497 run_ctx,
498 tool_descriptors,
499 plugins,
500 &[Phase::AfterInference],
501 |step| {
502 step.response = Some(result.clone());
503 },
504 |step| step.run_action(),
505 )
506 .await?;
507 run_ctx.add_thread_patches(pending);
508
509 let assistant = assistant_turn_message(result, step_meta, assistant_message_id);
510 run_ctx.add_message(Arc::new(assistant));
511
512 let pending =
513 emit_phase_block(Phase::StepEnd, run_ctx, tool_descriptors, plugins, |_| {}).await?;
514 run_ctx.add_thread_patches(pending);
515 Ok(match run_action {
516 RunAction::Terminate(reason) => Some(reason),
517 RunAction::Continue => None,
518 })
519}
520
521pub(super) fn pending_tool_events(call: &SuspendedCall) -> Vec<AgentEvent> {
523 vec![
524 AgentEvent::ToolCallStart {
525 id: call.ticket.pending.id.clone(),
526 name: call.ticket.pending.name.clone(),
527 },
528 AgentEvent::ToolCallReady {
529 id: call.ticket.pending.id.clone(),
530 name: call.ticket.pending.name.clone(),
531 arguments: call.ticket.pending.arguments.clone(),
532 },
533 ]
534}
535
536pub(super) fn has_suspended_calls(run_ctx: &RunContext) -> bool {
537 !suspended_calls_from_ctx(run_ctx).is_empty()
538}
539
540pub(super) fn suspended_call_ids(run_ctx: &RunContext) -> HashSet<String> {
541 suspended_calls_from_ctx(run_ctx).into_keys().collect()
542}
543
544pub(super) fn newly_suspended_call_ids(
545 run_ctx: &RunContext,
546 baseline_ids: &HashSet<String>,
547) -> HashSet<String> {
548 suspended_calls_from_ctx(run_ctx)
549 .into_keys()
550 .filter(|id| !baseline_ids.contains(id))
551 .collect()
552}
553
554pub(super) fn suspended_call_pending_events(run_ctx: &RunContext) -> Vec<AgentEvent> {
555 let mut calls: Vec<SuspendedCall> = suspended_calls_from_ctx(run_ctx).into_values().collect();
556 calls.sort_by(|left, right| left.call_id.cmp(&right.call_id));
557 calls
558 .into_iter()
559 .flat_map(|call| pending_tool_events(&call))
560 .collect()
561}
562
563pub(super) fn suspended_call_pending_events_for_ids(
564 run_ctx: &RunContext,
565 call_ids: &HashSet<String>,
566) -> Vec<AgentEvent> {
567 if call_ids.is_empty() {
568 return Vec::new();
569 }
570 let mut calls: Vec<SuspendedCall> = suspended_calls_from_ctx(run_ctx)
571 .into_iter()
572 .filter_map(|(call_id, call)| call_ids.contains(&call_id).then_some(call))
573 .collect();
574 calls.sort_by(|left, right| left.call_id.cmp(&right.call_id));
575 calls
576 .into_iter()
577 .flat_map(|call| pending_tool_events(&call))
578 .collect()
579}
580
581pub(super) struct ToolExecutionContext {
582 pub(super) state: serde_json::Value,
583 pub(super) run_config: tirea_contract::RunConfig,
584}
585
586pub(super) fn prepare_tool_execution_context(
587 run_ctx: &RunContext,
588 config: Option<&AgentConfig>,
589) -> Result<ToolExecutionContext, AgentLoopError> {
590 let state = run_ctx
591 .snapshot()
592 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
593 let run_config = scope_with_tool_caller_context(run_ctx, &state, config)?;
594 Ok(ToolExecutionContext { state, run_config })
595}
596
597pub(super) async fn finalize_run_end(
598 run_ctx: &mut RunContext,
599 tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
600 plugins: &[Arc<dyn crate::contracts::runtime::plugin::AgentPlugin>],
601) {
602 emit_run_end_phase(run_ctx, tool_descriptors, plugins).await
603}
604
605#[derive(Debug, Clone, Copy, PartialEq, Eq)]
606enum RunFinishedCommitPolicy {
607 Required,
608 BestEffort,
609}
610
611fn normalize_termination_for_suspended_calls(
612 run_ctx: &RunContext,
613 termination: TerminationReason,
614 response: Option<String>,
615) -> (TerminationReason, Option<String>) {
616 let final_termination = if !matches!(
617 termination,
618 TerminationReason::Error | TerminationReason::Cancelled
619 ) && has_suspended_calls(run_ctx)
620 {
621 TerminationReason::Suspended
622 } else {
623 termination
624 };
625 let final_response = if final_termination == TerminationReason::Suspended {
626 None
627 } else {
628 response
629 };
630 (final_termination, final_response)
631}
632
633async fn persist_run_termination(
634 run_ctx: &mut RunContext,
635 termination: &TerminationReason,
636 tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
637 plugins: &[Arc<dyn crate::contracts::runtime::plugin::AgentPlugin>],
638 pending_delta_commit: &PendingDeltaCommitContext<'_>,
639 run_finished_commit_policy: RunFinishedCommitPolicy,
640) -> Result<(), AgentLoopError> {
641 sync_run_lifecycle_for_termination(run_ctx, termination)?;
642 finalize_run_end(run_ctx, tool_descriptors, plugins).await;
643 if let Err(error) = pending_delta_commit
644 .commit(run_ctx, CheckpointReason::RunFinished, true)
645 .await
646 {
647 match run_finished_commit_policy {
648 RunFinishedCommitPolicy::Required => return Err(error),
649 RunFinishedCommitPolicy::BestEffort => {
650 tracing::warn!(error = %error, "failed to commit run-finished delta");
651 }
652 }
653 }
654 Ok(())
655}
656
657fn stream_result_from_chat_response(response: &genai::chat::ChatResponse) -> StreamResult {
658 let text = response
659 .first_text()
660 .map(|s| s.to_string())
661 .unwrap_or_default();
662 let tool_calls: Vec<crate::contracts::thread::ToolCall> = response
663 .tool_calls()
664 .into_iter()
665 .map(|tc| {
666 crate::contracts::thread::ToolCall::new(
667 &tc.call_id,
668 &tc.fn_name,
669 tc.fn_arguments.clone(),
670 )
671 })
672 .collect();
673
674 StreamResult {
675 text,
676 tool_calls,
677 usage: Some(crate::runtime::streaming::token_usage_from_genai(
678 &response.usage,
679 )),
680 }
681}
682
683fn assistant_turn_message(
684 result: &StreamResult,
685 step_meta: MessageMetadata,
686 message_id: Option<String>,
687) -> Message {
688 let mut msg = if result.tool_calls.is_empty() {
689 assistant_message(&result.text)
690 } else {
691 assistant_tool_calls(&result.text, result.tool_calls.clone())
692 }
693 .with_metadata(step_meta);
694 if let Some(message_id) = message_id {
695 msg = msg.with_id(message_id);
696 }
697 msg
698}
699
700struct RunStartDrainOutcome {
701 events: Vec<AgentEvent>,
702 replayed: bool,
703}
704
705fn decision_result_value(action: &ResumeDecisionAction, result: &Value) -> serde_json::Value {
706 if result.is_null() {
707 serde_json::Value::Bool(matches!(action, ResumeDecisionAction::Resume))
708 } else {
709 result.clone()
710 }
711}
712
713fn runtime_resume_inputs(run_ctx: &RunContext) -> HashMap<String, ToolCallResume> {
714 let mut decisions = HashMap::new();
715 for (call_id, state) in tool_call_states_from_ctx(run_ctx) {
716 if !matches!(state.status, ToolCallStatus::Resuming) {
717 continue;
718 }
719 let Some(mut resume) = state.resume else {
720 continue;
721 };
722 if resume.decision_id.trim().is_empty() {
723 resume.decision_id = call_id.clone();
724 }
725 decisions.insert(call_id, resume);
726 }
727 decisions
728}
729
730fn settle_orphan_resuming_tool_states(
731 run_ctx: &mut RunContext,
732 suspended: &HashMap<String, SuspendedCall>,
733 resumes: &HashMap<String, ToolCallResume>,
734) -> Result<bool, AgentLoopError> {
735 let states = tool_call_states_from_ctx(run_ctx);
736 let mut changed = false;
737
738 for (call_id, resume) in resumes {
739 if suspended.contains_key(call_id) {
740 continue;
741 }
742 let Some(state) = states.get(call_id).cloned() else {
743 continue;
744 };
745 let target_status = match &resume.action {
746 ResumeDecisionAction::Cancel => ToolCallStatus::Cancelled,
747 ResumeDecisionAction::Resume => ToolCallStatus::Failed,
748 };
749 if state.status == target_status && state.resume.as_ref() == Some(resume) {
750 continue;
751 }
752
753 let Some(next_state) = transition_tool_call_state(
754 Some(state.clone()),
755 ToolCallStateSeed {
756 call_id: call_id.as_str(),
757 tool_name: state.tool_name.as_str(),
758 arguments: &state.arguments,
759 status: state.status,
760 resume_token: state.resume_token.clone(),
761 },
762 ToolCallStateTransition {
763 status: target_status,
764 resume_token: state.resume_token.clone(),
765 resume: Some(resume.clone()),
766 updated_at: current_unix_millis(),
767 },
768 ) else {
769 continue;
770 };
771
772 let patch = upsert_tool_call_state(call_id, next_state)?;
773 if patch.patch().is_empty() {
774 continue;
775 }
776 run_ctx.add_thread_patch(patch);
777 changed = true;
778 }
779
780 Ok(changed)
781}
782
783fn all_suspended_calls_have_resume(
784 suspended: &HashMap<String, SuspendedCall>,
785 resumes: &HashMap<String, ToolCallResume>,
786) -> bool {
787 suspended
788 .keys()
789 .all(|call_id| resumes.contains_key(call_id))
790}
791
792async fn drain_resuming_tool_calls_and_replay(
793 run_ctx: &mut RunContext,
794 tools: &HashMap<String, Arc<dyn Tool>>,
795 config: &AgentConfig,
796 tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
797) -> Result<RunStartDrainOutcome, AgentLoopError> {
798 let decisions = runtime_resume_inputs(run_ctx);
799 if decisions.is_empty() {
800 return Ok(RunStartDrainOutcome {
801 events: Vec::new(),
802 replayed: false,
803 });
804 }
805
806 let suspended = suspended_calls_from_ctx(run_ctx);
807 let mut state_changed = false;
808 if settle_orphan_resuming_tool_states(run_ctx, &suspended, &decisions)? {
809 state_changed = true;
810 }
811 if suspended.is_empty() {
812 if state_changed {
813 let snapshot = run_ctx
814 .snapshot()
815 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
816 return Ok(RunStartDrainOutcome {
817 events: vec![AgentEvent::StateSnapshot { snapshot }],
818 replayed: false,
819 });
820 }
821 return Ok(RunStartDrainOutcome {
822 events: Vec::new(),
823 replayed: false,
824 });
825 }
826
827 if matches!(
828 config.tool_executor.decision_replay_policy(),
829 DecisionReplayPolicy::BatchAllSuspended
830 ) && !all_suspended_calls_have_resume(&suspended, &decisions)
831 {
832 if state_changed {
833 let snapshot = run_ctx
834 .snapshot()
835 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
836 return Ok(RunStartDrainOutcome {
837 events: vec![AgentEvent::StateSnapshot { snapshot }],
838 replayed: false,
839 });
840 }
841 return Ok(RunStartDrainOutcome {
842 events: Vec::new(),
843 replayed: false,
844 });
845 }
846
847 let mut events = Vec::new();
848 let mut decision_ids: Vec<String> = decisions.keys().cloned().collect();
849 decision_ids.sort();
850
851 let mut replayed = false;
852 let mut suspended_to_clear = Vec::new();
853
854 for call_id in decision_ids {
855 let Some(suspended_call) = suspended.get(&call_id).cloned() else {
856 continue;
857 };
858 let Some(decision) = decisions.get(&call_id).cloned() else {
859 continue;
860 };
861 replayed = true;
862 let decision_result = decision_result_value(&decision.action, &decision.result);
863 let resume_payload = ToolCallResume {
864 result: decision_result.clone(),
865 ..decision.clone()
866 };
867 events.push(AgentEvent::ToolCallResumed {
868 target_id: suspended_call.call_id.clone(),
869 result: decision_result.clone(),
870 });
871
872 match decision.action {
873 ResumeDecisionAction::Cancel => {
874 let cancel_reason = resume_payload.reason.clone();
875 if upsert_tool_call_lifecycle_state(
876 run_ctx,
877 &suspended_call,
878 ToolCallStatus::Cancelled,
879 Some(resume_payload),
880 )? {
881 state_changed = true;
882 }
883 events.push(append_denied_tool_result_message(
884 run_ctx,
885 &suspended_call.call_id,
886 Some(&suspended_call.tool_name),
887 cancel_reason.as_deref(),
888 ));
889 suspended_to_clear.push(call_id);
890 }
891 ResumeDecisionAction::Resume => {
892 if upsert_tool_call_lifecycle_state(
893 run_ctx,
894 &suspended_call,
895 ToolCallStatus::Resuming,
896 Some(resume_payload.clone()),
897 )? {
898 state_changed = true;
899 }
900 let Some(tool_call) = replay_tool_call_for_resolution(
901 run_ctx,
902 &suspended_call,
903 &ToolCallDecision {
904 target_id: suspended_call.call_id.clone(),
905 resume: resume_payload.clone(),
906 },
907 ) else {
908 continue;
909 };
910 let state = run_ctx
911 .snapshot()
912 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
913 let tool = tools.get(&tool_call.name).cloned();
914 let rt_for_replay = scope_with_tool_caller_context(run_ctx, &state, Some(config))?;
915 let replay_phase_ctx = ToolPhaseContext {
916 tool_descriptors,
917 plugins: &config.plugins,
918 activity_manager: tirea_contract::runtime::activity::NoOpActivityManager::arc(),
919 run_config: &rt_for_replay,
920 thread_id: run_ctx.thread_id(),
921 thread_messages: run_ctx.messages(),
922 cancellation_token: None,
923 };
924 let replay_result = execute_single_tool_with_phases(
925 tool.as_deref(),
926 &tool_call,
927 &state,
928 &replay_phase_ctx,
929 )
930 .await?;
931
932 let replay_msg_id = gen_message_id();
933 let replay_msg = tool_response(&tool_call.id, &replay_result.execution.result)
934 .with_id(replay_msg_id.clone());
935 run_ctx.add_message(Arc::new(replay_msg));
936
937 if !replay_result.reminders.is_empty() {
938 let msgs: Vec<Arc<Message>> = replay_result
939 .reminders
940 .iter()
941 .map(|reminder| {
942 Arc::new(Message::internal_system(format!(
943 "<system-reminder>{}</system-reminder>",
944 reminder
945 )))
946 })
947 .collect();
948 run_ctx.add_messages(msgs);
949 }
950
951 if let Some(patch) = replay_result.execution.patch.clone() {
952 state_changed = true;
953 run_ctx.add_thread_patch(patch);
954 }
955 if !replay_result.pending_patches.is_empty() {
956 state_changed = true;
957 run_ctx.add_thread_patches(replay_result.pending_patches.clone());
958 }
959
960 events.push(AgentEvent::ToolCallDone {
961 id: tool_call.id.clone(),
962 result: replay_result.execution.result,
963 patch: replay_result.execution.patch,
964 message_id: replay_msg_id,
965 });
966
967 if let Some(next_suspended_call) = replay_result.suspended_call.clone() {
968 let state = run_ctx
969 .snapshot()
970 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
971 let mut merged = run_ctx.suspended_calls();
972 merged.insert(
973 next_suspended_call.call_id.clone(),
974 next_suspended_call.clone(),
975 );
976 let patch = set_agent_suspended_calls(
977 &state,
978 merged.into_values().collect::<Vec<_>>(),
979 )?;
980 if !patch.patch().is_empty() {
981 state_changed = true;
982 run_ctx.add_thread_patch(patch);
983 }
984 for event in pending_tool_events(&next_suspended_call) {
985 events.push(event);
986 }
987 if next_suspended_call.call_id != call_id {
988 suspended_to_clear.push(call_id);
989 }
990 } else {
991 suspended_to_clear.push(call_id);
992 }
993 }
994 }
995 }
996
997 if !suspended_to_clear.is_empty() {
998 let mut unique = suspended_to_clear;
999 unique.sort();
1000 unique.dedup();
1001 for call_id in &unique {
1002 let state = run_ctx
1003 .snapshot()
1004 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1005 let clear_patch = clear_suspended_call(&state, call_id)?;
1006 if !clear_patch.patch().is_empty() {
1007 state_changed = true;
1008 run_ctx.add_thread_patch(clear_patch);
1009 }
1010 }
1011 }
1012
1013 if state_changed {
1014 let snapshot = run_ctx
1015 .snapshot()
1016 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1017 events.push(AgentEvent::StateSnapshot { snapshot });
1018 }
1019
1020 Ok(RunStartDrainOutcome { events, replayed })
1021}
1022
1023async fn drain_run_start_resume_replay(
1024 run_ctx: &mut RunContext,
1025 tools: &HashMap<String, Arc<dyn Tool>>,
1026 config: &AgentConfig,
1027 tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
1028) -> Result<RunStartDrainOutcome, AgentLoopError> {
1029 drain_resuming_tool_calls_and_replay(run_ctx, tools, config, tool_descriptors).await
1030}
1031
1032async fn commit_run_start_and_drain_replay(
1033 run_ctx: &mut RunContext,
1034 tools: &HashMap<String, Arc<dyn Tool>>,
1035 config: &AgentConfig,
1036 active_tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
1037 pending_delta_commit: &PendingDeltaCommitContext<'_>,
1038) -> Result<RunStartDrainOutcome, AgentLoopError> {
1039 pending_delta_commit
1040 .commit(run_ctx, CheckpointReason::UserMessage, false)
1041 .await?;
1042
1043 let run_start_drain =
1044 drain_run_start_resume_replay(run_ctx, tools, config, active_tool_descriptors).await?;
1045
1046 if run_start_drain.replayed {
1047 pending_delta_commit
1048 .commit(run_ctx, CheckpointReason::ToolResultsCommitted, false)
1049 .await?;
1050 }
1051
1052 Ok(run_start_drain)
1053}
1054
1055fn normalize_decision_tool_result(
1056 response: &serde_json::Value,
1057 fallback_arguments: &serde_json::Value,
1058) -> serde_json::Value {
1059 match response {
1060 serde_json::Value::Bool(_) => fallback_arguments.clone(),
1061 value => value.clone(),
1062 }
1063}
1064
1065fn denied_tool_result_for_call(
1066 run_ctx: &RunContext,
1067 call_id: &str,
1068 fallback_tool_name: Option<&str>,
1069 decision_reason: Option<&str>,
1070) -> ToolResult {
1071 let tool_name = fallback_tool_name
1072 .filter(|name| !name.is_empty())
1073 .map(str::to_string)
1074 .or_else(|| find_tool_call_in_messages(run_ctx, call_id).map(|call| call.name))
1075 .unwrap_or_else(|| "tool".to_string());
1076 let reason = decision_reason
1077 .map(str::to_string)
1078 .filter(|reason| !reason.trim().is_empty())
1079 .unwrap_or_else(|| "User denied the action".to_string());
1080 ToolResult::error(tool_name, reason)
1081}
1082
1083fn append_denied_tool_result_message(
1084 run_ctx: &mut RunContext,
1085 call_id: &str,
1086 fallback_tool_name: Option<&str>,
1087 decision_reason: Option<&str>,
1088) -> AgentEvent {
1089 let denied_result =
1090 denied_tool_result_for_call(run_ctx, call_id, fallback_tool_name, decision_reason);
1091 let message_id = gen_message_id();
1092 let denied_message = tool_response(call_id, &denied_result).with_id(message_id.clone());
1093 run_ctx.add_message(Arc::new(denied_message));
1094 AgentEvent::ToolCallDone {
1095 id: call_id.to_string(),
1096 result: denied_result,
1097 patch: None,
1098 message_id,
1099 }
1100}
1101
1102fn find_tool_call_in_messages(run_ctx: &RunContext, call_id: &str) -> Option<ToolCall> {
1103 run_ctx.messages().iter().rev().find_map(|message| {
1104 message
1105 .tool_calls
1106 .as_ref()
1107 .and_then(|calls| calls.iter().find(|call| call.id == call_id).cloned())
1108 })
1109}
1110
1111fn replay_tool_call_for_resolution(
1112 _run_ctx: &RunContext,
1113 suspended_call: &SuspendedCall,
1114 decision: &ToolCallDecision,
1115) -> Option<ToolCall> {
1116 if matches!(decision.resume.action, ResumeDecisionAction::Cancel) {
1117 return None;
1118 }
1119
1120 match suspended_call.ticket.resume_mode {
1121 ToolCallResumeMode::ReplayToolCall => Some(ToolCall::new(
1122 suspended_call.call_id.clone(),
1123 suspended_call.tool_name.clone(),
1124 suspended_call.arguments.clone(),
1125 )),
1126 ToolCallResumeMode::UseDecisionAsToolResult | ToolCallResumeMode::PassDecisionToTool => {
1127 Some(ToolCall::new(
1128 suspended_call.call_id.clone(),
1129 suspended_call.tool_name.clone(),
1130 normalize_decision_tool_result(&decision.resume.result, &suspended_call.arguments),
1131 ))
1132 }
1133 }
1134}
1135
1136fn upsert_tool_call_lifecycle_state(
1137 run_ctx: &mut RunContext,
1138 suspended_call: &SuspendedCall,
1139 status: ToolCallStatus,
1140 resume: Option<ToolCallResume>,
1141) -> Result<bool, AgentLoopError> {
1142 let current_state = tool_call_states_from_ctx(run_ctx).remove(&suspended_call.call_id);
1143 let Some(tool_state) = transition_tool_call_state(
1144 current_state,
1145 ToolCallStateSeed {
1146 call_id: &suspended_call.call_id,
1147 tool_name: &suspended_call.tool_name,
1148 arguments: &suspended_call.arguments,
1149 status: ToolCallStatus::Suspended,
1150 resume_token: Some(suspended_call.ticket.pending.id.clone()),
1151 },
1152 ToolCallStateTransition {
1153 status,
1154 resume_token: Some(suspended_call.ticket.pending.id.clone()),
1155 resume,
1156 updated_at: current_unix_millis(),
1157 },
1158 ) else {
1159 return Ok(false);
1160 };
1161
1162 let patch = upsert_tool_call_state(&suspended_call.call_id, tool_state)?;
1163 if patch.patch().is_empty() {
1164 return Ok(false);
1165 }
1166 run_ctx.add_thread_patch(patch);
1167 Ok(true)
1168}
1169
1170pub(super) fn resolve_suspended_call(
1171 run_ctx: &mut RunContext,
1172 response: &ToolCallDecision,
1173) -> Result<Option<DecisionReplayOutcome>, AgentLoopError> {
1174 let suspended_calls = suspended_calls_from_ctx(run_ctx);
1175 if suspended_calls.is_empty() {
1176 return Ok(None);
1177 }
1178
1179 let suspended_call = suspended_calls
1180 .get(&response.target_id)
1181 .cloned()
1182 .or_else(|| {
1183 suspended_calls
1184 .values()
1185 .find(|call| {
1186 call.ticket.suspension.id == response.target_id
1187 || call.ticket.pending.id == response.target_id
1188 || call.call_id == response.target_id
1189 })
1190 .cloned()
1191 });
1192 let Some(suspended_call) = suspended_call else {
1193 return Ok(None);
1194 };
1195
1196 let _ = upsert_tool_call_lifecycle_state(
1197 run_ctx,
1198 &suspended_call,
1199 ToolCallStatus::Resuming,
1200 Some(response.resume.clone()),
1201 )?;
1202
1203 Ok(Some(DecisionReplayOutcome {
1204 events: Vec::new(),
1205 resolved_call_ids: vec![suspended_call.call_id],
1206 }))
1207}
1208
1209pub(super) fn drain_decision_channel(
1210 run_ctx: &mut RunContext,
1211 decision_rx: &mut Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1212 pending_decisions: &mut VecDeque<ToolCallDecision>,
1213) -> Result<DecisionReplayOutcome, AgentLoopError> {
1214 let mut disconnected = false;
1215 if let Some(rx) = decision_rx.as_mut() {
1216 loop {
1217 match rx.try_recv() {
1218 Ok(response) => pending_decisions.push_back(response),
1219 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
1220 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
1221 disconnected = true;
1222 break;
1223 }
1224 }
1225 }
1226 }
1227 if disconnected {
1228 *decision_rx = None;
1229 }
1230
1231 if pending_decisions.is_empty() {
1232 return Ok(DecisionReplayOutcome {
1233 events: Vec::new(),
1234 resolved_call_ids: Vec::new(),
1235 });
1236 }
1237
1238 let mut unresolved = VecDeque::new();
1239 let mut events = Vec::new();
1240 let mut resolved_call_ids = Vec::new();
1241 let mut seen = HashSet::new();
1242
1243 while let Some(response) = pending_decisions.pop_front() {
1244 if let Some(outcome) = resolve_suspended_call(run_ctx, &response)? {
1245 for call_id in outcome.resolved_call_ids {
1246 if seen.insert(call_id.clone()) {
1247 resolved_call_ids.push(call_id);
1248 }
1249 }
1250 events.extend(outcome.events);
1251 } else {
1252 unresolved.push_back(response);
1253 }
1254 }
1255 *pending_decisions = unresolved;
1256
1257 Ok(DecisionReplayOutcome {
1258 events,
1259 resolved_call_ids,
1260 })
1261}
1262
1263async fn replay_after_decisions(
1264 run_ctx: &mut RunContext,
1265 decisions_applied: bool,
1266 step_tool_provider: &Arc<dyn StepToolProvider>,
1267 config: &AgentConfig,
1268 active_tool_descriptors: &mut Vec<crate::contracts::runtime::tool_call::ToolDescriptor>,
1269 pending_delta_commit: &PendingDeltaCommitContext<'_>,
1270) -> Result<Vec<AgentEvent>, AgentLoopError> {
1271 if !decisions_applied {
1272 return Ok(Vec::new());
1273 }
1274
1275 let decision_tools = resolve_step_tool_snapshot(step_tool_provider, run_ctx).await?;
1276 *active_tool_descriptors = decision_tools.descriptors.clone();
1277
1278 let decision_drain = drain_run_start_resume_replay(
1279 run_ctx,
1280 &decision_tools.tools,
1281 config,
1282 active_tool_descriptors,
1283 )
1284 .await?;
1285
1286 pending_delta_commit
1287 .commit(run_ctx, CheckpointReason::ToolResultsCommitted, false)
1288 .await?;
1289
1290 Ok(decision_drain.events)
1291}
1292
1293async fn apply_decisions_and_replay(
1294 run_ctx: &mut RunContext,
1295 decision_rx: &mut Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1296 pending_decisions: &mut VecDeque<ToolCallDecision>,
1297 step_tool_provider: &Arc<dyn StepToolProvider>,
1298 config: &AgentConfig,
1299 active_tool_descriptors: &mut Vec<crate::contracts::runtime::tool_call::ToolDescriptor>,
1300 pending_delta_commit: &PendingDeltaCommitContext<'_>,
1301) -> Result<Vec<AgentEvent>, AgentLoopError> {
1302 Ok(drain_and_replay_decisions(
1303 run_ctx,
1304 decision_rx,
1305 pending_decisions,
1306 None,
1307 step_tool_provider,
1308 config,
1309 active_tool_descriptors,
1310 pending_delta_commit,
1311 )
1312 .await?
1313 .events)
1314}
1315
1316pub(super) struct DecisionReplayOutcome {
1317 events: Vec<AgentEvent>,
1318 resolved_call_ids: Vec<String>,
1319}
1320
1321async fn drain_and_replay_decisions(
1322 run_ctx: &mut RunContext,
1323 decision_rx: &mut Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1324 pending_decisions: &mut VecDeque<ToolCallDecision>,
1325 decision: Option<ToolCallDecision>,
1326 step_tool_provider: &Arc<dyn StepToolProvider>,
1327 config: &AgentConfig,
1328 active_tool_descriptors: &mut Vec<crate::contracts::runtime::tool_call::ToolDescriptor>,
1329 pending_delta_commit: &PendingDeltaCommitContext<'_>,
1330) -> Result<DecisionReplayOutcome, AgentLoopError> {
1331 if let Some(decision) = decision {
1332 pending_decisions.push_back(decision);
1333 }
1334 let decision_drain = drain_decision_channel(run_ctx, decision_rx, pending_decisions)?;
1335 let mut events = decision_drain.events;
1336 let replay_events = replay_after_decisions(
1337 run_ctx,
1338 !decision_drain.resolved_call_ids.is_empty(),
1339 step_tool_provider,
1340 config,
1341 active_tool_descriptors,
1342 pending_delta_commit,
1343 )
1344 .await?;
1345 events.extend(replay_events);
1346
1347 Ok(DecisionReplayOutcome {
1348 events,
1349 resolved_call_ids: decision_drain.resolved_call_ids,
1350 })
1351}
1352
1353async fn apply_decision_and_replay(
1354 run_ctx: &mut RunContext,
1355 response: ToolCallDecision,
1356 decision_rx: &mut Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1357 pending_decisions: &mut VecDeque<ToolCallDecision>,
1358 step_tool_provider: &Arc<dyn StepToolProvider>,
1359 config: &AgentConfig,
1360 active_tool_descriptors: &mut Vec<crate::contracts::runtime::tool_call::ToolDescriptor>,
1361 pending_delta_commit: &PendingDeltaCommitContext<'_>,
1362) -> Result<DecisionReplayOutcome, AgentLoopError> {
1363 drain_and_replay_decisions(
1364 run_ctx,
1365 decision_rx,
1366 pending_decisions,
1367 Some(response),
1368 step_tool_provider,
1369 config,
1370 active_tool_descriptors,
1371 pending_delta_commit,
1372 )
1373 .await
1374}
1375
1376async fn recv_decision(
1377 decision_rx: &mut Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1378) -> Option<ToolCallDecision> {
1379 let rx = decision_rx.as_mut()?;
1380 rx.recv().await
1381}
1382
1383pub async fn run_loop(
1389 config: &AgentConfig,
1390 tools: HashMap<String, Arc<dyn Tool>>,
1391 mut run_ctx: RunContext,
1392 cancellation_token: Option<RunCancellationToken>,
1393 state_committer: Option<Arc<dyn StateCommitter>>,
1394 mut decision_rx: Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1395) -> LoopOutcome {
1396 let executor = llm_executor_for_run(config);
1397 let mut run_state = RunState::new();
1398 let mut pending_decisions = VecDeque::new();
1399 let run_cancellation_token = cancellation_token;
1400 let mut last_text = String::new();
1401 let step_tool_provider = step_tool_provider_for_run(config, tools);
1402 let run_identity = stream_core::resolve_stream_run_identity(&mut run_ctx);
1403 let run_id = run_identity.run_id;
1404 let parent_run_id = run_identity.parent_run_id;
1405 let baseline_suspended_call_ids = suspended_call_ids(&run_ctx);
1406 let pending_delta_commit =
1407 PendingDeltaCommitContext::new(&run_id, parent_run_id.as_deref(), state_committer.as_ref());
1408 let initial_step_tools = match resolve_step_tool_snapshot(&step_tool_provider, &run_ctx).await {
1409 Ok(snapshot) => snapshot,
1410 Err(error) => {
1411 return build_loop_outcome(
1412 run_ctx,
1413 TerminationReason::Error,
1414 None,
1415 &run_state,
1416 Some(outcome::LoopFailure::State(error.to_string())),
1417 );
1418 }
1419 };
1420 let StepToolSnapshot {
1421 tools: initial_tools,
1422 descriptors: initial_descriptors,
1423 } = initial_step_tools;
1424 let mut active_tool_descriptors = initial_descriptors;
1425
1426 macro_rules! terminate_run {
1427 ($termination:expr, $response:expr, $failure:expr) => {{
1428 let reason: TerminationReason = $termination;
1429 let (final_termination, final_response) =
1430 normalize_termination_for_suspended_calls(&run_ctx, reason, $response);
1431 if let Err(error) = persist_run_termination(
1432 &mut run_ctx,
1433 &final_termination,
1434 &active_tool_descriptors,
1435 &config.plugins,
1436 &pending_delta_commit,
1437 RunFinishedCommitPolicy::Required,
1438 )
1439 .await
1440 {
1441 return build_loop_outcome(
1442 run_ctx,
1443 TerminationReason::Error,
1444 None,
1445 &run_state,
1446 Some(outcome::LoopFailure::State(error.to_string())),
1447 );
1448 }
1449 return build_loop_outcome(
1450 run_ctx,
1451 final_termination,
1452 final_response,
1453 &run_state,
1454 $failure,
1455 );
1456 }};
1457 }
1458
1459 let pending = match emit_phase_block(
1461 Phase::RunStart,
1462 &run_ctx,
1463 &active_tool_descriptors,
1464 &config.plugins,
1465 |_| {},
1466 )
1467 .await
1468 {
1469 Ok(pending) => pending,
1470 Err(error) => {
1471 terminate_run!(
1472 TerminationReason::Error,
1473 None,
1474 Some(outcome::LoopFailure::State(error.to_string()))
1475 );
1476 }
1477 };
1478 run_ctx.add_thread_patches(pending);
1479 if let Err(error) = commit_run_start_and_drain_replay(
1480 &mut run_ctx,
1481 &initial_tools,
1482 config,
1483 &active_tool_descriptors,
1484 &pending_delta_commit,
1485 )
1486 .await
1487 {
1488 terminate_run!(
1489 TerminationReason::Error,
1490 None,
1491 Some(outcome::LoopFailure::State(error.to_string()))
1492 );
1493 }
1494 let run_start_new_suspended = newly_suspended_call_ids(&run_ctx, &baseline_suspended_call_ids);
1495 if !run_start_new_suspended.is_empty() {
1496 terminate_run!(TerminationReason::Suspended, None, None);
1497 }
1498 loop {
1499 if let Err(error) = apply_decisions_and_replay(
1500 &mut run_ctx,
1501 &mut decision_rx,
1502 &mut pending_decisions,
1503 &step_tool_provider,
1504 config,
1505 &mut active_tool_descriptors,
1506 &pending_delta_commit,
1507 )
1508 .await
1509 {
1510 terminate_run!(
1511 TerminationReason::Error,
1512 None,
1513 Some(outcome::LoopFailure::State(error.to_string()))
1514 );
1515 }
1516
1517 if is_run_cancelled(run_cancellation_token.as_ref()) {
1518 terminate_run!(TerminationReason::Cancelled, None, None);
1519 }
1520
1521 let step_tools = match resolve_step_tool_snapshot(&step_tool_provider, &run_ctx).await {
1522 Ok(snapshot) => snapshot,
1523 Err(e) => {
1524 terminate_run!(
1525 TerminationReason::Error,
1526 None,
1527 Some(outcome::LoopFailure::State(e.to_string()))
1528 );
1529 }
1530 };
1531 active_tool_descriptors = step_tools.descriptors.clone();
1532
1533 let prepared =
1534 match prepare_step_execution(&run_ctx, &active_tool_descriptors, config).await {
1535 Ok(v) => v,
1536 Err(e) => {
1537 terminate_run!(
1538 TerminationReason::Error,
1539 None,
1540 Some(outcome::LoopFailure::State(e.to_string()))
1541 );
1542 }
1543 };
1544 run_ctx.add_thread_patches(prepared.pending_patches);
1545
1546 match prepared.run_action {
1547 RunAction::Continue => {}
1548 RunAction::Terminate(reason) => {
1549 let response = if matches!(reason, TerminationReason::PluginRequested) {
1550 Some(last_text.clone())
1551 } else {
1552 None
1553 };
1554 terminate_run!(reason, response, None);
1555 }
1556 }
1557
1558 let messages = prepared.messages;
1560 let filtered_tools = prepared.filtered_tools;
1561 let attempt_outcome = run_llm_with_retry_and_fallback(
1562 config,
1563 run_cancellation_token.as_ref(),
1564 true,
1565 "unknown llm error",
1566 |model| {
1567 let request =
1568 build_request_for_filtered_tools(&messages, &step_tools.tools, &filtered_tools);
1569 let executor = executor.clone();
1570 async move {
1571 executor
1572 .exec_chat_response(&model, request, config.chat_options.as_ref())
1573 .await
1574 }
1575 },
1576 )
1577 .await;
1578
1579 let response = match attempt_outcome {
1580 LlmAttemptOutcome::Success {
1581 value, attempts, ..
1582 } => {
1583 run_state.record_llm_attempts(attempts);
1584 value
1585 }
1586 LlmAttemptOutcome::Cancelled => {
1587 append_cancellation_user_message(&mut run_ctx, CancellationStage::Inference);
1588 terminate_run!(TerminationReason::Cancelled, None, None);
1589 }
1590 LlmAttemptOutcome::Exhausted {
1591 last_error,
1592 attempts,
1593 } => {
1594 run_state.record_llm_attempts(attempts);
1595 if let Err(phase_error) = apply_llm_error_cleanup(
1596 &mut run_ctx,
1597 &active_tool_descriptors,
1598 &config.plugins,
1599 "llm_exec_error",
1600 last_error.clone(),
1601 )
1602 .await
1603 {
1604 terminate_run!(
1605 TerminationReason::Error,
1606 None,
1607 Some(outcome::LoopFailure::State(phase_error.to_string()))
1608 );
1609 }
1610 terminate_run!(
1611 TerminationReason::Error,
1612 None,
1613 Some(outcome::LoopFailure::Llm(last_error))
1614 );
1615 }
1616 };
1617
1618 let result = stream_result_from_chat_response(&response);
1619 run_state.update_from_response(&result);
1620 last_text = result.text.clone();
1621
1622 let assistant_msg_id = gen_message_id();
1624 let step_meta = step_metadata(Some(run_id.clone()), run_state.completed_steps as u32);
1625 let post_inference_termination = match complete_step_after_inference(
1626 &mut run_ctx,
1627 &result,
1628 step_meta.clone(),
1629 Some(assistant_msg_id.clone()),
1630 &active_tool_descriptors,
1631 &config.plugins,
1632 )
1633 .await
1634 {
1635 Ok(reason) => reason,
1636 Err(e) => {
1637 terminate_run!(
1638 TerminationReason::Error,
1639 None,
1640 Some(outcome::LoopFailure::State(e.to_string()))
1641 );
1642 }
1643 };
1644 if let Err(error) = pending_delta_commit
1645 .commit(
1646 &mut run_ctx,
1647 CheckpointReason::AssistantTurnCommitted,
1648 false,
1649 )
1650 .await
1651 {
1652 terminate_run!(
1653 TerminationReason::Error,
1654 None,
1655 Some(outcome::LoopFailure::State(error.to_string()))
1656 );
1657 }
1658
1659 mark_step_completed(&mut run_state);
1660
1661 if let Some(reason) = &post_inference_termination {
1665 if !matches!(reason, TerminationReason::Stopped(_)) {
1666 terminate_run!(reason.clone(), Some(last_text.clone()), None);
1667 }
1668 }
1669
1670 if !result.needs_tools() {
1671 run_state.record_step_without_tools();
1672 let reason = post_inference_termination.unwrap_or(TerminationReason::NaturalEnd);
1673 terminate_run!(reason, Some(last_text.clone()), None);
1674 }
1675
1676 let tool_context = match prepare_tool_execution_context(&run_ctx, Some(config)) {
1678 Ok(ctx) => ctx,
1679 Err(e) => {
1680 terminate_run!(
1681 TerminationReason::Error,
1682 None,
1683 Some(outcome::LoopFailure::State(e.to_string()))
1684 );
1685 }
1686 };
1687 let thread_messages_for_tools = run_ctx.messages().to_vec();
1688 let thread_version_for_tools = run_ctx.version();
1689
1690 let tool_exec_future = config.tool_executor.execute(ToolExecutionRequest {
1691 tools: &step_tools.tools,
1692 calls: &result.tool_calls,
1693 state: &tool_context.state,
1694 tool_descriptors: &active_tool_descriptors,
1695 plugins: &config.plugins,
1696 activity_manager: tirea_contract::runtime::activity::NoOpActivityManager::arc(),
1697 run_config: &tool_context.run_config,
1698 thread_id: run_ctx.thread_id(),
1699 thread_messages: &thread_messages_for_tools,
1700 state_version: thread_version_for_tools,
1701 cancellation_token: run_cancellation_token.as_ref(),
1702 });
1703 let results = tool_exec_future.await.map_err(AgentLoopError::from);
1704
1705 let results = match results {
1706 Ok(r) => r,
1707 Err(AgentLoopError::Cancelled) => {
1708 append_cancellation_user_message(&mut run_ctx, CancellationStage::ToolExecution);
1709 terminate_run!(TerminationReason::Cancelled, None, None);
1710 }
1711 Err(e) => {
1712 terminate_run!(
1713 TerminationReason::Error,
1714 None,
1715 Some(outcome::LoopFailure::State(e.to_string()))
1716 );
1717 }
1718 };
1719
1720 if let Err(_e) = apply_tool_results_to_session(
1721 &mut run_ctx,
1722 &results,
1723 Some(step_meta),
1724 config
1725 .tool_executor
1726 .requires_parallel_patch_conflict_check(),
1727 ) {
1728 terminate_run!(
1730 TerminationReason::Error,
1731 None,
1732 Some(outcome::LoopFailure::State(_e.to_string()))
1733 );
1734 }
1735 if let Err(error) = pending_delta_commit
1736 .commit(&mut run_ctx, CheckpointReason::ToolResultsCommitted, false)
1737 .await
1738 {
1739 terminate_run!(
1740 TerminationReason::Error,
1741 None,
1742 Some(outcome::LoopFailure::State(error.to_string()))
1743 );
1744 }
1745
1746 if let Err(error) = apply_decisions_and_replay(
1747 &mut run_ctx,
1748 &mut decision_rx,
1749 &mut pending_decisions,
1750 &step_tool_provider,
1751 config,
1752 &mut active_tool_descriptors,
1753 &pending_delta_commit,
1754 )
1755 .await
1756 {
1757 terminate_run!(
1758 TerminationReason::Error,
1759 None,
1760 Some(outcome::LoopFailure::State(error.to_string()))
1761 );
1762 }
1763
1764 if has_suspended_calls(&run_ctx) {
1766 let has_completed = results
1767 .iter()
1768 .any(|r| !matches!(r.outcome, crate::contracts::ToolCallOutcome::Suspended));
1769 if !has_completed {
1770 terminate_run!(TerminationReason::Suspended, None, None);
1771 }
1772 }
1773
1774 if let Some(reason) = post_inference_termination {
1777 terminate_run!(reason, Some(last_text.clone()), None);
1778 }
1779
1780 let error_count = results
1782 .iter()
1783 .filter(|r| r.execution.result.is_error())
1784 .count();
1785 run_state.record_tool_step(&result.tool_calls, error_count);
1786 }
1787}
1788
1789pub fn run_loop_stream(
1795 config: AgentConfig,
1796 tools: HashMap<String, Arc<dyn Tool>>,
1797 run_ctx: RunContext,
1798 cancellation_token: Option<RunCancellationToken>,
1799 state_committer: Option<Arc<dyn StateCommitter>>,
1800 decision_rx: Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1801) -> Pin<Box<dyn Stream<Item = AgentEvent> + Send>> {
1802 stream_runner::run_stream(
1803 config,
1804 tools,
1805 run_ctx,
1806 cancellation_token,
1807 state_committer,
1808 decision_rx,
1809 )
1810}
1811
1812#[cfg(test)]
1813mod tests;