1mod config;
42mod core;
43mod outcome;
44mod plugin_runtime;
45mod run_state;
46mod state_commit;
47mod stream_core;
48mod stream_runner;
49mod tool_exec;
50
51use crate::contracts::plugin::phase::Phase;
52use crate::contracts::runtime::ActivityManager;
53use crate::contracts::runtime::{
54 StopPolicy, StreamResult, ToolExecutionRequest, ToolExecutionResult,
55};
56use crate::contracts::thread::CheckpointReason;
57use crate::contracts::thread::{gen_message_id, Message, MessageMetadata};
58use crate::contracts::tool::Tool;
59use crate::contracts::RunContext;
60use crate::contracts::StopReason;
61use crate::contracts::{AgentEvent, FrontendToolInvocation, Interaction, TerminationReason};
62use crate::engine::convert::{assistant_message, assistant_tool_calls, tool_response};
63use crate::engine::stop_conditions::check_stop_policies;
64use crate::runtime::activity::ActivityHub;
65
66use crate::runtime::streaming::StreamCollector;
67use async_stream::stream;
68use futures::{Stream, StreamExt};
69use genai::Client;
70use std::collections::HashMap;
71use std::future::Future;
72use std::pin::Pin;
73use std::sync::Arc;
74use uuid::Uuid;
75
76#[cfg(test)]
77use crate::contracts::plugin::AgentPlugin;
78pub use crate::contracts::runtime::{LlmExecutor, ToolExecutor};
79pub use crate::runtime::run_context::{
80 await_or_cancel, is_cancelled, CancelAware, RunCancellationToken, StateCommitError,
81 StateCommitter, TOOL_SCOPE_CALLER_AGENT_ID_KEY, TOOL_SCOPE_CALLER_MESSAGES_KEY,
82 TOOL_SCOPE_CALLER_STATE_KEY, TOOL_SCOPE_CALLER_THREAD_ID_KEY,
83};
84use config::StaticStepToolProvider;
85pub use config::{AgentConfig, GenaiLlmExecutor, LlmRetryPolicy};
86pub use config::{StepToolInput, StepToolProvider, StepToolSnapshot};
87#[cfg(test)]
88use core::build_messages;
89use core::{
90 build_request_for_filtered_tools, clear_agent_pending_interaction, drain_agent_outbox,
91 inference_inputs_from_step, pending_frontend_invocation_from_ctx, pending_interaction_from_ctx,
92 set_agent_pending_interaction,
93};
94pub use outcome::{tool_map, tool_map_from_arc, AgentLoopError};
95pub use outcome::{LoopOutcome, LoopStats, LoopUsage};
96#[cfg(test)]
97use plugin_runtime::emit_phase_checked;
98use plugin_runtime::{
99 emit_cleanup_phases_and_apply, emit_phase_block, emit_run_end_phase, run_phase_block,
100};
101use run_state::{effective_stop_conditions, RunState};
102pub use state_commit::ChannelStateCommitter;
103use state_commit::PendingDeltaCommitContext;
104use std::time::{SystemTime, UNIX_EPOCH};
105use tirea_state::TrackedPatch;
106#[cfg(test)]
107use tokio_util::sync::CancellationToken;
108#[cfg(test)]
109use tool_exec::execute_tools_parallel_with_phases;
110use tool_exec::{
111 apply_tool_results_impl, apply_tool_results_to_session, execute_single_tool_with_phases,
112 scope_with_tool_caller_context, step_metadata, ToolPhaseContext,
113};
114pub use tool_exec::{
115 execute_tools, execute_tools_with_config, execute_tools_with_plugins,
116 execute_tools_with_plugins_and_executor, ParallelToolExecutor, SequentialToolExecutor,
117};
118
119pub struct ResolvedRun {
125 pub config: AgentConfig,
127 pub tools: HashMap<String, Arc<dyn Tool>>,
129 pub run_config: crate::contracts::RunConfig,
131}
132
133impl ResolvedRun {
134 #[must_use]
136 pub fn with_tool(mut self, id: String, tool: Arc<dyn Tool>) -> Self {
137 self.tools.insert(id, tool);
138 self
139 }
140
141 #[must_use]
143 pub fn with_plugin(mut self, plugin: Arc<dyn crate::contracts::plugin::AgentPlugin>) -> Self {
144 self.config.plugins.push(plugin);
145 self
146 }
147
148 pub fn overlay_tool_registry(&mut self, registry: &dyn crate::contracts::ToolRegistry) {
150 for (id, tool) in registry.snapshot() {
151 self.tools.entry(id).or_insert(tool);
152 }
153 }
154}
155
156fn uuid_v7() -> String {
157 Uuid::now_v7().simple().to_string()
158}
159
160pub(crate) fn current_unix_millis() -> u64 {
161 SystemTime::now()
162 .duration_since(UNIX_EPOCH)
163 .map_or(0, |d| d.as_millis().min(u128::from(u64::MAX)) as u64)
164}
165
166#[derive(Debug, Clone, Copy, PartialEq, Eq)]
167pub(super) enum CancellationStage {
168 Inference,
169 ToolExecution,
170}
171
172pub(super) const CANCELLATION_INFERENCE_USER_MESSAGE: &str =
173 "The previous run was interrupted during inference. Please continue from the current context.";
174pub(super) const CANCELLATION_TOOL_USER_MESSAGE: &str =
175 "The previous run was interrupted while using tools. Please continue from the current context.";
176
177pub(super) fn append_cancellation_user_message(run_ctx: &mut RunContext, stage: CancellationStage) {
178 let content = match stage {
179 CancellationStage::Inference => CANCELLATION_INFERENCE_USER_MESSAGE,
180 CancellationStage::ToolExecution => CANCELLATION_TOOL_USER_MESSAGE,
181 };
182 run_ctx.add_message(Arc::new(Message::user(content)));
183}
184
185pub(super) fn effective_llm_models(config: &AgentConfig) -> Vec<String> {
186 let mut models = Vec::with_capacity(1 + config.fallback_models.len());
187 models.push(config.model.clone());
188 for model in &config.fallback_models {
189 if model.trim().is_empty() {
190 continue;
191 }
192 if !models.iter().any(|m| m == model) {
193 models.push(model.clone());
194 }
195 }
196 models
197}
198
199pub(super) fn llm_retry_attempts(config: &AgentConfig) -> usize {
200 config.llm_retry_policy.max_attempts_per_model.max(1)
201}
202
203pub(super) fn is_retryable_llm_error(message: &str) -> bool {
204 let lower = message.to_ascii_lowercase();
205 let non_retryable = [
206 "401",
207 "403",
208 "404",
209 "400",
210 "422",
211 "unauthorized",
212 "forbidden",
213 "invalid api key",
214 "invalid_request",
215 "bad request",
216 ];
217 if non_retryable.iter().any(|p| lower.contains(p)) {
218 return false;
219 }
220 let retryable = [
221 "429",
222 "too many requests",
223 "rate limit",
224 "timeout",
225 "timed out",
226 "temporar",
227 "connection",
228 "network",
229 "unavailable",
230 "server error",
231 "502",
232 "503",
233 "504",
234 "reset by peer",
235 "eof",
236 ];
237 retryable.iter().any(|p| lower.contains(p))
238}
239
240pub(super) fn retry_backoff_ms(config: &AgentConfig, retry_index: usize) -> u64 {
241 let initial = config.llm_retry_policy.initial_backoff_ms;
242 let cap = config
243 .llm_retry_policy
244 .max_backoff_ms
245 .max(config.llm_retry_policy.initial_backoff_ms);
246 if retry_index == 0 {
247 return initial.min(cap);
248 }
249 let shift = (retry_index - 1).min(20) as u32;
250 let factor = 1u64.checked_shl(shift).unwrap_or(u64::MAX);
251 initial.saturating_mul(factor).min(cap)
252}
253
254pub(super) async fn wait_retry_backoff(
255 config: &AgentConfig,
256 retry_index: usize,
257 run_cancellation_token: Option<&RunCancellationToken>,
258) -> bool {
259 let wait_ms = retry_backoff_ms(config, retry_index);
260 match await_or_cancel(
261 run_cancellation_token,
262 tokio::time::sleep(std::time::Duration::from_millis(wait_ms)),
263 )
264 .await
265 {
266 CancelAware::Cancelled => true,
267 CancelAware::Value(_) => false,
268 }
269}
270
271pub(super) enum LlmAttemptOutcome<T> {
272 Success {
273 value: T,
274 model: String,
275 attempts: usize,
276 },
277 Cancelled,
278 Exhausted {
279 last_error: String,
280 attempts: usize,
281 },
282}
283
284fn is_run_cancelled(token: Option<&RunCancellationToken>) -> bool {
285 is_cancelled(token)
286}
287
288pub(super) fn step_tool_provider_for_run(
289 config: &AgentConfig,
290 tools: HashMap<String, Arc<dyn Tool>>,
291) -> Arc<dyn StepToolProvider> {
292 config.step_tool_provider.clone().unwrap_or_else(|| {
293 Arc::new(StaticStepToolProvider::new(tools)) as Arc<dyn StepToolProvider>
294 })
295}
296
297pub(super) fn llm_executor_for_run(config: &AgentConfig) -> Arc<dyn LlmExecutor> {
298 config
299 .llm_executor
300 .clone()
301 .unwrap_or_else(|| Arc::new(GenaiLlmExecutor::new(Client::default())))
302}
303
304pub(super) async fn resolve_step_tool_snapshot(
305 step_tool_provider: &Arc<dyn StepToolProvider>,
306 run_ctx: &RunContext,
307) -> Result<StepToolSnapshot, AgentLoopError> {
308 step_tool_provider
309 .provide(StepToolInput { state: run_ctx })
310 .await
311}
312
313fn mark_step_completed(run_state: &mut RunState) {
314 run_state.completed_steps += 1;
315}
316
317fn build_loop_outcome(
318 run_ctx: RunContext,
319 termination: TerminationReason,
320 response: Option<String>,
321 run_state: &RunState,
322 failure: Option<outcome::LoopFailure>,
323) -> LoopOutcome {
324 LoopOutcome {
325 run_ctx,
326 termination,
327 response: response.filter(|text| !text.is_empty()),
328 usage: run_state.usage(),
329 stats: run_state.stats(),
330 failure,
331 }
332}
333
334fn stop_reason_for_step(
335 run_state: &RunState,
336 result: &StreamResult,
337 run_ctx: &RunContext,
338 stop_conditions: &[Arc<dyn StopPolicy>],
339) -> Option<StopReason> {
340 let stop_input = run_state.to_policy_input(result, run_ctx);
341 check_stop_policies(stop_conditions, &stop_input)
342}
343
344pub(super) async fn run_llm_with_retry_and_fallback<T, Invoke, Fut>(
345 config: &AgentConfig,
346 run_cancellation_token: Option<&RunCancellationToken>,
347 retry_current_model: bool,
348 unknown_error: &str,
349 mut invoke: Invoke,
350) -> LlmAttemptOutcome<T>
351where
352 Invoke: FnMut(String) -> Fut,
353 Fut: std::future::Future<Output = genai::Result<T>>,
354{
355 let mut last_llm_error = unknown_error.to_string();
356 let model_candidates = effective_llm_models(config);
357 let max_attempts = llm_retry_attempts(config);
358 let mut total_attempts = 0usize;
359
360 for model in model_candidates {
361 for attempt in 1..=max_attempts {
362 total_attempts = total_attempts.saturating_add(1);
363 let response_res =
364 match await_or_cancel(run_cancellation_token, invoke(model.clone())).await {
365 CancelAware::Cancelled => return LlmAttemptOutcome::Cancelled,
366 CancelAware::Value(resp) => resp,
367 };
368
369 match response_res {
370 Ok(value) => {
371 return LlmAttemptOutcome::Success {
372 value,
373 model,
374 attempts: total_attempts,
375 };
376 }
377 Err(e) => {
378 let message = e.to_string();
379 last_llm_error =
380 format!("model='{model}' attempt={attempt}/{max_attempts}: {message}");
381 let can_retry_same_model = retry_current_model
382 && attempt < max_attempts
383 && is_retryable_llm_error(&message);
384 if can_retry_same_model {
385 let cancelled =
386 wait_retry_backoff(config, attempt, run_cancellation_token).await;
387 if cancelled {
388 return LlmAttemptOutcome::Cancelled;
389 }
390 continue;
391 }
392 break;
393 }
394 }
395 }
396 }
397
398 LlmAttemptOutcome::Exhausted {
399 last_error: last_llm_error,
400 attempts: total_attempts,
401 }
402}
403
404pub(super) async fn run_step_prepare_phases(
405 run_ctx: &RunContext,
406 tool_descriptors: &[crate::contracts::tool::ToolDescriptor],
407 config: &AgentConfig,
408) -> Result<(Vec<Message>, Vec<String>, bool, Vec<TrackedPatch>), AgentLoopError> {
409 let ((messages, filtered_tools, skip_inference), pending) = run_phase_block(
410 run_ctx,
411 tool_descriptors,
412 &config.plugins,
413 &[Phase::StepStart, Phase::BeforeInference],
414 |_| {},
415 |step| inference_inputs_from_step(step, &config.system_prompt),
416 )
417 .await?;
418 Ok((messages, filtered_tools, skip_inference, pending))
419}
420
421pub(super) struct PreparedStep {
422 pub(super) messages: Vec<Message>,
423 pub(super) filtered_tools: Vec<String>,
424 pub(super) skip_inference: bool,
425 pub(super) pending_patches: Vec<TrackedPatch>,
426}
427
428pub(super) async fn prepare_step_execution(
429 run_ctx: &RunContext,
430 tool_descriptors: &[crate::contracts::tool::ToolDescriptor],
431 config: &AgentConfig,
432) -> Result<PreparedStep, AgentLoopError> {
433 let (messages, filtered_tools, skip_inference, pending) =
434 run_step_prepare_phases(run_ctx, tool_descriptors, config).await?;
435 Ok(PreparedStep {
436 messages,
437 filtered_tools,
438 skip_inference,
439 pending_patches: pending,
440 })
441}
442
443pub(super) async fn apply_llm_error_cleanup(
444 run_ctx: &mut RunContext,
445 tool_descriptors: &[crate::contracts::tool::ToolDescriptor],
446 plugins: &[Arc<dyn crate::contracts::plugin::AgentPlugin>],
447 error_type: &'static str,
448 message: String,
449) -> Result<(), AgentLoopError> {
450 emit_cleanup_phases_and_apply(run_ctx, tool_descriptors, plugins, error_type, message).await
451}
452
453pub(super) async fn complete_step_after_inference(
454 run_ctx: &mut RunContext,
455 result: &StreamResult,
456 step_meta: MessageMetadata,
457 assistant_message_id: Option<String>,
458 tool_descriptors: &[crate::contracts::tool::ToolDescriptor],
459 plugins: &[Arc<dyn crate::contracts::plugin::AgentPlugin>],
460) -> Result<(), AgentLoopError> {
461 let pending = emit_phase_block(
462 Phase::AfterInference,
463 run_ctx,
464 tool_descriptors,
465 plugins,
466 |step| {
467 step.response = Some(result.clone());
468 },
469 )
470 .await?;
471 run_ctx.add_thread_patches(pending);
472
473 let assistant = assistant_turn_message(result, step_meta, assistant_message_id);
474 run_ctx.add_message(Arc::new(assistant));
475
476 let pending =
477 emit_phase_block(Phase::StepEnd, run_ctx, tool_descriptors, plugins, |_| {}).await?;
478 run_ctx.add_thread_patches(pending);
479 Ok(())
480}
481
482pub(super) fn interaction_requested_pending_events(interaction: &Interaction) -> [AgentEvent; 2] {
483 [
484 AgentEvent::InteractionRequested {
485 interaction: interaction.clone(),
486 },
487 AgentEvent::Pending {
488 interaction: interaction.clone(),
489 },
490 ]
491}
492
493pub(super) fn pending_tool_events(
502 interaction: &Interaction,
503 frontend_invocation: Option<&FrontendToolInvocation>,
504) -> Vec<AgentEvent> {
505 if let Some(inv) = frontend_invocation {
506 vec![
507 AgentEvent::ToolCallStart {
508 id: inv.call_id.clone(),
509 name: inv.tool_name.clone(),
510 },
511 AgentEvent::ToolCallReady {
512 id: inv.call_id.clone(),
513 name: inv.tool_name.clone(),
514 arguments: inv.arguments.clone(),
515 },
516 ]
517 } else {
518 interaction_requested_pending_events(interaction).to_vec()
519 }
520}
521
522pub(super) struct ToolExecutionContext {
523 pub(super) state: serde_json::Value,
524 pub(super) run_config: tirea_contract::RunConfig,
525}
526
527pub(super) fn prepare_tool_execution_context(
528 run_ctx: &RunContext,
529 config: Option<&AgentConfig>,
530) -> Result<ToolExecutionContext, AgentLoopError> {
531 let state = run_ctx
532 .snapshot()
533 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
534 let run_config = scope_with_tool_caller_context(run_ctx, &state, config)?;
535 Ok(ToolExecutionContext { state, run_config })
536}
537
538pub(super) async fn finalize_run_end(
539 run_ctx: &mut RunContext,
540 tool_descriptors: &[crate::contracts::tool::ToolDescriptor],
541 plugins: &[Arc<dyn crate::contracts::plugin::AgentPlugin>],
542) {
543 emit_run_end_phase(run_ctx, tool_descriptors, plugins).await
544}
545
546fn stream_result_from_chat_response(response: &genai::chat::ChatResponse) -> StreamResult {
547 let text = response
548 .first_text()
549 .map(|s| s.to_string())
550 .unwrap_or_default();
551 let tool_calls: Vec<crate::contracts::thread::ToolCall> = response
552 .tool_calls()
553 .into_iter()
554 .map(|tc| {
555 crate::contracts::thread::ToolCall::new(
556 &tc.call_id,
557 &tc.fn_name,
558 tc.fn_arguments.clone(),
559 )
560 })
561 .collect();
562
563 StreamResult {
564 text,
565 tool_calls,
566 usage: Some(response.usage.clone()),
567 }
568}
569
570fn assistant_turn_message(
571 result: &StreamResult,
572 step_meta: MessageMetadata,
573 message_id: Option<String>,
574) -> Message {
575 let mut msg = if result.tool_calls.is_empty() {
576 assistant_message(&result.text)
577 } else {
578 assistant_tool_calls(&result.text, result.tool_calls.clone())
579 }
580 .with_metadata(step_meta);
581 if let Some(message_id) = message_id {
582 msg = msg.with_id(message_id);
583 }
584 msg
585}
586
587async fn drain_run_start_outbox_and_replay_nonstream(
588 run_ctx: &mut RunContext,
589 tools: &HashMap<String, Arc<dyn Tool>>,
590 config: &AgentConfig,
591 tool_descriptors: &[crate::contracts::tool::ToolDescriptor],
592) -> Result<bool, AgentLoopError> {
593 let outbox = drain_agent_outbox(run_ctx, "agent_outbox_run_start_nonstream")?;
594 if outbox.replay_tool_calls.is_empty() {
595 return Ok(false);
596 }
597
598 for tool_call in &outbox.replay_tool_calls {
599 let state = run_ctx
600 .snapshot()
601 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
602 let tool = tools.get(&tool_call.name).cloned();
603 let rt_for_replay = scope_with_tool_caller_context(run_ctx, &state, Some(config))?;
604 let replay_phase_ctx = ToolPhaseContext {
605 tool_descriptors,
606 plugins: &config.plugins,
607 activity_manager: None,
608 run_config: &rt_for_replay,
609 thread_id: run_ctx.thread_id(),
610 thread_messages: run_ctx.messages(),
611 cancellation_token: None,
612 };
613 let replay_result =
614 execute_single_tool_with_phases(tool.as_deref(), tool_call, &state, &replay_phase_ctx)
615 .await?;
616
617 let replay_msg = tool_response(&tool_call.id, &replay_result.execution.result);
618 run_ctx.add_message(Arc::new(replay_msg));
619
620 if !replay_result.reminders.is_empty() {
621 let msgs: Vec<Arc<Message>> = replay_result
622 .reminders
623 .iter()
624 .map(|reminder| {
625 Arc::new(Message::internal_system(format!(
626 "<system-reminder>{}</system-reminder>",
627 reminder
628 )))
629 })
630 .collect();
631 run_ctx.add_messages(msgs);
632 }
633
634 if let Some(patch) = replay_result.execution.patch {
635 run_ctx.add_thread_patch(patch);
636 }
637 if !replay_result.pending_patches.is_empty() {
638 run_ctx.add_thread_patches(replay_result.pending_patches);
639 }
640
641 if let Some(new_interaction) = replay_result.pending_interaction {
642 let new_frontend_invocation = replay_result.pending_frontend_invocation;
643 let state = run_ctx
644 .snapshot()
645 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
646 let patch =
647 set_agent_pending_interaction(&state, new_interaction, new_frontend_invocation)?;
648 if !patch.patch().is_empty() {
649 run_ctx.add_thread_patch(patch);
650 }
651 return Ok(true);
652 }
653 }
654
655 let state = run_ctx
656 .snapshot()
657 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
658 let clear_patch = clear_agent_pending_interaction(&state)?;
659 if !clear_patch.patch().is_empty() {
660 run_ctx.add_thread_patch(clear_patch);
661 }
662
663 Ok(true)
664}
665
666pub async fn run_loop(
672 config: &AgentConfig,
673 tools: HashMap<String, Arc<dyn Tool>>,
674 mut run_ctx: RunContext,
675 cancellation_token: Option<RunCancellationToken>,
676 state_committer: Option<Arc<dyn StateCommitter>>,
677) -> LoopOutcome {
678 let executor = llm_executor_for_run(config);
679 let mut run_state = RunState::new();
680 let stop_conditions = effective_stop_conditions(config);
681 let run_cancellation_token = cancellation_token;
682 let mut last_text = String::new();
683 let step_tool_provider = step_tool_provider_for_run(config, tools);
684 let run_identity = stream_core::resolve_stream_run_identity(&mut run_ctx);
685 let run_id = run_identity.run_id;
686 let parent_run_id = run_identity.parent_run_id;
687 let pending_delta_commit =
688 PendingDeltaCommitContext::new(&run_id, parent_run_id.as_deref(), state_committer.as_ref());
689 let initial_step_tools = match resolve_step_tool_snapshot(&step_tool_provider, &run_ctx).await {
690 Ok(snapshot) => snapshot,
691 Err(error) => {
692 return build_loop_outcome(
693 run_ctx,
694 TerminationReason::Error,
695 None,
696 &run_state,
697 Some(outcome::LoopFailure::State(error.to_string())),
698 );
699 }
700 };
701 let StepToolSnapshot {
702 tools: initial_tools,
703 descriptors: initial_descriptors,
704 } = initial_step_tools;
705 let mut active_tool_descriptors = initial_descriptors;
706
707 macro_rules! terminate_run {
708 ($termination:expr, $response:expr, $failure:expr) => {{
709 finalize_run_end(&mut run_ctx, &active_tool_descriptors, &config.plugins).await;
710 if let Err(error) = pending_delta_commit
711 .commit(&mut run_ctx, CheckpointReason::RunFinished, true)
712 .await
713 {
714 return build_loop_outcome(
715 run_ctx,
716 TerminationReason::Error,
717 None,
718 &run_state,
719 Some(outcome::LoopFailure::State(error.to_string())),
720 );
721 }
722 return build_loop_outcome(run_ctx, $termination, $response, &run_state, $failure);
723 }};
724 }
725
726 let pending = match emit_phase_block(
728 Phase::RunStart,
729 &run_ctx,
730 &active_tool_descriptors,
731 &config.plugins,
732 |_| {},
733 )
734 .await
735 {
736 Ok(pending) => pending,
737 Err(error) => {
738 terminate_run!(
739 TerminationReason::Error,
740 None,
741 Some(outcome::LoopFailure::State(error.to_string()))
742 );
743 }
744 };
745 run_ctx.add_thread_patches(pending);
746 if let Err(error) = pending_delta_commit
747 .commit(&mut run_ctx, CheckpointReason::UserMessage, false)
748 .await
749 {
750 terminate_run!(
751 TerminationReason::Error,
752 None,
753 Some(outcome::LoopFailure::State(error.to_string()))
754 );
755 }
756
757 let replayed_at_run_start = match drain_run_start_outbox_and_replay_nonstream(
758 &mut run_ctx,
759 &initial_tools,
760 config,
761 &active_tool_descriptors,
762 )
763 .await
764 {
765 Ok(replayed) => replayed,
766 Err(error) => {
767 terminate_run!(
768 TerminationReason::Error,
769 None,
770 Some(outcome::LoopFailure::State(error.to_string()))
771 );
772 }
773 };
774 if replayed_at_run_start {
775 if let Err(error) = pending_delta_commit
776 .commit(&mut run_ctx, CheckpointReason::ToolResultsCommitted, false)
777 .await
778 {
779 terminate_run!(
780 TerminationReason::Error,
781 None,
782 Some(outcome::LoopFailure::State(error.to_string()))
783 );
784 }
785 }
786 if pending_interaction_from_ctx(&run_ctx).is_some() {
787 terminate_run!(TerminationReason::PendingInteraction, None, None);
788 }
789
790 loop {
791 if is_run_cancelled(run_cancellation_token.as_ref()) {
792 terminate_run!(TerminationReason::Cancelled, None, None);
793 }
794
795 let step_tools = match resolve_step_tool_snapshot(&step_tool_provider, &run_ctx).await {
796 Ok(snapshot) => snapshot,
797 Err(e) => {
798 terminate_run!(
799 TerminationReason::Error,
800 None,
801 Some(outcome::LoopFailure::State(e.to_string()))
802 );
803 }
804 };
805 active_tool_descriptors = step_tools.descriptors.clone();
806
807 let prepared =
808 match prepare_step_execution(&run_ctx, &active_tool_descriptors, config).await {
809 Ok(v) => v,
810 Err(e) => {
811 terminate_run!(
812 TerminationReason::Error,
813 None,
814 Some(outcome::LoopFailure::State(e.to_string()))
815 );
816 }
817 };
818 run_ctx.add_thread_patches(prepared.pending_patches);
819
820 if prepared.skip_inference {
821 if pending_interaction_from_ctx(&run_ctx).is_some() {
822 terminate_run!(TerminationReason::PendingInteraction, None, None);
823 }
824 terminate_run!(
825 TerminationReason::PluginRequested,
826 Some(last_text.clone()),
827 None
828 );
829 }
830
831 let messages = prepared.messages;
833 let filtered_tools = prepared.filtered_tools;
834 let attempt_outcome = run_llm_with_retry_and_fallback(
835 config,
836 run_cancellation_token.as_ref(),
837 true,
838 "unknown llm error",
839 |model| {
840 let request =
841 build_request_for_filtered_tools(&messages, &step_tools.tools, &filtered_tools);
842 let executor = executor.clone();
843 async move {
844 executor
845 .exec_chat_response(&model, request, config.chat_options.as_ref())
846 .await
847 }
848 },
849 )
850 .await;
851
852 let response = match attempt_outcome {
853 LlmAttemptOutcome::Success {
854 value, attempts, ..
855 } => {
856 run_state.record_llm_attempts(attempts);
857 value
858 }
859 LlmAttemptOutcome::Cancelled => {
860 append_cancellation_user_message(&mut run_ctx, CancellationStage::Inference);
861 terminate_run!(TerminationReason::Cancelled, None, None);
862 }
863 LlmAttemptOutcome::Exhausted {
864 last_error,
865 attempts,
866 } => {
867 run_state.record_llm_attempts(attempts);
868 if let Err(phase_error) = apply_llm_error_cleanup(
869 &mut run_ctx,
870 &active_tool_descriptors,
871 &config.plugins,
872 "llm_exec_error",
873 last_error.clone(),
874 )
875 .await
876 {
877 terminate_run!(
878 TerminationReason::Error,
879 None,
880 Some(outcome::LoopFailure::State(phase_error.to_string()))
881 );
882 }
883 terminate_run!(
884 TerminationReason::Error,
885 None,
886 Some(outcome::LoopFailure::Llm(last_error))
887 );
888 }
889 };
890
891 let result = stream_result_from_chat_response(&response);
892 run_state.update_from_response(&result);
893 last_text = result.text.clone();
894
895 let assistant_msg_id = gen_message_id();
897 let step_meta = step_metadata(Some(run_id.clone()), run_state.completed_steps as u32);
898 if let Err(e) = complete_step_after_inference(
899 &mut run_ctx,
900 &result,
901 step_meta.clone(),
902 Some(assistant_msg_id.clone()),
903 &active_tool_descriptors,
904 &config.plugins,
905 )
906 .await
907 {
908 terminate_run!(
909 TerminationReason::Error,
910 None,
911 Some(outcome::LoopFailure::State(e.to_string()))
912 );
913 }
914 if let Err(error) = pending_delta_commit
915 .commit(
916 &mut run_ctx,
917 CheckpointReason::AssistantTurnCommitted,
918 false,
919 )
920 .await
921 {
922 terminate_run!(
923 TerminationReason::Error,
924 None,
925 Some(outcome::LoopFailure::State(error.to_string()))
926 );
927 }
928
929 mark_step_completed(&mut run_state);
930
931 if !result.needs_tools() {
932 run_state.record_step_without_tools();
933 if let Some(reason) =
934 stop_reason_for_step(&run_state, &result, &run_ctx, &stop_conditions)
935 {
936 terminate_run!(TerminationReason::Stopped(reason), None, None);
937 }
938 terminate_run!(TerminationReason::NaturalEnd, Some(last_text.clone()), None);
939 }
940
941 let tool_context = match prepare_tool_execution_context(&run_ctx, Some(config)) {
943 Ok(ctx) => ctx,
944 Err(e) => {
945 terminate_run!(
946 TerminationReason::Error,
947 None,
948 Some(outcome::LoopFailure::State(e.to_string()))
949 );
950 }
951 };
952 let thread_messages_for_tools = run_ctx.messages().to_vec();
953 let thread_version_for_tools = run_ctx.version();
954
955 let tool_exec_future = config.tool_executor.execute(ToolExecutionRequest {
956 tools: &step_tools.tools,
957 calls: &result.tool_calls,
958 state: &tool_context.state,
959 tool_descriptors: &active_tool_descriptors,
960 plugins: &config.plugins,
961 activity_manager: None,
962 run_config: &tool_context.run_config,
963 thread_id: run_ctx.thread_id(),
964 thread_messages: &thread_messages_for_tools,
965 state_version: thread_version_for_tools,
966 cancellation_token: run_cancellation_token.as_ref(),
967 });
968 let results = tool_exec_future.await.map_err(AgentLoopError::from);
969
970 let results = match results {
971 Ok(r) => r,
972 Err(AgentLoopError::Cancelled { .. }) => {
973 append_cancellation_user_message(&mut run_ctx, CancellationStage::ToolExecution);
974 terminate_run!(TerminationReason::Cancelled, None, None);
975 }
976 Err(e) => {
977 terminate_run!(
978 TerminationReason::Error,
979 None,
980 Some(outcome::LoopFailure::State(e.to_string()))
981 );
982 }
983 };
984
985 let applied = match apply_tool_results_to_session(
986 &mut run_ctx,
987 &results,
988 Some(step_meta),
989 config
990 .tool_executor
991 .requires_parallel_patch_conflict_check(),
992 ) {
993 Ok(a) => a,
994 Err(_e) => {
995 terminate_run!(
997 TerminationReason::Error,
998 None,
999 Some(outcome::LoopFailure::State(_e.to_string()))
1000 );
1001 }
1002 };
1003 if let Err(error) = pending_delta_commit
1004 .commit(&mut run_ctx, CheckpointReason::ToolResultsCommitted, false)
1005 .await
1006 {
1007 terminate_run!(
1008 TerminationReason::Error,
1009 None,
1010 Some(outcome::LoopFailure::State(error.to_string()))
1011 );
1012 }
1013
1014 if let Some(_interaction) = applied.pending_interaction {
1016 terminate_run!(TerminationReason::PendingInteraction, None, None);
1017 }
1018
1019 let error_count = results
1021 .iter()
1022 .filter(|r| r.execution.result.is_error())
1023 .count();
1024 run_state.record_tool_step(&result.tool_calls, error_count);
1025
1026 if let Some(reason) = stop_reason_for_step(&run_state, &result, &run_ctx, &stop_conditions)
1028 {
1029 terminate_run!(TerminationReason::Stopped(reason), None, None);
1030 }
1031 }
1032}
1033
1034pub fn run_loop_stream(
1040 config: AgentConfig,
1041 tools: HashMap<String, Arc<dyn Tool>>,
1042 run_ctx: RunContext,
1043 cancellation_token: Option<RunCancellationToken>,
1044 state_committer: Option<Arc<dyn StateCommitter>>,
1045) -> Pin<Box<dyn Stream<Item = AgentEvent> + Send>> {
1046 stream_runner::run_loop_stream_impl(config, tools, run_ctx, cancellation_token, state_committer)
1047}
1048
1049#[cfg(test)]
1050mod tests;