1mod config;
42mod core;
43mod event_envelope_meta;
44mod outcome;
45mod parallel_state_merge;
46mod plugin_runtime;
47mod run_state;
48mod state_commit;
49mod stream_core;
50mod stream_runner;
51mod tool_exec;
52mod truncation_recovery;
53
54use crate::contracts::io::ResumeDecisionAction;
55use crate::contracts::runtime::phase::Phase;
56use crate::contracts::runtime::state::{reduce_state_actions, AnyStateAction, ScopeContext};
57use crate::contracts::runtime::tool_call::{Tool, ToolResult};
58use crate::contracts::runtime::ActivityManager;
59use crate::contracts::runtime::{
60 DecisionReplayPolicy, RunLifecycleAction, RunLifecycleState, StreamResult, SuspendedCall,
61 ToolCallResume, ToolCallResumeMode, ToolCallStatus, ToolExecutionRequest, ToolExecutionResult,
62};
63use crate::contracts::thread::CheckpointReason;
64use crate::contracts::thread::{gen_message_id, Message, MessageMetadata, ToolCall};
65use crate::contracts::RunContext;
66use crate::contracts::{AgentEvent, RunAction, RunOrigin, TerminationReason, ToolCallDecision};
67use crate::engine::convert::{assistant_message, assistant_tool_calls, tool_response};
68use crate::runtime::activity::ActivityHub;
69
70use crate::runtime::streaming::StreamCollector;
71use async_stream::stream;
72use futures::{Stream, StreamExt};
73use genai::Client;
74use serde_json::Value;
75use std::collections::{HashMap, HashSet, VecDeque};
76use std::future::Future;
77use std::pin::Pin;
78use std::sync::atomic::{AtomicU64, Ordering};
79use std::sync::Arc;
80use std::time::{Instant, SystemTime, UNIX_EPOCH};
81use uuid::Uuid;
82
83pub use crate::contracts::runtime::ToolExecutor;
84pub use crate::runtime::run_context::{
85 await_or_cancel, is_cancelled, CancelAware, RunCancellationToken, StateCommitError,
86 StateCommitter, TOOL_SCOPE_CALLER_AGENT_ID_KEY, TOOL_SCOPE_CALLER_MESSAGES_KEY,
87 TOOL_SCOPE_CALLER_STATE_KEY, TOOL_SCOPE_CALLER_THREAD_ID_KEY,
88};
89use config::StaticStepToolProvider;
90pub use config::{Agent, BaseAgent, GenaiLlmExecutor, LlmRetryPolicy};
91pub use config::{LlmEventStream, LlmExecutor};
92pub use config::{StepToolInput, StepToolProvider, StepToolSnapshot};
93#[cfg(test)]
94use core::build_messages;
95use core::{
96 build_request_for_filtered_tools, inference_inputs_from_step, suspended_calls_from_ctx,
97 tool_call_states_from_ctx, transition_tool_call_state, upsert_tool_call_state,
98 ToolCallStateSeed, ToolCallStateTransition,
99};
100pub use outcome::{tool_map, tool_map_from_arc, AgentLoopError};
101pub use outcome::{LoopOutcome, LoopStats, LoopUsage};
102#[cfg(test)]
103use plugin_runtime::emit_agent_phase;
104#[cfg(test)]
105use plugin_runtime::emit_cleanup_phases;
106use run_state::LoopRunState;
107pub use state_commit::ChannelStateCommitter;
108use state_commit::PendingDeltaCommitContext;
109use tirea_state::TrackedPatch;
110#[cfg(test)]
111use tokio_util::sync::CancellationToken;
112#[cfg(test)]
113use tool_exec::execute_single_tool_with_phases;
114#[cfg(test)]
115use tool_exec::execute_tools_parallel_with_phases;
116pub use tool_exec::ExecuteToolsOutcome;
117use tool_exec::{
118 apply_tool_results_impl, apply_tool_results_to_session,
119 execute_single_tool_with_phases_deferred, scope_with_tool_caller_context, step_metadata,
120 ToolPhaseContext,
121};
122pub use tool_exec::{
123 execute_tools, execute_tools_with_behaviors, execute_tools_with_config,
124 ParallelToolExecutionMode, ParallelToolExecutor, SequentialToolExecutor,
125};
126
127pub struct ResolvedRun {
133 pub agent: BaseAgent,
139 pub tools: HashMap<String, Arc<dyn Tool>>,
141 pub run_config: crate::contracts::RunConfig,
143}
144
145impl ResolvedRun {
146 #[must_use]
148 pub fn with_tool(mut self, id: String, tool: Arc<dyn Tool>) -> Self {
149 self.tools.insert(id, tool);
150 self
151 }
152
153 pub fn overlay_tools(&mut self, tools: HashMap<String, Arc<dyn Tool>>) {
155 for (id, tool) in tools {
156 self.tools.entry(id).or_insert(tool);
157 }
158 }
159}
160
161#[derive(Debug, Clone)]
162pub struct RunExecutionContext {
163 pub run_id: String,
164 pub parent_run_id: Option<String>,
165 pub agent_id: String,
166 pub origin: RunOrigin,
167}
168
169impl RunExecutionContext {
170 pub const RUN_ID_KEY: &'static str = "run_id";
171 pub const PARENT_RUN_ID_KEY: &'static str = "parent_run_id";
172 pub const AGENT_ID_KEY: &'static str = "agent_id";
173 pub const ORIGIN_KEY: &'static str = "origin";
174
175 #[must_use]
176 pub const fn new(
177 run_id: String,
178 parent_run_id: Option<String>,
179 agent_id: String,
180 origin: RunOrigin,
181 ) -> Self {
182 Self {
183 run_id,
184 parent_run_id,
185 agent_id,
186 origin,
187 }
188 }
189
190 #[must_use]
191 pub fn from_run_config(run_ctx: &RunContext) -> Self {
192 let run_id = run_ctx
193 .run_config
194 .value(Self::RUN_ID_KEY)
195 .and_then(Value::as_str)
196 .map(str::trim)
197 .filter(|id| !id.is_empty())
198 .map(ToOwned::to_owned)
199 .unwrap_or_else(uuid_v7);
200
201 let parent_run_id = run_ctx
202 .run_config
203 .value(Self::PARENT_RUN_ID_KEY)
204 .and_then(Value::as_str)
205 .map(str::trim)
206 .filter(|id| !id.is_empty())
207 .map(ToOwned::to_owned);
208
209 let agent_id = run_ctx
210 .run_config
211 .value(Self::AGENT_ID_KEY)
212 .and_then(Value::as_str)
213 .map(str::trim)
214 .filter(|id| !id.is_empty())
215 .map(ToOwned::to_owned)
216 .unwrap_or_else(|| run_ctx.thread_id().to_string());
217
218 let origin = run_ctx
219 .run_config
220 .value(Self::ORIGIN_KEY)
221 .and_then(|value| serde_json::from_value::<RunOrigin>(value.clone()).ok())
222 .unwrap_or_default();
223
224 Self::new(run_id, parent_run_id, agent_id, origin)
225 }
226}
227
228fn uuid_v7() -> String {
229 Uuid::now_v7().simple().to_string()
230}
231
232fn bind_execution_context_to_run_config(
233 run_ctx: &mut RunContext,
234 execution_ctx: &RunExecutionContext,
235) {
236 let _ = run_ctx.run_config.set(
237 RunExecutionContext::RUN_ID_KEY,
238 execution_ctx.run_id.clone(),
239 );
240 if let Some(parent_run_id) = execution_ctx.parent_run_id.as_deref() {
241 let _ = run_ctx.run_config.set(
242 RunExecutionContext::PARENT_RUN_ID_KEY,
243 parent_run_id.to_string(),
244 );
245 }
246 let _ = run_ctx.run_config.set(
247 RunExecutionContext::AGENT_ID_KEY,
248 execution_ctx.agent_id.clone(),
249 );
250 if let Ok(origin_value) = serde_json::to_value(execution_ctx.origin) {
251 let _ = run_ctx
252 .run_config
253 .set(RunExecutionContext::ORIGIN_KEY, origin_value);
254 }
255}
256
257pub(crate) fn current_unix_millis() -> u64 {
258 SystemTime::now()
259 .duration_since(UNIX_EPOCH)
260 .map_or(0, |d| d.as_millis().min(u128::from(u64::MAX)) as u64)
261}
262
263#[cfg(test)]
264pub(super) fn sync_run_lifecycle_for_termination(
265 run_ctx: &mut RunContext,
266 execution_ctx: &RunExecutionContext,
267 termination: &TerminationReason,
268) -> Result<(), AgentLoopError> {
269 sync_run_lifecycle_for_termination_with_context(run_ctx, execution_ctx, termination)
270}
271
272fn sync_run_lifecycle_for_termination_with_context(
273 run_ctx: &mut RunContext,
274 execution_ctx: &RunExecutionContext,
275 termination: &TerminationReason,
276) -> Result<(), AgentLoopError> {
277 if execution_ctx.run_id.trim().is_empty() {
278 return Ok(());
279 };
280
281 let (status, done_reason) = termination.to_run_status();
282
283 let base_state = run_ctx
284 .snapshot()
285 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
286 let actions = vec![AnyStateAction::new::<RunLifecycleState>(
287 RunLifecycleAction::Set {
288 id: execution_ctx.run_id.clone(),
289 status,
290 done_reason,
291 updated_at: current_unix_millis(),
292 },
293 )];
294 let patches = reduce_state_actions(actions, &base_state, "agent_loop", &ScopeContext::run())
295 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
296 run_ctx.add_thread_patches(patches);
297 Ok(())
298}
299
300#[derive(Debug, Clone, Copy, PartialEq, Eq)]
301pub(super) enum CancellationStage {
302 Inference,
303 ToolExecution,
304}
305
306pub(super) const CANCELLATION_INFERENCE_USER_MESSAGE: &str =
307 "The previous run was interrupted during inference. Please continue from the current context.";
308pub(super) const CANCELLATION_TOOL_USER_MESSAGE: &str =
309 "The previous run was interrupted while using tools. Please continue from the current context.";
310
311pub(super) fn append_cancellation_user_message(run_ctx: &mut RunContext, stage: CancellationStage) {
312 let content = match stage {
313 CancellationStage::Inference => CANCELLATION_INFERENCE_USER_MESSAGE,
314 CancellationStage::ToolExecution => CANCELLATION_TOOL_USER_MESSAGE,
315 };
316 run_ctx.add_message(Arc::new(Message::user(content)));
317}
318
319pub(super) fn effective_llm_models(agent: &dyn Agent) -> Vec<String> {
320 let mut models = Vec::with_capacity(1 + agent.fallback_models().len());
321 models.push(agent.model().to_string());
322 for model in agent.fallback_models() {
323 if model.trim().is_empty() {
324 continue;
325 }
326 if !models.iter().any(|m| m == model) {
327 models.push(model.clone());
328 }
329 }
330 models
331}
332
333pub(super) fn effective_llm_models_from(
334 agent: &dyn Agent,
335 start_model: Option<&str>,
336) -> Vec<String> {
337 let models = effective_llm_models(agent);
338 let Some(start_model) = start_model.map(str::trim).filter(|model| !model.is_empty()) else {
339 return models;
340 };
341 let Some(index) = models.iter().position(|model| model == start_model) else {
342 return models;
343 };
344 models.into_iter().skip(index).collect()
345}
346
347pub(super) fn next_llm_model_after(agent: &dyn Agent, current_model: &str) -> Option<String> {
348 let models = effective_llm_models(agent);
349 let current_index = models.iter().position(|model| model == current_model)?;
350 models.into_iter().nth(current_index + 1)
351}
352
353pub(super) fn llm_retry_attempts(agent: &dyn Agent) -> usize {
354 agent.llm_retry_policy().max_attempts_per_model.max(1)
355}
356
357#[derive(Debug, Clone, Copy, PartialEq, Eq)]
358pub(super) enum LlmErrorClass {
359 RateLimit,
360 Timeout,
361 Connection,
362 ServerUnavailable,
363 ServerError,
364 Auth,
365 ClientRequest,
366 Unknown,
367}
368
369impl LlmErrorClass {
370 pub(super) fn is_retryable(self) -> bool {
371 matches!(
372 self,
373 LlmErrorClass::RateLimit
374 | LlmErrorClass::Timeout
375 | LlmErrorClass::Connection
376 | LlmErrorClass::ServerUnavailable
377 | LlmErrorClass::ServerError
378 )
379 }
380
381 pub(super) fn as_str(self) -> &'static str {
383 match self {
384 LlmErrorClass::RateLimit => "rate_limit",
385 LlmErrorClass::Timeout => "timeout",
386 LlmErrorClass::Connection => "connection",
387 LlmErrorClass::ServerUnavailable => "server_unavailable",
388 LlmErrorClass::ServerError => "server_error",
389 LlmErrorClass::Auth => "auth",
390 LlmErrorClass::ClientRequest => "client_request",
391 LlmErrorClass::Unknown => "unknown",
392 }
393 }
394}
395
396fn classify_llm_error_message(message: &str) -> LlmErrorClass {
397 let lower = message.to_ascii_lowercase();
398 if ["429", "too many requests", "rate limit"]
399 .iter()
400 .any(|p| lower.contains(p))
401 {
402 return LlmErrorClass::RateLimit;
403 }
404 if ["timeout", "timed out"].iter().any(|p| lower.contains(p)) {
405 return LlmErrorClass::Timeout;
406 }
407 if [
408 "connection",
409 "network",
410 "reset by peer",
411 "broken pipe",
412 "eof",
413 "connection refused",
414 "error sending request for url",
415 ]
416 .iter()
417 .any(|p| lower.contains(p))
418 {
419 return LlmErrorClass::Connection;
420 }
421 if ["503", "service unavailable", "unavailable", "temporar"]
422 .iter()
423 .any(|p| lower.contains(p))
424 {
425 return LlmErrorClass::ServerUnavailable;
426 }
427 if [
428 "500",
429 "502",
430 "504",
431 "server error",
432 "bad gateway",
433 "gateway timeout",
434 ]
435 .iter()
436 .any(|p| lower.contains(p))
437 {
438 return LlmErrorClass::ServerError;
439 }
440 if ["401", "403", "unauthorized", "forbidden", "invalid api key"]
441 .iter()
442 .any(|p| lower.contains(p))
443 {
444 return LlmErrorClass::Auth;
445 }
446 if ["400", "404", "422", "invalid_request", "bad request"]
447 .iter()
448 .any(|p| lower.contains(p))
449 {
450 return LlmErrorClass::ClientRequest;
451 }
452 LlmErrorClass::Unknown
453}
454
455fn classify_status_code(status_code: u16) -> LlmErrorClass {
456 match status_code {
457 408 => LlmErrorClass::Timeout,
458 429 => LlmErrorClass::RateLimit,
459 401 | 403 => LlmErrorClass::Auth,
460 400 | 404 | 422 => LlmErrorClass::ClientRequest,
461 503 => LlmErrorClass::ServerUnavailable,
462 500..=599 => LlmErrorClass::ServerError,
463 400..=499 => LlmErrorClass::ClientRequest,
464 _ => LlmErrorClass::Unknown,
465 }
466}
467
468fn classify_error_chain(error: &(dyn std::error::Error + 'static)) -> Option<LlmErrorClass> {
469 let mut current = Some(error);
470 while let Some(err) = current {
471 if let Some(io_error) = err.downcast_ref::<std::io::Error>() {
472 use std::io::ErrorKind;
473
474 let class = match io_error.kind() {
475 ErrorKind::TimedOut => Some(LlmErrorClass::Timeout),
476 ErrorKind::ConnectionAborted
477 | ErrorKind::ConnectionRefused
478 | ErrorKind::ConnectionReset
479 | ErrorKind::BrokenPipe
480 | ErrorKind::NotConnected
481 | ErrorKind::UnexpectedEof => Some(LlmErrorClass::Connection),
482 _ => None,
483 };
484 if class.is_some() {
485 return class;
486 }
487 }
488 current = err.source();
489 }
490 None
491}
492
493fn classify_webc_error(error: &genai::webc::Error) -> LlmErrorClass {
494 match error {
495 genai::webc::Error::ResponseFailedStatus { status, .. } => {
496 classify_status_code(status.as_u16())
497 }
498 genai::webc::Error::Reqwest(err) => classify_error_chain(err)
499 .unwrap_or_else(|| classify_llm_error_message(&err.to_string())),
500 _ => classify_llm_error_message(&error.to_string()),
501 }
502}
503
504fn classify_chat_response_body(body: &serde_json::Value) -> LlmErrorClass {
515 let top_type = body.get("type").and_then(|v| v.as_str());
521 let nested_type = body
522 .get("error")
523 .and_then(|e| e.get("type"))
524 .and_then(|v| v.as_str());
525
526 let candidates: &[&str] = match (top_type, nested_type) {
529 (_, Some(inner)) => &[inner, top_type.unwrap_or("")],
530 (Some(top), None) => &[top],
531 (None, None) => &[],
532 };
533
534 for t in candidates {
535 let lower = t.to_ascii_lowercase();
536 if lower.contains("rate_limit") {
537 return LlmErrorClass::RateLimit;
538 }
539 if lower.contains("overloaded") || lower.contains("unavailable") {
540 return LlmErrorClass::ServerUnavailable;
541 }
542 if lower.contains("timeout") {
543 return LlmErrorClass::Timeout;
544 }
545 if lower.contains("server_error") || lower.contains("api_error") {
546 return LlmErrorClass::ServerError;
547 }
548 if lower.contains("authentication") || lower.contains("permission") {
549 return LlmErrorClass::Auth;
550 }
551 if lower.contains("invalid_request") || lower.contains("not_found") {
552 return LlmErrorClass::ClientRequest;
553 }
554 }
555
556 let status_code = body
559 .get("status")
560 .or_else(|| body.get("code"))
561 .and_then(|v| v.as_u64())
562 .and_then(|v| u16::try_from(v).ok());
563
564 if let Some(code) = status_code {
565 let class = classify_status_code(code);
566 if class != LlmErrorClass::Unknown {
567 return class;
568 }
569 }
570
571 LlmErrorClass::Unknown
572}
573
574pub(super) fn classify_llm_error(error: &genai::Error) -> LlmErrorClass {
575 match error {
576 genai::Error::HttpError { status, .. } => classify_status_code(status.as_u16()),
577 genai::Error::WebStream { cause, error, .. } => classify_error_chain(error.as_ref())
578 .unwrap_or_else(|| classify_llm_error_message(cause)),
579 genai::Error::WebAdapterCall { webc_error, .. }
580 | genai::Error::WebModelCall { webc_error, .. } => classify_webc_error(webc_error),
581 genai::Error::Internal(message) => classify_llm_error_message(message),
582
583 genai::Error::StreamParse { .. } => LlmErrorClass::Connection,
586
587 genai::Error::NoChatResponse { .. } => LlmErrorClass::ServerError,
589
590 genai::Error::ChatResponse { body, .. } => classify_chat_response_body(body),
592
593 genai::Error::RequiresApiKey { .. }
595 | genai::Error::NoAuthResolver { .. }
596 | genai::Error::NoAuthData { .. } => LlmErrorClass::Auth,
597
598 other => classify_llm_error_message(&other.to_string()),
599 }
600}
601
602#[cfg(test)]
603pub(super) fn is_retryable_llm_error(error: &genai::Error) -> bool {
604 classify_llm_error(error).is_retryable()
605}
606
607static RETRY_BACKOFF_ENTROPY: AtomicU64 = AtomicU64::new(0x9e37_79b9_7f4a_7c15);
608
609#[derive(Debug, Clone, Default)]
610pub(super) struct RetryBackoffWindow {
611 started_at: Option<Instant>,
612}
613
614impl RetryBackoffWindow {
615 pub(super) fn elapsed_ms(&mut self) -> u64 {
616 self.started_at
617 .get_or_insert_with(Instant::now)
618 .elapsed()
619 .as_millis()
620 .try_into()
621 .unwrap_or(u64::MAX)
622 }
623
624 pub(super) fn reset(&mut self) {
625 self.started_at = None;
626 }
627}
628
629#[derive(Debug, Clone, Copy, PartialEq, Eq)]
630pub(super) enum RetryBackoffOutcome {
631 Completed,
632 Cancelled,
633 BudgetExhausted,
634}
635
636fn mix_retry_entropy(mut entropy: u64) -> u64 {
637 entropy = entropy.wrapping_add(0x9e37_79b9_7f4a_7c15);
638 entropy = (entropy ^ (entropy >> 30)).wrapping_mul(0xbf58_476d_1ce4_e5b9);
639 entropy = (entropy ^ (entropy >> 27)).wrapping_mul(0x94d0_49bb_1331_11eb);
640 entropy ^ (entropy >> 31)
641}
642
643fn next_retry_entropy() -> u64 {
644 let counter = RETRY_BACKOFF_ENTROPY.fetch_add(0x9e37_79b9_7f4a_7c15, Ordering::Relaxed);
645 let now = SystemTime::now()
646 .duration_since(UNIX_EPOCH)
647 .map(|duration| duration.as_nanos() as u64)
648 .unwrap_or(counter);
649 mix_retry_entropy(counter ^ now)
650}
651
652pub(super) fn retry_base_backoff_ms(policy: &LlmRetryPolicy, retry_attempt: usize) -> u64 {
653 let initial = policy.initial_backoff_ms;
654 let cap = policy.max_backoff_ms.max(policy.initial_backoff_ms);
655 let attempt = retry_attempt.max(1);
656 if attempt == 1 {
657 return initial.min(cap);
658 }
659 let shift = (attempt - 1).min(20) as u32;
660 let factor = 1u64.checked_shl(shift).unwrap_or(u64::MAX);
661 initial.saturating_mul(factor).min(cap)
662}
663
664fn jittered_backoff_ms(policy: &LlmRetryPolicy, retry_attempt: usize, entropy: u64) -> u64 {
665 let cap = policy.max_backoff_ms.max(policy.initial_backoff_ms);
666 let base_ms = retry_base_backoff_ms(policy, retry_attempt).min(cap);
667 let jitter_percent = policy.backoff_jitter_percent.min(100) as u64;
668 if jitter_percent == 0 || base_ms == 0 {
669 return base_ms;
670 }
671
672 let jitter_window = base_ms.saturating_mul(jitter_percent) / 100;
673 let lower = base_ms.saturating_sub(jitter_window);
674 let upper = base_ms.saturating_add(jitter_window).min(cap);
675 if upper <= lower {
676 return lower;
677 }
678
679 let span = upper - lower;
680 lower + (mix_retry_entropy(entropy) % (span.saturating_add(1)))
681}
682
683pub(super) fn retry_backoff_plan_ms(
684 policy: &LlmRetryPolicy,
685 retry_attempt: usize,
686 elapsed_ms: u64,
687 entropy: u64,
688) -> Option<u64> {
689 let wait_ms = jittered_backoff_ms(policy, retry_attempt, entropy);
690 match policy.max_retry_window_ms {
691 Some(budget_ms) if elapsed_ms.saturating_add(wait_ms) > budget_ms => None,
692 _ => Some(wait_ms),
693 }
694}
695
696pub(super) async fn wait_retry_backoff(
697 agent: &dyn Agent,
698 retry_attempt: usize,
699 retry_window: &mut RetryBackoffWindow,
700 run_cancellation_token: Option<&RunCancellationToken>,
701) -> RetryBackoffOutcome {
702 let elapsed_ms = retry_window.elapsed_ms();
703 let Some(wait_ms) = retry_backoff_plan_ms(
704 agent.llm_retry_policy(),
705 retry_attempt,
706 elapsed_ms,
707 next_retry_entropy(),
708 ) else {
709 return RetryBackoffOutcome::BudgetExhausted;
710 };
711 tracing::debug!(
712 attempt = retry_attempt,
713 backoff_ms = wait_ms,
714 elapsed_ms = elapsed_ms,
715 "waiting before LLM retry"
716 );
717 match await_or_cancel(
718 run_cancellation_token,
719 tokio::time::sleep(std::time::Duration::from_millis(wait_ms)),
720 )
721 .await
722 {
723 CancelAware::Cancelled => RetryBackoffOutcome::Cancelled,
724 CancelAware::Value(_) => RetryBackoffOutcome::Completed,
725 }
726}
727
728pub(super) enum LlmAttemptOutcome<T> {
729 Success {
730 value: T,
731 model: String,
732 attempts: usize,
733 },
734 Cancelled,
735 Exhausted {
736 last_error: String,
737 last_error_class: Option<&'static str>,
738 attempts: usize,
739 },
740}
741
742fn is_run_cancelled(token: Option<&RunCancellationToken>) -> bool {
743 is_cancelled(token)
744}
745
746pub(super) fn step_tool_provider_for_run(
747 agent: &dyn Agent,
748 tools: HashMap<String, Arc<dyn Tool>>,
749) -> Arc<dyn StepToolProvider> {
750 agent.step_tool_provider().unwrap_or_else(|| {
751 Arc::new(StaticStepToolProvider::new(tools)) as Arc<dyn StepToolProvider>
752 })
753}
754
755pub(super) fn llm_executor_for_run(agent: &dyn Agent) -> Arc<dyn LlmExecutor> {
756 agent
757 .llm_executor()
758 .unwrap_or_else(|| Arc::new(GenaiLlmExecutor::new(Client::default())))
759}
760
761pub(super) async fn resolve_step_tool_snapshot(
762 step_tool_provider: &Arc<dyn StepToolProvider>,
763 run_ctx: &RunContext,
764) -> Result<StepToolSnapshot, AgentLoopError> {
765 step_tool_provider
766 .provide(StepToolInput { state: run_ctx })
767 .await
768}
769
770fn mark_step_completed(run_state: &mut LoopRunState) {
771 run_state.completed_steps += 1;
772}
773
774fn stitch_response_text(prefix: &str, segment: &str) -> String {
775 if prefix.is_empty() {
776 return segment.to_string();
777 }
778 if segment.is_empty() {
779 return prefix.to_string();
780 }
781 let mut stitched = String::with_capacity(prefix.len() + segment.len());
782 stitched.push_str(prefix);
783 stitched.push_str(segment);
784 stitched
785}
786
787fn extend_response_prefix(prefix: &mut String, segment: &str) {
788 if !segment.is_empty() {
789 prefix.push_str(segment);
790 }
791}
792
793fn build_loop_outcome(
794 run_ctx: RunContext,
795 termination: TerminationReason,
796 response: Option<String>,
797 run_state: &LoopRunState,
798 failure: Option<outcome::LoopFailure>,
799) -> LoopOutcome {
800 LoopOutcome {
801 run_ctx,
802 termination,
803 response: response.filter(|text| !text.is_empty()),
804 usage: run_state.usage(),
805 stats: run_state.stats(),
806 failure,
807 }
808}
809
810pub(super) async fn run_llm_with_retry_and_fallback<T, Invoke, Fut>(
811 agent: &dyn Agent,
812 run_cancellation_token: Option<&RunCancellationToken>,
813 retry_current_model: bool,
814 start_model: Option<&str>,
815 unknown_error: &str,
816 mut invoke: Invoke,
817) -> LlmAttemptOutcome<T>
818where
819 Invoke: FnMut(String) -> Fut,
820 Fut: std::future::Future<Output = genai::Result<T>>,
821{
822 let mut last_llm_error = unknown_error.to_string();
823 let mut last_error_class: Option<&'static str> = None;
824 let model_candidates = effective_llm_models_from(agent, start_model);
825 let max_attempts = llm_retry_attempts(agent);
826 let mut total_attempts = 0usize;
827 let mut retry_window = RetryBackoffWindow::default();
828
829 'models: for model in model_candidates {
830 for attempt in 1..=max_attempts {
831 total_attempts = total_attempts.saturating_add(1);
832 let response_res =
833 match await_or_cancel(run_cancellation_token, invoke(model.clone())).await {
834 CancelAware::Cancelled => return LlmAttemptOutcome::Cancelled,
835 CancelAware::Value(resp) => resp,
836 };
837
838 match response_res {
839 Ok(value) => {
840 if total_attempts > 1 {
841 tracing::info!(
842 model = %model,
843 attempts = total_attempts,
844 "LLM call succeeded after retries"
845 );
846 }
847 return LlmAttemptOutcome::Success {
848 value,
849 model,
850 attempts: total_attempts,
851 };
852 }
853 Err(e) => {
854 let error_class = classify_llm_error(&e);
855 last_error_class = Some(error_class.as_str());
856 let message = e.to_string();
857 last_llm_error =
858 format!("model='{model}' attempt={attempt}/{max_attempts}: {message}");
859 let can_retry_same_model =
860 retry_current_model && attempt < max_attempts && error_class.is_retryable();
861 tracing::warn!(
862 model = %model,
863 attempt = attempt,
864 max_attempts = max_attempts,
865 error_class = error_class.as_str(),
866 retryable = can_retry_same_model,
867 error = %message,
868 "LLM call failed"
869 );
870 if can_retry_same_model {
871 match wait_retry_backoff(
872 agent,
873 attempt,
874 &mut retry_window,
875 run_cancellation_token,
876 )
877 .await
878 {
879 RetryBackoffOutcome::Completed => continue,
880 RetryBackoffOutcome::Cancelled => {
881 return LlmAttemptOutcome::Cancelled;
882 }
883 RetryBackoffOutcome::BudgetExhausted => {
884 tracing::warn!(
885 model = %model,
886 attempt = attempt,
887 "LLM retry budget exhausted"
888 );
889 last_llm_error =
890 format!("{last_llm_error} (retry budget exhausted)");
891 break 'models;
892 }
893 }
894 }
895 break;
896 }
897 }
898 }
899 }
900
901 LlmAttemptOutcome::Exhausted {
902 last_error: last_llm_error,
903 last_error_class,
904 attempts: total_attempts,
905 }
906}
907
908pub(super) async fn run_step_prepare_phases(
909 run_ctx: &RunContext,
910 tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
911 agent: &dyn Agent,
912) -> Result<
913 (
914 Vec<Message>,
915 Vec<String>,
916 RunAction,
917 Vec<std::sync::Arc<dyn tirea_contract::runtime::inference::InferenceRequestTransform>>,
918 Vec<TrackedPatch>,
919 Vec<tirea_contract::SerializedAction>,
920 ),
921 AgentLoopError,
922> {
923 let system_prompt = agent.system_prompt().to_string();
924 let ((messages, filtered_tools, run_action, transforms), pending, actions) =
925 plugin_runtime::run_phase_block(
926 run_ctx,
927 tool_descriptors,
928 agent,
929 &[Phase::StepStart, Phase::BeforeInference],
930 |_| {},
931 |step| inference_inputs_from_step(step, &system_prompt),
932 )
933 .await?;
934 Ok((
935 messages,
936 filtered_tools,
937 run_action,
938 transforms,
939 pending,
940 actions,
941 ))
942}
943
944pub(super) struct PreparedStep {
945 pub(super) messages: Vec<Message>,
946 pub(super) filtered_tools: Vec<String>,
947 pub(super) run_action: RunAction,
948 pub(super) pending_patches: Vec<TrackedPatch>,
949 pub(super) serialized_actions: Vec<tirea_contract::SerializedAction>,
950 pub(super) request_transforms:
951 Vec<std::sync::Arc<dyn tirea_contract::runtime::inference::InferenceRequestTransform>>,
952}
953
954pub(super) async fn prepare_step_execution(
955 run_ctx: &RunContext,
956 tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
957 agent: &dyn Agent,
958) -> Result<PreparedStep, AgentLoopError> {
959 let (messages, filtered_tools, run_action, transforms, pending, actions) =
960 run_step_prepare_phases(run_ctx, tool_descriptors, agent).await?;
961 Ok(PreparedStep {
962 messages,
963 filtered_tools,
964 run_action,
965 pending_patches: pending,
966 serialized_actions: actions,
967 request_transforms: transforms,
968 })
969}
970
971pub(super) async fn apply_llm_error_cleanup(
972 run_ctx: &mut RunContext,
973 tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
974 agent: &dyn Agent,
975 error_type: &'static str,
976 message: String,
977 error_class: Option<&str>,
978) -> Result<(), AgentLoopError> {
979 plugin_runtime::emit_cleanup_phases(
980 run_ctx,
981 tool_descriptors,
982 agent,
983 error_type,
984 message,
985 error_class,
986 )
987 .await
988}
989
990pub(super) async fn complete_step_after_inference(
991 run_ctx: &mut RunContext,
992 result: &StreamResult,
993 step_meta: MessageMetadata,
994 assistant_message_id: Option<String>,
995 tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
996 agent: &dyn Agent,
997) -> Result<RunAction, AgentLoopError> {
998 let (run_action, pending, actions) = plugin_runtime::run_phase_block(
999 run_ctx,
1000 tool_descriptors,
1001 agent,
1002 &[Phase::AfterInference],
1003 |step| {
1004 use crate::contracts::runtime::inference::LLMResponse;
1005 step.llm_response = Some(LLMResponse::success(result.clone()));
1006 },
1007 |step| step.run_action(),
1008 )
1009 .await?;
1010 run_ctx.add_thread_patches(pending);
1011 run_ctx.add_serialized_actions(actions);
1012
1013 let assistant = assistant_turn_message(result, step_meta, assistant_message_id);
1014 run_ctx.add_message(Arc::new(assistant));
1015
1016 let (pending, actions) =
1017 plugin_runtime::emit_phase_block(Phase::StepEnd, run_ctx, tool_descriptors, agent, |_| {})
1018 .await?;
1019 run_ctx.add_thread_patches(pending);
1020 run_ctx.add_serialized_actions(actions);
1021 Ok(run_action)
1022}
1023
1024pub(super) fn pending_tool_events(call: &SuspendedCall) -> Vec<AgentEvent> {
1026 vec![
1027 AgentEvent::ToolCallStart {
1028 id: call.ticket.pending.id.clone(),
1029 name: call.ticket.pending.name.clone(),
1030 },
1031 AgentEvent::ToolCallReady {
1032 id: call.ticket.pending.id.clone(),
1033 name: call.ticket.pending.name.clone(),
1034 arguments: call.ticket.pending.arguments.clone(),
1035 },
1036 ]
1037}
1038
1039pub(super) fn has_suspended_calls(run_ctx: &RunContext) -> bool {
1040 !suspended_calls_from_ctx(run_ctx).is_empty()
1041}
1042
1043pub(super) fn suspended_call_ids(run_ctx: &RunContext) -> HashSet<String> {
1044 suspended_calls_from_ctx(run_ctx).into_keys().collect()
1045}
1046
1047pub(super) fn newly_suspended_call_ids(
1048 run_ctx: &RunContext,
1049 baseline_ids: &HashSet<String>,
1050) -> HashSet<String> {
1051 suspended_calls_from_ctx(run_ctx)
1052 .into_keys()
1053 .filter(|id| !baseline_ids.contains(id))
1054 .collect()
1055}
1056
1057pub(super) fn suspended_call_pending_events(run_ctx: &RunContext) -> Vec<AgentEvent> {
1058 let mut calls: Vec<SuspendedCall> = suspended_calls_from_ctx(run_ctx).into_values().collect();
1059 calls.sort_by(|left, right| left.call_id.cmp(&right.call_id));
1060 calls
1061 .into_iter()
1062 .flat_map(|call| pending_tool_events(&call))
1063 .collect()
1064}
1065
1066pub(super) fn suspended_call_pending_events_for_ids(
1067 run_ctx: &RunContext,
1068 call_ids: &HashSet<String>,
1069) -> Vec<AgentEvent> {
1070 if call_ids.is_empty() {
1071 return Vec::new();
1072 }
1073 let mut calls: Vec<SuspendedCall> = suspended_calls_from_ctx(run_ctx)
1074 .into_iter()
1075 .filter_map(|(call_id, call)| call_ids.contains(&call_id).then_some(call))
1076 .collect();
1077 calls.sort_by(|left, right| left.call_id.cmp(&right.call_id));
1078 calls
1079 .into_iter()
1080 .flat_map(|call| pending_tool_events(&call))
1081 .collect()
1082}
1083
1084pub(super) struct ToolExecutionContext {
1085 pub(super) state: serde_json::Value,
1086 pub(super) run_config: tirea_contract::RunConfig,
1087}
1088
1089pub(super) fn prepare_tool_execution_context(
1090 run_ctx: &RunContext,
1091) -> Result<ToolExecutionContext, AgentLoopError> {
1092 let state = run_ctx
1093 .snapshot()
1094 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1095 let run_config = scope_with_tool_caller_context(run_ctx, &state)?;
1096 Ok(ToolExecutionContext { state, run_config })
1097}
1098
1099pub(super) async fn finalize_run_end(
1100 run_ctx: &mut RunContext,
1101 tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
1102 agent: &dyn Agent,
1103) {
1104 plugin_runtime::emit_run_end_phase(run_ctx, tool_descriptors, agent).await
1105}
1106
1107#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1108enum RunFinishedCommitPolicy {
1109 Required,
1110 BestEffort,
1111}
1112
1113fn normalize_termination_for_suspended_calls(
1114 run_ctx: &RunContext,
1115 termination: TerminationReason,
1116 response: Option<String>,
1117) -> (TerminationReason, Option<String>) {
1118 let final_termination = if !matches!(
1119 termination,
1120 TerminationReason::Error(_) | TerminationReason::Cancelled
1121 ) && has_suspended_calls(run_ctx)
1122 {
1123 TerminationReason::Suspended
1124 } else {
1125 termination
1126 };
1127 let final_response = if final_termination == TerminationReason::Suspended {
1128 None
1129 } else {
1130 response
1131 };
1132 (final_termination, final_response)
1133}
1134
1135async fn persist_run_termination(
1136 run_ctx: &mut RunContext,
1137 termination: &TerminationReason,
1138 tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
1139 agent: &dyn Agent,
1140 execution_ctx: &RunExecutionContext,
1141 pending_delta_commit: &PendingDeltaCommitContext<'_>,
1142 run_finished_commit_policy: RunFinishedCommitPolicy,
1143) -> Result<(), AgentLoopError> {
1144 sync_run_lifecycle_for_termination_with_context(run_ctx, execution_ctx, termination)?;
1145 finalize_run_end(run_ctx, tool_descriptors, agent).await;
1146 if let Err(error) = pending_delta_commit
1147 .commit_run_finished(run_ctx, termination)
1148 .await
1149 {
1150 match run_finished_commit_policy {
1151 RunFinishedCommitPolicy::Required => return Err(error),
1152 RunFinishedCommitPolicy::BestEffort => {
1153 tracing::warn!(error = %error, "failed to commit run-finished delta");
1154 }
1155 }
1156 }
1157 Ok(())
1158}
1159
1160fn stream_result_from_chat_response(response: &genai::chat::ChatResponse) -> StreamResult {
1161 let text = response
1162 .first_text()
1163 .map(|s| s.to_string())
1164 .unwrap_or_default();
1165 let tool_calls: Vec<crate::contracts::thread::ToolCall> = response
1166 .tool_calls()
1167 .into_iter()
1168 .map(|tc| {
1169 crate::contracts::thread::ToolCall::new(
1170 &tc.call_id,
1171 &tc.fn_name,
1172 tc.fn_arguments.clone(),
1173 )
1174 })
1175 .collect();
1176
1177 let usage = Some(crate::runtime::streaming::token_usage_from_genai(
1178 &response.usage,
1179 ));
1180 let stop_reason = response
1181 .stop_reason
1182 .as_ref()
1183 .and_then(crate::runtime::streaming::map_genai_stop_reason)
1184 .or({
1185 if !tool_calls.is_empty() {
1186 Some(tirea_contract::runtime::inference::StopReason::ToolUse)
1187 } else {
1188 Some(tirea_contract::runtime::inference::StopReason::EndTurn)
1189 }
1190 });
1191 StreamResult {
1192 text,
1193 tool_calls,
1194 usage,
1195 stop_reason,
1196 }
1197}
1198
1199fn assistant_turn_message(
1200 result: &StreamResult,
1201 step_meta: MessageMetadata,
1202 message_id: Option<String>,
1203) -> Message {
1204 let mut msg = if result.tool_calls.is_empty() {
1205 assistant_message(&result.text)
1206 } else {
1207 assistant_tool_calls(&result.text, result.tool_calls.clone())
1208 }
1209 .with_metadata(step_meta);
1210 if let Some(message_id) = message_id {
1211 msg = msg.with_id(message_id);
1212 }
1213 msg
1214}
1215
1216struct RunStartDrainOutcome {
1217 events: Vec<AgentEvent>,
1218 replayed: bool,
1219}
1220
1221fn decision_result_value(action: &ResumeDecisionAction, result: &Value) -> serde_json::Value {
1222 if result.is_null() {
1223 serde_json::Value::Bool(matches!(action, ResumeDecisionAction::Resume))
1224 } else {
1225 result.clone()
1226 }
1227}
1228
1229fn runtime_resume_inputs(run_ctx: &RunContext) -> HashMap<String, ToolCallResume> {
1230 let mut decisions = HashMap::new();
1231 for (call_id, state) in tool_call_states_from_ctx(run_ctx) {
1232 if !matches!(state.status, ToolCallStatus::Resuming) {
1233 continue;
1234 }
1235 let Some(mut resume) = state.resume else {
1236 continue;
1237 };
1238 if resume.decision_id.trim().is_empty() {
1239 resume.decision_id = call_id.clone();
1240 }
1241 decisions.insert(call_id, resume);
1242 }
1243 decisions
1244}
1245
1246fn settle_orphan_resuming_tool_states(
1247 run_ctx: &mut RunContext,
1248 suspended: &HashMap<String, SuspendedCall>,
1249 resumes: &HashMap<String, ToolCallResume>,
1250) -> Result<bool, AgentLoopError> {
1251 let states = tool_call_states_from_ctx(run_ctx);
1252 let mut changed = false;
1253
1254 for (call_id, resume) in resumes {
1255 if suspended.contains_key(call_id) {
1256 continue;
1257 }
1258 let Some(state) = states.get(call_id).cloned() else {
1259 continue;
1260 };
1261 let target_status = match &resume.action {
1262 ResumeDecisionAction::Cancel => ToolCallStatus::Cancelled,
1263 ResumeDecisionAction::Resume => ToolCallStatus::Failed,
1264 };
1265 if state.status == target_status && state.resume.as_ref() == Some(resume) {
1266 continue;
1267 }
1268
1269 let Some(next_state) = transition_tool_call_state(
1270 Some(state.clone()),
1271 ToolCallStateSeed {
1272 call_id: call_id.as_str(),
1273 tool_name: state.tool_name.as_str(),
1274 arguments: &state.arguments,
1275 status: state.status,
1276 resume_token: state.resume_token.clone(),
1277 },
1278 ToolCallStateTransition {
1279 status: target_status,
1280 resume_token: state.resume_token.clone(),
1281 resume: Some(resume.clone()),
1282 updated_at: current_unix_millis(),
1283 },
1284 ) else {
1285 continue;
1286 };
1287
1288 let base_state = run_ctx
1289 .snapshot()
1290 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1291 let patch = upsert_tool_call_state(&base_state, call_id, next_state)?;
1292 if patch.patch().is_empty() {
1293 continue;
1294 }
1295 run_ctx.add_thread_patch(patch);
1296 changed = true;
1297 }
1298
1299 Ok(changed)
1300}
1301
1302fn all_suspended_calls_have_resume(
1303 suspended: &HashMap<String, SuspendedCall>,
1304 resumes: &HashMap<String, ToolCallResume>,
1305) -> bool {
1306 suspended
1307 .keys()
1308 .all(|call_id| resumes.contains_key(call_id))
1309}
1310
1311async fn drain_resuming_tool_calls_and_replay(
1312 run_ctx: &mut RunContext,
1313 tools: &HashMap<String, Arc<dyn Tool>>,
1314 agent: &dyn Agent,
1315 tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
1316) -> Result<RunStartDrainOutcome, AgentLoopError> {
1317 let decisions = runtime_resume_inputs(run_ctx);
1318 if decisions.is_empty() {
1319 return Ok(RunStartDrainOutcome {
1320 events: Vec::new(),
1321 replayed: false,
1322 });
1323 }
1324
1325 let suspended = suspended_calls_from_ctx(run_ctx);
1326 let mut state_changed = false;
1327 if settle_orphan_resuming_tool_states(run_ctx, &suspended, &decisions)? {
1328 state_changed = true;
1329 }
1330 if suspended.is_empty() {
1331 if state_changed {
1332 let snapshot = run_ctx
1333 .snapshot()
1334 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1335 return Ok(RunStartDrainOutcome {
1336 events: vec![AgentEvent::StateSnapshot { snapshot }],
1337 replayed: false,
1338 });
1339 }
1340 return Ok(RunStartDrainOutcome {
1341 events: Vec::new(),
1342 replayed: false,
1343 });
1344 }
1345
1346 if matches!(
1347 agent.tool_executor().decision_replay_policy(),
1348 DecisionReplayPolicy::BatchAllSuspended
1349 ) && !all_suspended_calls_have_resume(&suspended, &decisions)
1350 {
1351 if state_changed {
1352 let snapshot = run_ctx
1353 .snapshot()
1354 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1355 return Ok(RunStartDrainOutcome {
1356 events: vec![AgentEvent::StateSnapshot { snapshot }],
1357 replayed: false,
1358 });
1359 }
1360 return Ok(RunStartDrainOutcome {
1361 events: Vec::new(),
1362 replayed: false,
1363 });
1364 }
1365
1366 let mut events = Vec::new();
1367 let mut decision_ids: Vec<String> = decisions.keys().cloned().collect();
1368 decision_ids.sort();
1369
1370 let mut replayed = false;
1371
1372 for call_id in decision_ids {
1373 let Some(suspended_call) = suspended.get(&call_id).cloned() else {
1374 continue;
1375 };
1376 let Some(decision) = decisions.get(&call_id).cloned() else {
1377 continue;
1378 };
1379 replayed = true;
1380 let decision_result = decision_result_value(&decision.action, &decision.result);
1381 let resume_payload = ToolCallResume {
1382 result: decision_result.clone(),
1383 ..decision.clone()
1384 };
1385 events.push(AgentEvent::ToolCallResumed {
1386 target_id: suspended_call.call_id.clone(),
1387 result: decision_result.clone(),
1388 });
1389
1390 match decision.action {
1391 ResumeDecisionAction::Cancel => {
1392 let cancel_reason = resume_payload.reason.clone();
1393 if upsert_tool_call_lifecycle_state(
1394 run_ctx,
1395 &suspended_call,
1396 ToolCallStatus::Cancelled,
1397 Some(resume_payload),
1398 )? {
1399 state_changed = true;
1400 }
1401 events.push(append_denied_tool_result_message(
1402 run_ctx,
1403 &suspended_call.call_id,
1404 Some(&suspended_call.tool_name),
1405 cancel_reason.as_deref(),
1406 ));
1407 let cleanup_path = format!(
1411 "__tool_call_scope.{}.suspended_call",
1412 suspended_call.call_id
1413 );
1414 let cleanup_patch = tirea_state::Patch::with_ops(vec![tirea_state::Op::delete(
1415 tirea_state::parse_path(&cleanup_path),
1416 )]);
1417 let tracked = tirea_state::TrackedPatch::new(cleanup_patch)
1418 .with_source("framework:scope_cleanup");
1419 if !tracked.patch().is_empty() {
1420 state_changed = true;
1421 run_ctx.add_thread_patch(tracked);
1422 }
1423 }
1424 ResumeDecisionAction::Resume => {
1425 if upsert_tool_call_lifecycle_state(
1426 run_ctx,
1427 &suspended_call,
1428 ToolCallStatus::Resuming,
1429 Some(resume_payload.clone()),
1430 )? {
1431 state_changed = true;
1432 }
1433 let Some(tool_call) = replay_tool_call_for_resolution(
1434 run_ctx,
1435 &suspended_call,
1436 &ToolCallDecision {
1437 target_id: suspended_call.call_id.clone(),
1438 resume: resume_payload.clone(),
1439 },
1440 ) else {
1441 continue;
1442 };
1443 let state = run_ctx
1444 .snapshot()
1445 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1446 let tool = tools.get(&tool_call.name).cloned();
1447 let rt_for_replay = scope_with_tool_caller_context(run_ctx, &state)?;
1448 let replay_phase_ctx = ToolPhaseContext {
1449 tool_descriptors,
1450 agent_behavior: Some(agent.behavior()),
1451 activity_manager: tirea_contract::runtime::activity::NoOpActivityManager::arc(),
1452 run_config: &rt_for_replay,
1453 thread_id: run_ctx.thread_id(),
1454 thread_messages: run_ctx.messages(),
1455 cancellation_token: None,
1456 };
1457 let replay_result = execute_single_tool_with_phases_deferred(
1458 tool.as_deref(),
1459 &tool_call,
1460 &state,
1461 &replay_phase_ctx,
1462 )
1463 .await?;
1464
1465 let replay_msg_id = gen_message_id();
1466 let replay_msg = tool_response(&tool_call.id, &replay_result.execution.result)
1467 .with_id(replay_msg_id.clone());
1468 run_ctx.add_message(Arc::new(replay_msg));
1469
1470 if !replay_result.reminders.is_empty() {
1471 let msgs: Vec<Arc<Message>> = replay_result
1472 .reminders
1473 .iter()
1474 .map(|reminder| {
1475 Arc::new(Message::internal_system(format!(
1476 "<system-reminder>{}</system-reminder>",
1477 reminder
1478 )))
1479 })
1480 .collect();
1481 run_ctx.add_messages(msgs);
1482 }
1483
1484 if let Some(patch) = replay_result.execution.patch.clone() {
1485 state_changed = true;
1486 run_ctx.add_thread_patch(patch);
1487 }
1488 if !replay_result.pending_patches.is_empty() {
1489 state_changed = true;
1490 run_ctx.add_thread_patches(replay_result.pending_patches.clone());
1491 }
1492 events.push(AgentEvent::ToolCallDone {
1493 id: tool_call.id.clone(),
1494 result: replay_result.execution.result,
1495 patch: replay_result.execution.patch,
1496 message_id: replay_msg_id,
1497 });
1498
1499 if let Some(next_suspended_call) = replay_result.suspended_call.clone() {
1500 let state = run_ctx
1501 .snapshot()
1502 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1503 let action = next_suspended_call.clone().into_state_action();
1504 let patches = reduce_state_actions(
1505 vec![action],
1506 &state,
1507 "agent_loop",
1508 &ScopeContext::run(),
1509 )
1510 .map_err(|e| {
1511 AgentLoopError::StateError(format!(
1512 "failed to reduce suspended call action: {e}"
1513 ))
1514 })?;
1515 for patch in patches {
1516 if !patch.patch().is_empty() {
1517 state_changed = true;
1518 run_ctx.add_thread_patch(patch);
1519 }
1520 }
1521 for event in pending_tool_events(&next_suspended_call) {
1522 events.push(event);
1523 }
1524 }
1525 }
1526 }
1527 }
1528
1529 if state_changed {
1534 let snapshot = run_ctx
1535 .snapshot()
1536 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1537 events.push(AgentEvent::StateSnapshot { snapshot });
1538 }
1539
1540 Ok(RunStartDrainOutcome { events, replayed })
1541}
1542
1543async fn drain_run_start_resume_replay(
1544 run_ctx: &mut RunContext,
1545 tools: &HashMap<String, Arc<dyn Tool>>,
1546 agent: &dyn Agent,
1547 tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
1548) -> Result<RunStartDrainOutcome, AgentLoopError> {
1549 drain_resuming_tool_calls_and_replay(run_ctx, tools, agent, tool_descriptors).await
1550}
1551
1552async fn commit_run_start_and_drain_replay(
1553 run_ctx: &mut RunContext,
1554 tools: &HashMap<String, Arc<dyn Tool>>,
1555 agent: &dyn Agent,
1556 active_tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
1557 pending_delta_commit: &PendingDeltaCommitContext<'_>,
1558) -> Result<RunStartDrainOutcome, AgentLoopError> {
1559 pending_delta_commit
1560 .commit(run_ctx, CheckpointReason::UserMessage, false)
1561 .await?;
1562
1563 let run_start_drain =
1564 drain_run_start_resume_replay(run_ctx, tools, agent, active_tool_descriptors).await?;
1565
1566 if run_start_drain.replayed {
1567 pending_delta_commit
1568 .commit(run_ctx, CheckpointReason::ToolResultsCommitted, false)
1569 .await?;
1570 }
1571
1572 Ok(run_start_drain)
1573}
1574
1575fn normalize_decision_tool_result(
1576 response: &serde_json::Value,
1577 fallback_arguments: &serde_json::Value,
1578) -> serde_json::Value {
1579 match response {
1580 serde_json::Value::Bool(_) => fallback_arguments.clone(),
1581 value => value.clone(),
1582 }
1583}
1584
1585fn denied_tool_result_for_call(
1586 run_ctx: &RunContext,
1587 call_id: &str,
1588 fallback_tool_name: Option<&str>,
1589 decision_reason: Option<&str>,
1590) -> ToolResult {
1591 let tool_name = fallback_tool_name
1592 .filter(|name| !name.is_empty())
1593 .map(str::to_string)
1594 .or_else(|| find_tool_call_in_messages(run_ctx, call_id).map(|call| call.name))
1595 .unwrap_or_else(|| "tool".to_string());
1596 let reason = decision_reason
1597 .map(str::to_string)
1598 .filter(|reason| !reason.trim().is_empty())
1599 .unwrap_or_else(|| "User denied the action".to_string());
1600 ToolResult::error(tool_name, reason)
1601}
1602
1603fn append_denied_tool_result_message(
1604 run_ctx: &mut RunContext,
1605 call_id: &str,
1606 fallback_tool_name: Option<&str>,
1607 decision_reason: Option<&str>,
1608) -> AgentEvent {
1609 let denied_result =
1610 denied_tool_result_for_call(run_ctx, call_id, fallback_tool_name, decision_reason);
1611 let message_id = gen_message_id();
1612 let denied_message = tool_response(call_id, &denied_result).with_id(message_id.clone());
1613 run_ctx.add_message(Arc::new(denied_message));
1614 AgentEvent::ToolCallDone {
1615 id: call_id.to_string(),
1616 result: denied_result,
1617 patch: None,
1618 message_id,
1619 }
1620}
1621
1622fn find_tool_call_in_messages(run_ctx: &RunContext, call_id: &str) -> Option<ToolCall> {
1623 run_ctx.messages().iter().rev().find_map(|message| {
1624 message
1625 .tool_calls
1626 .as_ref()
1627 .and_then(|calls| calls.iter().find(|call| call.id == call_id).cloned())
1628 })
1629}
1630
1631fn replay_tool_call_for_resolution(
1632 _run_ctx: &RunContext,
1633 suspended_call: &SuspendedCall,
1634 decision: &ToolCallDecision,
1635) -> Option<ToolCall> {
1636 if matches!(decision.resume.action, ResumeDecisionAction::Cancel) {
1637 return None;
1638 }
1639
1640 match suspended_call.ticket.resume_mode {
1641 ToolCallResumeMode::ReplayToolCall => Some(ToolCall::new(
1642 suspended_call.call_id.clone(),
1643 suspended_call.tool_name.clone(),
1644 suspended_call.arguments.clone(),
1645 )),
1646 ToolCallResumeMode::UseDecisionAsToolResult | ToolCallResumeMode::PassDecisionToTool => {
1647 Some(ToolCall::new(
1648 suspended_call.call_id.clone(),
1649 suspended_call.tool_name.clone(),
1650 normalize_decision_tool_result(&decision.resume.result, &suspended_call.arguments),
1651 ))
1652 }
1653 }
1654}
1655
1656fn upsert_tool_call_lifecycle_state(
1657 run_ctx: &mut RunContext,
1658 suspended_call: &SuspendedCall,
1659 status: ToolCallStatus,
1660 resume: Option<ToolCallResume>,
1661) -> Result<bool, AgentLoopError> {
1662 let current_state = tool_call_states_from_ctx(run_ctx).remove(&suspended_call.call_id);
1663 let Some(tool_state) = transition_tool_call_state(
1664 current_state,
1665 ToolCallStateSeed {
1666 call_id: &suspended_call.call_id,
1667 tool_name: &suspended_call.tool_name,
1668 arguments: &suspended_call.arguments,
1669 status: ToolCallStatus::Suspended,
1670 resume_token: Some(suspended_call.ticket.pending.id.clone()),
1671 },
1672 ToolCallStateTransition {
1673 status,
1674 resume_token: Some(suspended_call.ticket.pending.id.clone()),
1675 resume,
1676 updated_at: current_unix_millis(),
1677 },
1678 ) else {
1679 return Ok(false);
1680 };
1681
1682 let base_state = run_ctx
1683 .snapshot()
1684 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1685 let patch = upsert_tool_call_state(&base_state, &suspended_call.call_id, tool_state)?;
1686 if patch.patch().is_empty() {
1687 return Ok(false);
1688 }
1689 run_ctx.add_thread_patch(patch);
1690 Ok(true)
1691}
1692
1693pub(super) fn resolve_suspended_call(
1694 run_ctx: &mut RunContext,
1695 response: &ToolCallDecision,
1696) -> Result<Option<DecisionReplayOutcome>, AgentLoopError> {
1697 let suspended_calls = suspended_calls_from_ctx(run_ctx);
1698 if suspended_calls.is_empty() {
1699 return Ok(None);
1700 }
1701
1702 let suspended_call = suspended_calls
1703 .get(&response.target_id)
1704 .cloned()
1705 .or_else(|| {
1706 suspended_calls
1707 .values()
1708 .find(|call| {
1709 call.ticket.suspension.id == response.target_id
1710 || call.ticket.pending.id == response.target_id
1711 || call.call_id == response.target_id
1712 })
1713 .cloned()
1714 });
1715 let Some(suspended_call) = suspended_call else {
1716 return Ok(None);
1717 };
1718
1719 let _ = upsert_tool_call_lifecycle_state(
1720 run_ctx,
1721 &suspended_call,
1722 ToolCallStatus::Resuming,
1723 Some(response.resume.clone()),
1724 )?;
1725
1726 Ok(Some(DecisionReplayOutcome {
1727 events: Vec::new(),
1728 resolved_call_ids: vec![suspended_call.call_id],
1729 }))
1730}
1731
1732pub(super) fn drain_decision_channel(
1733 run_ctx: &mut RunContext,
1734 decision_rx: &mut Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1735 pending_decisions: &mut VecDeque<ToolCallDecision>,
1736) -> Result<DecisionReplayOutcome, AgentLoopError> {
1737 let mut disconnected = false;
1738 if let Some(rx) = decision_rx.as_mut() {
1739 loop {
1740 match rx.try_recv() {
1741 Ok(response) => pending_decisions.push_back(response),
1742 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
1743 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
1744 disconnected = true;
1745 break;
1746 }
1747 }
1748 }
1749 }
1750 if disconnected {
1751 *decision_rx = None;
1752 }
1753
1754 if pending_decisions.is_empty() {
1755 return Ok(DecisionReplayOutcome {
1756 events: Vec::new(),
1757 resolved_call_ids: Vec::new(),
1758 });
1759 }
1760
1761 let mut unresolved = VecDeque::new();
1762 let mut events = Vec::new();
1763 let mut resolved_call_ids = Vec::new();
1764 let mut seen = HashSet::new();
1765
1766 while let Some(response) = pending_decisions.pop_front() {
1767 if let Some(outcome) = resolve_suspended_call(run_ctx, &response)? {
1768 for call_id in outcome.resolved_call_ids {
1769 if seen.insert(call_id.clone()) {
1770 resolved_call_ids.push(call_id);
1771 }
1772 }
1773 events.extend(outcome.events);
1774 } else {
1775 unresolved.push_back(response);
1776 }
1777 }
1778 *pending_decisions = unresolved;
1779
1780 Ok(DecisionReplayOutcome {
1781 events,
1782 resolved_call_ids,
1783 })
1784}
1785
1786async fn replay_after_decisions(
1787 run_ctx: &mut RunContext,
1788 decisions_applied: bool,
1789 step_tool_provider: &Arc<dyn StepToolProvider>,
1790 agent: &dyn Agent,
1791 active_tool_descriptors: &mut Vec<crate::contracts::runtime::tool_call::ToolDescriptor>,
1792 pending_delta_commit: &PendingDeltaCommitContext<'_>,
1793) -> Result<Vec<AgentEvent>, AgentLoopError> {
1794 if !decisions_applied {
1795 return Ok(Vec::new());
1796 }
1797
1798 let decision_tools = resolve_step_tool_snapshot(step_tool_provider, run_ctx).await?;
1799 *active_tool_descriptors = decision_tools.descriptors.clone();
1800
1801 let decision_drain = drain_run_start_resume_replay(
1802 run_ctx,
1803 &decision_tools.tools,
1804 agent,
1805 active_tool_descriptors,
1806 )
1807 .await?;
1808
1809 pending_delta_commit
1810 .commit(run_ctx, CheckpointReason::ToolResultsCommitted, false)
1811 .await?;
1812
1813 Ok(decision_drain.events)
1814}
1815
1816async fn apply_decisions_and_replay(
1817 run_ctx: &mut RunContext,
1818 decision_rx: &mut Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1819 pending_decisions: &mut VecDeque<ToolCallDecision>,
1820 step_tool_provider: &Arc<dyn StepToolProvider>,
1821 agent: &dyn Agent,
1822 active_tool_descriptors: &mut Vec<crate::contracts::runtime::tool_call::ToolDescriptor>,
1823 pending_delta_commit: &PendingDeltaCommitContext<'_>,
1824) -> Result<Vec<AgentEvent>, AgentLoopError> {
1825 Ok(drain_and_replay_decisions(
1826 run_ctx,
1827 decision_rx,
1828 pending_decisions,
1829 None,
1830 DecisionReplayInputs {
1831 step_tool_provider,
1832 agent,
1833 active_tool_descriptors,
1834 },
1835 pending_delta_commit,
1836 )
1837 .await?
1838 .events)
1839}
1840
1841pub(super) struct DecisionReplayOutcome {
1842 events: Vec<AgentEvent>,
1843 resolved_call_ids: Vec<String>,
1844}
1845
1846struct DecisionReplayInputs<'a> {
1847 step_tool_provider: &'a Arc<dyn StepToolProvider>,
1848 agent: &'a dyn Agent,
1849 active_tool_descriptors: &'a mut Vec<crate::contracts::runtime::tool_call::ToolDescriptor>,
1850}
1851
1852async fn drain_and_replay_decisions(
1853 run_ctx: &mut RunContext,
1854 decision_rx: &mut Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1855 pending_decisions: &mut VecDeque<ToolCallDecision>,
1856 decision: Option<ToolCallDecision>,
1857 replay_inputs: DecisionReplayInputs<'_>,
1858 pending_delta_commit: &PendingDeltaCommitContext<'_>,
1859) -> Result<DecisionReplayOutcome, AgentLoopError> {
1860 if let Some(decision) = decision {
1861 pending_decisions.push_back(decision);
1862 }
1863 let decision_drain = drain_decision_channel(run_ctx, decision_rx, pending_decisions)?;
1864 let mut events = decision_drain.events;
1865 let replay_events = replay_after_decisions(
1866 run_ctx,
1867 !decision_drain.resolved_call_ids.is_empty(),
1868 replay_inputs.step_tool_provider,
1869 replay_inputs.agent,
1870 replay_inputs.active_tool_descriptors,
1871 pending_delta_commit,
1872 )
1873 .await?;
1874 events.extend(replay_events);
1875
1876 Ok(DecisionReplayOutcome {
1877 events,
1878 resolved_call_ids: decision_drain.resolved_call_ids,
1879 })
1880}
1881
1882async fn apply_decision_and_replay(
1883 run_ctx: &mut RunContext,
1884 response: ToolCallDecision,
1885 decision_rx: &mut Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1886 pending_decisions: &mut VecDeque<ToolCallDecision>,
1887 replay_inputs: DecisionReplayInputs<'_>,
1888 pending_delta_commit: &PendingDeltaCommitContext<'_>,
1889) -> Result<DecisionReplayOutcome, AgentLoopError> {
1890 drain_and_replay_decisions(
1891 run_ctx,
1892 decision_rx,
1893 pending_decisions,
1894 Some(response),
1895 replay_inputs,
1896 pending_delta_commit,
1897 )
1898 .await
1899}
1900
1901async fn recv_decision(
1902 decision_rx: &mut Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1903) -> Option<ToolCallDecision> {
1904 let rx = decision_rx.as_mut()?;
1905 rx.recv().await
1906}
1907
1908pub async fn run_loop(
1914 agent: &dyn Agent,
1915 tools: HashMap<String, Arc<dyn Tool>>,
1916 run_ctx: RunContext,
1917 cancellation_token: Option<RunCancellationToken>,
1918 state_committer: Option<Arc<dyn StateCommitter>>,
1919 decision_rx: Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1920) -> LoopOutcome {
1921 let execution_ctx = RunExecutionContext::from_run_config(&run_ctx);
1922 run_loop_with_context(
1923 agent,
1924 tools,
1925 run_ctx,
1926 execution_ctx,
1927 cancellation_token,
1928 state_committer,
1929 decision_rx,
1930 )
1931 .await
1932}
1933
1934pub async fn run_loop_with_context(
1940 agent: &dyn Agent,
1941 tools: HashMap<String, Arc<dyn Tool>>,
1942 mut run_ctx: RunContext,
1943 execution_ctx: RunExecutionContext,
1944 cancellation_token: Option<RunCancellationToken>,
1945 state_committer: Option<Arc<dyn StateCommitter>>,
1946 mut decision_rx: Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1947) -> LoopOutcome {
1948 bind_execution_context_to_run_config(&mut run_ctx, &execution_ctx);
1949
1950 let executor = llm_executor_for_run(agent);
1951 let tool_executor = agent.tool_executor();
1952 let mut run_state = LoopRunState::new();
1953 let mut pending_decisions = VecDeque::new();
1954 let run_cancellation_token = cancellation_token;
1955 let mut last_text = String::new();
1956 let mut continued_response_prefix = String::new();
1957 let step_tool_provider = step_tool_provider_for_run(agent, tools);
1958 let run_id = execution_ctx.run_id.clone();
1959 let baseline_suspended_call_ids = suspended_call_ids(&run_ctx);
1960 let pending_delta_commit =
1961 PendingDeltaCommitContext::new(&execution_ctx, state_committer.as_ref());
1962 let initial_step_tools = match resolve_step_tool_snapshot(&step_tool_provider, &run_ctx).await {
1963 Ok(snapshot) => snapshot,
1964 Err(error) => {
1965 let msg = error.to_string();
1966 return build_loop_outcome(
1967 run_ctx,
1968 TerminationReason::Error(msg.clone()),
1969 None,
1970 &run_state,
1971 Some(outcome::LoopFailure::State(msg)),
1972 );
1973 }
1974 };
1975 let StepToolSnapshot {
1976 tools: initial_tools,
1977 descriptors: initial_descriptors,
1978 } = initial_step_tools;
1979 let mut active_tool_descriptors = initial_descriptors;
1980
1981 macro_rules! terminate_run {
1982 ($termination:expr, $response:expr, $failure:expr) => {{
1983 let reason: TerminationReason = $termination;
1984 let (final_termination, final_response) =
1985 normalize_termination_for_suspended_calls(&run_ctx, reason, $response);
1986 if let Err(error) = persist_run_termination(
1987 &mut run_ctx,
1988 &final_termination,
1989 &active_tool_descriptors,
1990 agent,
1991 &execution_ctx,
1992 &pending_delta_commit,
1993 RunFinishedCommitPolicy::Required,
1994 )
1995 .await
1996 {
1997 let msg = error.to_string();
1998 return build_loop_outcome(
1999 run_ctx,
2000 TerminationReason::Error(msg.clone()),
2001 None,
2002 &run_state,
2003 Some(outcome::LoopFailure::State(msg)),
2004 );
2005 }
2006 return build_loop_outcome(
2007 run_ctx,
2008 final_termination,
2009 final_response,
2010 &run_state,
2011 $failure,
2012 );
2013 }};
2014 }
2015
2016 let (pending, actions) = match plugin_runtime::emit_phase_block(
2018 Phase::RunStart,
2019 &run_ctx,
2020 &active_tool_descriptors,
2021 agent,
2022 |_| {},
2023 )
2024 .await
2025 {
2026 Ok(result) => result,
2027 Err(error) => {
2028 let msg = error.to_string();
2029 terminate_run!(
2030 TerminationReason::Error(msg.clone()),
2031 None,
2032 Some(outcome::LoopFailure::State(msg))
2033 );
2034 }
2035 };
2036 run_ctx.add_thread_patches(pending);
2037 run_ctx.add_serialized_actions(actions);
2038 if let Err(error) = commit_run_start_and_drain_replay(
2039 &mut run_ctx,
2040 &initial_tools,
2041 agent,
2042 &active_tool_descriptors,
2043 &pending_delta_commit,
2044 )
2045 .await
2046 {
2047 let msg = error.to_string();
2048 terminate_run!(
2049 TerminationReason::Error(msg.clone()),
2050 None,
2051 Some(outcome::LoopFailure::State(msg))
2052 );
2053 }
2054 let run_start_new_suspended = newly_suspended_call_ids(&run_ctx, &baseline_suspended_call_ids);
2055 if !run_start_new_suspended.is_empty() {
2056 terminate_run!(TerminationReason::Suspended, None, None);
2057 }
2058 loop {
2059 if let Err(error) = apply_decisions_and_replay(
2060 &mut run_ctx,
2061 &mut decision_rx,
2062 &mut pending_decisions,
2063 &step_tool_provider,
2064 agent,
2065 &mut active_tool_descriptors,
2066 &pending_delta_commit,
2067 )
2068 .await
2069 {
2070 let msg = error.to_string();
2071 terminate_run!(
2072 TerminationReason::Error(msg.clone()),
2073 None,
2074 Some(outcome::LoopFailure::State(msg))
2075 );
2076 }
2077
2078 if is_run_cancelled(run_cancellation_token.as_ref()) {
2079 terminate_run!(TerminationReason::Cancelled, None, None);
2080 }
2081
2082 let step_tools = match resolve_step_tool_snapshot(&step_tool_provider, &run_ctx).await {
2083 Ok(snapshot) => snapshot,
2084 Err(e) => {
2085 let msg = e.to_string();
2086 terminate_run!(
2087 TerminationReason::Error(msg.clone()),
2088 None,
2089 Some(outcome::LoopFailure::State(msg))
2090 );
2091 }
2092 };
2093 active_tool_descriptors = step_tools.descriptors.clone();
2094
2095 let prepared = match prepare_step_execution(&run_ctx, &active_tool_descriptors, agent).await
2096 {
2097 Ok(v) => v,
2098 Err(e) => {
2099 let msg = e.to_string();
2100 terminate_run!(
2101 TerminationReason::Error(msg.clone()),
2102 None,
2103 Some(outcome::LoopFailure::State(msg))
2104 );
2105 }
2106 };
2107 run_ctx.add_thread_patches(prepared.pending_patches);
2108
2109 match prepared.run_action {
2110 RunAction::Continue => {}
2111 RunAction::Terminate(reason) => {
2112 let response = if matches!(reason, TerminationReason::BehaviorRequested) {
2113 Some(last_text.clone())
2114 } else {
2115 None
2116 };
2117 terminate_run!(reason, response, None);
2118 }
2119 }
2120
2121 let messages = prepared.messages;
2123 let filtered_tools = prepared.filtered_tools;
2124 let request_transforms = prepared.request_transforms;
2125 let chat_options = agent.chat_options().cloned();
2126 let attempt_outcome = run_llm_with_retry_and_fallback(
2127 agent,
2128 run_cancellation_token.as_ref(),
2129 true,
2130 None,
2131 "unknown llm error",
2132 |model| {
2133 let request = build_request_for_filtered_tools(
2134 &messages,
2135 &step_tools.tools,
2136 &filtered_tools,
2137 &request_transforms,
2138 );
2139 let executor = executor.clone();
2140 let chat_options = chat_options.clone();
2141 async move {
2142 executor
2143 .exec_chat_response(&model, request, chat_options.as_ref())
2144 .await
2145 }
2146 },
2147 )
2148 .await;
2149
2150 let response = match attempt_outcome {
2151 LlmAttemptOutcome::Success {
2152 value, attempts, ..
2153 } => {
2154 run_state.record_llm_attempts(attempts);
2155 value
2156 }
2157 LlmAttemptOutcome::Cancelled => {
2158 append_cancellation_user_message(&mut run_ctx, CancellationStage::Inference);
2159 terminate_run!(TerminationReason::Cancelled, None, None);
2160 }
2161 LlmAttemptOutcome::Exhausted {
2162 last_error,
2163 last_error_class,
2164 attempts,
2165 } => {
2166 run_state.record_llm_attempts(attempts);
2167 if let Err(phase_error) = apply_llm_error_cleanup(
2168 &mut run_ctx,
2169 &active_tool_descriptors,
2170 agent,
2171 "llm_exec_error",
2172 last_error.clone(),
2173 last_error_class,
2174 )
2175 .await
2176 {
2177 let msg = phase_error.to_string();
2178 terminate_run!(
2179 TerminationReason::Error(msg.clone()),
2180 None,
2181 Some(outcome::LoopFailure::State(msg))
2182 );
2183 }
2184 terminate_run!(
2185 TerminationReason::Error(last_error.clone()),
2186 None,
2187 Some(outcome::LoopFailure::Llm(last_error))
2188 );
2189 }
2190 };
2191
2192 let result = stream_result_from_chat_response(&response);
2193 run_state.update_from_response(&result);
2194 last_text = stitch_response_text(&continued_response_prefix, &result.text);
2195
2196 let assistant_msg_id = gen_message_id();
2198 let step_meta = step_metadata(Some(run_id.clone()), run_state.completed_steps as u32);
2199 let post_inference_action = match complete_step_after_inference(
2200 &mut run_ctx,
2201 &result,
2202 step_meta.clone(),
2203 Some(assistant_msg_id.clone()),
2204 &active_tool_descriptors,
2205 agent,
2206 )
2207 .await
2208 {
2209 Ok(action) => action,
2210 Err(e) => {
2211 let msg = e.to_string();
2212 terminate_run!(
2213 TerminationReason::Error(msg.clone()),
2214 None,
2215 Some(outcome::LoopFailure::State(msg))
2216 );
2217 }
2218 };
2219 if let Err(error) = pending_delta_commit
2220 .commit(
2221 &mut run_ctx,
2222 CheckpointReason::AssistantTurnCommitted,
2223 false,
2224 )
2225 .await
2226 {
2227 let msg = error.to_string();
2228 terminate_run!(
2229 TerminationReason::Error(msg.clone()),
2230 None,
2231 Some(outcome::LoopFailure::State(msg))
2232 );
2233 }
2234
2235 mark_step_completed(&mut run_state);
2236
2237 if truncation_recovery::should_retry(&result, &mut run_state) {
2240 extend_response_prefix(&mut continued_response_prefix, &result.text);
2241 let prompt = truncation_recovery::continuation_message();
2242 run_ctx.add_message(Arc::new(prompt));
2243 continue;
2244 }
2245 continued_response_prefix.clear();
2246
2247 let post_inference_termination = match &post_inference_action {
2248 RunAction::Terminate(reason) => Some(reason.clone()),
2249 _ => None,
2250 };
2251
2252 if let Some(reason) = &post_inference_termination {
2256 if !matches!(reason, TerminationReason::Stopped(_)) {
2257 terminate_run!(reason.clone(), Some(last_text.clone()), None);
2258 }
2259 }
2260
2261 if !result.needs_tools() {
2262 run_state.record_step_without_tools();
2263 let reason = post_inference_termination.unwrap_or(TerminationReason::NaturalEnd);
2264 terminate_run!(reason, Some(last_text.clone()), None);
2265 }
2266
2267 let tool_context = match prepare_tool_execution_context(&run_ctx) {
2269 Ok(ctx) => ctx,
2270 Err(e) => {
2271 let msg = e.to_string();
2272 terminate_run!(
2273 TerminationReason::Error(msg.clone()),
2274 None,
2275 Some(outcome::LoopFailure::State(msg))
2276 );
2277 }
2278 };
2279 let thread_messages_for_tools = run_ctx.messages().to_vec();
2280 let thread_version_for_tools = run_ctx.version();
2281
2282 let tool_exec_future = tool_executor.execute(ToolExecutionRequest {
2283 tools: &step_tools.tools,
2284 calls: &result.tool_calls,
2285 state: &tool_context.state,
2286 tool_descriptors: &active_tool_descriptors,
2287 agent_behavior: Some(agent.behavior()),
2288 activity_manager: tirea_contract::runtime::activity::NoOpActivityManager::arc(),
2289 run_config: &tool_context.run_config,
2290 thread_id: run_ctx.thread_id(),
2291 thread_messages: &thread_messages_for_tools,
2292 state_version: thread_version_for_tools,
2293 cancellation_token: run_cancellation_token.as_ref(),
2294 });
2295 let results = tool_exec_future.await.map_err(AgentLoopError::from);
2296
2297 let results = match results {
2298 Ok(r) => r,
2299 Err(AgentLoopError::Cancelled) => {
2300 append_cancellation_user_message(&mut run_ctx, CancellationStage::ToolExecution);
2301 terminate_run!(TerminationReason::Cancelled, None, None);
2302 }
2303 Err(e) => {
2304 let msg = e.to_string();
2305 terminate_run!(
2306 TerminationReason::Error(msg.clone()),
2307 None,
2308 Some(outcome::LoopFailure::State(msg))
2309 );
2310 }
2311 };
2312
2313 if let Err(_e) = apply_tool_results_to_session(
2314 &mut run_ctx,
2315 &results,
2316 Some(step_meta),
2317 tool_executor.requires_parallel_patch_conflict_check(),
2318 ) {
2319 let msg = _e.to_string();
2321 terminate_run!(
2322 TerminationReason::Error(msg.clone()),
2323 None,
2324 Some(outcome::LoopFailure::State(msg))
2325 );
2326 }
2327 if let Err(error) = pending_delta_commit
2328 .commit(&mut run_ctx, CheckpointReason::ToolResultsCommitted, false)
2329 .await
2330 {
2331 let msg = error.to_string();
2332 terminate_run!(
2333 TerminationReason::Error(msg.clone()),
2334 None,
2335 Some(outcome::LoopFailure::State(msg))
2336 );
2337 }
2338
2339 if let Err(error) = apply_decisions_and_replay(
2340 &mut run_ctx,
2341 &mut decision_rx,
2342 &mut pending_decisions,
2343 &step_tool_provider,
2344 agent,
2345 &mut active_tool_descriptors,
2346 &pending_delta_commit,
2347 )
2348 .await
2349 {
2350 let msg = error.to_string();
2351 terminate_run!(
2352 TerminationReason::Error(msg.clone()),
2353 None,
2354 Some(outcome::LoopFailure::State(msg))
2355 );
2356 }
2357
2358 if has_suspended_calls(&run_ctx) {
2360 let has_completed = results
2361 .iter()
2362 .any(|r| !matches!(r.outcome, crate::contracts::ToolCallOutcome::Suspended));
2363 if !has_completed {
2364 terminate_run!(TerminationReason::Suspended, None, None);
2365 }
2366 }
2367
2368 if let Some(reason) = post_inference_termination {
2371 terminate_run!(reason, Some(last_text.clone()), None);
2372 }
2373
2374 let error_count = results
2376 .iter()
2377 .filter(|r| r.execution.result.is_error())
2378 .count();
2379 run_state.record_tool_step(&result.tool_calls, error_count);
2380 }
2381}
2382
2383pub fn run_loop_stream(
2389 agent: Arc<dyn Agent>,
2390 tools: HashMap<String, Arc<dyn Tool>>,
2391 run_ctx: RunContext,
2392 cancellation_token: Option<RunCancellationToken>,
2393 state_committer: Option<Arc<dyn StateCommitter>>,
2394 decision_rx: Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
2395) -> Pin<Box<dyn Stream<Item = AgentEvent> + Send>> {
2396 let execution_ctx = RunExecutionContext::from_run_config(&run_ctx);
2397 run_loop_stream_with_context(
2398 agent,
2399 tools,
2400 run_ctx,
2401 execution_ctx,
2402 cancellation_token,
2403 state_committer,
2404 decision_rx,
2405 )
2406}
2407
2408pub fn run_loop_stream_with_context(
2409 agent: Arc<dyn Agent>,
2410 tools: HashMap<String, Arc<dyn Tool>>,
2411 mut run_ctx: RunContext,
2412 execution_ctx: RunExecutionContext,
2413 cancellation_token: Option<RunCancellationToken>,
2414 state_committer: Option<Arc<dyn StateCommitter>>,
2415 decision_rx: Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
2416) -> Pin<Box<dyn Stream<Item = AgentEvent> + Send>> {
2417 bind_execution_context_to_run_config(&mut run_ctx, &execution_ctx);
2418
2419 stream_runner::run_stream(
2420 agent,
2421 tools,
2422 run_ctx,
2423 execution_ctx,
2424 cancellation_token,
2425 state_committer,
2426 decision_rx,
2427 )
2428}
2429
2430#[cfg(test)]
2431mod tests;