1mod config;
42mod core;
43mod event_envelope_meta;
44mod outcome;
45mod parallel_state_merge;
46mod plugin_runtime;
47mod run_state;
48mod state_commit;
49mod stream_core;
50mod stream_runner;
51mod tool_exec;
52mod truncation_recovery;
53
54use crate::contracts::io::ResumeDecisionAction;
55use crate::contracts::runtime::phase::Phase;
56use crate::contracts::runtime::state::{reduce_state_actions, AnyStateAction, ScopeContext};
57use crate::contracts::runtime::tool_call::{Tool, ToolResult};
58use crate::contracts::runtime::ActivityManager;
59use crate::contracts::runtime::{
60 DecisionReplayPolicy, RunIdentity, RunLifecycleAction, RunLifecycleState, StreamResult,
61 SuspendedCall, ToolCallResume, ToolCallResumeMode, ToolCallStatus, ToolExecutionRequest,
62 ToolExecutionResult,
63};
64use crate::contracts::thread::CheckpointReason;
65use crate::contracts::thread::{gen_message_id, Message, MessageMetadata, ToolCall};
66use crate::contracts::RunContext;
67use crate::contracts::{AgentEvent, RunAction, TerminationReason, ToolCallDecision};
68use crate::engine::convert::{assistant_message, assistant_tool_calls, tool_response};
69use crate::runtime::activity::ActivityHub;
70
71use crate::runtime::streaming::StreamCollector;
72use async_stream::stream;
73use futures::{Stream, StreamExt};
74use genai::Client;
75use serde_json::Value;
76use std::collections::{HashMap, HashSet, VecDeque};
77use std::future::Future;
78use std::pin::Pin;
79use std::sync::atomic::{AtomicU64, Ordering};
80use std::sync::Arc;
81use std::time::{Instant, SystemTime, UNIX_EPOCH};
82use uuid::Uuid;
83
84pub use crate::contracts::runtime::ToolExecutor;
85pub use crate::runtime::run_context::{
86 await_or_cancel, is_cancelled, CancelAware, RunCancellationToken, StateCommitError,
87 StateCommitter,
88};
89use config::StaticStepToolProvider;
90pub use config::{Agent, BaseAgent, GenaiLlmExecutor, LlmRetryPolicy};
91pub use config::{LlmEventStream, LlmExecutor};
92pub use config::{StepToolInput, StepToolProvider, StepToolSnapshot};
93#[cfg(test)]
94use core::build_messages;
95use core::{
96 build_request_for_filtered_tools, inference_inputs_from_step, suspended_calls_from_ctx,
97 tool_call_states_from_ctx, transition_tool_call_state, upsert_tool_call_state,
98 ToolCallStateSeed, ToolCallStateTransition,
99};
100pub use outcome::{tool_map, tool_map_from_arc, AgentLoopError};
101pub use outcome::{LoopOutcome, LoopStats, LoopUsage};
102#[cfg(test)]
103use plugin_runtime::emit_agent_phase;
104#[cfg(test)]
105use plugin_runtime::emit_cleanup_phases;
106use run_state::LoopRunState;
107pub use state_commit::ChannelStateCommitter;
108use state_commit::PendingDeltaCommitContext;
109use tirea_state::TrackedPatch;
110#[cfg(test)]
111use tokio_util::sync::CancellationToken;
112#[cfg(test)]
113use tool_exec::execute_single_tool_with_phases;
114#[cfg(test)]
115use tool_exec::execute_tools_parallel_with_phases;
116pub use tool_exec::ExecuteToolsOutcome;
117use tool_exec::{
118 apply_tool_results_impl, apply_tool_results_to_session, caller_context_for_tool_execution,
119 execute_single_tool_with_phases_deferred, step_metadata, ToolPhaseContext,
120};
121pub use tool_exec::{
122 execute_tools, execute_tools_with_behaviors, execute_tools_with_config,
123 ParallelToolExecutionMode, ParallelToolExecutor, SequentialToolExecutor,
124};
125
126pub struct ResolvedRun {
132 pub agent: BaseAgent,
138 pub tools: HashMap<String, Arc<dyn Tool>>,
140 pub run_policy: crate::contracts::RunPolicy,
142 pub parent_tool_call_id: Option<String>,
144}
145
146impl ResolvedRun {
147 #[must_use]
149 pub fn with_tool(mut self, id: String, tool: Arc<dyn Tool>) -> Self {
150 self.tools.insert(id, tool);
151 self
152 }
153
154 pub fn overlay_tools(&mut self, tools: HashMap<String, Arc<dyn Tool>>) {
156 for (id, tool) in tools {
157 self.tools.entry(id).or_insert(tool);
158 }
159 }
160}
161
162pub(crate) fn current_unix_millis() -> u64 {
163 SystemTime::now()
164 .duration_since(UNIX_EPOCH)
165 .map_or(0, |d| d.as_millis().min(u128::from(u64::MAX)) as u64)
166}
167
168fn ensure_run_identity(
169 agent: &dyn Agent,
170 run_ctx: &mut RunContext,
171 mut run_identity: RunIdentity,
172) -> RunIdentity {
173 if run_identity.run_id_opt().is_none() {
174 run_identity.run_id = Uuid::now_v7().to_string();
175 }
176 if run_identity.agent_id_opt().is_none() {
177 run_identity.agent_id = agent.id().to_string();
178 }
179 if run_identity.thread_id_opt().is_none() {
180 run_identity.thread_id = run_ctx.thread_id().to_string();
181 }
182 run_ctx.set_run_identity(run_identity.clone());
183 run_identity
184}
185
186#[cfg(test)]
187pub(super) fn sync_run_lifecycle_for_termination(
188 run_ctx: &mut RunContext,
189 run_identity: &RunIdentity,
190 termination: &TerminationReason,
191) -> Result<(), AgentLoopError> {
192 sync_run_lifecycle_for_termination_with_context(run_ctx, run_identity, termination)
193}
194
195fn sync_run_lifecycle_for_termination_with_context(
196 run_ctx: &mut RunContext,
197 run_identity: &RunIdentity,
198 termination: &TerminationReason,
199) -> Result<(), AgentLoopError> {
200 if run_identity.run_id.trim().is_empty() {
201 return Ok(());
202 };
203
204 let (status, done_reason) = termination.to_run_status();
205
206 let base_state = run_ctx
207 .snapshot()
208 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
209 let actions = vec![AnyStateAction::new::<RunLifecycleState>(
210 RunLifecycleAction::Set {
211 id: run_identity.run_id.clone(),
212 status,
213 done_reason,
214 updated_at: current_unix_millis(),
215 },
216 )];
217 let patches = reduce_state_actions(actions, &base_state, "agent_loop", &ScopeContext::run())
218 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
219 run_ctx.add_thread_patches(patches);
220 Ok(())
221}
222
223#[derive(Debug, Clone, Copy, PartialEq, Eq)]
224pub(super) enum CancellationStage {
225 Inference,
226 ToolExecution,
227}
228
229pub(super) const CANCELLATION_INFERENCE_USER_MESSAGE: &str =
230 "The previous run was interrupted during inference. Please continue from the current context.";
231pub(super) const CANCELLATION_TOOL_USER_MESSAGE: &str =
232 "The previous run was interrupted while using tools. Please continue from the current context.";
233
234pub(super) fn append_cancellation_user_message(run_ctx: &mut RunContext, stage: CancellationStage) {
235 let content = match stage {
236 CancellationStage::Inference => CANCELLATION_INFERENCE_USER_MESSAGE,
237 CancellationStage::ToolExecution => CANCELLATION_TOOL_USER_MESSAGE,
238 };
239 run_ctx.add_message(Arc::new(Message::user(content)));
240}
241
242pub(super) fn effective_llm_models(agent: &dyn Agent) -> Vec<String> {
243 let mut models = Vec::with_capacity(1 + agent.fallback_models().len());
244 models.push(agent.model().to_string());
245 for model in agent.fallback_models() {
246 if model.trim().is_empty() {
247 continue;
248 }
249 if !models.iter().any(|m| m == model) {
250 models.push(model.clone());
251 }
252 }
253 models
254}
255
256pub(super) fn effective_llm_models_from(
257 agent: &dyn Agent,
258 start_model: Option<&str>,
259) -> Vec<String> {
260 let models = effective_llm_models(agent);
261 let Some(start_model) = start_model.map(str::trim).filter(|model| !model.is_empty()) else {
262 return models;
263 };
264 let Some(index) = models.iter().position(|model| model == start_model) else {
265 return models;
266 };
267 models.into_iter().skip(index).collect()
268}
269
270pub(super) fn next_llm_model_after(agent: &dyn Agent, current_model: &str) -> Option<String> {
271 let models = effective_llm_models(agent);
272 let current_index = models.iter().position(|model| model == current_model)?;
273 models.into_iter().nth(current_index + 1)
274}
275
276pub(super) fn llm_retry_attempts(agent: &dyn Agent) -> usize {
277 agent.llm_retry_policy().max_attempts_per_model.max(1)
278}
279
280#[derive(Debug, Clone, Copy, PartialEq, Eq)]
281pub(super) enum LlmErrorClass {
282 RateLimit,
283 Timeout,
284 Connection,
285 ServerUnavailable,
286 ServerError,
287 Auth,
288 ClientRequest,
289 Unknown,
290}
291
292impl LlmErrorClass {
293 pub(super) fn is_retryable(self) -> bool {
294 matches!(
295 self,
296 LlmErrorClass::RateLimit
297 | LlmErrorClass::Timeout
298 | LlmErrorClass::Connection
299 | LlmErrorClass::ServerUnavailable
300 | LlmErrorClass::ServerError
301 )
302 }
303
304 pub(super) fn as_str(self) -> &'static str {
306 match self {
307 LlmErrorClass::RateLimit => "rate_limit",
308 LlmErrorClass::Timeout => "timeout",
309 LlmErrorClass::Connection => "connection",
310 LlmErrorClass::ServerUnavailable => "server_unavailable",
311 LlmErrorClass::ServerError => "server_error",
312 LlmErrorClass::Auth => "auth",
313 LlmErrorClass::ClientRequest => "client_request",
314 LlmErrorClass::Unknown => "unknown",
315 }
316 }
317}
318
319fn classify_llm_error_message(message: &str) -> LlmErrorClass {
320 let lower = message.to_ascii_lowercase();
321 if ["429", "too many requests", "rate limit"]
322 .iter()
323 .any(|p| lower.contains(p))
324 {
325 return LlmErrorClass::RateLimit;
326 }
327 if ["timeout", "timed out"].iter().any(|p| lower.contains(p)) {
328 return LlmErrorClass::Timeout;
329 }
330 if [
331 "connection",
332 "network",
333 "reset by peer",
334 "broken pipe",
335 "eof",
336 "connection refused",
337 "error sending request for url",
338 ]
339 .iter()
340 .any(|p| lower.contains(p))
341 {
342 return LlmErrorClass::Connection;
343 }
344 if ["503", "service unavailable", "unavailable", "temporar"]
345 .iter()
346 .any(|p| lower.contains(p))
347 {
348 return LlmErrorClass::ServerUnavailable;
349 }
350 if [
351 "500",
352 "502",
353 "504",
354 "server error",
355 "bad gateway",
356 "gateway timeout",
357 ]
358 .iter()
359 .any(|p| lower.contains(p))
360 {
361 return LlmErrorClass::ServerError;
362 }
363 if ["401", "403", "unauthorized", "forbidden", "invalid api key"]
364 .iter()
365 .any(|p| lower.contains(p))
366 {
367 return LlmErrorClass::Auth;
368 }
369 if ["400", "404", "422", "invalid_request", "bad request"]
370 .iter()
371 .any(|p| lower.contains(p))
372 {
373 return LlmErrorClass::ClientRequest;
374 }
375 LlmErrorClass::Unknown
376}
377
378fn classify_status_code(status_code: u16) -> LlmErrorClass {
379 match status_code {
380 408 => LlmErrorClass::Timeout,
381 429 => LlmErrorClass::RateLimit,
382 401 | 403 => LlmErrorClass::Auth,
383 400 | 404 | 422 => LlmErrorClass::ClientRequest,
384 503 => LlmErrorClass::ServerUnavailable,
385 500..=599 => LlmErrorClass::ServerError,
386 400..=499 => LlmErrorClass::ClientRequest,
387 _ => LlmErrorClass::Unknown,
388 }
389}
390
391fn classify_error_chain(error: &(dyn std::error::Error + 'static)) -> Option<LlmErrorClass> {
392 let mut current = Some(error);
393 while let Some(err) = current {
394 if let Some(io_error) = err.downcast_ref::<std::io::Error>() {
395 use std::io::ErrorKind;
396
397 let class = match io_error.kind() {
398 ErrorKind::TimedOut => Some(LlmErrorClass::Timeout),
399 ErrorKind::ConnectionAborted
400 | ErrorKind::ConnectionRefused
401 | ErrorKind::ConnectionReset
402 | ErrorKind::BrokenPipe
403 | ErrorKind::NotConnected
404 | ErrorKind::UnexpectedEof => Some(LlmErrorClass::Connection),
405 _ => None,
406 };
407 if class.is_some() {
408 return class;
409 }
410 }
411 current = err.source();
412 }
413 None
414}
415
416fn classify_webc_error(error: &genai::webc::Error) -> LlmErrorClass {
417 match error {
418 genai::webc::Error::ResponseFailedStatus { status, .. } => {
419 classify_status_code(status.as_u16())
420 }
421 genai::webc::Error::Reqwest(err) => classify_error_chain(err)
422 .unwrap_or_else(|| classify_llm_error_message(&err.to_string())),
423 _ => classify_llm_error_message(&error.to_string()),
424 }
425}
426
427fn classify_chat_response_body(body: &serde_json::Value) -> LlmErrorClass {
438 let top_type = body.get("type").and_then(|v| v.as_str());
444 let nested_type = body
445 .get("error")
446 .and_then(|e| e.get("type"))
447 .and_then(|v| v.as_str());
448
449 let candidates: &[&str] = match (top_type, nested_type) {
452 (_, Some(inner)) => &[inner, top_type.unwrap_or("")],
453 (Some(top), None) => &[top],
454 (None, None) => &[],
455 };
456
457 for t in candidates {
458 let lower = t.to_ascii_lowercase();
459 if lower.contains("rate_limit") {
460 return LlmErrorClass::RateLimit;
461 }
462 if lower.contains("overloaded") || lower.contains("unavailable") {
463 return LlmErrorClass::ServerUnavailable;
464 }
465 if lower.contains("timeout") {
466 return LlmErrorClass::Timeout;
467 }
468 if lower.contains("server_error") || lower.contains("api_error") {
469 return LlmErrorClass::ServerError;
470 }
471 if lower.contains("authentication") || lower.contains("permission") {
472 return LlmErrorClass::Auth;
473 }
474 if lower.contains("invalid_request") || lower.contains("not_found") {
475 return LlmErrorClass::ClientRequest;
476 }
477 }
478
479 let status_code = body
482 .get("status")
483 .or_else(|| body.get("code"))
484 .and_then(|v| v.as_u64())
485 .and_then(|v| u16::try_from(v).ok());
486
487 if let Some(code) = status_code {
488 let class = classify_status_code(code);
489 if class != LlmErrorClass::Unknown {
490 return class;
491 }
492 }
493
494 LlmErrorClass::Unknown
495}
496
497pub(super) fn classify_llm_error(error: &genai::Error) -> LlmErrorClass {
498 match error {
499 genai::Error::HttpError { status, .. } => classify_status_code(status.as_u16()),
500 genai::Error::WebStream { cause, error, .. } => classify_error_chain(error.as_ref())
501 .unwrap_or_else(|| classify_llm_error_message(cause)),
502 genai::Error::WebAdapterCall { webc_error, .. }
503 | genai::Error::WebModelCall { webc_error, .. } => classify_webc_error(webc_error),
504 genai::Error::Internal(message) => classify_llm_error_message(message),
505
506 genai::Error::StreamParse { .. } => LlmErrorClass::Connection,
509
510 genai::Error::NoChatResponse { .. } => LlmErrorClass::ServerError,
512
513 genai::Error::ChatResponse { body, .. } => classify_chat_response_body(body),
515
516 genai::Error::RequiresApiKey { .. }
518 | genai::Error::NoAuthResolver { .. }
519 | genai::Error::NoAuthData { .. } => LlmErrorClass::Auth,
520
521 other => classify_llm_error_message(&other.to_string()),
522 }
523}
524
525#[cfg(test)]
526pub(super) fn is_retryable_llm_error(error: &genai::Error) -> bool {
527 classify_llm_error(error).is_retryable()
528}
529
530static RETRY_BACKOFF_ENTROPY: AtomicU64 = AtomicU64::new(0x9e37_79b9_7f4a_7c15);
531
532#[derive(Debug, Clone, Default)]
533pub(super) struct RetryBackoffWindow {
534 started_at: Option<Instant>,
535}
536
537impl RetryBackoffWindow {
538 pub(super) fn elapsed_ms(&mut self) -> u64 {
539 self.started_at
540 .get_or_insert_with(Instant::now)
541 .elapsed()
542 .as_millis()
543 .try_into()
544 .unwrap_or(u64::MAX)
545 }
546
547 pub(super) fn reset(&mut self) {
548 self.started_at = None;
549 }
550}
551
552#[derive(Debug, Clone, Copy, PartialEq, Eq)]
553pub(super) enum RetryBackoffOutcome {
554 Completed,
555 Cancelled,
556 BudgetExhausted,
557}
558
559fn mix_retry_entropy(mut entropy: u64) -> u64 {
560 entropy = entropy.wrapping_add(0x9e37_79b9_7f4a_7c15);
561 entropy = (entropy ^ (entropy >> 30)).wrapping_mul(0xbf58_476d_1ce4_e5b9);
562 entropy = (entropy ^ (entropy >> 27)).wrapping_mul(0x94d0_49bb_1331_11eb);
563 entropy ^ (entropy >> 31)
564}
565
566fn next_retry_entropy() -> u64 {
567 let counter = RETRY_BACKOFF_ENTROPY.fetch_add(0x9e37_79b9_7f4a_7c15, Ordering::Relaxed);
568 let now = SystemTime::now()
569 .duration_since(UNIX_EPOCH)
570 .map(|duration| duration.as_nanos() as u64)
571 .unwrap_or(counter);
572 mix_retry_entropy(counter ^ now)
573}
574
575pub(super) fn retry_base_backoff_ms(policy: &LlmRetryPolicy, retry_attempt: usize) -> u64 {
576 let initial = policy.initial_backoff_ms;
577 let cap = policy.max_backoff_ms.max(policy.initial_backoff_ms);
578 let attempt = retry_attempt.max(1);
579 if attempt == 1 {
580 return initial.min(cap);
581 }
582 let shift = (attempt - 1).min(20) as u32;
583 let factor = 1u64.checked_shl(shift).unwrap_or(u64::MAX);
584 initial.saturating_mul(factor).min(cap)
585}
586
587fn jittered_backoff_ms(policy: &LlmRetryPolicy, retry_attempt: usize, entropy: u64) -> u64 {
588 let cap = policy.max_backoff_ms.max(policy.initial_backoff_ms);
589 let base_ms = retry_base_backoff_ms(policy, retry_attempt).min(cap);
590 let jitter_percent = policy.backoff_jitter_percent.min(100) as u64;
591 if jitter_percent == 0 || base_ms == 0 {
592 return base_ms;
593 }
594
595 let jitter_window = base_ms.saturating_mul(jitter_percent) / 100;
596 let lower = base_ms.saturating_sub(jitter_window);
597 let upper = base_ms.saturating_add(jitter_window).min(cap);
598 if upper <= lower {
599 return lower;
600 }
601
602 let span = upper - lower;
603 lower + (mix_retry_entropy(entropy) % (span.saturating_add(1)))
604}
605
606pub(super) fn retry_backoff_plan_ms(
607 policy: &LlmRetryPolicy,
608 retry_attempt: usize,
609 elapsed_ms: u64,
610 entropy: u64,
611) -> Option<u64> {
612 let wait_ms = jittered_backoff_ms(policy, retry_attempt, entropy);
613 match policy.max_retry_window_ms {
614 Some(budget_ms) if elapsed_ms.saturating_add(wait_ms) > budget_ms => None,
615 _ => Some(wait_ms),
616 }
617}
618
619pub(super) async fn wait_retry_backoff(
620 agent: &dyn Agent,
621 retry_attempt: usize,
622 retry_window: &mut RetryBackoffWindow,
623 run_cancellation_token: Option<&RunCancellationToken>,
624) -> RetryBackoffOutcome {
625 let elapsed_ms = retry_window.elapsed_ms();
626 let Some(wait_ms) = retry_backoff_plan_ms(
627 agent.llm_retry_policy(),
628 retry_attempt,
629 elapsed_ms,
630 next_retry_entropy(),
631 ) else {
632 return RetryBackoffOutcome::BudgetExhausted;
633 };
634 tracing::debug!(
635 attempt = retry_attempt,
636 backoff_ms = wait_ms,
637 elapsed_ms = elapsed_ms,
638 "waiting before LLM retry"
639 );
640 match await_or_cancel(
641 run_cancellation_token,
642 tokio::time::sleep(std::time::Duration::from_millis(wait_ms)),
643 )
644 .await
645 {
646 CancelAware::Cancelled => RetryBackoffOutcome::Cancelled,
647 CancelAware::Value(_) => RetryBackoffOutcome::Completed,
648 }
649}
650
651pub(super) enum LlmAttemptOutcome<T> {
652 Success {
653 value: T,
654 model: String,
655 attempts: usize,
656 },
657 Cancelled,
658 Exhausted {
659 last_error: String,
660 last_error_class: Option<&'static str>,
661 attempts: usize,
662 },
663}
664
665fn is_run_cancelled(token: Option<&RunCancellationToken>) -> bool {
666 is_cancelled(token)
667}
668
669pub(super) fn step_tool_provider_for_run(
670 agent: &dyn Agent,
671 tools: HashMap<String, Arc<dyn Tool>>,
672) -> Arc<dyn StepToolProvider> {
673 agent.step_tool_provider().unwrap_or_else(|| {
674 Arc::new(StaticStepToolProvider::new(tools)) as Arc<dyn StepToolProvider>
675 })
676}
677
678pub(super) fn llm_executor_for_run(agent: &dyn Agent) -> Arc<dyn LlmExecutor> {
679 agent
680 .llm_executor()
681 .unwrap_or_else(|| Arc::new(GenaiLlmExecutor::new(Client::default())))
682}
683
684pub(super) async fn resolve_step_tool_snapshot(
685 step_tool_provider: &Arc<dyn StepToolProvider>,
686 run_ctx: &RunContext,
687) -> Result<StepToolSnapshot, AgentLoopError> {
688 step_tool_provider
689 .provide(StepToolInput { state: run_ctx })
690 .await
691}
692
693fn mark_step_completed(run_state: &mut LoopRunState) {
694 run_state.completed_steps += 1;
695}
696
697fn stitch_response_text(prefix: &str, segment: &str) -> String {
698 if prefix.is_empty() {
699 return segment.to_string();
700 }
701 if segment.is_empty() {
702 return prefix.to_string();
703 }
704 let mut stitched = String::with_capacity(prefix.len() + segment.len());
705 stitched.push_str(prefix);
706 stitched.push_str(segment);
707 stitched
708}
709
710fn extend_response_prefix(prefix: &mut String, segment: &str) {
711 if !segment.is_empty() {
712 prefix.push_str(segment);
713 }
714}
715
716fn build_loop_outcome(
717 run_ctx: RunContext,
718 termination: TerminationReason,
719 response: Option<String>,
720 run_state: &LoopRunState,
721 failure: Option<outcome::LoopFailure>,
722) -> LoopOutcome {
723 LoopOutcome {
724 run_ctx,
725 termination,
726 response: response.filter(|text| !text.is_empty()),
727 usage: run_state.usage(),
728 stats: run_state.stats(),
729 failure,
730 }
731}
732
733pub(super) async fn run_llm_with_retry_and_fallback<T, Invoke, Fut>(
734 agent: &dyn Agent,
735 run_cancellation_token: Option<&RunCancellationToken>,
736 retry_current_model: bool,
737 start_model: Option<&str>,
738 unknown_error: &str,
739 mut invoke: Invoke,
740) -> LlmAttemptOutcome<T>
741where
742 Invoke: FnMut(String) -> Fut,
743 Fut: std::future::Future<Output = genai::Result<T>>,
744{
745 let mut last_llm_error = unknown_error.to_string();
746 let mut last_error_class: Option<&'static str> = None;
747 let model_candidates = effective_llm_models_from(agent, start_model);
748 let max_attempts = llm_retry_attempts(agent);
749 let mut total_attempts = 0usize;
750 let mut retry_window = RetryBackoffWindow::default();
751
752 'models: for model in model_candidates {
753 for attempt in 1..=max_attempts {
754 total_attempts = total_attempts.saturating_add(1);
755 let response_res =
756 match await_or_cancel(run_cancellation_token, invoke(model.clone())).await {
757 CancelAware::Cancelled => return LlmAttemptOutcome::Cancelled,
758 CancelAware::Value(resp) => resp,
759 };
760
761 match response_res {
762 Ok(value) => {
763 if total_attempts > 1 {
764 tracing::info!(
765 model = %model,
766 attempts = total_attempts,
767 "LLM call succeeded after retries"
768 );
769 }
770 return LlmAttemptOutcome::Success {
771 value,
772 model,
773 attempts: total_attempts,
774 };
775 }
776 Err(e) => {
777 let error_class = classify_llm_error(&e);
778 last_error_class = Some(error_class.as_str());
779 let message = e.to_string();
780 last_llm_error =
781 format!("model='{model}' attempt={attempt}/{max_attempts}: {message}");
782 let can_retry_same_model =
783 retry_current_model && attempt < max_attempts && error_class.is_retryable();
784 tracing::warn!(
785 model = %model,
786 attempt = attempt,
787 max_attempts = max_attempts,
788 error_class = error_class.as_str(),
789 retryable = can_retry_same_model,
790 error = %message,
791 "LLM call failed"
792 );
793 if can_retry_same_model {
794 match wait_retry_backoff(
795 agent,
796 attempt,
797 &mut retry_window,
798 run_cancellation_token,
799 )
800 .await
801 {
802 RetryBackoffOutcome::Completed => continue,
803 RetryBackoffOutcome::Cancelled => {
804 return LlmAttemptOutcome::Cancelled;
805 }
806 RetryBackoffOutcome::BudgetExhausted => {
807 tracing::warn!(
808 model = %model,
809 attempt = attempt,
810 "LLM retry budget exhausted"
811 );
812 last_llm_error =
813 format!("{last_llm_error} (retry budget exhausted)");
814 break 'models;
815 }
816 }
817 }
818 break;
819 }
820 }
821 }
822 }
823
824 LlmAttemptOutcome::Exhausted {
825 last_error: last_llm_error,
826 last_error_class,
827 attempts: total_attempts,
828 }
829}
830
831pub(super) async fn run_step_prepare_phases(
832 run_ctx: &RunContext,
833 tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
834 agent: &dyn Agent,
835) -> Result<
836 (
837 Vec<Message>,
838 Vec<String>,
839 RunAction,
840 Vec<std::sync::Arc<dyn tirea_contract::runtime::inference::InferenceRequestTransform>>,
841 Vec<TrackedPatch>,
842 Vec<tirea_contract::SerializedStateAction>,
843 ),
844 AgentLoopError,
845> {
846 let system_prompt = agent.system_prompt().to_string();
847 let ((messages, filtered_tools, run_action, transforms), pending, actions) =
848 plugin_runtime::run_phase_block(
849 run_ctx,
850 tool_descriptors,
851 agent,
852 &[Phase::StepStart, Phase::BeforeInference],
853 |_| {},
854 |step| inference_inputs_from_step(step, &system_prompt),
855 )
856 .await?;
857 Ok((
858 messages,
859 filtered_tools,
860 run_action,
861 transforms,
862 pending,
863 actions,
864 ))
865}
866
867pub(super) struct PreparedStep {
868 pub(super) messages: Vec<Message>,
869 pub(super) filtered_tools: Vec<String>,
870 pub(super) run_action: RunAction,
871 pub(super) pending_patches: Vec<TrackedPatch>,
872 pub(super) serialized_state_actions: Vec<tirea_contract::SerializedStateAction>,
873 pub(super) request_transforms:
874 Vec<std::sync::Arc<dyn tirea_contract::runtime::inference::InferenceRequestTransform>>,
875}
876
877pub(super) async fn prepare_step_execution(
878 run_ctx: &RunContext,
879 tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
880 agent: &dyn Agent,
881) -> Result<PreparedStep, AgentLoopError> {
882 let (messages, filtered_tools, run_action, transforms, pending, actions) =
883 run_step_prepare_phases(run_ctx, tool_descriptors, agent).await?;
884 Ok(PreparedStep {
885 messages,
886 filtered_tools,
887 run_action,
888 pending_patches: pending,
889 serialized_state_actions: actions,
890 request_transforms: transforms,
891 })
892}
893
894pub(super) async fn apply_llm_error_cleanup(
895 run_ctx: &mut RunContext,
896 tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
897 agent: &dyn Agent,
898 error_type: &'static str,
899 message: String,
900 error_class: Option<&str>,
901) -> Result<(), AgentLoopError> {
902 plugin_runtime::emit_cleanup_phases(
903 run_ctx,
904 tool_descriptors,
905 agent,
906 error_type,
907 message,
908 error_class,
909 )
910 .await
911}
912
913pub(super) async fn complete_step_after_inference(
914 run_ctx: &mut RunContext,
915 result: &StreamResult,
916 step_meta: MessageMetadata,
917 assistant_message_id: Option<String>,
918 tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
919 agent: &dyn Agent,
920) -> Result<RunAction, AgentLoopError> {
921 let (run_action, pending, actions) = plugin_runtime::run_phase_block(
922 run_ctx,
923 tool_descriptors,
924 agent,
925 &[Phase::AfterInference],
926 |step| {
927 use crate::contracts::runtime::inference::LLMResponse;
928 step.llm_response = Some(LLMResponse::success(result.clone()));
929 },
930 |step| step.run_action(),
931 )
932 .await?;
933 run_ctx.add_thread_patches(pending);
934 run_ctx.add_serialized_state_actions(actions);
935
936 let assistant = assistant_turn_message(result, step_meta, assistant_message_id);
937 run_ctx.add_message(Arc::new(assistant));
938
939 let (pending, actions) =
940 plugin_runtime::emit_phase_block(Phase::StepEnd, run_ctx, tool_descriptors, agent, |_| {})
941 .await?;
942 run_ctx.add_thread_patches(pending);
943 run_ctx.add_serialized_state_actions(actions);
944 Ok(run_action)
945}
946
947pub(super) fn pending_tool_events(call: &SuspendedCall) -> Vec<AgentEvent> {
949 vec![
950 AgentEvent::ToolCallStart {
951 id: call.ticket.pending.id.clone(),
952 name: call.ticket.pending.name.clone(),
953 },
954 AgentEvent::ToolCallReady {
955 id: call.ticket.pending.id.clone(),
956 name: call.ticket.pending.name.clone(),
957 arguments: call.ticket.pending.arguments.clone(),
958 },
959 ]
960}
961
962pub(super) fn has_suspended_calls(run_ctx: &RunContext) -> bool {
963 !suspended_calls_from_ctx(run_ctx).is_empty()
964}
965
966pub(super) fn suspended_call_ids(run_ctx: &RunContext) -> HashSet<String> {
967 suspended_calls_from_ctx(run_ctx).into_keys().collect()
968}
969
970pub(super) fn newly_suspended_call_ids(
971 run_ctx: &RunContext,
972 baseline_ids: &HashSet<String>,
973) -> HashSet<String> {
974 suspended_calls_from_ctx(run_ctx)
975 .into_keys()
976 .filter(|id| !baseline_ids.contains(id))
977 .collect()
978}
979
980pub(super) fn suspended_call_pending_events(run_ctx: &RunContext) -> Vec<AgentEvent> {
981 let mut calls: Vec<SuspendedCall> = suspended_calls_from_ctx(run_ctx).into_values().collect();
982 calls.sort_by(|left, right| left.call_id.cmp(&right.call_id));
983 calls
984 .into_iter()
985 .flat_map(|call| pending_tool_events(&call))
986 .collect()
987}
988
989pub(super) fn suspended_call_pending_events_for_ids(
990 run_ctx: &RunContext,
991 call_ids: &HashSet<String>,
992) -> Vec<AgentEvent> {
993 if call_ids.is_empty() {
994 return Vec::new();
995 }
996 let mut calls: Vec<SuspendedCall> = suspended_calls_from_ctx(run_ctx)
997 .into_iter()
998 .filter_map(|(call_id, call)| call_ids.contains(&call_id).then_some(call))
999 .collect();
1000 calls.sort_by(|left, right| left.call_id.cmp(&right.call_id));
1001 calls
1002 .into_iter()
1003 .flat_map(|call| pending_tool_events(&call))
1004 .collect()
1005}
1006
1007pub(super) struct ToolExecutionContext {
1008 pub(super) state: serde_json::Value,
1009 pub(super) run_policy: tirea_contract::RunPolicy,
1010 pub(super) run_identity: RunIdentity,
1011 pub(super) caller_context: crate::contracts::runtime::tool_call::CallerContext,
1012}
1013
1014pub(super) fn prepare_tool_execution_context(
1015 run_ctx: &RunContext,
1016) -> Result<ToolExecutionContext, AgentLoopError> {
1017 let state = run_ctx
1018 .snapshot()
1019 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1020 let caller_context = caller_context_for_tool_execution(run_ctx, &state);
1021 Ok(ToolExecutionContext {
1022 state,
1023 run_policy: run_ctx.run_policy().clone(),
1024 run_identity: run_ctx.run_identity().clone(),
1025 caller_context,
1026 })
1027}
1028
1029pub(super) async fn finalize_run_end(
1030 run_ctx: &mut RunContext,
1031 tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
1032 agent: &dyn Agent,
1033) {
1034 plugin_runtime::emit_run_end_phase(run_ctx, tool_descriptors, agent).await
1035}
1036
1037fn normalize_termination_for_suspended_calls(
1038 run_ctx: &RunContext,
1039 termination: TerminationReason,
1040 response: Option<String>,
1041) -> (TerminationReason, Option<String>) {
1042 let final_termination = if !matches!(
1043 termination,
1044 TerminationReason::Error(_) | TerminationReason::Cancelled
1045 ) && has_suspended_calls(run_ctx)
1046 {
1047 TerminationReason::Suspended
1048 } else {
1049 termination
1050 };
1051 let final_response = if final_termination == TerminationReason::Suspended {
1052 None
1053 } else {
1054 response
1055 };
1056 (final_termination, final_response)
1057}
1058
1059async fn persist_run_termination(
1060 run_ctx: &mut RunContext,
1061 termination: &TerminationReason,
1062 tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
1063 agent: &dyn Agent,
1064 run_identity: &RunIdentity,
1065 pending_delta_commit: &PendingDeltaCommitContext<'_>,
1066) -> Result<(), AgentLoopError> {
1067 sync_run_lifecycle_for_termination_with_context(run_ctx, run_identity, termination)?;
1068 finalize_run_end(run_ctx, tool_descriptors, agent).await;
1069 pending_delta_commit
1070 .commit_run_finished(run_ctx, termination)
1071 .await?;
1072 Ok(())
1073}
1074
1075fn stream_result_from_chat_response(response: &genai::chat::ChatResponse) -> StreamResult {
1076 let text = response
1077 .first_text()
1078 .map(|s| s.to_string())
1079 .unwrap_or_default();
1080 let tool_calls: Vec<crate::contracts::thread::ToolCall> = response
1081 .tool_calls()
1082 .into_iter()
1083 .map(|tc| {
1084 crate::contracts::thread::ToolCall::new(
1085 &tc.call_id,
1086 &tc.fn_name,
1087 tc.fn_arguments.clone(),
1088 )
1089 })
1090 .collect();
1091
1092 let usage = Some(crate::runtime::streaming::token_usage_from_genai(
1093 &response.usage,
1094 ));
1095 let stop_reason = response
1096 .stop_reason
1097 .as_ref()
1098 .and_then(crate::runtime::streaming::map_genai_stop_reason)
1099 .or({
1100 if !tool_calls.is_empty() {
1101 Some(tirea_contract::runtime::inference::StopReason::ToolUse)
1102 } else {
1103 Some(tirea_contract::runtime::inference::StopReason::EndTurn)
1104 }
1105 });
1106 StreamResult {
1107 text,
1108 tool_calls,
1109 usage,
1110 stop_reason,
1111 }
1112}
1113
1114fn assistant_turn_message(
1115 result: &StreamResult,
1116 step_meta: MessageMetadata,
1117 message_id: Option<String>,
1118) -> Message {
1119 let mut msg = if result.tool_calls.is_empty() {
1120 assistant_message(&result.text)
1121 } else {
1122 assistant_tool_calls(&result.text, result.tool_calls.clone())
1123 }
1124 .with_metadata(step_meta);
1125 if let Some(message_id) = message_id {
1126 msg = msg.with_id(message_id);
1127 }
1128 msg
1129}
1130
1131struct RunStartDrainOutcome {
1132 events: Vec<AgentEvent>,
1133 replayed: bool,
1134}
1135
1136fn decision_result_value(action: &ResumeDecisionAction, result: &Value) -> serde_json::Value {
1137 if result.is_null() {
1138 serde_json::Value::Bool(matches!(action, ResumeDecisionAction::Resume))
1139 } else {
1140 result.clone()
1141 }
1142}
1143
1144fn runtime_resume_inputs(run_ctx: &RunContext) -> HashMap<String, ToolCallResume> {
1145 let mut decisions = HashMap::new();
1146 for (call_id, state) in tool_call_states_from_ctx(run_ctx) {
1147 if !matches!(state.status, ToolCallStatus::Resuming) {
1148 continue;
1149 }
1150 let Some(mut resume) = state.resume else {
1151 continue;
1152 };
1153 if resume.decision_id.trim().is_empty() {
1154 resume.decision_id = call_id.clone();
1155 }
1156 decisions.insert(call_id, resume);
1157 }
1158 decisions
1159}
1160
1161fn settle_orphan_resuming_tool_states(
1162 run_ctx: &mut RunContext,
1163 suspended: &HashMap<String, SuspendedCall>,
1164 resumes: &HashMap<String, ToolCallResume>,
1165) -> Result<bool, AgentLoopError> {
1166 let states = tool_call_states_from_ctx(run_ctx);
1167 let mut changed = false;
1168
1169 for (call_id, resume) in resumes {
1170 if suspended.contains_key(call_id) {
1171 continue;
1172 }
1173 let Some(state) = states.get(call_id).cloned() else {
1174 continue;
1175 };
1176 let target_status = match &resume.action {
1177 ResumeDecisionAction::Cancel => ToolCallStatus::Cancelled,
1178 ResumeDecisionAction::Resume => ToolCallStatus::Failed,
1179 };
1180 if state.status == target_status && state.resume.as_ref() == Some(resume) {
1181 continue;
1182 }
1183
1184 let Some(next_state) = transition_tool_call_state(
1185 Some(state.clone()),
1186 ToolCallStateSeed {
1187 call_id: call_id.as_str(),
1188 tool_name: state.tool_name.as_str(),
1189 arguments: &state.arguments,
1190 status: state.status,
1191 resume_token: state.resume_token.clone(),
1192 },
1193 ToolCallStateTransition {
1194 status: target_status,
1195 resume_token: state.resume_token.clone(),
1196 resume: Some(resume.clone()),
1197 updated_at: current_unix_millis(),
1198 },
1199 ) else {
1200 continue;
1201 };
1202
1203 let base_state = run_ctx
1204 .snapshot()
1205 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1206 let patch = upsert_tool_call_state(&base_state, call_id, next_state)?;
1207 if patch.patch().is_empty() {
1208 continue;
1209 }
1210 run_ctx.add_thread_patch(patch);
1211 changed = true;
1212 }
1213
1214 Ok(changed)
1215}
1216
1217fn all_suspended_calls_have_resume(
1218 suspended: &HashMap<String, SuspendedCall>,
1219 resumes: &HashMap<String, ToolCallResume>,
1220) -> bool {
1221 suspended
1222 .keys()
1223 .all(|call_id| resumes.contains_key(call_id))
1224}
1225
1226async fn drain_resuming_tool_calls_and_replay(
1227 run_ctx: &mut RunContext,
1228 tools: &HashMap<String, Arc<dyn Tool>>,
1229 agent: &dyn Agent,
1230 tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
1231) -> Result<RunStartDrainOutcome, AgentLoopError> {
1232 let decisions = runtime_resume_inputs(run_ctx);
1233 if decisions.is_empty() {
1234 return Ok(RunStartDrainOutcome {
1235 events: Vec::new(),
1236 replayed: false,
1237 });
1238 }
1239
1240 let suspended = suspended_calls_from_ctx(run_ctx);
1241 let mut state_changed = false;
1242 if settle_orphan_resuming_tool_states(run_ctx, &suspended, &decisions)? {
1243 state_changed = true;
1244 }
1245 if suspended.is_empty() {
1246 if state_changed {
1247 let snapshot = run_ctx
1248 .snapshot()
1249 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1250 return Ok(RunStartDrainOutcome {
1251 events: vec![AgentEvent::StateSnapshot { snapshot }],
1252 replayed: false,
1253 });
1254 }
1255 return Ok(RunStartDrainOutcome {
1256 events: Vec::new(),
1257 replayed: false,
1258 });
1259 }
1260
1261 if matches!(
1262 agent.tool_executor().decision_replay_policy(),
1263 DecisionReplayPolicy::BatchAllSuspended
1264 ) && !all_suspended_calls_have_resume(&suspended, &decisions)
1265 {
1266 if state_changed {
1267 let snapshot = run_ctx
1268 .snapshot()
1269 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1270 return Ok(RunStartDrainOutcome {
1271 events: vec![AgentEvent::StateSnapshot { snapshot }],
1272 replayed: false,
1273 });
1274 }
1275 return Ok(RunStartDrainOutcome {
1276 events: Vec::new(),
1277 replayed: false,
1278 });
1279 }
1280
1281 let mut events = Vec::new();
1282 let mut decision_ids: Vec<String> = decisions.keys().cloned().collect();
1283 decision_ids.sort();
1284
1285 let mut replayed = false;
1286
1287 for call_id in decision_ids {
1288 let Some(suspended_call) = suspended.get(&call_id).cloned() else {
1289 continue;
1290 };
1291 let Some(decision) = decisions.get(&call_id).cloned() else {
1292 continue;
1293 };
1294 replayed = true;
1295 let decision_result = decision_result_value(&decision.action, &decision.result);
1296 let resume_payload = ToolCallResume {
1297 result: decision_result.clone(),
1298 ..decision.clone()
1299 };
1300 events.push(AgentEvent::ToolCallResumed {
1301 target_id: suspended_call.call_id.clone(),
1302 result: decision_result.clone(),
1303 });
1304
1305 match decision.action {
1306 ResumeDecisionAction::Cancel => {
1307 let cancel_reason = resume_payload.reason.clone();
1308 if upsert_tool_call_lifecycle_state(
1309 run_ctx,
1310 &suspended_call,
1311 ToolCallStatus::Cancelled,
1312 Some(resume_payload),
1313 )? {
1314 state_changed = true;
1315 }
1316 events.push(append_denied_tool_result_message(
1317 run_ctx,
1318 &suspended_call.call_id,
1319 Some(&suspended_call.tool_name),
1320 cancel_reason.as_deref(),
1321 ));
1322 let cleanup_path = format!(
1326 "__tool_call_scope.{}.suspended_call",
1327 suspended_call.call_id
1328 );
1329 let cleanup_patch = tirea_state::Patch::with_ops(vec![tirea_state::Op::delete(
1330 tirea_state::parse_path(&cleanup_path),
1331 )]);
1332 let tracked = tirea_state::TrackedPatch::new(cleanup_patch)
1333 .with_source("framework:scope_cleanup");
1334 if !tracked.patch().is_empty() {
1335 state_changed = true;
1336 run_ctx.add_thread_patch(tracked);
1337 }
1338 }
1339 ResumeDecisionAction::Resume => {
1340 if upsert_tool_call_lifecycle_state(
1341 run_ctx,
1342 &suspended_call,
1343 ToolCallStatus::Resuming,
1344 Some(resume_payload.clone()),
1345 )? {
1346 state_changed = true;
1347 }
1348 let Some(tool_call) = replay_tool_call_for_resolution(
1349 run_ctx,
1350 &suspended_call,
1351 &ToolCallDecision {
1352 target_id: suspended_call.call_id.clone(),
1353 resume: resume_payload.clone(),
1354 },
1355 ) else {
1356 continue;
1357 };
1358 let state = run_ctx
1359 .snapshot()
1360 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1361 let tool = tools.get(&tool_call.name).cloned();
1362 let caller_context = caller_context_for_tool_execution(run_ctx, &state);
1363 let replay_phase_ctx = ToolPhaseContext {
1364 tool_descriptors,
1365 agent_behavior: Some(agent.behavior()),
1366 activity_manager: tirea_contract::runtime::activity::NoOpActivityManager::arc(),
1367 run_policy: run_ctx.run_policy(),
1368 run_identity: run_ctx.run_identity().clone(),
1369 caller_context,
1370 thread_id: run_ctx.thread_id(),
1371 thread_messages: run_ctx.messages(),
1372 cancellation_token: None,
1373 };
1374 let replay_result = execute_single_tool_with_phases_deferred(
1375 tool.as_deref(),
1376 &tool_call,
1377 &state,
1378 &replay_phase_ctx,
1379 )
1380 .await?;
1381
1382 let replay_msg_id = gen_message_id();
1383 let replay_msg = tool_response(&tool_call.id, &replay_result.execution.result)
1384 .with_id(replay_msg_id.clone());
1385 run_ctx.add_message(Arc::new(replay_msg));
1386
1387 if !replay_result.reminders.is_empty() {
1388 let msgs: Vec<Arc<Message>> = replay_result
1389 .reminders
1390 .iter()
1391 .map(|reminder| {
1392 Arc::new(Message::internal_system(format!(
1393 "<system-reminder>{}</system-reminder>",
1394 reminder
1395 )))
1396 })
1397 .collect();
1398 run_ctx.add_messages(msgs);
1399 }
1400
1401 if let Some(patch) = replay_result.execution.patch.clone() {
1402 state_changed = true;
1403 run_ctx.add_thread_patch(patch);
1404 }
1405 if !replay_result.pending_patches.is_empty() {
1406 state_changed = true;
1407 run_ctx.add_thread_patches(replay_result.pending_patches.clone());
1408 }
1409 events.push(AgentEvent::ToolCallDone {
1410 id: tool_call.id.clone(),
1411 result: replay_result.execution.result,
1412 patch: replay_result.execution.patch,
1413 message_id: replay_msg_id,
1414 });
1415
1416 if let Some(next_suspended_call) = replay_result.suspended_call.clone() {
1417 let state = run_ctx
1418 .snapshot()
1419 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1420 let action = next_suspended_call.clone().into_state_action();
1421 let patches = reduce_state_actions(
1422 vec![action],
1423 &state,
1424 "agent_loop",
1425 &ScopeContext::run(),
1426 )
1427 .map_err(|e| {
1428 AgentLoopError::StateError(format!(
1429 "failed to reduce suspended call action: {e}"
1430 ))
1431 })?;
1432 for patch in patches {
1433 if !patch.patch().is_empty() {
1434 state_changed = true;
1435 run_ctx.add_thread_patch(patch);
1436 }
1437 }
1438 for event in pending_tool_events(&next_suspended_call) {
1439 events.push(event);
1440 }
1441 }
1442 }
1443 }
1444 }
1445
1446 if state_changed {
1451 let snapshot = run_ctx
1452 .snapshot()
1453 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1454 events.push(AgentEvent::StateSnapshot { snapshot });
1455 }
1456
1457 Ok(RunStartDrainOutcome { events, replayed })
1458}
1459
1460async fn drain_run_start_resume_replay(
1461 run_ctx: &mut RunContext,
1462 tools: &HashMap<String, Arc<dyn Tool>>,
1463 agent: &dyn Agent,
1464 tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
1465) -> Result<RunStartDrainOutcome, AgentLoopError> {
1466 drain_resuming_tool_calls_and_replay(run_ctx, tools, agent, tool_descriptors).await
1467}
1468
1469async fn commit_run_start_and_drain_replay(
1470 run_ctx: &mut RunContext,
1471 tools: &HashMap<String, Arc<dyn Tool>>,
1472 agent: &dyn Agent,
1473 active_tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
1474 pending_delta_commit: &PendingDeltaCommitContext<'_>,
1475) -> Result<RunStartDrainOutcome, AgentLoopError> {
1476 pending_delta_commit
1477 .commit(run_ctx, CheckpointReason::UserMessage, false)
1478 .await?;
1479
1480 let run_start_drain =
1481 drain_run_start_resume_replay(run_ctx, tools, agent, active_tool_descriptors).await?;
1482
1483 if run_start_drain.replayed {
1484 pending_delta_commit
1485 .commit(run_ctx, CheckpointReason::ToolResultsCommitted, false)
1486 .await?;
1487 }
1488
1489 Ok(run_start_drain)
1490}
1491
1492fn normalize_decision_tool_result(
1493 response: &serde_json::Value,
1494 fallback_arguments: &serde_json::Value,
1495) -> serde_json::Value {
1496 match response {
1497 serde_json::Value::Bool(_) => fallback_arguments.clone(),
1498 value => value.clone(),
1499 }
1500}
1501
1502fn denied_tool_result_for_call(
1503 run_ctx: &RunContext,
1504 call_id: &str,
1505 fallback_tool_name: Option<&str>,
1506 decision_reason: Option<&str>,
1507) -> ToolResult {
1508 let tool_name = fallback_tool_name
1509 .filter(|name| !name.is_empty())
1510 .map(str::to_string)
1511 .or_else(|| find_tool_call_in_messages(run_ctx, call_id).map(|call| call.name))
1512 .unwrap_or_else(|| "tool".to_string());
1513 let reason = decision_reason
1514 .map(str::to_string)
1515 .filter(|reason| !reason.trim().is_empty())
1516 .unwrap_or_else(|| "User denied the action".to_string());
1517 ToolResult::error(tool_name, reason)
1518}
1519
1520fn append_denied_tool_result_message(
1521 run_ctx: &mut RunContext,
1522 call_id: &str,
1523 fallback_tool_name: Option<&str>,
1524 decision_reason: Option<&str>,
1525) -> AgentEvent {
1526 let denied_result =
1527 denied_tool_result_for_call(run_ctx, call_id, fallback_tool_name, decision_reason);
1528 let message_id = gen_message_id();
1529 let denied_message = tool_response(call_id, &denied_result).with_id(message_id.clone());
1530 run_ctx.add_message(Arc::new(denied_message));
1531 AgentEvent::ToolCallDone {
1532 id: call_id.to_string(),
1533 result: denied_result,
1534 patch: None,
1535 message_id,
1536 }
1537}
1538
1539fn find_tool_call_in_messages(run_ctx: &RunContext, call_id: &str) -> Option<ToolCall> {
1540 run_ctx.messages().iter().rev().find_map(|message| {
1541 message
1542 .tool_calls
1543 .as_ref()
1544 .and_then(|calls| calls.iter().find(|call| call.id == call_id).cloned())
1545 })
1546}
1547
1548fn replay_tool_call_for_resolution(
1549 _run_ctx: &RunContext,
1550 suspended_call: &SuspendedCall,
1551 decision: &ToolCallDecision,
1552) -> Option<ToolCall> {
1553 if matches!(decision.resume.action, ResumeDecisionAction::Cancel) {
1554 return None;
1555 }
1556
1557 match suspended_call.ticket.resume_mode {
1558 ToolCallResumeMode::ReplayToolCall => Some(ToolCall::new(
1559 suspended_call.call_id.clone(),
1560 suspended_call.tool_name.clone(),
1561 suspended_call.arguments.clone(),
1562 )),
1563 ToolCallResumeMode::UseDecisionAsToolResult | ToolCallResumeMode::PassDecisionToTool => {
1564 Some(ToolCall::new(
1565 suspended_call.call_id.clone(),
1566 suspended_call.tool_name.clone(),
1567 normalize_decision_tool_result(&decision.resume.result, &suspended_call.arguments),
1568 ))
1569 }
1570 }
1571}
1572
1573fn upsert_tool_call_lifecycle_state(
1574 run_ctx: &mut RunContext,
1575 suspended_call: &SuspendedCall,
1576 status: ToolCallStatus,
1577 resume: Option<ToolCallResume>,
1578) -> Result<bool, AgentLoopError> {
1579 let current_state = tool_call_states_from_ctx(run_ctx).remove(&suspended_call.call_id);
1580 let Some(tool_state) = transition_tool_call_state(
1581 current_state,
1582 ToolCallStateSeed {
1583 call_id: &suspended_call.call_id,
1584 tool_name: &suspended_call.tool_name,
1585 arguments: &suspended_call.arguments,
1586 status: ToolCallStatus::Suspended,
1587 resume_token: Some(suspended_call.ticket.pending.id.clone()),
1588 },
1589 ToolCallStateTransition {
1590 status,
1591 resume_token: Some(suspended_call.ticket.pending.id.clone()),
1592 resume,
1593 updated_at: current_unix_millis(),
1594 },
1595 ) else {
1596 return Ok(false);
1597 };
1598
1599 let base_state = run_ctx
1600 .snapshot()
1601 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1602 let patch = upsert_tool_call_state(&base_state, &suspended_call.call_id, tool_state)?;
1603 if patch.patch().is_empty() {
1604 return Ok(false);
1605 }
1606 run_ctx.add_thread_patch(patch);
1607 Ok(true)
1608}
1609
1610pub(super) fn resolve_suspended_call(
1611 run_ctx: &mut RunContext,
1612 response: &ToolCallDecision,
1613) -> Result<Option<DecisionReplayOutcome>, AgentLoopError> {
1614 let suspended_calls = suspended_calls_from_ctx(run_ctx);
1615 if suspended_calls.is_empty() {
1616 return Ok(None);
1617 }
1618
1619 let suspended_call = suspended_calls
1620 .get(&response.target_id)
1621 .cloned()
1622 .or_else(|| {
1623 suspended_calls
1624 .values()
1625 .find(|call| {
1626 call.ticket.suspension.id == response.target_id
1627 || call.ticket.pending.id == response.target_id
1628 || call.call_id == response.target_id
1629 })
1630 .cloned()
1631 });
1632 let Some(suspended_call) = suspended_call else {
1633 return Ok(None);
1634 };
1635
1636 let _ = upsert_tool_call_lifecycle_state(
1637 run_ctx,
1638 &suspended_call,
1639 ToolCallStatus::Resuming,
1640 Some(response.resume.clone()),
1641 )?;
1642
1643 Ok(Some(DecisionReplayOutcome {
1644 events: Vec::new(),
1645 resolved_call_ids: vec![suspended_call.call_id],
1646 }))
1647}
1648
1649pub(super) fn drain_decision_channel(
1650 run_ctx: &mut RunContext,
1651 decision_rx: &mut Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1652 pending_decisions: &mut VecDeque<ToolCallDecision>,
1653) -> Result<DecisionReplayOutcome, AgentLoopError> {
1654 let mut disconnected = false;
1655 if let Some(rx) = decision_rx.as_mut() {
1656 loop {
1657 match rx.try_recv() {
1658 Ok(response) => pending_decisions.push_back(response),
1659 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
1660 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
1661 disconnected = true;
1662 break;
1663 }
1664 }
1665 }
1666 }
1667 if disconnected {
1668 *decision_rx = None;
1669 }
1670
1671 if pending_decisions.is_empty() {
1672 return Ok(DecisionReplayOutcome {
1673 events: Vec::new(),
1674 resolved_call_ids: Vec::new(),
1675 });
1676 }
1677
1678 let mut unresolved = VecDeque::new();
1679 let mut events = Vec::new();
1680 let mut resolved_call_ids = Vec::new();
1681 let mut seen = HashSet::new();
1682
1683 while let Some(response) = pending_decisions.pop_front() {
1684 if let Some(outcome) = resolve_suspended_call(run_ctx, &response)? {
1685 for call_id in outcome.resolved_call_ids {
1686 if seen.insert(call_id.clone()) {
1687 resolved_call_ids.push(call_id);
1688 }
1689 }
1690 events.extend(outcome.events);
1691 } else {
1692 unresolved.push_back(response);
1693 }
1694 }
1695 *pending_decisions = unresolved;
1696
1697 Ok(DecisionReplayOutcome {
1698 events,
1699 resolved_call_ids,
1700 })
1701}
1702
1703async fn replay_after_decisions(
1704 run_ctx: &mut RunContext,
1705 decisions_applied: bool,
1706 step_tool_provider: &Arc<dyn StepToolProvider>,
1707 agent: &dyn Agent,
1708 active_tool_descriptors: &mut Vec<crate::contracts::runtime::tool_call::ToolDescriptor>,
1709 pending_delta_commit: &PendingDeltaCommitContext<'_>,
1710) -> Result<Vec<AgentEvent>, AgentLoopError> {
1711 if !decisions_applied {
1712 return Ok(Vec::new());
1713 }
1714
1715 let decision_tools = resolve_step_tool_snapshot(step_tool_provider, run_ctx).await?;
1716 *active_tool_descriptors = decision_tools.descriptors.clone();
1717
1718 let decision_drain = drain_run_start_resume_replay(
1719 run_ctx,
1720 &decision_tools.tools,
1721 agent,
1722 active_tool_descriptors,
1723 )
1724 .await?;
1725
1726 pending_delta_commit
1727 .commit(run_ctx, CheckpointReason::ToolResultsCommitted, false)
1728 .await?;
1729
1730 Ok(decision_drain.events)
1731}
1732
1733async fn apply_decisions_and_replay(
1734 run_ctx: &mut RunContext,
1735 decision_rx: &mut Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1736 pending_decisions: &mut VecDeque<ToolCallDecision>,
1737 step_tool_provider: &Arc<dyn StepToolProvider>,
1738 agent: &dyn Agent,
1739 active_tool_descriptors: &mut Vec<crate::contracts::runtime::tool_call::ToolDescriptor>,
1740 pending_delta_commit: &PendingDeltaCommitContext<'_>,
1741) -> Result<Vec<AgentEvent>, AgentLoopError> {
1742 Ok(drain_and_replay_decisions(
1743 run_ctx,
1744 decision_rx,
1745 pending_decisions,
1746 None,
1747 DecisionReplayInputs {
1748 step_tool_provider,
1749 agent,
1750 active_tool_descriptors,
1751 },
1752 pending_delta_commit,
1753 )
1754 .await?
1755 .events)
1756}
1757
1758pub(super) struct DecisionReplayOutcome {
1759 events: Vec<AgentEvent>,
1760 resolved_call_ids: Vec<String>,
1761}
1762
1763struct DecisionReplayInputs<'a> {
1764 step_tool_provider: &'a Arc<dyn StepToolProvider>,
1765 agent: &'a dyn Agent,
1766 active_tool_descriptors: &'a mut Vec<crate::contracts::runtime::tool_call::ToolDescriptor>,
1767}
1768
1769async fn drain_and_replay_decisions(
1770 run_ctx: &mut RunContext,
1771 decision_rx: &mut Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1772 pending_decisions: &mut VecDeque<ToolCallDecision>,
1773 decision: Option<ToolCallDecision>,
1774 replay_inputs: DecisionReplayInputs<'_>,
1775 pending_delta_commit: &PendingDeltaCommitContext<'_>,
1776) -> Result<DecisionReplayOutcome, AgentLoopError> {
1777 if let Some(decision) = decision {
1778 pending_decisions.push_back(decision);
1779 }
1780 let decision_drain = drain_decision_channel(run_ctx, decision_rx, pending_decisions)?;
1781 let mut events = decision_drain.events;
1782 let replay_events = replay_after_decisions(
1783 run_ctx,
1784 !decision_drain.resolved_call_ids.is_empty(),
1785 replay_inputs.step_tool_provider,
1786 replay_inputs.agent,
1787 replay_inputs.active_tool_descriptors,
1788 pending_delta_commit,
1789 )
1790 .await?;
1791 events.extend(replay_events);
1792
1793 Ok(DecisionReplayOutcome {
1794 events,
1795 resolved_call_ids: decision_drain.resolved_call_ids,
1796 })
1797}
1798
1799async fn apply_decision_and_replay(
1800 run_ctx: &mut RunContext,
1801 response: ToolCallDecision,
1802 decision_rx: &mut Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1803 pending_decisions: &mut VecDeque<ToolCallDecision>,
1804 replay_inputs: DecisionReplayInputs<'_>,
1805 pending_delta_commit: &PendingDeltaCommitContext<'_>,
1806) -> Result<DecisionReplayOutcome, AgentLoopError> {
1807 drain_and_replay_decisions(
1808 run_ctx,
1809 decision_rx,
1810 pending_decisions,
1811 Some(response),
1812 replay_inputs,
1813 pending_delta_commit,
1814 )
1815 .await
1816}
1817
1818async fn recv_decision(
1819 decision_rx: &mut Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1820) -> Option<ToolCallDecision> {
1821 let rx = decision_rx.as_mut()?;
1822 rx.recv().await
1823}
1824
1825pub async fn run_loop(
1831 agent: &dyn Agent,
1832 tools: HashMap<String, Arc<dyn Tool>>,
1833 run_ctx: RunContext,
1834 cancellation_token: Option<RunCancellationToken>,
1835 state_committer: Option<Arc<dyn StateCommitter>>,
1836 decision_rx: Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1837) -> LoopOutcome {
1838 let run_identity = run_ctx.run_identity().clone();
1839 run_loop_with_context(
1840 agent,
1841 tools,
1842 run_ctx,
1843 run_identity,
1844 cancellation_token,
1845 state_committer,
1846 decision_rx,
1847 )
1848 .await
1849}
1850
1851pub async fn run_loop_with_context(
1857 agent: &dyn Agent,
1858 tools: HashMap<String, Arc<dyn Tool>>,
1859 mut run_ctx: RunContext,
1860 run_identity: RunIdentity,
1861 cancellation_token: Option<RunCancellationToken>,
1862 state_committer: Option<Arc<dyn StateCommitter>>,
1863 mut decision_rx: Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1864) -> LoopOutcome {
1865 let run_identity = ensure_run_identity(agent, &mut run_ctx, run_identity);
1866 let executor = llm_executor_for_run(agent);
1867 let tool_executor = agent.tool_executor();
1868 let mut run_state = LoopRunState::new();
1869 let mut pending_decisions = VecDeque::new();
1870 let run_cancellation_token = cancellation_token;
1871 let mut last_text = String::new();
1872 let mut continued_response_prefix = String::new();
1873 let step_tool_provider = step_tool_provider_for_run(agent, tools);
1874 let run_id = run_identity.run_id.clone();
1875 let baseline_suspended_call_ids = suspended_call_ids(&run_ctx);
1876 let pending_delta_commit =
1877 PendingDeltaCommitContext::new(&run_identity, state_committer.as_ref());
1878 let initial_step_tools = match resolve_step_tool_snapshot(&step_tool_provider, &run_ctx).await {
1879 Ok(snapshot) => snapshot,
1880 Err(error) => {
1881 let msg = error.to_string();
1882 return build_loop_outcome(
1883 run_ctx,
1884 TerminationReason::Error(msg.clone()),
1885 None,
1886 &run_state,
1887 Some(outcome::LoopFailure::State(msg)),
1888 );
1889 }
1890 };
1891 let StepToolSnapshot {
1892 tools: initial_tools,
1893 descriptors: initial_descriptors,
1894 } = initial_step_tools;
1895 let mut active_tool_descriptors = initial_descriptors;
1896
1897 macro_rules! terminate_run {
1898 ($termination:expr, $response:expr, $failure:expr) => {{
1899 let reason: TerminationReason = $termination;
1900 let (final_termination, final_response) =
1901 normalize_termination_for_suspended_calls(&run_ctx, reason, $response);
1902 if let Err(error) = persist_run_termination(
1903 &mut run_ctx,
1904 &final_termination,
1905 &active_tool_descriptors,
1906 agent,
1907 &run_identity,
1908 &pending_delta_commit,
1909 )
1910 .await
1911 {
1912 let msg = error.to_string();
1913 return build_loop_outcome(
1914 run_ctx,
1915 TerminationReason::Error(msg.clone()),
1916 None,
1917 &run_state,
1918 Some(outcome::LoopFailure::State(msg)),
1919 );
1920 }
1921 return build_loop_outcome(
1922 run_ctx,
1923 final_termination,
1924 final_response,
1925 &run_state,
1926 $failure,
1927 );
1928 }};
1929 }
1930
1931 let (pending, actions) = match plugin_runtime::emit_phase_block(
1933 Phase::RunStart,
1934 &run_ctx,
1935 &active_tool_descriptors,
1936 agent,
1937 |_| {},
1938 )
1939 .await
1940 {
1941 Ok(result) => result,
1942 Err(error) => {
1943 let msg = error.to_string();
1944 terminate_run!(
1945 TerminationReason::Error(msg.clone()),
1946 None,
1947 Some(outcome::LoopFailure::State(msg))
1948 );
1949 }
1950 };
1951 run_ctx.add_thread_patches(pending);
1952 run_ctx.add_serialized_state_actions(actions);
1953 if let Err(error) = commit_run_start_and_drain_replay(
1954 &mut run_ctx,
1955 &initial_tools,
1956 agent,
1957 &active_tool_descriptors,
1958 &pending_delta_commit,
1959 )
1960 .await
1961 {
1962 let msg = error.to_string();
1963 terminate_run!(
1964 TerminationReason::Error(msg.clone()),
1965 None,
1966 Some(outcome::LoopFailure::State(msg))
1967 );
1968 }
1969 let run_start_new_suspended = newly_suspended_call_ids(&run_ctx, &baseline_suspended_call_ids);
1970 if !run_start_new_suspended.is_empty() {
1971 terminate_run!(TerminationReason::Suspended, None, None);
1972 }
1973 loop {
1974 if let Err(error) = apply_decisions_and_replay(
1975 &mut run_ctx,
1976 &mut decision_rx,
1977 &mut pending_decisions,
1978 &step_tool_provider,
1979 agent,
1980 &mut active_tool_descriptors,
1981 &pending_delta_commit,
1982 )
1983 .await
1984 {
1985 let msg = error.to_string();
1986 terminate_run!(
1987 TerminationReason::Error(msg.clone()),
1988 None,
1989 Some(outcome::LoopFailure::State(msg))
1990 );
1991 }
1992
1993 if is_run_cancelled(run_cancellation_token.as_ref()) {
1994 terminate_run!(TerminationReason::Cancelled, None, None);
1995 }
1996
1997 let step_tools = match resolve_step_tool_snapshot(&step_tool_provider, &run_ctx).await {
1998 Ok(snapshot) => snapshot,
1999 Err(e) => {
2000 let msg = e.to_string();
2001 terminate_run!(
2002 TerminationReason::Error(msg.clone()),
2003 None,
2004 Some(outcome::LoopFailure::State(msg))
2005 );
2006 }
2007 };
2008 active_tool_descriptors = step_tools.descriptors.clone();
2009
2010 let prepared = match prepare_step_execution(&run_ctx, &active_tool_descriptors, agent).await
2011 {
2012 Ok(v) => v,
2013 Err(e) => {
2014 let msg = e.to_string();
2015 terminate_run!(
2016 TerminationReason::Error(msg.clone()),
2017 None,
2018 Some(outcome::LoopFailure::State(msg))
2019 );
2020 }
2021 };
2022 run_ctx.add_thread_patches(prepared.pending_patches);
2023
2024 match prepared.run_action {
2025 RunAction::Continue => {}
2026 RunAction::Terminate(reason) => {
2027 let response = if matches!(reason, TerminationReason::BehaviorRequested) {
2028 Some(last_text.clone())
2029 } else {
2030 None
2031 };
2032 terminate_run!(reason, response, None);
2033 }
2034 }
2035
2036 let messages = prepared.messages;
2038 let filtered_tools = prepared.filtered_tools;
2039 let request_transforms = prepared.request_transforms;
2040 let chat_options = agent.chat_options().cloned();
2041 let attempt_outcome = run_llm_with_retry_and_fallback(
2042 agent,
2043 run_cancellation_token.as_ref(),
2044 true,
2045 None,
2046 "unknown llm error",
2047 |model| {
2048 let request = build_request_for_filtered_tools(
2049 &messages,
2050 &step_tools.tools,
2051 &filtered_tools,
2052 &request_transforms,
2053 );
2054 let executor = executor.clone();
2055 let chat_options = chat_options.clone();
2056 async move {
2057 executor
2058 .exec_chat_response(&model, request, chat_options.as_ref())
2059 .await
2060 }
2061 },
2062 )
2063 .await;
2064
2065 let response = match attempt_outcome {
2066 LlmAttemptOutcome::Success {
2067 value, attempts, ..
2068 } => {
2069 run_state.record_llm_attempts(attempts);
2070 value
2071 }
2072 LlmAttemptOutcome::Cancelled => {
2073 append_cancellation_user_message(&mut run_ctx, CancellationStage::Inference);
2074 terminate_run!(TerminationReason::Cancelled, None, None);
2075 }
2076 LlmAttemptOutcome::Exhausted {
2077 last_error,
2078 last_error_class,
2079 attempts,
2080 } => {
2081 run_state.record_llm_attempts(attempts);
2082 if let Err(phase_error) = apply_llm_error_cleanup(
2083 &mut run_ctx,
2084 &active_tool_descriptors,
2085 agent,
2086 "llm_exec_error",
2087 last_error.clone(),
2088 last_error_class,
2089 )
2090 .await
2091 {
2092 let msg = phase_error.to_string();
2093 terminate_run!(
2094 TerminationReason::Error(msg.clone()),
2095 None,
2096 Some(outcome::LoopFailure::State(msg))
2097 );
2098 }
2099 terminate_run!(
2100 TerminationReason::Error(last_error.clone()),
2101 None,
2102 Some(outcome::LoopFailure::Llm(last_error))
2103 );
2104 }
2105 };
2106
2107 let result = stream_result_from_chat_response(&response);
2108 run_state.update_from_response(&result);
2109 last_text = stitch_response_text(&continued_response_prefix, &result.text);
2110
2111 let assistant_msg_id = gen_message_id();
2113 let step_meta = step_metadata(Some(run_id.clone()), run_state.completed_steps as u32);
2114 let post_inference_action = match complete_step_after_inference(
2115 &mut run_ctx,
2116 &result,
2117 step_meta.clone(),
2118 Some(assistant_msg_id.clone()),
2119 &active_tool_descriptors,
2120 agent,
2121 )
2122 .await
2123 {
2124 Ok(action) => action,
2125 Err(e) => {
2126 let msg = e.to_string();
2127 terminate_run!(
2128 TerminationReason::Error(msg.clone()),
2129 None,
2130 Some(outcome::LoopFailure::State(msg))
2131 );
2132 }
2133 };
2134 if let Err(error) = pending_delta_commit
2135 .commit(
2136 &mut run_ctx,
2137 CheckpointReason::AssistantTurnCommitted,
2138 false,
2139 )
2140 .await
2141 {
2142 let msg = error.to_string();
2143 terminate_run!(
2144 TerminationReason::Error(msg.clone()),
2145 None,
2146 Some(outcome::LoopFailure::State(msg))
2147 );
2148 }
2149
2150 mark_step_completed(&mut run_state);
2151
2152 if truncation_recovery::should_retry(&result, &mut run_state) {
2155 extend_response_prefix(&mut continued_response_prefix, &result.text);
2156 let prompt = truncation_recovery::continuation_message();
2157 run_ctx.add_message(Arc::new(prompt));
2158 continue;
2159 }
2160 continued_response_prefix.clear();
2161
2162 let post_inference_termination = match &post_inference_action {
2163 RunAction::Terminate(reason) => Some(reason.clone()),
2164 _ => None,
2165 };
2166
2167 if let Some(reason) = &post_inference_termination {
2171 if !matches!(reason, TerminationReason::Stopped(_)) {
2172 terminate_run!(reason.clone(), Some(last_text.clone()), None);
2173 }
2174 }
2175
2176 if !result.needs_tools() {
2177 run_state.record_step_without_tools();
2178 let reason = post_inference_termination.unwrap_or(TerminationReason::NaturalEnd);
2179 terminate_run!(reason, Some(last_text.clone()), None);
2180 }
2181
2182 let tool_context = match prepare_tool_execution_context(&run_ctx) {
2184 Ok(ctx) => ctx,
2185 Err(e) => {
2186 let msg = e.to_string();
2187 terminate_run!(
2188 TerminationReason::Error(msg.clone()),
2189 None,
2190 Some(outcome::LoopFailure::State(msg))
2191 );
2192 }
2193 };
2194 let thread_messages_for_tools = run_ctx.messages().to_vec();
2195 let thread_version_for_tools = run_ctx.version();
2196
2197 let tool_exec_future = tool_executor.execute(ToolExecutionRequest {
2198 tools: &step_tools.tools,
2199 calls: &result.tool_calls,
2200 state: &tool_context.state,
2201 tool_descriptors: &active_tool_descriptors,
2202 agent_behavior: Some(agent.behavior()),
2203 activity_manager: tirea_contract::runtime::activity::NoOpActivityManager::arc(),
2204 run_policy: &tool_context.run_policy,
2205 run_identity: tool_context.run_identity.clone(),
2206 caller_context: tool_context.caller_context.clone(),
2207 thread_id: run_ctx.thread_id(),
2208 thread_messages: &thread_messages_for_tools,
2209 state_version: thread_version_for_tools,
2210 cancellation_token: run_cancellation_token.as_ref(),
2211 });
2212 let results = tool_exec_future.await.map_err(AgentLoopError::from);
2213
2214 let results = match results {
2215 Ok(r) => r,
2216 Err(AgentLoopError::Cancelled) => {
2217 append_cancellation_user_message(&mut run_ctx, CancellationStage::ToolExecution);
2218 terminate_run!(TerminationReason::Cancelled, None, None);
2219 }
2220 Err(e) => {
2221 let msg = e.to_string();
2222 terminate_run!(
2223 TerminationReason::Error(msg.clone()),
2224 None,
2225 Some(outcome::LoopFailure::State(msg))
2226 );
2227 }
2228 };
2229
2230 if let Err(_e) = apply_tool_results_to_session(
2231 &mut run_ctx,
2232 &results,
2233 Some(step_meta),
2234 tool_executor.requires_parallel_patch_conflict_check(),
2235 ) {
2236 let msg = _e.to_string();
2238 terminate_run!(
2239 TerminationReason::Error(msg.clone()),
2240 None,
2241 Some(outcome::LoopFailure::State(msg))
2242 );
2243 }
2244 if let Err(error) = pending_delta_commit
2245 .commit(&mut run_ctx, CheckpointReason::ToolResultsCommitted, false)
2246 .await
2247 {
2248 let msg = error.to_string();
2249 terminate_run!(
2250 TerminationReason::Error(msg.clone()),
2251 None,
2252 Some(outcome::LoopFailure::State(msg))
2253 );
2254 }
2255
2256 if let Err(error) = apply_decisions_and_replay(
2257 &mut run_ctx,
2258 &mut decision_rx,
2259 &mut pending_decisions,
2260 &step_tool_provider,
2261 agent,
2262 &mut active_tool_descriptors,
2263 &pending_delta_commit,
2264 )
2265 .await
2266 {
2267 let msg = error.to_string();
2268 terminate_run!(
2269 TerminationReason::Error(msg.clone()),
2270 None,
2271 Some(outcome::LoopFailure::State(msg))
2272 );
2273 }
2274
2275 if has_suspended_calls(&run_ctx) {
2277 let has_completed = results
2278 .iter()
2279 .any(|r| !matches!(r.outcome, crate::contracts::ToolCallOutcome::Suspended));
2280 if !has_completed {
2281 terminate_run!(TerminationReason::Suspended, None, None);
2282 }
2283 }
2284
2285 if let Some(reason) = post_inference_termination {
2288 terminate_run!(reason, Some(last_text.clone()), None);
2289 }
2290
2291 let error_count = results
2293 .iter()
2294 .filter(|r| r.execution.result.is_error())
2295 .count();
2296 run_state.record_tool_step(&result.tool_calls, error_count);
2297 }
2298}
2299
2300pub fn run_loop_stream(
2306 agent: Arc<dyn Agent>,
2307 tools: HashMap<String, Arc<dyn Tool>>,
2308 run_ctx: RunContext,
2309 cancellation_token: Option<RunCancellationToken>,
2310 state_committer: Option<Arc<dyn StateCommitter>>,
2311 decision_rx: Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
2312) -> Pin<Box<dyn Stream<Item = AgentEvent> + Send>> {
2313 let run_identity = run_ctx.run_identity().clone();
2314 run_loop_stream_with_context(
2315 agent,
2316 tools,
2317 run_ctx,
2318 run_identity,
2319 cancellation_token,
2320 state_committer,
2321 decision_rx,
2322 )
2323}
2324
2325pub fn run_loop_stream_with_context(
2326 agent: Arc<dyn Agent>,
2327 tools: HashMap<String, Arc<dyn Tool>>,
2328 mut run_ctx: RunContext,
2329 run_identity: RunIdentity,
2330 cancellation_token: Option<RunCancellationToken>,
2331 state_committer: Option<Arc<dyn StateCommitter>>,
2332 decision_rx: Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
2333) -> Pin<Box<dyn Stream<Item = AgentEvent> + Send>> {
2334 let run_identity = ensure_run_identity(agent.as_ref(), &mut run_ctx, run_identity);
2335 stream_runner::run_stream(
2336 agent,
2337 tools,
2338 run_ctx,
2339 run_identity,
2340 cancellation_token,
2341 state_committer,
2342 decision_rx,
2343 )
2344}
2345
2346#[cfg(test)]
2347mod tests;