1mod config;
42mod core;
43mod event_envelope_meta;
44mod outcome;
45mod parallel_state_merge;
46mod plugin_runtime;
47mod run_state;
48mod state_commit;
49mod stream_core;
50mod stream_runner;
51mod tool_exec;
52mod truncation_recovery;
53
54use crate::contracts::io::ResumeDecisionAction;
55use crate::contracts::runtime::phase::Phase;
56use crate::contracts::runtime::state::{reduce_state_actions, AnyStateAction, ScopeContext};
57use crate::contracts::runtime::tool_call::{Tool, ToolResult};
58use crate::contracts::runtime::ActivityManager;
59use crate::contracts::runtime::{
60 DecisionReplayPolicy, RunLifecycleAction, RunLifecycleState, StreamResult, SuspendedCall,
61 ToolCallResume, ToolCallResumeMode, ToolCallStatus, ToolExecutionRequest, ToolExecutionResult,
62};
63use crate::contracts::thread::CheckpointReason;
64use crate::contracts::thread::{gen_message_id, Message, MessageMetadata, ToolCall};
65use crate::contracts::RunContext;
66use crate::contracts::{AgentEvent, RunAction, RunOrigin, TerminationReason, ToolCallDecision};
67use crate::engine::convert::{assistant_message, assistant_tool_calls, tool_response};
68use crate::runtime::activity::ActivityHub;
69
70use crate::runtime::streaming::StreamCollector;
71use async_stream::stream;
72use futures::{Stream, StreamExt};
73use genai::Client;
74use serde_json::Value;
75use std::collections::{HashMap, HashSet, VecDeque};
76use std::future::Future;
77use std::pin::Pin;
78use std::sync::atomic::{AtomicU64, Ordering};
79use std::sync::Arc;
80use std::time::{Instant, SystemTime, UNIX_EPOCH};
81use uuid::Uuid;
82
83pub use crate::contracts::runtime::ToolExecutor;
84pub use crate::runtime::run_context::{
85 await_or_cancel, is_cancelled, CancelAware, RunCancellationToken, StateCommitError,
86 StateCommitter, TOOL_SCOPE_CALLER_AGENT_ID_KEY, TOOL_SCOPE_CALLER_MESSAGES_KEY,
87 TOOL_SCOPE_CALLER_STATE_KEY, TOOL_SCOPE_CALLER_THREAD_ID_KEY,
88};
89use config::StaticStepToolProvider;
90pub use config::{Agent, BaseAgent, GenaiLlmExecutor, LlmRetryPolicy};
91pub use config::{LlmEventStream, LlmExecutor};
92pub use config::{StepToolInput, StepToolProvider, StepToolSnapshot};
93#[cfg(test)]
94use core::build_messages;
95use core::{
96 build_request_for_filtered_tools, inference_inputs_from_step, suspended_calls_from_ctx,
97 tool_call_states_from_ctx, transition_tool_call_state, upsert_tool_call_state,
98 ToolCallStateSeed, ToolCallStateTransition,
99};
100pub use outcome::{tool_map, tool_map_from_arc, AgentLoopError};
101pub use outcome::{LoopOutcome, LoopStats, LoopUsage};
102#[cfg(test)]
103use plugin_runtime::emit_agent_phase;
104#[cfg(test)]
105use plugin_runtime::emit_cleanup_phases;
106use run_state::LoopRunState;
107pub use state_commit::ChannelStateCommitter;
108use state_commit::PendingDeltaCommitContext;
109use tirea_state::TrackedPatch;
110#[cfg(test)]
111use tokio_util::sync::CancellationToken;
112#[cfg(test)]
113use tool_exec::execute_single_tool_with_phases;
114#[cfg(test)]
115use tool_exec::execute_tools_parallel_with_phases;
116pub use tool_exec::ExecuteToolsOutcome;
117use tool_exec::{
118 apply_tool_results_impl, apply_tool_results_to_session,
119 execute_single_tool_with_phases_deferred, scope_with_tool_caller_context, step_metadata,
120 ToolPhaseContext,
121};
122pub use tool_exec::{
123 execute_tools, execute_tools_with_behaviors, execute_tools_with_config,
124 ParallelToolExecutionMode, ParallelToolExecutor, SequentialToolExecutor,
125};
126
127pub struct ResolvedRun {
133 pub agent: BaseAgent,
139 pub tools: HashMap<String, Arc<dyn Tool>>,
141 pub run_config: crate::contracts::RunConfig,
143}
144
145impl ResolvedRun {
146 #[must_use]
148 pub fn with_tool(mut self, id: String, tool: Arc<dyn Tool>) -> Self {
149 self.tools.insert(id, tool);
150 self
151 }
152
153 pub fn overlay_tools(&mut self, tools: HashMap<String, Arc<dyn Tool>>) {
155 for (id, tool) in tools {
156 self.tools.entry(id).or_insert(tool);
157 }
158 }
159}
160
161#[derive(Debug, Clone)]
162pub struct RunExecutionContext {
163 pub run_id: String,
164 pub parent_run_id: Option<String>,
165 pub agent_id: String,
166 pub origin: RunOrigin,
167}
168
169impl RunExecutionContext {
170 pub const RUN_ID_KEY: &'static str = "run_id";
171 pub const PARENT_RUN_ID_KEY: &'static str = "parent_run_id";
172 pub const AGENT_ID_KEY: &'static str = "agent_id";
173 pub const ORIGIN_KEY: &'static str = "origin";
174
175 #[must_use]
176 pub const fn new(
177 run_id: String,
178 parent_run_id: Option<String>,
179 agent_id: String,
180 origin: RunOrigin,
181 ) -> Self {
182 Self {
183 run_id,
184 parent_run_id,
185 agent_id,
186 origin,
187 }
188 }
189
190 #[must_use]
191 pub fn from_run_config(run_ctx: &RunContext) -> Self {
192 let run_id = run_ctx
193 .run_config
194 .value(Self::RUN_ID_KEY)
195 .and_then(Value::as_str)
196 .map(str::trim)
197 .filter(|id| !id.is_empty())
198 .map(ToOwned::to_owned)
199 .unwrap_or_else(uuid_v7);
200
201 let parent_run_id = run_ctx
202 .run_config
203 .value(Self::PARENT_RUN_ID_KEY)
204 .and_then(Value::as_str)
205 .map(str::trim)
206 .filter(|id| !id.is_empty())
207 .map(ToOwned::to_owned);
208
209 let agent_id = run_ctx
210 .run_config
211 .value(Self::AGENT_ID_KEY)
212 .and_then(Value::as_str)
213 .map(str::trim)
214 .filter(|id| !id.is_empty())
215 .map(ToOwned::to_owned)
216 .unwrap_or_else(|| run_ctx.thread_id().to_string());
217
218 let origin = run_ctx
219 .run_config
220 .value(Self::ORIGIN_KEY)
221 .and_then(|value| serde_json::from_value::<RunOrigin>(value.clone()).ok())
222 .unwrap_or_default();
223
224 Self::new(run_id, parent_run_id, agent_id, origin)
225 }
226}
227
228fn uuid_v7() -> String {
229 Uuid::now_v7().simple().to_string()
230}
231
232fn bind_execution_context_to_run_config(
233 run_ctx: &mut RunContext,
234 execution_ctx: &RunExecutionContext,
235) {
236 let _ = run_ctx.run_config.set(
237 RunExecutionContext::RUN_ID_KEY,
238 execution_ctx.run_id.clone(),
239 );
240 if let Some(parent_run_id) = execution_ctx.parent_run_id.as_deref() {
241 let _ = run_ctx.run_config.set(
242 RunExecutionContext::PARENT_RUN_ID_KEY,
243 parent_run_id.to_string(),
244 );
245 }
246 let _ = run_ctx.run_config.set(
247 RunExecutionContext::AGENT_ID_KEY,
248 execution_ctx.agent_id.clone(),
249 );
250 if let Ok(origin_value) = serde_json::to_value(execution_ctx.origin) {
251 let _ = run_ctx
252 .run_config
253 .set(RunExecutionContext::ORIGIN_KEY, origin_value);
254 }
255}
256
257pub(crate) fn current_unix_millis() -> u64 {
258 SystemTime::now()
259 .duration_since(UNIX_EPOCH)
260 .map_or(0, |d| d.as_millis().min(u128::from(u64::MAX)) as u64)
261}
262
263#[cfg(test)]
264pub(super) fn sync_run_lifecycle_for_termination(
265 run_ctx: &mut RunContext,
266 execution_ctx: &RunExecutionContext,
267 termination: &TerminationReason,
268) -> Result<(), AgentLoopError> {
269 sync_run_lifecycle_for_termination_with_context(run_ctx, execution_ctx, termination)
270}
271
272fn sync_run_lifecycle_for_termination_with_context(
273 run_ctx: &mut RunContext,
274 execution_ctx: &RunExecutionContext,
275 termination: &TerminationReason,
276) -> Result<(), AgentLoopError> {
277 if execution_ctx.run_id.trim().is_empty() {
278 return Ok(());
279 };
280
281 let (status, done_reason) = termination.to_run_status();
282
283 let base_state = run_ctx
284 .snapshot()
285 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
286 let actions = vec![AnyStateAction::new::<RunLifecycleState>(
287 RunLifecycleAction::Set {
288 id: execution_ctx.run_id.clone(),
289 status,
290 done_reason,
291 updated_at: current_unix_millis(),
292 },
293 )];
294 let patches = reduce_state_actions(actions, &base_state, "agent_loop", &ScopeContext::run())
295 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
296 run_ctx.add_thread_patches(patches);
297 Ok(())
298}
299
300#[derive(Debug, Clone, Copy, PartialEq, Eq)]
301pub(super) enum CancellationStage {
302 Inference,
303 ToolExecution,
304}
305
306pub(super) const CANCELLATION_INFERENCE_USER_MESSAGE: &str =
307 "The previous run was interrupted during inference. Please continue from the current context.";
308pub(super) const CANCELLATION_TOOL_USER_MESSAGE: &str =
309 "The previous run was interrupted while using tools. Please continue from the current context.";
310
311pub(super) fn append_cancellation_user_message(run_ctx: &mut RunContext, stage: CancellationStage) {
312 let content = match stage {
313 CancellationStage::Inference => CANCELLATION_INFERENCE_USER_MESSAGE,
314 CancellationStage::ToolExecution => CANCELLATION_TOOL_USER_MESSAGE,
315 };
316 run_ctx.add_message(Arc::new(Message::user(content)));
317}
318
319pub(super) fn effective_llm_models(agent: &dyn Agent) -> Vec<String> {
320 let mut models = Vec::with_capacity(1 + agent.fallback_models().len());
321 models.push(agent.model().to_string());
322 for model in agent.fallback_models() {
323 if model.trim().is_empty() {
324 continue;
325 }
326 if !models.iter().any(|m| m == model) {
327 models.push(model.clone());
328 }
329 }
330 models
331}
332
333pub(super) fn effective_llm_models_from(
334 agent: &dyn Agent,
335 start_model: Option<&str>,
336) -> Vec<String> {
337 let models = effective_llm_models(agent);
338 let Some(start_model) = start_model.map(str::trim).filter(|model| !model.is_empty()) else {
339 return models;
340 };
341 let Some(index) = models.iter().position(|model| model == start_model) else {
342 return models;
343 };
344 models.into_iter().skip(index).collect()
345}
346
347pub(super) fn next_llm_model_after(agent: &dyn Agent, current_model: &str) -> Option<String> {
348 let models = effective_llm_models(agent);
349 let current_index = models.iter().position(|model| model == current_model)?;
350 models.into_iter().nth(current_index + 1)
351}
352
353pub(super) fn llm_retry_attempts(agent: &dyn Agent) -> usize {
354 agent.llm_retry_policy().max_attempts_per_model.max(1)
355}
356
357#[derive(Debug, Clone, Copy, PartialEq, Eq)]
358pub(super) enum LlmErrorClass {
359 RateLimit,
360 Timeout,
361 Connection,
362 ServerUnavailable,
363 ServerError,
364 Auth,
365 ClientRequest,
366 Unknown,
367}
368
369impl LlmErrorClass {
370 pub(super) fn is_retryable(self) -> bool {
371 matches!(
372 self,
373 LlmErrorClass::RateLimit
374 | LlmErrorClass::Timeout
375 | LlmErrorClass::Connection
376 | LlmErrorClass::ServerUnavailable
377 | LlmErrorClass::ServerError
378 )
379 }
380
381 pub(super) fn as_str(self) -> &'static str {
383 match self {
384 LlmErrorClass::RateLimit => "rate_limit",
385 LlmErrorClass::Timeout => "timeout",
386 LlmErrorClass::Connection => "connection",
387 LlmErrorClass::ServerUnavailable => "server_unavailable",
388 LlmErrorClass::ServerError => "server_error",
389 LlmErrorClass::Auth => "auth",
390 LlmErrorClass::ClientRequest => "client_request",
391 LlmErrorClass::Unknown => "unknown",
392 }
393 }
394}
395
396fn classify_llm_error_message(message: &str) -> LlmErrorClass {
397 let lower = message.to_ascii_lowercase();
398 if ["429", "too many requests", "rate limit"]
399 .iter()
400 .any(|p| lower.contains(p))
401 {
402 return LlmErrorClass::RateLimit;
403 }
404 if ["timeout", "timed out"].iter().any(|p| lower.contains(p)) {
405 return LlmErrorClass::Timeout;
406 }
407 if [
408 "connection",
409 "network",
410 "reset by peer",
411 "broken pipe",
412 "eof",
413 "connection refused",
414 "error sending request for url",
415 ]
416 .iter()
417 .any(|p| lower.contains(p))
418 {
419 return LlmErrorClass::Connection;
420 }
421 if ["503", "service unavailable", "unavailable", "temporar"]
422 .iter()
423 .any(|p| lower.contains(p))
424 {
425 return LlmErrorClass::ServerUnavailable;
426 }
427 if [
428 "500",
429 "502",
430 "504",
431 "server error",
432 "bad gateway",
433 "gateway timeout",
434 ]
435 .iter()
436 .any(|p| lower.contains(p))
437 {
438 return LlmErrorClass::ServerError;
439 }
440 if ["401", "403", "unauthorized", "forbidden", "invalid api key"]
441 .iter()
442 .any(|p| lower.contains(p))
443 {
444 return LlmErrorClass::Auth;
445 }
446 if ["400", "404", "422", "invalid_request", "bad request"]
447 .iter()
448 .any(|p| lower.contains(p))
449 {
450 return LlmErrorClass::ClientRequest;
451 }
452 LlmErrorClass::Unknown
453}
454
455fn classify_status_code(status_code: u16) -> LlmErrorClass {
456 match status_code {
457 408 => LlmErrorClass::Timeout,
458 429 => LlmErrorClass::RateLimit,
459 401 | 403 => LlmErrorClass::Auth,
460 400 | 404 | 422 => LlmErrorClass::ClientRequest,
461 503 => LlmErrorClass::ServerUnavailable,
462 500..=599 => LlmErrorClass::ServerError,
463 400..=499 => LlmErrorClass::ClientRequest,
464 _ => LlmErrorClass::Unknown,
465 }
466}
467
468fn classify_error_chain(error: &(dyn std::error::Error + 'static)) -> Option<LlmErrorClass> {
469 let mut current = Some(error);
470 while let Some(err) = current {
471 if let Some(io_error) = err.downcast_ref::<std::io::Error>() {
472 use std::io::ErrorKind;
473
474 let class = match io_error.kind() {
475 ErrorKind::TimedOut => Some(LlmErrorClass::Timeout),
476 ErrorKind::ConnectionAborted
477 | ErrorKind::ConnectionRefused
478 | ErrorKind::ConnectionReset
479 | ErrorKind::BrokenPipe
480 | ErrorKind::NotConnected
481 | ErrorKind::UnexpectedEof => Some(LlmErrorClass::Connection),
482 _ => None,
483 };
484 if class.is_some() {
485 return class;
486 }
487 }
488 current = err.source();
489 }
490 None
491}
492
493fn classify_webc_error(error: &genai::webc::Error) -> LlmErrorClass {
494 match error {
495 genai::webc::Error::ResponseFailedStatus { status, .. } => {
496 classify_status_code(status.as_u16())
497 }
498 genai::webc::Error::Reqwest(err) => classify_error_chain(err)
499 .unwrap_or_else(|| classify_llm_error_message(&err.to_string())),
500 _ => classify_llm_error_message(&error.to_string()),
501 }
502}
503
504fn classify_chat_response_body(body: &serde_json::Value) -> LlmErrorClass {
515 let top_type = body.get("type").and_then(|v| v.as_str());
521 let nested_type = body
522 .get("error")
523 .and_then(|e| e.get("type"))
524 .and_then(|v| v.as_str());
525
526 let candidates: &[&str] = match (top_type, nested_type) {
529 (_, Some(inner)) => &[inner, top_type.unwrap_or("")],
530 (Some(top), None) => &[top],
531 (None, None) => &[],
532 };
533
534 for t in candidates {
535 let lower = t.to_ascii_lowercase();
536 if lower.contains("rate_limit") {
537 return LlmErrorClass::RateLimit;
538 }
539 if lower.contains("overloaded") || lower.contains("unavailable") {
540 return LlmErrorClass::ServerUnavailable;
541 }
542 if lower.contains("timeout") {
543 return LlmErrorClass::Timeout;
544 }
545 if lower.contains("server_error") || lower.contains("api_error") {
546 return LlmErrorClass::ServerError;
547 }
548 if lower.contains("authentication") || lower.contains("permission") {
549 return LlmErrorClass::Auth;
550 }
551 if lower.contains("invalid_request") || lower.contains("not_found") {
552 return LlmErrorClass::ClientRequest;
553 }
554 }
555
556 let status_code = body
559 .get("status")
560 .or_else(|| body.get("code"))
561 .and_then(|v| v.as_u64())
562 .and_then(|v| u16::try_from(v).ok());
563
564 if let Some(code) = status_code {
565 let class = classify_status_code(code);
566 if class != LlmErrorClass::Unknown {
567 return class;
568 }
569 }
570
571 LlmErrorClass::Unknown
572}
573
574pub(super) fn classify_llm_error(error: &genai::Error) -> LlmErrorClass {
575 match error {
576 genai::Error::HttpError { status, .. } => classify_status_code(status.as_u16()),
577 genai::Error::WebStream { cause, error, .. } => classify_error_chain(error.as_ref())
578 .unwrap_or_else(|| classify_llm_error_message(cause)),
579 genai::Error::WebAdapterCall { webc_error, .. }
580 | genai::Error::WebModelCall { webc_error, .. } => classify_webc_error(webc_error),
581 genai::Error::Internal(message) => classify_llm_error_message(message),
582
583 genai::Error::StreamParse { .. } => LlmErrorClass::Connection,
586
587 genai::Error::NoChatResponse { .. } => LlmErrorClass::ServerError,
589
590 genai::Error::ChatResponse { body, .. } => classify_chat_response_body(body),
592
593 genai::Error::RequiresApiKey { .. }
595 | genai::Error::NoAuthResolver { .. }
596 | genai::Error::NoAuthData { .. } => LlmErrorClass::Auth,
597
598 other => classify_llm_error_message(&other.to_string()),
599 }
600}
601
602#[cfg(test)]
603pub(super) fn is_retryable_llm_error(error: &genai::Error) -> bool {
604 classify_llm_error(error).is_retryable()
605}
606
607static RETRY_BACKOFF_ENTROPY: AtomicU64 = AtomicU64::new(0x9e37_79b9_7f4a_7c15);
608
609#[derive(Debug, Clone, Default)]
610pub(super) struct RetryBackoffWindow {
611 started_at: Option<Instant>,
612}
613
614impl RetryBackoffWindow {
615 pub(super) fn elapsed_ms(&mut self) -> u64 {
616 self.started_at
617 .get_or_insert_with(Instant::now)
618 .elapsed()
619 .as_millis()
620 .try_into()
621 .unwrap_or(u64::MAX)
622 }
623
624 pub(super) fn reset(&mut self) {
625 self.started_at = None;
626 }
627}
628
629#[derive(Debug, Clone, Copy, PartialEq, Eq)]
630pub(super) enum RetryBackoffOutcome {
631 Completed,
632 Cancelled,
633 BudgetExhausted,
634}
635
636fn mix_retry_entropy(mut entropy: u64) -> u64 {
637 entropy = entropy.wrapping_add(0x9e37_79b9_7f4a_7c15);
638 entropy = (entropy ^ (entropy >> 30)).wrapping_mul(0xbf58_476d_1ce4_e5b9);
639 entropy = (entropy ^ (entropy >> 27)).wrapping_mul(0x94d0_49bb_1331_11eb);
640 entropy ^ (entropy >> 31)
641}
642
643fn next_retry_entropy() -> u64 {
644 let counter = RETRY_BACKOFF_ENTROPY.fetch_add(0x9e37_79b9_7f4a_7c15, Ordering::Relaxed);
645 let now = SystemTime::now()
646 .duration_since(UNIX_EPOCH)
647 .map(|duration| duration.as_nanos() as u64)
648 .unwrap_or(counter);
649 mix_retry_entropy(counter ^ now)
650}
651
652pub(super) fn retry_base_backoff_ms(policy: &LlmRetryPolicy, retry_attempt: usize) -> u64 {
653 let initial = policy.initial_backoff_ms;
654 let cap = policy.max_backoff_ms.max(policy.initial_backoff_ms);
655 let attempt = retry_attempt.max(1);
656 if attempt == 1 {
657 return initial.min(cap);
658 }
659 let shift = (attempt - 1).min(20) as u32;
660 let factor = 1u64.checked_shl(shift).unwrap_or(u64::MAX);
661 initial.saturating_mul(factor).min(cap)
662}
663
664fn jittered_backoff_ms(policy: &LlmRetryPolicy, retry_attempt: usize, entropy: u64) -> u64 {
665 let cap = policy.max_backoff_ms.max(policy.initial_backoff_ms);
666 let base_ms = retry_base_backoff_ms(policy, retry_attempt).min(cap);
667 let jitter_percent = policy.backoff_jitter_percent.min(100) as u64;
668 if jitter_percent == 0 || base_ms == 0 {
669 return base_ms;
670 }
671
672 let jitter_window = base_ms.saturating_mul(jitter_percent) / 100;
673 let lower = base_ms.saturating_sub(jitter_window);
674 let upper = base_ms.saturating_add(jitter_window).min(cap);
675 if upper <= lower {
676 return lower;
677 }
678
679 let span = upper - lower;
680 lower + (mix_retry_entropy(entropy) % (span.saturating_add(1)))
681}
682
683pub(super) fn retry_backoff_plan_ms(
684 policy: &LlmRetryPolicy,
685 retry_attempt: usize,
686 elapsed_ms: u64,
687 entropy: u64,
688) -> Option<u64> {
689 let wait_ms = jittered_backoff_ms(policy, retry_attempt, entropy);
690 match policy.max_retry_window_ms {
691 Some(budget_ms) if elapsed_ms.saturating_add(wait_ms) > budget_ms => None,
692 _ => Some(wait_ms),
693 }
694}
695
696pub(super) async fn wait_retry_backoff(
697 agent: &dyn Agent,
698 retry_attempt: usize,
699 retry_window: &mut RetryBackoffWindow,
700 run_cancellation_token: Option<&RunCancellationToken>,
701) -> RetryBackoffOutcome {
702 let elapsed_ms = retry_window.elapsed_ms();
703 let Some(wait_ms) = retry_backoff_plan_ms(
704 agent.llm_retry_policy(),
705 retry_attempt,
706 elapsed_ms,
707 next_retry_entropy(),
708 ) else {
709 return RetryBackoffOutcome::BudgetExhausted;
710 };
711 tracing::debug!(
712 attempt = retry_attempt,
713 backoff_ms = wait_ms,
714 elapsed_ms = elapsed_ms,
715 "waiting before LLM retry"
716 );
717 match await_or_cancel(
718 run_cancellation_token,
719 tokio::time::sleep(std::time::Duration::from_millis(wait_ms)),
720 )
721 .await
722 {
723 CancelAware::Cancelled => RetryBackoffOutcome::Cancelled,
724 CancelAware::Value(_) => RetryBackoffOutcome::Completed,
725 }
726}
727
728pub(super) enum LlmAttemptOutcome<T> {
729 Success {
730 value: T,
731 model: String,
732 attempts: usize,
733 },
734 Cancelled,
735 Exhausted {
736 last_error: String,
737 last_error_class: Option<&'static str>,
738 attempts: usize,
739 },
740}
741
742fn is_run_cancelled(token: Option<&RunCancellationToken>) -> bool {
743 is_cancelled(token)
744}
745
746pub(super) fn step_tool_provider_for_run(
747 agent: &dyn Agent,
748 tools: HashMap<String, Arc<dyn Tool>>,
749) -> Arc<dyn StepToolProvider> {
750 agent.step_tool_provider().unwrap_or_else(|| {
751 Arc::new(StaticStepToolProvider::new(tools)) as Arc<dyn StepToolProvider>
752 })
753}
754
755pub(super) fn llm_executor_for_run(agent: &dyn Agent) -> Arc<dyn LlmExecutor> {
756 agent
757 .llm_executor()
758 .unwrap_or_else(|| Arc::new(GenaiLlmExecutor::new(Client::default())))
759}
760
761pub(super) async fn resolve_step_tool_snapshot(
762 step_tool_provider: &Arc<dyn StepToolProvider>,
763 run_ctx: &RunContext,
764) -> Result<StepToolSnapshot, AgentLoopError> {
765 step_tool_provider
766 .provide(StepToolInput { state: run_ctx })
767 .await
768}
769
770fn mark_step_completed(run_state: &mut LoopRunState) {
771 run_state.completed_steps += 1;
772}
773
774fn stitch_response_text(prefix: &str, segment: &str) -> String {
775 if prefix.is_empty() {
776 return segment.to_string();
777 }
778 if segment.is_empty() {
779 return prefix.to_string();
780 }
781 let mut stitched = String::with_capacity(prefix.len() + segment.len());
782 stitched.push_str(prefix);
783 stitched.push_str(segment);
784 stitched
785}
786
787fn extend_response_prefix(prefix: &mut String, segment: &str) {
788 if !segment.is_empty() {
789 prefix.push_str(segment);
790 }
791}
792
793fn build_loop_outcome(
794 run_ctx: RunContext,
795 termination: TerminationReason,
796 response: Option<String>,
797 run_state: &LoopRunState,
798 failure: Option<outcome::LoopFailure>,
799) -> LoopOutcome {
800 LoopOutcome {
801 run_ctx,
802 termination,
803 response: response.filter(|text| !text.is_empty()),
804 usage: run_state.usage(),
805 stats: run_state.stats(),
806 failure,
807 }
808}
809
810pub(super) async fn run_llm_with_retry_and_fallback<T, Invoke, Fut>(
811 agent: &dyn Agent,
812 run_cancellation_token: Option<&RunCancellationToken>,
813 retry_current_model: bool,
814 start_model: Option<&str>,
815 unknown_error: &str,
816 mut invoke: Invoke,
817) -> LlmAttemptOutcome<T>
818where
819 Invoke: FnMut(String) -> Fut,
820 Fut: std::future::Future<Output = genai::Result<T>>,
821{
822 let mut last_llm_error = unknown_error.to_string();
823 let mut last_error_class: Option<&'static str> = None;
824 let model_candidates = effective_llm_models_from(agent, start_model);
825 let max_attempts = llm_retry_attempts(agent);
826 let mut total_attempts = 0usize;
827 let mut retry_window = RetryBackoffWindow::default();
828
829 'models: for model in model_candidates {
830 for attempt in 1..=max_attempts {
831 total_attempts = total_attempts.saturating_add(1);
832 let response_res =
833 match await_or_cancel(run_cancellation_token, invoke(model.clone())).await {
834 CancelAware::Cancelled => return LlmAttemptOutcome::Cancelled,
835 CancelAware::Value(resp) => resp,
836 };
837
838 match response_res {
839 Ok(value) => {
840 if total_attempts > 1 {
841 tracing::info!(
842 model = %model,
843 attempts = total_attempts,
844 "LLM call succeeded after retries"
845 );
846 }
847 return LlmAttemptOutcome::Success {
848 value,
849 model,
850 attempts: total_attempts,
851 };
852 }
853 Err(e) => {
854 let error_class = classify_llm_error(&e);
855 last_error_class = Some(error_class.as_str());
856 let message = e.to_string();
857 last_llm_error =
858 format!("model='{model}' attempt={attempt}/{max_attempts}: {message}");
859 let can_retry_same_model =
860 retry_current_model && attempt < max_attempts && error_class.is_retryable();
861 tracing::warn!(
862 model = %model,
863 attempt = attempt,
864 max_attempts = max_attempts,
865 error_class = error_class.as_str(),
866 retryable = can_retry_same_model,
867 error = %message,
868 "LLM call failed"
869 );
870 if can_retry_same_model {
871 match wait_retry_backoff(
872 agent,
873 attempt,
874 &mut retry_window,
875 run_cancellation_token,
876 )
877 .await
878 {
879 RetryBackoffOutcome::Completed => continue,
880 RetryBackoffOutcome::Cancelled => {
881 return LlmAttemptOutcome::Cancelled;
882 }
883 RetryBackoffOutcome::BudgetExhausted => {
884 tracing::warn!(
885 model = %model,
886 attempt = attempt,
887 "LLM retry budget exhausted"
888 );
889 last_llm_error =
890 format!("{last_llm_error} (retry budget exhausted)");
891 break 'models;
892 }
893 }
894 }
895 break;
896 }
897 }
898 }
899 }
900
901 LlmAttemptOutcome::Exhausted {
902 last_error: last_llm_error,
903 last_error_class,
904 attempts: total_attempts,
905 }
906}
907
908pub(super) async fn run_step_prepare_phases(
909 run_ctx: &RunContext,
910 tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
911 agent: &dyn Agent,
912) -> Result<
913 (
914 Vec<Message>,
915 Vec<String>,
916 RunAction,
917 Vec<std::sync::Arc<dyn tirea_contract::runtime::inference::InferenceRequestTransform>>,
918 Vec<TrackedPatch>,
919 Vec<tirea_contract::SerializedStateAction>,
920 ),
921 AgentLoopError,
922> {
923 let system_prompt = agent.system_prompt().to_string();
924 let ((messages, filtered_tools, run_action, transforms), pending, actions) =
925 plugin_runtime::run_phase_block(
926 run_ctx,
927 tool_descriptors,
928 agent,
929 &[Phase::StepStart, Phase::BeforeInference],
930 |_| {},
931 |step| inference_inputs_from_step(step, &system_prompt),
932 )
933 .await?;
934 Ok((
935 messages,
936 filtered_tools,
937 run_action,
938 transforms,
939 pending,
940 actions,
941 ))
942}
943
944pub(super) struct PreparedStep {
945 pub(super) messages: Vec<Message>,
946 pub(super) filtered_tools: Vec<String>,
947 pub(super) run_action: RunAction,
948 pub(super) pending_patches: Vec<TrackedPatch>,
949 pub(super) serialized_state_actions: Vec<tirea_contract::SerializedStateAction>,
950 pub(super) request_transforms:
951 Vec<std::sync::Arc<dyn tirea_contract::runtime::inference::InferenceRequestTransform>>,
952}
953
954pub(super) async fn prepare_step_execution(
955 run_ctx: &RunContext,
956 tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
957 agent: &dyn Agent,
958) -> Result<PreparedStep, AgentLoopError> {
959 let (messages, filtered_tools, run_action, transforms, pending, actions) =
960 run_step_prepare_phases(run_ctx, tool_descriptors, agent).await?;
961 Ok(PreparedStep {
962 messages,
963 filtered_tools,
964 run_action,
965 pending_patches: pending,
966 serialized_state_actions: actions,
967 request_transforms: transforms,
968 })
969}
970
971pub(super) async fn apply_llm_error_cleanup(
972 run_ctx: &mut RunContext,
973 tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
974 agent: &dyn Agent,
975 error_type: &'static str,
976 message: String,
977 error_class: Option<&str>,
978) -> Result<(), AgentLoopError> {
979 plugin_runtime::emit_cleanup_phases(
980 run_ctx,
981 tool_descriptors,
982 agent,
983 error_type,
984 message,
985 error_class,
986 )
987 .await
988}
989
990pub(super) async fn complete_step_after_inference(
991 run_ctx: &mut RunContext,
992 result: &StreamResult,
993 step_meta: MessageMetadata,
994 assistant_message_id: Option<String>,
995 tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
996 agent: &dyn Agent,
997) -> Result<RunAction, AgentLoopError> {
998 let (run_action, pending, actions) = plugin_runtime::run_phase_block(
999 run_ctx,
1000 tool_descriptors,
1001 agent,
1002 &[Phase::AfterInference],
1003 |step| {
1004 use crate::contracts::runtime::inference::LLMResponse;
1005 step.llm_response = Some(LLMResponse::success(result.clone()));
1006 },
1007 |step| step.run_action(),
1008 )
1009 .await?;
1010 run_ctx.add_thread_patches(pending);
1011 run_ctx.add_serialized_state_actions(actions);
1012
1013 let assistant = assistant_turn_message(result, step_meta, assistant_message_id);
1014 run_ctx.add_message(Arc::new(assistant));
1015
1016 let (pending, actions) =
1017 plugin_runtime::emit_phase_block(Phase::StepEnd, run_ctx, tool_descriptors, agent, |_| {})
1018 .await?;
1019 run_ctx.add_thread_patches(pending);
1020 run_ctx.add_serialized_state_actions(actions);
1021 Ok(run_action)
1022}
1023
1024pub(super) fn pending_tool_events(call: &SuspendedCall) -> Vec<AgentEvent> {
1026 vec![
1027 AgentEvent::ToolCallStart {
1028 id: call.ticket.pending.id.clone(),
1029 name: call.ticket.pending.name.clone(),
1030 },
1031 AgentEvent::ToolCallReady {
1032 id: call.ticket.pending.id.clone(),
1033 name: call.ticket.pending.name.clone(),
1034 arguments: call.ticket.pending.arguments.clone(),
1035 },
1036 ]
1037}
1038
1039pub(super) fn has_suspended_calls(run_ctx: &RunContext) -> bool {
1040 !suspended_calls_from_ctx(run_ctx).is_empty()
1041}
1042
1043pub(super) fn suspended_call_ids(run_ctx: &RunContext) -> HashSet<String> {
1044 suspended_calls_from_ctx(run_ctx).into_keys().collect()
1045}
1046
1047pub(super) fn newly_suspended_call_ids(
1048 run_ctx: &RunContext,
1049 baseline_ids: &HashSet<String>,
1050) -> HashSet<String> {
1051 suspended_calls_from_ctx(run_ctx)
1052 .into_keys()
1053 .filter(|id| !baseline_ids.contains(id))
1054 .collect()
1055}
1056
1057pub(super) fn suspended_call_pending_events(run_ctx: &RunContext) -> Vec<AgentEvent> {
1058 let mut calls: Vec<SuspendedCall> = suspended_calls_from_ctx(run_ctx).into_values().collect();
1059 calls.sort_by(|left, right| left.call_id.cmp(&right.call_id));
1060 calls
1061 .into_iter()
1062 .flat_map(|call| pending_tool_events(&call))
1063 .collect()
1064}
1065
1066pub(super) fn suspended_call_pending_events_for_ids(
1067 run_ctx: &RunContext,
1068 call_ids: &HashSet<String>,
1069) -> Vec<AgentEvent> {
1070 if call_ids.is_empty() {
1071 return Vec::new();
1072 }
1073 let mut calls: Vec<SuspendedCall> = suspended_calls_from_ctx(run_ctx)
1074 .into_iter()
1075 .filter_map(|(call_id, call)| call_ids.contains(&call_id).then_some(call))
1076 .collect();
1077 calls.sort_by(|left, right| left.call_id.cmp(&right.call_id));
1078 calls
1079 .into_iter()
1080 .flat_map(|call| pending_tool_events(&call))
1081 .collect()
1082}
1083
1084pub(super) struct ToolExecutionContext {
1085 pub(super) state: serde_json::Value,
1086 pub(super) run_config: tirea_contract::RunConfig,
1087}
1088
1089pub(super) fn prepare_tool_execution_context(
1090 run_ctx: &RunContext,
1091) -> Result<ToolExecutionContext, AgentLoopError> {
1092 let state = run_ctx
1093 .snapshot()
1094 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1095 let run_config = scope_with_tool_caller_context(run_ctx, &state)?;
1096 Ok(ToolExecutionContext { state, run_config })
1097}
1098
1099pub(super) async fn finalize_run_end(
1100 run_ctx: &mut RunContext,
1101 tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
1102 agent: &dyn Agent,
1103) {
1104 plugin_runtime::emit_run_end_phase(run_ctx, tool_descriptors, agent).await
1105}
1106
1107fn normalize_termination_for_suspended_calls(
1108 run_ctx: &RunContext,
1109 termination: TerminationReason,
1110 response: Option<String>,
1111) -> (TerminationReason, Option<String>) {
1112 let final_termination = if !matches!(
1113 termination,
1114 TerminationReason::Error(_) | TerminationReason::Cancelled
1115 ) && has_suspended_calls(run_ctx)
1116 {
1117 TerminationReason::Suspended
1118 } else {
1119 termination
1120 };
1121 let final_response = if final_termination == TerminationReason::Suspended {
1122 None
1123 } else {
1124 response
1125 };
1126 (final_termination, final_response)
1127}
1128
1129async fn persist_run_termination(
1130 run_ctx: &mut RunContext,
1131 termination: &TerminationReason,
1132 tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
1133 agent: &dyn Agent,
1134 execution_ctx: &RunExecutionContext,
1135 pending_delta_commit: &PendingDeltaCommitContext<'_>,
1136) -> Result<(), AgentLoopError> {
1137 sync_run_lifecycle_for_termination_with_context(run_ctx, execution_ctx, termination)?;
1138 finalize_run_end(run_ctx, tool_descriptors, agent).await;
1139 pending_delta_commit
1140 .commit_run_finished(run_ctx, termination)
1141 .await?;
1142 Ok(())
1143}
1144
1145fn stream_result_from_chat_response(response: &genai::chat::ChatResponse) -> StreamResult {
1146 let text = response
1147 .first_text()
1148 .map(|s| s.to_string())
1149 .unwrap_or_default();
1150 let tool_calls: Vec<crate::contracts::thread::ToolCall> = response
1151 .tool_calls()
1152 .into_iter()
1153 .map(|tc| {
1154 crate::contracts::thread::ToolCall::new(
1155 &tc.call_id,
1156 &tc.fn_name,
1157 tc.fn_arguments.clone(),
1158 )
1159 })
1160 .collect();
1161
1162 let usage = Some(crate::runtime::streaming::token_usage_from_genai(
1163 &response.usage,
1164 ));
1165 let stop_reason = response
1166 .stop_reason
1167 .as_ref()
1168 .and_then(crate::runtime::streaming::map_genai_stop_reason)
1169 .or({
1170 if !tool_calls.is_empty() {
1171 Some(tirea_contract::runtime::inference::StopReason::ToolUse)
1172 } else {
1173 Some(tirea_contract::runtime::inference::StopReason::EndTurn)
1174 }
1175 });
1176 StreamResult {
1177 text,
1178 tool_calls,
1179 usage,
1180 stop_reason,
1181 }
1182}
1183
1184fn assistant_turn_message(
1185 result: &StreamResult,
1186 step_meta: MessageMetadata,
1187 message_id: Option<String>,
1188) -> Message {
1189 let mut msg = if result.tool_calls.is_empty() {
1190 assistant_message(&result.text)
1191 } else {
1192 assistant_tool_calls(&result.text, result.tool_calls.clone())
1193 }
1194 .with_metadata(step_meta);
1195 if let Some(message_id) = message_id {
1196 msg = msg.with_id(message_id);
1197 }
1198 msg
1199}
1200
1201struct RunStartDrainOutcome {
1202 events: Vec<AgentEvent>,
1203 replayed: bool,
1204}
1205
1206fn decision_result_value(action: &ResumeDecisionAction, result: &Value) -> serde_json::Value {
1207 if result.is_null() {
1208 serde_json::Value::Bool(matches!(action, ResumeDecisionAction::Resume))
1209 } else {
1210 result.clone()
1211 }
1212}
1213
1214fn runtime_resume_inputs(run_ctx: &RunContext) -> HashMap<String, ToolCallResume> {
1215 let mut decisions = HashMap::new();
1216 for (call_id, state) in tool_call_states_from_ctx(run_ctx) {
1217 if !matches!(state.status, ToolCallStatus::Resuming) {
1218 continue;
1219 }
1220 let Some(mut resume) = state.resume else {
1221 continue;
1222 };
1223 if resume.decision_id.trim().is_empty() {
1224 resume.decision_id = call_id.clone();
1225 }
1226 decisions.insert(call_id, resume);
1227 }
1228 decisions
1229}
1230
1231fn settle_orphan_resuming_tool_states(
1232 run_ctx: &mut RunContext,
1233 suspended: &HashMap<String, SuspendedCall>,
1234 resumes: &HashMap<String, ToolCallResume>,
1235) -> Result<bool, AgentLoopError> {
1236 let states = tool_call_states_from_ctx(run_ctx);
1237 let mut changed = false;
1238
1239 for (call_id, resume) in resumes {
1240 if suspended.contains_key(call_id) {
1241 continue;
1242 }
1243 let Some(state) = states.get(call_id).cloned() else {
1244 continue;
1245 };
1246 let target_status = match &resume.action {
1247 ResumeDecisionAction::Cancel => ToolCallStatus::Cancelled,
1248 ResumeDecisionAction::Resume => ToolCallStatus::Failed,
1249 };
1250 if state.status == target_status && state.resume.as_ref() == Some(resume) {
1251 continue;
1252 }
1253
1254 let Some(next_state) = transition_tool_call_state(
1255 Some(state.clone()),
1256 ToolCallStateSeed {
1257 call_id: call_id.as_str(),
1258 tool_name: state.tool_name.as_str(),
1259 arguments: &state.arguments,
1260 status: state.status,
1261 resume_token: state.resume_token.clone(),
1262 },
1263 ToolCallStateTransition {
1264 status: target_status,
1265 resume_token: state.resume_token.clone(),
1266 resume: Some(resume.clone()),
1267 updated_at: current_unix_millis(),
1268 },
1269 ) else {
1270 continue;
1271 };
1272
1273 let base_state = run_ctx
1274 .snapshot()
1275 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1276 let patch = upsert_tool_call_state(&base_state, call_id, next_state)?;
1277 if patch.patch().is_empty() {
1278 continue;
1279 }
1280 run_ctx.add_thread_patch(patch);
1281 changed = true;
1282 }
1283
1284 Ok(changed)
1285}
1286
1287fn all_suspended_calls_have_resume(
1288 suspended: &HashMap<String, SuspendedCall>,
1289 resumes: &HashMap<String, ToolCallResume>,
1290) -> bool {
1291 suspended
1292 .keys()
1293 .all(|call_id| resumes.contains_key(call_id))
1294}
1295
1296async fn drain_resuming_tool_calls_and_replay(
1297 run_ctx: &mut RunContext,
1298 tools: &HashMap<String, Arc<dyn Tool>>,
1299 agent: &dyn Agent,
1300 tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
1301) -> Result<RunStartDrainOutcome, AgentLoopError> {
1302 let decisions = runtime_resume_inputs(run_ctx);
1303 if decisions.is_empty() {
1304 return Ok(RunStartDrainOutcome {
1305 events: Vec::new(),
1306 replayed: false,
1307 });
1308 }
1309
1310 let suspended = suspended_calls_from_ctx(run_ctx);
1311 let mut state_changed = false;
1312 if settle_orphan_resuming_tool_states(run_ctx, &suspended, &decisions)? {
1313 state_changed = true;
1314 }
1315 if suspended.is_empty() {
1316 if state_changed {
1317 let snapshot = run_ctx
1318 .snapshot()
1319 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1320 return Ok(RunStartDrainOutcome {
1321 events: vec![AgentEvent::StateSnapshot { snapshot }],
1322 replayed: false,
1323 });
1324 }
1325 return Ok(RunStartDrainOutcome {
1326 events: Vec::new(),
1327 replayed: false,
1328 });
1329 }
1330
1331 if matches!(
1332 agent.tool_executor().decision_replay_policy(),
1333 DecisionReplayPolicy::BatchAllSuspended
1334 ) && !all_suspended_calls_have_resume(&suspended, &decisions)
1335 {
1336 if state_changed {
1337 let snapshot = run_ctx
1338 .snapshot()
1339 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1340 return Ok(RunStartDrainOutcome {
1341 events: vec![AgentEvent::StateSnapshot { snapshot }],
1342 replayed: false,
1343 });
1344 }
1345 return Ok(RunStartDrainOutcome {
1346 events: Vec::new(),
1347 replayed: false,
1348 });
1349 }
1350
1351 let mut events = Vec::new();
1352 let mut decision_ids: Vec<String> = decisions.keys().cloned().collect();
1353 decision_ids.sort();
1354
1355 let mut replayed = false;
1356
1357 for call_id in decision_ids {
1358 let Some(suspended_call) = suspended.get(&call_id).cloned() else {
1359 continue;
1360 };
1361 let Some(decision) = decisions.get(&call_id).cloned() else {
1362 continue;
1363 };
1364 replayed = true;
1365 let decision_result = decision_result_value(&decision.action, &decision.result);
1366 let resume_payload = ToolCallResume {
1367 result: decision_result.clone(),
1368 ..decision.clone()
1369 };
1370 events.push(AgentEvent::ToolCallResumed {
1371 target_id: suspended_call.call_id.clone(),
1372 result: decision_result.clone(),
1373 });
1374
1375 match decision.action {
1376 ResumeDecisionAction::Cancel => {
1377 let cancel_reason = resume_payload.reason.clone();
1378 if upsert_tool_call_lifecycle_state(
1379 run_ctx,
1380 &suspended_call,
1381 ToolCallStatus::Cancelled,
1382 Some(resume_payload),
1383 )? {
1384 state_changed = true;
1385 }
1386 events.push(append_denied_tool_result_message(
1387 run_ctx,
1388 &suspended_call.call_id,
1389 Some(&suspended_call.tool_name),
1390 cancel_reason.as_deref(),
1391 ));
1392 let cleanup_path = format!(
1396 "__tool_call_scope.{}.suspended_call",
1397 suspended_call.call_id
1398 );
1399 let cleanup_patch = tirea_state::Patch::with_ops(vec![tirea_state::Op::delete(
1400 tirea_state::parse_path(&cleanup_path),
1401 )]);
1402 let tracked = tirea_state::TrackedPatch::new(cleanup_patch)
1403 .with_source("framework:scope_cleanup");
1404 if !tracked.patch().is_empty() {
1405 state_changed = true;
1406 run_ctx.add_thread_patch(tracked);
1407 }
1408 }
1409 ResumeDecisionAction::Resume => {
1410 if upsert_tool_call_lifecycle_state(
1411 run_ctx,
1412 &suspended_call,
1413 ToolCallStatus::Resuming,
1414 Some(resume_payload.clone()),
1415 )? {
1416 state_changed = true;
1417 }
1418 let Some(tool_call) = replay_tool_call_for_resolution(
1419 run_ctx,
1420 &suspended_call,
1421 &ToolCallDecision {
1422 target_id: suspended_call.call_id.clone(),
1423 resume: resume_payload.clone(),
1424 },
1425 ) else {
1426 continue;
1427 };
1428 let state = run_ctx
1429 .snapshot()
1430 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1431 let tool = tools.get(&tool_call.name).cloned();
1432 let rt_for_replay = scope_with_tool_caller_context(run_ctx, &state)?;
1433 let replay_phase_ctx = ToolPhaseContext {
1434 tool_descriptors,
1435 agent_behavior: Some(agent.behavior()),
1436 activity_manager: tirea_contract::runtime::activity::NoOpActivityManager::arc(),
1437 run_config: &rt_for_replay,
1438 thread_id: run_ctx.thread_id(),
1439 thread_messages: run_ctx.messages(),
1440 cancellation_token: None,
1441 };
1442 let replay_result = execute_single_tool_with_phases_deferred(
1443 tool.as_deref(),
1444 &tool_call,
1445 &state,
1446 &replay_phase_ctx,
1447 )
1448 .await?;
1449
1450 let replay_msg_id = gen_message_id();
1451 let replay_msg = tool_response(&tool_call.id, &replay_result.execution.result)
1452 .with_id(replay_msg_id.clone());
1453 run_ctx.add_message(Arc::new(replay_msg));
1454
1455 if !replay_result.reminders.is_empty() {
1456 let msgs: Vec<Arc<Message>> = replay_result
1457 .reminders
1458 .iter()
1459 .map(|reminder| {
1460 Arc::new(Message::internal_system(format!(
1461 "<system-reminder>{}</system-reminder>",
1462 reminder
1463 )))
1464 })
1465 .collect();
1466 run_ctx.add_messages(msgs);
1467 }
1468
1469 if let Some(patch) = replay_result.execution.patch.clone() {
1470 state_changed = true;
1471 run_ctx.add_thread_patch(patch);
1472 }
1473 if !replay_result.pending_patches.is_empty() {
1474 state_changed = true;
1475 run_ctx.add_thread_patches(replay_result.pending_patches.clone());
1476 }
1477 events.push(AgentEvent::ToolCallDone {
1478 id: tool_call.id.clone(),
1479 result: replay_result.execution.result,
1480 patch: replay_result.execution.patch,
1481 message_id: replay_msg_id,
1482 });
1483
1484 if let Some(next_suspended_call) = replay_result.suspended_call.clone() {
1485 let state = run_ctx
1486 .snapshot()
1487 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1488 let action = next_suspended_call.clone().into_state_action();
1489 let patches = reduce_state_actions(
1490 vec![action],
1491 &state,
1492 "agent_loop",
1493 &ScopeContext::run(),
1494 )
1495 .map_err(|e| {
1496 AgentLoopError::StateError(format!(
1497 "failed to reduce suspended call action: {e}"
1498 ))
1499 })?;
1500 for patch in patches {
1501 if !patch.patch().is_empty() {
1502 state_changed = true;
1503 run_ctx.add_thread_patch(patch);
1504 }
1505 }
1506 for event in pending_tool_events(&next_suspended_call) {
1507 events.push(event);
1508 }
1509 }
1510 }
1511 }
1512 }
1513
1514 if state_changed {
1519 let snapshot = run_ctx
1520 .snapshot()
1521 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1522 events.push(AgentEvent::StateSnapshot { snapshot });
1523 }
1524
1525 Ok(RunStartDrainOutcome { events, replayed })
1526}
1527
1528async fn drain_run_start_resume_replay(
1529 run_ctx: &mut RunContext,
1530 tools: &HashMap<String, Arc<dyn Tool>>,
1531 agent: &dyn Agent,
1532 tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
1533) -> Result<RunStartDrainOutcome, AgentLoopError> {
1534 drain_resuming_tool_calls_and_replay(run_ctx, tools, agent, tool_descriptors).await
1535}
1536
1537async fn commit_run_start_and_drain_replay(
1538 run_ctx: &mut RunContext,
1539 tools: &HashMap<String, Arc<dyn Tool>>,
1540 agent: &dyn Agent,
1541 active_tool_descriptors: &[crate::contracts::runtime::tool_call::ToolDescriptor],
1542 pending_delta_commit: &PendingDeltaCommitContext<'_>,
1543) -> Result<RunStartDrainOutcome, AgentLoopError> {
1544 pending_delta_commit
1545 .commit(run_ctx, CheckpointReason::UserMessage, false)
1546 .await?;
1547
1548 let run_start_drain =
1549 drain_run_start_resume_replay(run_ctx, tools, agent, active_tool_descriptors).await?;
1550
1551 if run_start_drain.replayed {
1552 pending_delta_commit
1553 .commit(run_ctx, CheckpointReason::ToolResultsCommitted, false)
1554 .await?;
1555 }
1556
1557 Ok(run_start_drain)
1558}
1559
1560fn normalize_decision_tool_result(
1561 response: &serde_json::Value,
1562 fallback_arguments: &serde_json::Value,
1563) -> serde_json::Value {
1564 match response {
1565 serde_json::Value::Bool(_) => fallback_arguments.clone(),
1566 value => value.clone(),
1567 }
1568}
1569
1570fn denied_tool_result_for_call(
1571 run_ctx: &RunContext,
1572 call_id: &str,
1573 fallback_tool_name: Option<&str>,
1574 decision_reason: Option<&str>,
1575) -> ToolResult {
1576 let tool_name = fallback_tool_name
1577 .filter(|name| !name.is_empty())
1578 .map(str::to_string)
1579 .or_else(|| find_tool_call_in_messages(run_ctx, call_id).map(|call| call.name))
1580 .unwrap_or_else(|| "tool".to_string());
1581 let reason = decision_reason
1582 .map(str::to_string)
1583 .filter(|reason| !reason.trim().is_empty())
1584 .unwrap_or_else(|| "User denied the action".to_string());
1585 ToolResult::error(tool_name, reason)
1586}
1587
1588fn append_denied_tool_result_message(
1589 run_ctx: &mut RunContext,
1590 call_id: &str,
1591 fallback_tool_name: Option<&str>,
1592 decision_reason: Option<&str>,
1593) -> AgentEvent {
1594 let denied_result =
1595 denied_tool_result_for_call(run_ctx, call_id, fallback_tool_name, decision_reason);
1596 let message_id = gen_message_id();
1597 let denied_message = tool_response(call_id, &denied_result).with_id(message_id.clone());
1598 run_ctx.add_message(Arc::new(denied_message));
1599 AgentEvent::ToolCallDone {
1600 id: call_id.to_string(),
1601 result: denied_result,
1602 patch: None,
1603 message_id,
1604 }
1605}
1606
1607fn find_tool_call_in_messages(run_ctx: &RunContext, call_id: &str) -> Option<ToolCall> {
1608 run_ctx.messages().iter().rev().find_map(|message| {
1609 message
1610 .tool_calls
1611 .as_ref()
1612 .and_then(|calls| calls.iter().find(|call| call.id == call_id).cloned())
1613 })
1614}
1615
1616fn replay_tool_call_for_resolution(
1617 _run_ctx: &RunContext,
1618 suspended_call: &SuspendedCall,
1619 decision: &ToolCallDecision,
1620) -> Option<ToolCall> {
1621 if matches!(decision.resume.action, ResumeDecisionAction::Cancel) {
1622 return None;
1623 }
1624
1625 match suspended_call.ticket.resume_mode {
1626 ToolCallResumeMode::ReplayToolCall => Some(ToolCall::new(
1627 suspended_call.call_id.clone(),
1628 suspended_call.tool_name.clone(),
1629 suspended_call.arguments.clone(),
1630 )),
1631 ToolCallResumeMode::UseDecisionAsToolResult | ToolCallResumeMode::PassDecisionToTool => {
1632 Some(ToolCall::new(
1633 suspended_call.call_id.clone(),
1634 suspended_call.tool_name.clone(),
1635 normalize_decision_tool_result(&decision.resume.result, &suspended_call.arguments),
1636 ))
1637 }
1638 }
1639}
1640
1641fn upsert_tool_call_lifecycle_state(
1642 run_ctx: &mut RunContext,
1643 suspended_call: &SuspendedCall,
1644 status: ToolCallStatus,
1645 resume: Option<ToolCallResume>,
1646) -> Result<bool, AgentLoopError> {
1647 let current_state = tool_call_states_from_ctx(run_ctx).remove(&suspended_call.call_id);
1648 let Some(tool_state) = transition_tool_call_state(
1649 current_state,
1650 ToolCallStateSeed {
1651 call_id: &suspended_call.call_id,
1652 tool_name: &suspended_call.tool_name,
1653 arguments: &suspended_call.arguments,
1654 status: ToolCallStatus::Suspended,
1655 resume_token: Some(suspended_call.ticket.pending.id.clone()),
1656 },
1657 ToolCallStateTransition {
1658 status,
1659 resume_token: Some(suspended_call.ticket.pending.id.clone()),
1660 resume,
1661 updated_at: current_unix_millis(),
1662 },
1663 ) else {
1664 return Ok(false);
1665 };
1666
1667 let base_state = run_ctx
1668 .snapshot()
1669 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
1670 let patch = upsert_tool_call_state(&base_state, &suspended_call.call_id, tool_state)?;
1671 if patch.patch().is_empty() {
1672 return Ok(false);
1673 }
1674 run_ctx.add_thread_patch(patch);
1675 Ok(true)
1676}
1677
1678pub(super) fn resolve_suspended_call(
1679 run_ctx: &mut RunContext,
1680 response: &ToolCallDecision,
1681) -> Result<Option<DecisionReplayOutcome>, AgentLoopError> {
1682 let suspended_calls = suspended_calls_from_ctx(run_ctx);
1683 if suspended_calls.is_empty() {
1684 return Ok(None);
1685 }
1686
1687 let suspended_call = suspended_calls
1688 .get(&response.target_id)
1689 .cloned()
1690 .or_else(|| {
1691 suspended_calls
1692 .values()
1693 .find(|call| {
1694 call.ticket.suspension.id == response.target_id
1695 || call.ticket.pending.id == response.target_id
1696 || call.call_id == response.target_id
1697 })
1698 .cloned()
1699 });
1700 let Some(suspended_call) = suspended_call else {
1701 return Ok(None);
1702 };
1703
1704 let _ = upsert_tool_call_lifecycle_state(
1705 run_ctx,
1706 &suspended_call,
1707 ToolCallStatus::Resuming,
1708 Some(response.resume.clone()),
1709 )?;
1710
1711 Ok(Some(DecisionReplayOutcome {
1712 events: Vec::new(),
1713 resolved_call_ids: vec![suspended_call.call_id],
1714 }))
1715}
1716
1717pub(super) fn drain_decision_channel(
1718 run_ctx: &mut RunContext,
1719 decision_rx: &mut Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1720 pending_decisions: &mut VecDeque<ToolCallDecision>,
1721) -> Result<DecisionReplayOutcome, AgentLoopError> {
1722 let mut disconnected = false;
1723 if let Some(rx) = decision_rx.as_mut() {
1724 loop {
1725 match rx.try_recv() {
1726 Ok(response) => pending_decisions.push_back(response),
1727 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
1728 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
1729 disconnected = true;
1730 break;
1731 }
1732 }
1733 }
1734 }
1735 if disconnected {
1736 *decision_rx = None;
1737 }
1738
1739 if pending_decisions.is_empty() {
1740 return Ok(DecisionReplayOutcome {
1741 events: Vec::new(),
1742 resolved_call_ids: Vec::new(),
1743 });
1744 }
1745
1746 let mut unresolved = VecDeque::new();
1747 let mut events = Vec::new();
1748 let mut resolved_call_ids = Vec::new();
1749 let mut seen = HashSet::new();
1750
1751 while let Some(response) = pending_decisions.pop_front() {
1752 if let Some(outcome) = resolve_suspended_call(run_ctx, &response)? {
1753 for call_id in outcome.resolved_call_ids {
1754 if seen.insert(call_id.clone()) {
1755 resolved_call_ids.push(call_id);
1756 }
1757 }
1758 events.extend(outcome.events);
1759 } else {
1760 unresolved.push_back(response);
1761 }
1762 }
1763 *pending_decisions = unresolved;
1764
1765 Ok(DecisionReplayOutcome {
1766 events,
1767 resolved_call_ids,
1768 })
1769}
1770
1771async fn replay_after_decisions(
1772 run_ctx: &mut RunContext,
1773 decisions_applied: bool,
1774 step_tool_provider: &Arc<dyn StepToolProvider>,
1775 agent: &dyn Agent,
1776 active_tool_descriptors: &mut Vec<crate::contracts::runtime::tool_call::ToolDescriptor>,
1777 pending_delta_commit: &PendingDeltaCommitContext<'_>,
1778) -> Result<Vec<AgentEvent>, AgentLoopError> {
1779 if !decisions_applied {
1780 return Ok(Vec::new());
1781 }
1782
1783 let decision_tools = resolve_step_tool_snapshot(step_tool_provider, run_ctx).await?;
1784 *active_tool_descriptors = decision_tools.descriptors.clone();
1785
1786 let decision_drain = drain_run_start_resume_replay(
1787 run_ctx,
1788 &decision_tools.tools,
1789 agent,
1790 active_tool_descriptors,
1791 )
1792 .await?;
1793
1794 pending_delta_commit
1795 .commit(run_ctx, CheckpointReason::ToolResultsCommitted, false)
1796 .await?;
1797
1798 Ok(decision_drain.events)
1799}
1800
1801async fn apply_decisions_and_replay(
1802 run_ctx: &mut RunContext,
1803 decision_rx: &mut Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1804 pending_decisions: &mut VecDeque<ToolCallDecision>,
1805 step_tool_provider: &Arc<dyn StepToolProvider>,
1806 agent: &dyn Agent,
1807 active_tool_descriptors: &mut Vec<crate::contracts::runtime::tool_call::ToolDescriptor>,
1808 pending_delta_commit: &PendingDeltaCommitContext<'_>,
1809) -> Result<Vec<AgentEvent>, AgentLoopError> {
1810 Ok(drain_and_replay_decisions(
1811 run_ctx,
1812 decision_rx,
1813 pending_decisions,
1814 None,
1815 DecisionReplayInputs {
1816 step_tool_provider,
1817 agent,
1818 active_tool_descriptors,
1819 },
1820 pending_delta_commit,
1821 )
1822 .await?
1823 .events)
1824}
1825
1826pub(super) struct DecisionReplayOutcome {
1827 events: Vec<AgentEvent>,
1828 resolved_call_ids: Vec<String>,
1829}
1830
1831struct DecisionReplayInputs<'a> {
1832 step_tool_provider: &'a Arc<dyn StepToolProvider>,
1833 agent: &'a dyn Agent,
1834 active_tool_descriptors: &'a mut Vec<crate::contracts::runtime::tool_call::ToolDescriptor>,
1835}
1836
1837async fn drain_and_replay_decisions(
1838 run_ctx: &mut RunContext,
1839 decision_rx: &mut Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1840 pending_decisions: &mut VecDeque<ToolCallDecision>,
1841 decision: Option<ToolCallDecision>,
1842 replay_inputs: DecisionReplayInputs<'_>,
1843 pending_delta_commit: &PendingDeltaCommitContext<'_>,
1844) -> Result<DecisionReplayOutcome, AgentLoopError> {
1845 if let Some(decision) = decision {
1846 pending_decisions.push_back(decision);
1847 }
1848 let decision_drain = drain_decision_channel(run_ctx, decision_rx, pending_decisions)?;
1849 let mut events = decision_drain.events;
1850 let replay_events = replay_after_decisions(
1851 run_ctx,
1852 !decision_drain.resolved_call_ids.is_empty(),
1853 replay_inputs.step_tool_provider,
1854 replay_inputs.agent,
1855 replay_inputs.active_tool_descriptors,
1856 pending_delta_commit,
1857 )
1858 .await?;
1859 events.extend(replay_events);
1860
1861 Ok(DecisionReplayOutcome {
1862 events,
1863 resolved_call_ids: decision_drain.resolved_call_ids,
1864 })
1865}
1866
1867async fn apply_decision_and_replay(
1868 run_ctx: &mut RunContext,
1869 response: ToolCallDecision,
1870 decision_rx: &mut Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1871 pending_decisions: &mut VecDeque<ToolCallDecision>,
1872 replay_inputs: DecisionReplayInputs<'_>,
1873 pending_delta_commit: &PendingDeltaCommitContext<'_>,
1874) -> Result<DecisionReplayOutcome, AgentLoopError> {
1875 drain_and_replay_decisions(
1876 run_ctx,
1877 decision_rx,
1878 pending_decisions,
1879 Some(response),
1880 replay_inputs,
1881 pending_delta_commit,
1882 )
1883 .await
1884}
1885
1886async fn recv_decision(
1887 decision_rx: &mut Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1888) -> Option<ToolCallDecision> {
1889 let rx = decision_rx.as_mut()?;
1890 rx.recv().await
1891}
1892
1893pub async fn run_loop(
1899 agent: &dyn Agent,
1900 tools: HashMap<String, Arc<dyn Tool>>,
1901 run_ctx: RunContext,
1902 cancellation_token: Option<RunCancellationToken>,
1903 state_committer: Option<Arc<dyn StateCommitter>>,
1904 decision_rx: Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1905) -> LoopOutcome {
1906 let execution_ctx = RunExecutionContext::from_run_config(&run_ctx);
1907 run_loop_with_context(
1908 agent,
1909 tools,
1910 run_ctx,
1911 execution_ctx,
1912 cancellation_token,
1913 state_committer,
1914 decision_rx,
1915 )
1916 .await
1917}
1918
1919pub async fn run_loop_with_context(
1925 agent: &dyn Agent,
1926 tools: HashMap<String, Arc<dyn Tool>>,
1927 mut run_ctx: RunContext,
1928 execution_ctx: RunExecutionContext,
1929 cancellation_token: Option<RunCancellationToken>,
1930 state_committer: Option<Arc<dyn StateCommitter>>,
1931 mut decision_rx: Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
1932) -> LoopOutcome {
1933 bind_execution_context_to_run_config(&mut run_ctx, &execution_ctx);
1934
1935 let executor = llm_executor_for_run(agent);
1936 let tool_executor = agent.tool_executor();
1937 let mut run_state = LoopRunState::new();
1938 let mut pending_decisions = VecDeque::new();
1939 let run_cancellation_token = cancellation_token;
1940 let mut last_text = String::new();
1941 let mut continued_response_prefix = String::new();
1942 let step_tool_provider = step_tool_provider_for_run(agent, tools);
1943 let run_id = execution_ctx.run_id.clone();
1944 let baseline_suspended_call_ids = suspended_call_ids(&run_ctx);
1945 let pending_delta_commit =
1946 PendingDeltaCommitContext::new(&execution_ctx, state_committer.as_ref());
1947 let initial_step_tools = match resolve_step_tool_snapshot(&step_tool_provider, &run_ctx).await {
1948 Ok(snapshot) => snapshot,
1949 Err(error) => {
1950 let msg = error.to_string();
1951 return build_loop_outcome(
1952 run_ctx,
1953 TerminationReason::Error(msg.clone()),
1954 None,
1955 &run_state,
1956 Some(outcome::LoopFailure::State(msg)),
1957 );
1958 }
1959 };
1960 let StepToolSnapshot {
1961 tools: initial_tools,
1962 descriptors: initial_descriptors,
1963 } = initial_step_tools;
1964 let mut active_tool_descriptors = initial_descriptors;
1965
1966 macro_rules! terminate_run {
1967 ($termination:expr, $response:expr, $failure:expr) => {{
1968 let reason: TerminationReason = $termination;
1969 let (final_termination, final_response) =
1970 normalize_termination_for_suspended_calls(&run_ctx, reason, $response);
1971 if let Err(error) = persist_run_termination(
1972 &mut run_ctx,
1973 &final_termination,
1974 &active_tool_descriptors,
1975 agent,
1976 &execution_ctx,
1977 &pending_delta_commit,
1978 )
1979 .await
1980 {
1981 let msg = error.to_string();
1982 return build_loop_outcome(
1983 run_ctx,
1984 TerminationReason::Error(msg.clone()),
1985 None,
1986 &run_state,
1987 Some(outcome::LoopFailure::State(msg)),
1988 );
1989 }
1990 return build_loop_outcome(
1991 run_ctx,
1992 final_termination,
1993 final_response,
1994 &run_state,
1995 $failure,
1996 );
1997 }};
1998 }
1999
2000 let (pending, actions) = match plugin_runtime::emit_phase_block(
2002 Phase::RunStart,
2003 &run_ctx,
2004 &active_tool_descriptors,
2005 agent,
2006 |_| {},
2007 )
2008 .await
2009 {
2010 Ok(result) => result,
2011 Err(error) => {
2012 let msg = error.to_string();
2013 terminate_run!(
2014 TerminationReason::Error(msg.clone()),
2015 None,
2016 Some(outcome::LoopFailure::State(msg))
2017 );
2018 }
2019 };
2020 run_ctx.add_thread_patches(pending);
2021 run_ctx.add_serialized_state_actions(actions);
2022 if let Err(error) = commit_run_start_and_drain_replay(
2023 &mut run_ctx,
2024 &initial_tools,
2025 agent,
2026 &active_tool_descriptors,
2027 &pending_delta_commit,
2028 )
2029 .await
2030 {
2031 let msg = error.to_string();
2032 terminate_run!(
2033 TerminationReason::Error(msg.clone()),
2034 None,
2035 Some(outcome::LoopFailure::State(msg))
2036 );
2037 }
2038 let run_start_new_suspended = newly_suspended_call_ids(&run_ctx, &baseline_suspended_call_ids);
2039 if !run_start_new_suspended.is_empty() {
2040 terminate_run!(TerminationReason::Suspended, None, None);
2041 }
2042 loop {
2043 if let Err(error) = apply_decisions_and_replay(
2044 &mut run_ctx,
2045 &mut decision_rx,
2046 &mut pending_decisions,
2047 &step_tool_provider,
2048 agent,
2049 &mut active_tool_descriptors,
2050 &pending_delta_commit,
2051 )
2052 .await
2053 {
2054 let msg = error.to_string();
2055 terminate_run!(
2056 TerminationReason::Error(msg.clone()),
2057 None,
2058 Some(outcome::LoopFailure::State(msg))
2059 );
2060 }
2061
2062 if is_run_cancelled(run_cancellation_token.as_ref()) {
2063 terminate_run!(TerminationReason::Cancelled, None, None);
2064 }
2065
2066 let step_tools = match resolve_step_tool_snapshot(&step_tool_provider, &run_ctx).await {
2067 Ok(snapshot) => snapshot,
2068 Err(e) => {
2069 let msg = e.to_string();
2070 terminate_run!(
2071 TerminationReason::Error(msg.clone()),
2072 None,
2073 Some(outcome::LoopFailure::State(msg))
2074 );
2075 }
2076 };
2077 active_tool_descriptors = step_tools.descriptors.clone();
2078
2079 let prepared = match prepare_step_execution(&run_ctx, &active_tool_descriptors, agent).await
2080 {
2081 Ok(v) => v,
2082 Err(e) => {
2083 let msg = e.to_string();
2084 terminate_run!(
2085 TerminationReason::Error(msg.clone()),
2086 None,
2087 Some(outcome::LoopFailure::State(msg))
2088 );
2089 }
2090 };
2091 run_ctx.add_thread_patches(prepared.pending_patches);
2092
2093 match prepared.run_action {
2094 RunAction::Continue => {}
2095 RunAction::Terminate(reason) => {
2096 let response = if matches!(reason, TerminationReason::BehaviorRequested) {
2097 Some(last_text.clone())
2098 } else {
2099 None
2100 };
2101 terminate_run!(reason, response, None);
2102 }
2103 }
2104
2105 let messages = prepared.messages;
2107 let filtered_tools = prepared.filtered_tools;
2108 let request_transforms = prepared.request_transforms;
2109 let chat_options = agent.chat_options().cloned();
2110 let attempt_outcome = run_llm_with_retry_and_fallback(
2111 agent,
2112 run_cancellation_token.as_ref(),
2113 true,
2114 None,
2115 "unknown llm error",
2116 |model| {
2117 let request = build_request_for_filtered_tools(
2118 &messages,
2119 &step_tools.tools,
2120 &filtered_tools,
2121 &request_transforms,
2122 );
2123 let executor = executor.clone();
2124 let chat_options = chat_options.clone();
2125 async move {
2126 executor
2127 .exec_chat_response(&model, request, chat_options.as_ref())
2128 .await
2129 }
2130 },
2131 )
2132 .await;
2133
2134 let response = match attempt_outcome {
2135 LlmAttemptOutcome::Success {
2136 value, attempts, ..
2137 } => {
2138 run_state.record_llm_attempts(attempts);
2139 value
2140 }
2141 LlmAttemptOutcome::Cancelled => {
2142 append_cancellation_user_message(&mut run_ctx, CancellationStage::Inference);
2143 terminate_run!(TerminationReason::Cancelled, None, None);
2144 }
2145 LlmAttemptOutcome::Exhausted {
2146 last_error,
2147 last_error_class,
2148 attempts,
2149 } => {
2150 run_state.record_llm_attempts(attempts);
2151 if let Err(phase_error) = apply_llm_error_cleanup(
2152 &mut run_ctx,
2153 &active_tool_descriptors,
2154 agent,
2155 "llm_exec_error",
2156 last_error.clone(),
2157 last_error_class,
2158 )
2159 .await
2160 {
2161 let msg = phase_error.to_string();
2162 terminate_run!(
2163 TerminationReason::Error(msg.clone()),
2164 None,
2165 Some(outcome::LoopFailure::State(msg))
2166 );
2167 }
2168 terminate_run!(
2169 TerminationReason::Error(last_error.clone()),
2170 None,
2171 Some(outcome::LoopFailure::Llm(last_error))
2172 );
2173 }
2174 };
2175
2176 let result = stream_result_from_chat_response(&response);
2177 run_state.update_from_response(&result);
2178 last_text = stitch_response_text(&continued_response_prefix, &result.text);
2179
2180 let assistant_msg_id = gen_message_id();
2182 let step_meta = step_metadata(Some(run_id.clone()), run_state.completed_steps as u32);
2183 let post_inference_action = match complete_step_after_inference(
2184 &mut run_ctx,
2185 &result,
2186 step_meta.clone(),
2187 Some(assistant_msg_id.clone()),
2188 &active_tool_descriptors,
2189 agent,
2190 )
2191 .await
2192 {
2193 Ok(action) => action,
2194 Err(e) => {
2195 let msg = e.to_string();
2196 terminate_run!(
2197 TerminationReason::Error(msg.clone()),
2198 None,
2199 Some(outcome::LoopFailure::State(msg))
2200 );
2201 }
2202 };
2203 if let Err(error) = pending_delta_commit
2204 .commit(
2205 &mut run_ctx,
2206 CheckpointReason::AssistantTurnCommitted,
2207 false,
2208 )
2209 .await
2210 {
2211 let msg = error.to_string();
2212 terminate_run!(
2213 TerminationReason::Error(msg.clone()),
2214 None,
2215 Some(outcome::LoopFailure::State(msg))
2216 );
2217 }
2218
2219 mark_step_completed(&mut run_state);
2220
2221 if truncation_recovery::should_retry(&result, &mut run_state) {
2224 extend_response_prefix(&mut continued_response_prefix, &result.text);
2225 let prompt = truncation_recovery::continuation_message();
2226 run_ctx.add_message(Arc::new(prompt));
2227 continue;
2228 }
2229 continued_response_prefix.clear();
2230
2231 let post_inference_termination = match &post_inference_action {
2232 RunAction::Terminate(reason) => Some(reason.clone()),
2233 _ => None,
2234 };
2235
2236 if let Some(reason) = &post_inference_termination {
2240 if !matches!(reason, TerminationReason::Stopped(_)) {
2241 terminate_run!(reason.clone(), Some(last_text.clone()), None);
2242 }
2243 }
2244
2245 if !result.needs_tools() {
2246 run_state.record_step_without_tools();
2247 let reason = post_inference_termination.unwrap_or(TerminationReason::NaturalEnd);
2248 terminate_run!(reason, Some(last_text.clone()), None);
2249 }
2250
2251 let tool_context = match prepare_tool_execution_context(&run_ctx) {
2253 Ok(ctx) => ctx,
2254 Err(e) => {
2255 let msg = e.to_string();
2256 terminate_run!(
2257 TerminationReason::Error(msg.clone()),
2258 None,
2259 Some(outcome::LoopFailure::State(msg))
2260 );
2261 }
2262 };
2263 let thread_messages_for_tools = run_ctx.messages().to_vec();
2264 let thread_version_for_tools = run_ctx.version();
2265
2266 let tool_exec_future = tool_executor.execute(ToolExecutionRequest {
2267 tools: &step_tools.tools,
2268 calls: &result.tool_calls,
2269 state: &tool_context.state,
2270 tool_descriptors: &active_tool_descriptors,
2271 agent_behavior: Some(agent.behavior()),
2272 activity_manager: tirea_contract::runtime::activity::NoOpActivityManager::arc(),
2273 run_config: &tool_context.run_config,
2274 thread_id: run_ctx.thread_id(),
2275 thread_messages: &thread_messages_for_tools,
2276 state_version: thread_version_for_tools,
2277 cancellation_token: run_cancellation_token.as_ref(),
2278 });
2279 let results = tool_exec_future.await.map_err(AgentLoopError::from);
2280
2281 let results = match results {
2282 Ok(r) => r,
2283 Err(AgentLoopError::Cancelled) => {
2284 append_cancellation_user_message(&mut run_ctx, CancellationStage::ToolExecution);
2285 terminate_run!(TerminationReason::Cancelled, None, None);
2286 }
2287 Err(e) => {
2288 let msg = e.to_string();
2289 terminate_run!(
2290 TerminationReason::Error(msg.clone()),
2291 None,
2292 Some(outcome::LoopFailure::State(msg))
2293 );
2294 }
2295 };
2296
2297 if let Err(_e) = apply_tool_results_to_session(
2298 &mut run_ctx,
2299 &results,
2300 Some(step_meta),
2301 tool_executor.requires_parallel_patch_conflict_check(),
2302 ) {
2303 let msg = _e.to_string();
2305 terminate_run!(
2306 TerminationReason::Error(msg.clone()),
2307 None,
2308 Some(outcome::LoopFailure::State(msg))
2309 );
2310 }
2311 if let Err(error) = pending_delta_commit
2312 .commit(&mut run_ctx, CheckpointReason::ToolResultsCommitted, false)
2313 .await
2314 {
2315 let msg = error.to_string();
2316 terminate_run!(
2317 TerminationReason::Error(msg.clone()),
2318 None,
2319 Some(outcome::LoopFailure::State(msg))
2320 );
2321 }
2322
2323 if let Err(error) = apply_decisions_and_replay(
2324 &mut run_ctx,
2325 &mut decision_rx,
2326 &mut pending_decisions,
2327 &step_tool_provider,
2328 agent,
2329 &mut active_tool_descriptors,
2330 &pending_delta_commit,
2331 )
2332 .await
2333 {
2334 let msg = error.to_string();
2335 terminate_run!(
2336 TerminationReason::Error(msg.clone()),
2337 None,
2338 Some(outcome::LoopFailure::State(msg))
2339 );
2340 }
2341
2342 if has_suspended_calls(&run_ctx) {
2344 let has_completed = results
2345 .iter()
2346 .any(|r| !matches!(r.outcome, crate::contracts::ToolCallOutcome::Suspended));
2347 if !has_completed {
2348 terminate_run!(TerminationReason::Suspended, None, None);
2349 }
2350 }
2351
2352 if let Some(reason) = post_inference_termination {
2355 terminate_run!(reason, Some(last_text.clone()), None);
2356 }
2357
2358 let error_count = results
2360 .iter()
2361 .filter(|r| r.execution.result.is_error())
2362 .count();
2363 run_state.record_tool_step(&result.tool_calls, error_count);
2364 }
2365}
2366
2367pub fn run_loop_stream(
2373 agent: Arc<dyn Agent>,
2374 tools: HashMap<String, Arc<dyn Tool>>,
2375 run_ctx: RunContext,
2376 cancellation_token: Option<RunCancellationToken>,
2377 state_committer: Option<Arc<dyn StateCommitter>>,
2378 decision_rx: Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
2379) -> Pin<Box<dyn Stream<Item = AgentEvent> + Send>> {
2380 let execution_ctx = RunExecutionContext::from_run_config(&run_ctx);
2381 run_loop_stream_with_context(
2382 agent,
2383 tools,
2384 run_ctx,
2385 execution_ctx,
2386 cancellation_token,
2387 state_committer,
2388 decision_rx,
2389 )
2390}
2391
2392pub fn run_loop_stream_with_context(
2393 agent: Arc<dyn Agent>,
2394 tools: HashMap<String, Arc<dyn Tool>>,
2395 mut run_ctx: RunContext,
2396 execution_ctx: RunExecutionContext,
2397 cancellation_token: Option<RunCancellationToken>,
2398 state_committer: Option<Arc<dyn StateCommitter>>,
2399 decision_rx: Option<tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>>,
2400) -> Pin<Box<dyn Stream<Item = AgentEvent> + Send>> {
2401 bind_execution_context_to_run_config(&mut run_ctx, &execution_ctx);
2402
2403 stream_runner::run_stream(
2404 agent,
2405 tools,
2406 run_ctx,
2407 execution_ctx,
2408 cancellation_token,
2409 state_committer,
2410 decision_rx,
2411 )
2412}
2413
2414#[cfg(test)]
2415mod tests;