1use super::AgentRunner;
2use super::continuation::{
3 CompletionAssessment, ContinuationController, VerificationResult, is_review_like_task,
4};
5use super::helpers::detect_textual_exec_tool_call;
6use super::orchestration::EvaluatorGateOutcome;
7use super::prompt_alignment;
8use crate::config::build_openai_prompt_cache_key;
9use crate::config::constants::tools;
10use crate::config::models::{ModelId, Provider as ModelProvider};
11use crate::config::tool_loop_limit_reached;
12use crate::config::types::{ReasoningEffortLevel, SystemPromptMode, VerbosityLevel};
13use crate::core::agent::blocked_handoff::write_blocked_handoff;
14use crate::core::agent::completion::{check_completion_candidate, check_for_response_loop};
15use crate::core::agent::conversation::{
16 build_conversation, build_messages_from_conversation, conversation_from_messages,
17};
18use crate::core::agent::events::ExecEventRecorder;
19use crate::core::agent::harness_artifacts::existing_harness_artifact_paths;
20use crate::core::agent::harness_kernel::{
21 HarnessRequestPlanInput, SessionToolCatalogSnapshot, build_harness_request_plan,
22};
23use crate::core::agent::runtime::{AgentRuntime, RuntimeControl};
24use crate::core::agent::session::AgentSessionState;
25use crate::core::agent::task::{ContextItem, Task, TaskOutcome, TaskResults};
26use crate::exec::events::HarnessEventKind;
27use crate::llm::model_resolver::ModelResolver;
28use crate::llm::provider::{
29 FinishReason, Message, ResponsesContinuationState, ToolCall, ToolDefinition,
30 prepare_responses_continuation_request, responses_continuation_key,
31 supports_responses_chaining,
32};
33use crate::llm::providers::gemini::wire::Part;
34use crate::prompts::{
35 PromptContext, RuntimePromptContract, append_runtime_mode_sections,
36 append_runtime_tool_prompt_sections, upsert_harness_limits_section,
37};
38use crate::utils::colors::style;
39use anyhow::Result;
40use serde_json::json;
41use std::sync::Arc;
42use tracing::{debug, warn};
43
44fn record_terminal_turn_event(
45 event_recorder: &mut ExecEventRecorder,
46 outcome: &TaskOutcome,
47 usage: vtcode_exec_events::Usage,
48) {
49 if outcome.is_success() {
50 event_recorder.record_thread_event(vtcode_exec_events::ThreadEvent::TurnCompleted(
51 vtcode_exec_events::TurnCompletedEvent { usage },
52 ));
53 } else {
54 event_recorder.record_thread_event(vtcode_exec_events::ThreadEvent::TurnFailed(
55 vtcode_exec_events::TurnFailedEvent {
56 message: outcome.description(),
57 usage: Some(usage),
58 },
59 ));
60 }
61}
62
63fn emit_blocked_handoff_events(
64 event_recorder: &mut ExecEventRecorder,
65 current_path: &std::path::Path,
66 archive_path: &std::path::Path,
67) {
68 for path in [current_path, archive_path] {
69 event_recorder.harness_event(
70 HarnessEventKind::BlockedHandoffWritten,
71 Some("Blocked handoff written".to_string()),
72 None,
73 Some(path.display().to_string()),
74 None,
75 );
76 }
77}
78
79fn summarize_verification_output(result: &serde_json::Value) -> String {
80 result
81 .get("output")
82 .and_then(serde_json::Value::as_str)
83 .or_else(|| result.get("stderr").and_then(serde_json::Value::as_str))
84 .or_else(|| result.get("stdout").and_then(serde_json::Value::as_str))
85 .map(str::trim)
86 .filter(|text| !text.is_empty())
87 .map(|text| {
88 let truncated = text
89 .lines()
90 .take(20)
91 .collect::<Vec<_>>()
92 .join("\n")
93 .trim()
94 .to_string();
95 if truncated.len() < text.len() {
96 format!("{truncated}\n...")
97 } else {
98 truncated
99 }
100 })
101 .unwrap_or_default()
102}
103
104fn prepare_responses_request_messages<'a>(
105 previous_chains: &mut hashbrown::HashMap<(String, String), ResponsesContinuationState>,
106 provider_name: &str,
107 provider_supports_responses_compaction: bool,
108 model: &str,
109 messages: &'a [Message],
110) -> (std::borrow::Cow<'a, [Message]>, Option<String>) {
111 let key = responses_continuation_key(provider_name, model);
112 let continuation = key.as_ref().and_then(|k| previous_chains.get(k));
113 let prepared = prepare_responses_continuation_request(
114 provider_name,
115 provider_supports_responses_compaction,
116 messages,
117 continuation,
118 );
119 if prepared.clear_stale_chain
120 && let Some(key) = key
121 {
122 previous_chains.remove(&key);
123 }
124
125 (prepared.messages, prepared.previous_response_id)
126}
127
128fn stop_reason_from_finish_reason(finish_reason: &FinishReason) -> String {
129 match finish_reason {
130 FinishReason::Stop => "end_turn".to_string(),
131 FinishReason::Length => "max_tokens".to_string(),
132 FinishReason::ToolCalls => "tool_calls".to_string(),
133 FinishReason::ContentFilter => "content_filter".to_string(),
134 FinishReason::Pause => "pause_turn".to_string(),
135 FinishReason::Refusal => "refusal".to_string(),
136 FinishReason::Error(message) => message.clone(),
137 }
138}
139
140fn estimate_session_cost_usd(
141 provider: &str,
142 model: &str,
143 usage: &vtcode_exec_events::Usage,
144) -> Option<f64> {
145 let usage = crate::llm::provider::Usage {
146 prompt_tokens: u32::try_from(usage.input_tokens).unwrap_or(u32::MAX),
147 completion_tokens: u32::try_from(usage.output_tokens).unwrap_or(u32::MAX),
148 total_tokens: u32::try_from(usage.input_tokens.saturating_add(usage.output_tokens))
149 .unwrap_or(u32::MAX),
150 cached_prompt_tokens: Some(u32::try_from(usage.cached_input_tokens).unwrap_or(u32::MAX)),
151 cache_creation_tokens: Some(u32::try_from(usage.cache_creation_tokens).unwrap_or(u32::MAX)),
152 cache_read_tokens: Some(u32::try_from(usage.cached_input_tokens).unwrap_or(u32::MAX)),
153 iterations: None,
154 };
155 let resolved = ModelResolver::resolve(Some(provider), model, &[], None)?;
156 let pricing = resolved.pricing()?;
157 ModelResolver::estimate_cost(pricing, &usage)
158}
159
160struct RuntimePromptBundle {
161 system_instruction: Arc<String>,
162 tool_snapshot: SessionToolCatalogSnapshot,
163 request_tools: Option<Arc<Vec<ToolDefinition>>>,
164}
165
166impl AgentRunner {
167 async fn compose_task_system_prompt(
168 &self,
169 prompt_tools: Arc<Vec<ToolDefinition>>,
170 is_simple_task: bool,
171 ) -> Result<String> {
172 if !is_simple_task {
173 return Ok(self.system_prompt.clone());
174 }
175
176 let mut config = self.config().clone();
177 config.agent.system_prompt_mode = SystemPromptMode::Minimal;
178 let mut prompt_context = PromptContext::from_workspace_tools(
179 self._workspace.as_path(),
180 prompt_tools
181 .iter()
182 .map(|tool| tool.function_name().to_string()),
183 );
184 prompt_context.load_available_skills();
185
186 let prompt = super::helpers::compose_system_prompt_with_appendix(
187 self._workspace.as_path(),
188 &config,
189 &prompt_context,
190 )
191 .await?;
192
193 Ok(prompt)
194 }
195
196 async fn build_runtime_prompt_bundle(
197 &self,
198 is_simple_task: bool,
199 ) -> Result<RuntimePromptBundle> {
200 let tool_snapshot = self.build_universal_tool_snapshot().await?;
201 let request_tools = tool_snapshot.snapshot.clone();
202 let prompt_tools = request_tools
203 .clone()
204 .unwrap_or_else(|| Arc::new(Vec::new()));
205 let mut system_prompt = self
206 .compose_task_system_prompt(prompt_tools, is_simple_task)
207 .await?;
208
209 let plan_mode = self.tool_registry.is_plan_mode();
210 let request_user_input_enabled =
211 self.features().request_user_input_enabled(plan_mode, false);
212 let full_auto_active = self
213 .tool_registry
214 .current_full_auto_allowlist()
215 .await
216 .is_some();
217
218 append_runtime_mode_sections(
219 &mut system_prompt,
220 RuntimePromptContract {
221 full_auto: full_auto_active,
222 plan_mode,
223 request_user_input_enabled,
224 },
225 );
226 upsert_harness_limits_section(
227 &mut system_prompt,
228 self.config().agent.harness.max_tool_calls_per_turn,
229 self.config().agent.harness.max_tool_wall_clock_secs,
230 self.config().agent.harness.max_tool_retries,
231 );
232 append_runtime_tool_prompt_sections(&mut system_prompt, &tool_snapshot, true);
233
234 Ok(RuntimePromptBundle {
235 system_instruction: Arc::new(system_prompt),
236 tool_snapshot,
237 request_tools,
238 })
239 }
240
241 async fn build_validated_runtime_prompt_bundle(
242 &self,
243 is_simple_task: bool,
244 ) -> Result<RuntimePromptBundle> {
245 let mut runner = self;
246 let bundle = runner.build_runtime_prompt_bundle(is_simple_task).await?;
247 prompt_alignment::rebuild_once_on_alignment_mismatch(
248 &mut runner,
249 bundle,
250 |runner| Box::pin((*runner).build_runtime_prompt_bundle(is_simple_task)),
251 |runner, bundle| {
252 let plan_mode = runner.tool_registry.is_plan_mode();
253 let request_user_input_enabled = runner
254 .features()
255 .request_user_input_enabled(plan_mode, false);
256 prompt_alignment::validate_prompt_catalog_alignment(
257 &bundle.system_instruction,
258 &bundle.tool_snapshot,
259 plan_mode,
260 request_user_input_enabled,
261 )
262 },
263 "prompt/catalog alignment mismatch; rebuilding runtime prompt bundle",
264 "prompt/catalog alignment mismatch persisted after rebuild",
265 )
266 .await
267 }
268
269 async fn refresh_runtime_prompt_bundle_if_catalog_changed(
270 &self,
271 bundle: &mut RuntimePromptBundle,
272 is_simple_task: bool,
273 ) -> Result<bool> {
274 let current_version = self.tool_registry.tool_catalog_state().current_version();
275 if current_version == bundle.tool_snapshot.version {
276 return Ok(false);
277 }
278
279 debug!(
280 old_version = bundle.tool_snapshot.version,
281 new_version = current_version,
282 "Tool catalog changed mid-task; refreshing runtime prompt bundle"
283 );
284 *bundle = self
285 .build_validated_runtime_prompt_bundle(is_simple_task)
286 .await?;
287 Ok(true)
288 }
289
290 async fn resolve_completion_acceptance(
291 &mut self,
292 effective_task: &Task,
293 session_state: &mut AgentSessionState,
294 event_recorder: &mut ExecEventRecorder,
295 orchestration_enabled: bool,
296 verification_results: &[VerificationResult],
297 revision_rounds_used: &mut usize,
298 max_revision_rounds: usize,
299 should_write_blocked_handoff: &mut bool,
300 ) -> Result<bool> {
301 if !orchestration_enabled {
302 session_state.is_completed = true;
303 session_state.outcome = TaskOutcome::Success;
304 return Ok(true);
305 }
306
307 match self
308 .apply_evaluator_gate(
309 effective_task,
310 session_state,
311 event_recorder,
312 verification_results,
313 revision_rounds_used,
314 max_revision_rounds,
315 )
316 .await?
317 {
318 EvaluatorGateOutcome::Accept => {
319 session_state.is_completed = true;
320 session_state.outcome = TaskOutcome::Success;
321 Ok(true)
322 }
323 EvaluatorGateOutcome::Continue { prompt } => {
324 session_state.add_user_message(prompt);
325 Ok(false)
326 }
327 EvaluatorGateOutcome::Exhausted { reason } => {
328 session_state.outcome = TaskOutcome::Failed { reason };
329 *should_write_blocked_handoff = true;
330 Ok(true)
331 }
332 }
333 }
334
335 async fn run_verification_commands(
336 &self,
337 commands: &[String],
338 event_recorder: &mut ExecEventRecorder,
339 ) -> Result<Vec<VerificationResult>> {
340 let mut results = Vec::with_capacity(commands.len());
341 for command in commands {
342 let command_event = event_recorder.command_started(command);
343 let payload = json!({
344 "action": "run",
345 "command": command,
346 "workdir": self._workspace.display().to_string(),
347 "yield_time_ms": 1000,
348 });
349 let result = self
350 .tool_registry
351 .execute_harness_unified_exec(payload)
352 .await?;
353 let exit_code = result
354 .get("exit_code")
355 .and_then(serde_json::Value::as_i64)
356 .map(|value| value as i32);
357 let success = exit_code.unwrap_or(0) == 0;
358 let output = summarize_verification_output(&result);
359 event_recorder.command_finished(
360 &command_event,
361 if success {
362 crate::exec::events::CommandExecutionStatus::Completed
363 } else {
364 crate::exec::events::CommandExecutionStatus::Failed
365 },
366 exit_code,
367 &output,
368 );
369 results.push(VerificationResult {
370 command: command.clone(),
371 success,
372 exit_code,
373 output,
374 });
375 if !success {
376 break;
377 }
378 }
379 Ok(results)
380 }
381
382 pub async fn execute_task(
384 &mut self,
385 task: &Task,
386 contexts: &[ContextItem],
387 ) -> Result<TaskResults> {
388 self.tool_registry
390 .set_harness_session(self.session_id.clone());
391 self.tool_registry.set_harness_task(Some(task.id.clone()));
392
393 if let Err(err) = self.tool_registry.initialize_async().await {
395 warn!(
396 error = %err,
397 "Tool registry initialization failed at task start"
398 );
399 }
400
401 let steering_receiver = self.steering_receiver.lock().take();
402 let runtime_setup = {
403 let agent_prefix = format!("[{}]", self.agent_type);
405 let mut event_recorder = ExecEventRecorder::new(
407 self.session_id.clone(),
408 self.event_sink.clone(),
409 Some(self.thread_handle.clone()),
410 );
411 event_recorder.turn_started();
412 self.runner_println(format_args!(
413 "{agent_prefix} Analyzing request and planning approach..."
414 ));
415
416 self.runner_println(format_args!(
417 "{} Executing {} task: {}",
418 style("[AGENT]").magenta().bold().on_black(),
419 self.agent_type,
420 task.title
421 ));
422
423 let run_started_at = std::time::Instant::now();
424 let is_simple_task = Self::is_simple_task(task, contexts);
425 let prompt_bundle = self
426 .build_validated_runtime_prompt_bundle(is_simple_task)
427 .await?;
428
429 let review_like = is_review_like_task(task);
430 let full_auto_active = self
431 .tool_registry
432 .current_full_auto_allowlist()
433 .await
434 .is_some();
435
436 let mut conversation = conversation_from_messages(&self.bootstrap_messages);
437 conversation.extend(build_conversation(task, contexts));
438
439 let conversation_messages = build_messages_from_conversation(&conversation);
442
443 let max_tool_loops = self.config().tools.max_tool_loops;
446 let preserve_recent_turns = self.config().context.preserve_recent_turns;
447 let max_context_tokens = self.config().context.max_context_tokens;
448
449 let mut session_state = AgentSessionState::new(
450 self.session_id.clone(),
451 self.max_turns,
452 max_tool_loops,
453 max_context_tokens,
454 );
455 session_state.conversation = conversation;
456 session_state.messages = conversation_messages;
457 session_state.last_processed_message_idx = session_state.conversation.len();
458
459 let mut runtime = AgentRuntime::new(session_state, None, steering_receiver);
460
461 if let Err(err) = self.tool_registry.initialize_async().await {
462 warn!(
463 error = %err,
464 "Tool registry initialization failed at task start"
465 );
466 runtime
467 .state
468 .warnings
469 .push(format!("Tool registry init failed: {err}"));
470 }
471 let orchestration_enabled =
472 self.harness_plan_build_evaluate_enabled(full_auto_active, review_like);
473 let planner_artifacts = if orchestration_enabled {
474 Some(self.run_planner_phase(task, &mut event_recorder).await?)
475 } else {
476 None
477 };
478 let effective_task = planner_artifacts
479 .as_ref()
480 .map(|artifacts| self.augment_generator_task(task, artifacts))
481 .unwrap_or_else(|| task.clone());
482
483 let mut continuation_controller = ContinuationController::new(
484 self._workspace.clone(),
485 self.tool_registry.plan_mode_state(),
486 self.config().agent.harness.continuation_policy.clone(),
487 full_auto_active,
488 self.tool_registry.is_plan_mode(),
489 review_like,
490 );
491 continuation_controller.prepare(&effective_task).await?;
492
493 (
494 agent_prefix,
495 event_recorder,
496 run_started_at,
497 is_simple_task,
498 prompt_bundle,
499 preserve_recent_turns,
500 max_tool_loops,
501 max_context_tokens,
502 runtime,
503 continuation_controller,
504 effective_task,
505 orchestration_enabled,
506 )
507 };
508
509 let (
510 agent_prefix,
511 mut event_recorder,
512 run_started_at,
513 is_simple_task,
514 mut prompt_bundle,
515 preserve_recent_turns,
516 max_tool_loops,
517 max_context_tokens,
518 mut runtime,
519 mut continuation_controller,
520 effective_task,
521 orchestration_enabled,
522 ) = runtime_setup;
523 let mut cost_warning_emitted = false;
524 let max_budget_usd = self.config().agent.harness.max_budget_usd;
525 let max_revision_rounds = self.config().agent.harness.max_revision_rounds.max(1);
526 let mut revision_rounds_used = 0usize;
527 let mut should_write_blocked_handoff = false;
528
529 let result = {
530 for turn in 0..self.max_turns {
531 if matches!(
532 runtime.poll_turn_control().await,
533 RuntimeControl::StopRequested
534 ) {
535 self.runner_println(format_args!(
536 "{} {}",
537 agent_prefix,
538 style("Stopped by steering signal.").red().bold()
539 ));
540 runtime.state.outcome = TaskOutcome::Cancelled;
541 break;
542 }
543
544 if let Some(input) = runtime.run_until_idle() {
545 self.runner_println(format_args!(
546 "{} {}: {}",
547 agent_prefix,
548 style("Follow-up Input").cyan().bold(),
549 input
550 ));
551 }
552
553 let utilization = runtime.state.utilization();
554 if utilization > 0.90 {
555 warn!("Context at {:.1}% - approaching limit", utilization * 100.0);
556 runtime.state.warnings.push(format!(
557 "Token budget at {}% - approaching context limit",
558 (utilization * 100.0) as u32
559 ));
560 }
561
562 if runtime.state.is_completed {
563 runtime.state.outcome = TaskOutcome::Success;
564 break;
565 }
566
567 self.runner_println(format_args!(
568 "{} {} is processing turn {}...",
569 agent_prefix,
570 style("(PROC)").cyan().bold(),
571 turn + 1
572 ));
573
574 let turn_model = self.get_selected_model();
575 let provider_name = self.provider_client.name().to_string();
576 if std::env::var_os("VTCODE_DEBUG_PROVIDER").is_some() {
577 tracing::debug!(
578 provider_client = self.provider_client.name(),
579 turn_model,
580 "Provider debug turn selection"
581 );
582 }
583 let turn_reasoning = if is_simple_task {
584 Some(ReasoningEffortLevel::Minimal)
585 } else {
586 self.reasoning_effort
587 };
588 let turn_verbosity = if is_simple_task {
589 Some(VerbosityLevel::Low)
590 } else {
591 self.verbosity
592 };
593 let max_tokens = if is_simple_task {
594 Some(800)
595 } else {
596 Some(2000)
597 };
598
599 self.summarize_conversation_if_needed(
600 &mut runtime.state,
601 preserve_recent_turns,
602 utilization,
603 );
604
605 let parallel_tool_config = if self.model.len() < 20 {
606 None
607 } else if self
608 .provider_client
609 .supports_parallel_tool_config(&turn_model)
610 {
611 Some(Box::new(
612 crate::llm::provider::ParallelToolConfig::anthropic_optimized(),
613 ))
614 } else {
615 None
616 };
617
618 let provider_kind = turn_model
619 .parse::<ModelId>()
620 .map(|model| model.provider())
621 .unwrap_or(ModelProvider::Gemini);
622
623 if matches!(provider_kind, ModelProvider::Gemini)
624 && runtime.state.conversation.len() > runtime.state.last_processed_message_idx
625 {
626 for content in
627 &runtime.state.conversation[runtime.state.last_processed_message_idx..]
628 {
629 let mut text = String::new();
630 for part in &content.parts {
631 if let Part::Text {
632 text: part_text, ..
633 } = part
634 {
635 if !text.is_empty() {
636 text.push('\n');
637 }
638 text.push_str(part_text);
639 }
640 }
641 let message = match content.role.as_str() {
642 "model" => Message::assistant(text),
643 _ => Message::user(text),
644 };
645 runtime.state.messages.push(message);
646 }
647 runtime.state.last_processed_message_idx = runtime.state.conversation.len();
648 }
649
650 let reasoning_effort =
651 if self.provider_client.supports_reasoning_effort(&turn_model) {
652 turn_reasoning
653 } else {
654 None
655 };
656 let temperature = if reasoning_effort.is_some()
657 && matches!(
658 provider_kind,
659 ModelProvider::Anthropic | ModelProvider::Minimax
660 ) {
661 None
662 } else {
663 Some(0.7)
664 };
665
666 let (request_messages, previous_response_id) = prepare_responses_request_messages(
667 &mut runtime.state.previous_response_chains,
668 &provider_name,
669 self.provider_client
670 .supports_responses_compaction(&turn_model),
671 &turn_model,
672 &runtime.state.messages,
673 );
674
675 let request = build_harness_request_plan(HarnessRequestPlanInput {
676 messages: request_messages.into_owned(),
677 system_prompt: prompt_bundle.system_instruction.as_ref().clone(),
678 tools: prompt_bundle.request_tools.clone(),
679 model: turn_model.clone(),
680 max_tokens,
681 temperature,
682 stream: self.provider_client.supports_streaming(),
683 tool_choice: None,
684 parallel_tool_config,
685 reasoning_effort,
686 verbosity: turn_verbosity,
687 metadata: None,
688 context_management: None,
689 previous_response_id,
690 prompt_cache_key: build_openai_prompt_cache_key(
691 provider_name.eq_ignore_ascii_case("openai")
692 && self.config().prompt_cache.enabled
693 && self.config().prompt_cache.providers.openai.enabled,
694 &self
695 .config()
696 .prompt_cache
697 .providers
698 .openai
699 .prompt_cache_key_mode,
700 Some(&self.session_id),
701 ),
702 prompt_cache_profile: None,
703 tool_catalog_hash: prompt_bundle.tool_snapshot.tool_catalog_hash,
704 })
705 .request;
706 self.validate_llm_request(&request)?;
710 let previous_response_chain_present = request.previous_response_id.is_some();
711 let sent_messages = request.messages.clone();
715 let streaming_timeout = self
718 .config()
719 .timeouts
720 .ceiling_duration(self.config().timeouts.streaming_ceiling_seconds);
721
722 let turn_output = runtime
723 .run_turn_once(&mut self.provider_client, request, streaming_timeout)
724 .await?;
725 event_recorder.record_thread_events(runtime.take_emitted_events());
726 let response = turn_output.response;
727 runtime.state.stop_reason =
728 Some(stop_reason_from_finish_reason(&response.finish_reason));
729 if supports_responses_chaining(
730 &provider_name,
731 self.provider_client
732 .supports_responses_compaction(&turn_model),
733 ) {
734 runtime.state.set_previous_response_chain(
735 &provider_name,
736 &turn_model,
737 response.request_id.as_deref(),
738 sent_messages,
739 );
740 }
741 match estimate_session_cost_usd(
742 self.config().agent.provider.as_str(),
743 &turn_model,
744 &runtime.state.stats.total_usage,
745 ) {
746 Some(total_cost_usd) => {
747 runtime.state.total_cost_usd = Some(total_cost_usd);
748 if let Some(max_budget_usd) = max_budget_usd
749 && total_cost_usd > max_budget_usd
750 {
751 runtime.state.outcome =
752 TaskOutcome::budget_limit_reached(max_budget_usd, total_cost_usd);
753 break;
754 }
755 }
756 None => {
757 runtime.state.total_cost_usd = None;
758 if max_budget_usd.is_some() && !cost_warning_emitted {
759 cost_warning_emitted = true;
760 let warning_message = format!(
761 "Budget enforcement disabled for model `{turn_model}` because pricing metadata is unavailable"
762 );
763 warn!(
764 provider = %self.config().agent.provider,
765 model = %turn_model,
766 "Budget enforcement disabled because pricing metadata is unavailable"
767 );
768 runtime.state.warnings.push(warning_message);
769 }
770 }
771 }
772 self.runner_println(format_args!(
773 "{} {} {} received response, processing...",
774 agent_prefix,
775 self.agent_type,
776 style("(RECV)").green().bold()
777 ));
778
779 self.warn_on_empty_response(
780 &agent_prefix,
781 response.content.as_deref().unwrap_or(""),
782 response
783 .tool_calls
784 .as_ref()
785 .is_some_and(|tool_calls| !tool_calls.is_empty()),
786 );
787
788 let response_text = response.content_string();
789 if !response_text.trim().is_empty() {
790 self.emit_final_assistant_message(&self.agent_type, &response_text);
791 }
792
793 let mut effective_tool_calls = response.tool_calls.clone();
794 let mut forced_continuation = false;
795
796 if effective_tool_calls.is_none()
797 && response.content_text().len() < 150
798 && let Some(args_value) = detect_textual_exec_tool_call(response.content_text())
799 {
800 effective_tool_calls = Some(vec![ToolCall::function(
801 format!("call_text_{}", turn),
802 tools::UNIFIED_EXEC.to_string(),
803 args_value.to_string(),
804 )]);
805 }
806
807 let is_gemini = matches!(provider_kind, ModelProvider::Gemini);
808
809 if !runtime.state.is_completed
810 && effective_tool_calls
811 .as_ref()
812 .is_none_or(|tool_calls| tool_calls.is_empty())
813 && !response.content_text().is_empty()
814 {
815 if check_for_response_loop(response.content_text(), &mut runtime.state) {
816 self.runner_println(format_args!(
817 "[{}] {}",
818 self.agent_type,
819 style(
820 "Repetitive assistant response detected. Breaking potential loop."
821 )
822 .red()
823 .bold()
824 ));
825 runtime.state.outcome = TaskOutcome::LoopDetected;
826 break;
827 }
828
829 if check_completion_candidate(response.content_text()) {
830 self.runner_println(format_args!(
831 "[{}] {}",
832 self.agent_type,
833 style("Completion candidate detected; checking tracker and verification state.")
834 .green()
835 .bold()
836 ));
837 match continuation_controller
838 .assess_completion(&effective_task, &runtime.state)
839 .await?
840 {
841 CompletionAssessment::Accept => {
842 if self
843 .resolve_completion_acceptance(
844 &effective_task,
845 &mut runtime.state,
846 &mut event_recorder,
847 orchestration_enabled,
848 &[],
849 &mut revision_rounds_used,
850 max_revision_rounds,
851 &mut should_write_blocked_handoff,
852 )
853 .await?
854 {
855 break;
856 }
857 forced_continuation = true;
858 }
859 CompletionAssessment::SkipAccept { reason } => {
860 event_recorder.harness_event(
861 HarnessEventKind::ContinuationSkipped,
862 Some(reason),
863 None,
864 None,
865 None,
866 );
867 runtime.state.is_completed = true;
868 runtime.state.outcome = TaskOutcome::Success;
869 break;
870 }
871 CompletionAssessment::Continue { reason, prompt } => {
872 event_recorder.harness_event(
873 HarnessEventKind::ContinuationStarted,
874 Some(reason),
875 None,
876 None,
877 None,
878 );
879 runtime.state.add_user_message(prompt);
880 forced_continuation = true;
881 }
882 CompletionAssessment::Verify { commands } => {
883 event_recorder.harness_event(
884 HarnessEventKind::VerificationStarted,
885 Some(format!("Running verification: {}", commands.join(", "))),
886 commands.first().cloned(),
887 None,
888 None,
889 );
890 let verification_results = self
891 .run_verification_commands(&commands, &mut event_recorder)
892 .await?;
893 if let Some(failure) =
894 verification_results.iter().find(|result| !result.success)
895 {
896 event_recorder.harness_event(
897 HarnessEventKind::VerificationFailed,
898 Some(format!(
899 "{}{}",
900 match failure.exit_code {
901 Some(code) => format!(
902 "Verification failed: {} (exit code {}).",
903 failure.command, code
904 ),
905 None => format!(
906 "Verification failed: {}.",
907 failure.command
908 ),
909 },
910 if failure.output.trim().is_empty() {
911 String::new()
912 } else {
913 format!("\n{}", failure.output.trim())
914 }
915 )),
916 Some(failure.command.clone()),
917 None,
918 failure.exit_code,
919 );
920 } else {
921 event_recorder.harness_event(
922 HarnessEventKind::VerificationPassed,
923 Some(format!(
924 "Verification passed: {}",
925 commands.join(", ")
926 )),
927 commands.last().cloned(),
928 None,
929 Some(0),
930 );
931 }
932
933 match continuation_controller
934 .after_verification(&verification_results)
935 .await?
936 {
937 CompletionAssessment::Accept
938 | CompletionAssessment::SkipAccept { .. } => {
939 if self
940 .resolve_completion_acceptance(
941 &effective_task,
942 &mut runtime.state,
943 &mut event_recorder,
944 orchestration_enabled,
945 &verification_results,
946 &mut revision_rounds_used,
947 max_revision_rounds,
948 &mut should_write_blocked_handoff,
949 )
950 .await?
951 {
952 break;
953 }
954 forced_continuation = true;
955 }
956 CompletionAssessment::Continue { reason, prompt } => {
957 event_recorder.harness_event(
958 HarnessEventKind::ContinuationStarted,
959 Some(reason),
960 None,
961 None,
962 None,
963 );
964 runtime.state.add_user_message(prompt);
965 forced_continuation = true;
966 }
967 CompletionAssessment::Verify { .. } => {}
968 }
969 }
970 }
971 }
972 }
973
974 if let Some(tool_calls) = effective_tool_calls
975 .as_ref()
976 .filter(|tool_calls| !tool_calls.is_empty())
977 .cloned()
978 {
979 self.execute_tool_call_batches(
980 tool_calls,
981 &mut runtime,
982 &mut event_recorder,
983 &agent_prefix,
984 is_gemini,
985 previous_response_chain_present,
986 )
987 .await?;
988 event_recorder.record_thread_events(runtime.take_emitted_events());
989 }
990
991 let _ = self
994 .refresh_runtime_prompt_bundle_if_catalog_changed(
995 &mut prompt_bundle,
996 is_simple_task,
997 )
998 .await?;
999
1000 let had_effective_shell_tool_call =
1001 effective_tool_calls.as_ref().is_some_and(|calls| {
1002 calls.iter().any(|call| {
1003 call.function
1004 .as_ref()
1005 .map(|function| function.name.as_str())
1006 == Some(tools::UNIFIED_EXEC)
1007 })
1008 });
1009 let had_tool_call = response
1010 .tool_calls
1011 .as_ref()
1012 .is_some_and(|tool_calls| !tool_calls.is_empty())
1013 || had_effective_shell_tool_call;
1014
1015 if had_tool_call {
1016 let loops = runtime.state.register_tool_loop();
1017 if tool_loop_limit_reached(loops, runtime.state.constraints.max_tool_loops) {
1018 let warning_message = format!(
1019 "Reached tool-call limit of {} iterations; pausing autonomous loop",
1020 runtime.state.constraints.max_tool_loops
1021 );
1022 self.record_warning(
1023 &agent_prefix,
1024 &mut runtime.state,
1025 &mut event_recorder,
1026 warning_message,
1027 );
1028 runtime.state.mark_tool_loop_limit_hit();
1029 break;
1030 }
1031 runtime.state.consecutive_idle_turns = 0;
1032 } else {
1033 runtime.state.reset_tool_loop_guard();
1034 if forced_continuation {
1035 runtime.state.consecutive_idle_turns = 0;
1036 } else if !runtime.state.is_completed {
1037 runtime.state.consecutive_idle_turns =
1038 runtime.state.consecutive_idle_turns.saturating_add(1);
1039 let idle_turn_limit = self.config().agent.idle_turn_limit;
1040 if runtime.state.consecutive_idle_turns >= idle_turn_limit {
1041 let warning_message = format!(
1042 "No tool calls or completion for {} consecutive turns; halting to avoid idle loop",
1043 runtime.state.consecutive_idle_turns
1044 );
1045 self.record_warning(
1046 &agent_prefix,
1047 &mut runtime.state,
1048 &mut event_recorder,
1049 warning_message,
1050 );
1051 runtime.state.outcome = TaskOutcome::StoppedNoAction;
1052 break;
1053 }
1054 }
1055 }
1056
1057 let should_continue = forced_continuation
1058 || had_tool_call
1059 || runtime.has_pending_follow_up_inputs()
1060 || (!runtime.state.is_completed && (turn + 1) < self.max_turns);
1061
1062 if !should_continue {
1063 if runtime.state.is_completed {
1064 runtime.state.outcome = TaskOutcome::Success;
1065 } else if (turn + 1) >= self.max_turns {
1066 runtime.state.outcome =
1067 TaskOutcome::turn_limit_reached(self.max_turns, turn + 1);
1068 } else {
1069 runtime.state.outcome = TaskOutcome::StoppedNoAction;
1070 }
1071 break;
1072 }
1073 }
1074
1075 runtime.state.finalize_outcome(self.max_turns);
1076
1077 let total_duration_ms = run_started_at.elapsed().as_millis();
1078
1079 self.runner_println(format_args!("{} Done", agent_prefix));
1081
1082 let average_turn_duration_ms = if runtime.state.turn_count > 0 {
1084 Some(runtime.state.turn_total_ms as f64 / runtime.state.turn_count as f64)
1085 } else {
1086 None
1087 };
1088
1089 let max_turn_duration_ms = if runtime.state.turn_count > 0 {
1090 Some(runtime.state.turn_max_ms)
1091 } else {
1092 None
1093 };
1094
1095 let outcome = runtime.state.outcome.clone();
1096 self.thread_handle
1097 .replace_messages(runtime.state.messages.clone());
1098 let summary = self.generate_task_summary(
1099 &effective_task,
1100 &runtime.state.modified_files,
1101 &runtime.state.executed_commands,
1102 &runtime.state.warnings,
1103 &runtime.state.messages,
1104 runtime.state.stats.turns_executed,
1105 runtime.state.max_tool_loop_streak,
1106 max_tool_loops,
1107 outcome,
1108 total_duration_ms,
1109 average_turn_duration_ms,
1110 max_turn_duration_ms,
1111 );
1112
1113 if !summary.trim().is_empty() {
1114 event_recorder.agent_message(&summary);
1116 self.runner_println(format_args!(
1118 "\n{} Agent Task Summary\n{}",
1119 style("[Task]").cyan().bold(),
1120 summary
1121 ));
1122 }
1123
1124 let runtime_agent_config = self.core_agent_config();
1125 if let Err(err) = crate::persistent_memory::finalize_persistent_memory(
1126 &runtime_agent_config,
1127 Some(self.config()),
1128 &runtime.state.messages,
1129 )
1130 .await
1131 {
1132 warn!(
1133 error = %err,
1134 session_id = %self.session_id,
1135 "Failed to update persistent memory"
1136 );
1137 }
1138
1139 if runtime.state.outcome.is_hard_block() || should_write_blocked_handoff {
1140 let relevant_paths = existing_harness_artifact_paths(&self._workspace);
1141 match write_blocked_handoff(
1142 &self._workspace,
1143 &self.session_id,
1144 runtime.state.outcome.code(),
1145 &runtime.state.outcome.description(),
1146 &relevant_paths,
1147 ) {
1148 Ok(artifacts) => emit_blocked_handoff_events(
1149 &mut event_recorder,
1150 &artifacts.current_path,
1151 &artifacts.archive_path,
1152 ),
1153 Err(err) => warn!(
1154 error = %err,
1155 session_id = %self.session_id,
1156 "Failed to persist blocked handoff"
1157 ),
1158 }
1159 }
1160
1161 let total_usage = runtime.state.stats.total_usage.clone();
1162 record_terminal_turn_event(
1163 &mut event_recorder,
1164 &runtime.state.outcome,
1165 total_usage.clone(),
1166 );
1167 event_recorder.thread_completed(
1168 &self.session_id,
1169 runtime.state.outcome.thread_completion_subtype(),
1170 runtime.state.outcome.code(),
1171 runtime
1172 .state
1173 .outcome
1174 .is_success()
1175 .then_some(summary.as_str()),
1176 runtime.state.stop_reason.as_deref(),
1177 total_usage,
1178 runtime
1179 .state
1180 .total_cost_usd
1181 .and_then(serde_json::Number::from_f64),
1182 runtime.state.stats.turns_executed,
1183 );
1184 let thread_events = event_recorder.into_events();
1185 let steering_receiver = runtime.take_steering_receiver();
1186 let state = std::mem::replace(
1187 &mut runtime.state,
1188 AgentSessionState::new(
1189 self.session_id.clone(),
1190 self.max_turns,
1191 max_tool_loops,
1192 max_context_tokens,
1193 ),
1194 );
1195
1196 Ok((
1197 state.into_results(summary, thread_events, total_duration_ms),
1198 steering_receiver,
1199 ))
1200 };
1201
1202 let result = match result {
1203 Ok((task_results, steering_receiver)) => {
1204 *self.steering_receiver.lock() = steering_receiver;
1205 Ok(task_results)
1206 }
1207 Err(err) => {
1208 *self.steering_receiver.lock() = runtime.take_steering_receiver();
1209 Err(err)
1210 }
1211 };
1212
1213 self.tool_registry.set_harness_task(None);
1214 result
1215 }
1216}
1217
1218#[cfg(test)]
1219mod tests {
1220 use super::{
1221 prepare_responses_request_messages, record_terminal_turn_event, tool_loop_limit_reached,
1222 };
1223 use crate::core::agent::events::ExecEventRecorder;
1224 use crate::core::agent::session::AgentSessionState;
1225 use crate::core::agent::task::TaskOutcome;
1226 use crate::exec::events::ThreadEvent;
1227 use crate::llm::provider::Message;
1228
1229 #[test]
1230 fn failed_outcome_emits_only_turn_failed() {
1231 let mut recorder = ExecEventRecorder::new("thread", None, None);
1232 recorder.turn_started();
1233
1234 record_terminal_turn_event(
1235 &mut recorder,
1236 &TaskOutcome::Failed {
1237 reason: "boom".to_string(),
1238 },
1239 Default::default(),
1240 );
1241
1242 let events = recorder.into_events();
1243 assert_eq!(
1244 events
1245 .iter()
1246 .filter(|event| matches!(event, ThreadEvent::TurnFailed(_)))
1247 .count(),
1248 1
1249 );
1250 assert_eq!(
1251 events
1252 .iter()
1253 .filter(|event| matches!(event, ThreadEvent::TurnCompleted(_)))
1254 .count(),
1255 0
1256 );
1257 }
1258
1259 #[test]
1260 fn successful_outcome_emits_only_turn_completed() {
1261 let mut recorder = ExecEventRecorder::new("thread", None, None);
1262 recorder.turn_started();
1263
1264 record_terminal_turn_event(&mut recorder, &TaskOutcome::Success, Default::default());
1265
1266 let events = recorder.into_events();
1267 assert_eq!(
1268 events
1269 .iter()
1270 .filter(|event| matches!(event, ThreadEvent::TurnCompleted(_)))
1271 .count(),
1272 1
1273 );
1274 assert_eq!(
1275 events
1276 .iter()
1277 .filter(|event| matches!(event, ThreadEvent::TurnFailed(_)))
1278 .count(),
1279 0
1280 );
1281 }
1282
1283 #[test]
1284 fn disabled_tool_loop_limit_never_trips() {
1285 assert!(!tool_loop_limit_reached(1, 0));
1286 assert!(!tool_loop_limit_reached(32, 0));
1287 }
1288
1289 #[test]
1290 fn openai_prepare_responses_request_messages_uses_incremental_suffix() {
1291 let mut state = AgentSessionState::new("session".to_string(), 4, 4, 16_000);
1292 let prior_messages = vec![Message::user("hello".to_string())];
1293 let current_messages = vec![
1294 Message::user("hello".to_string()),
1295 Message::user("continue".to_string()),
1296 ];
1297 state.set_previous_response_chain("openai", "gpt-5.4", Some("resp_123"), prior_messages);
1298
1299 let (request_messages, previous_response_id) = prepare_responses_request_messages(
1300 &mut state.previous_response_chains,
1301 "openai",
1302 false,
1303 "gpt-5.4",
1304 ¤t_messages,
1305 );
1306
1307 assert_eq!(previous_response_id.as_deref(), Some("resp_123"));
1308 assert_eq!(
1309 request_messages.as_ref(),
1310 [Message::user("continue".to_string())].as_slice()
1311 );
1312 }
1313
1314 #[test]
1315 fn gemini_prepare_responses_request_messages_keeps_full_history() {
1316 let mut state = AgentSessionState::new("session".to_string(), 4, 4, 16_000);
1317 let prior_messages = vec![Message::user("hello".to_string())];
1318 let current_messages = vec![
1319 Message::user("hello".to_string()),
1320 Message::user("continue".to_string()),
1321 ];
1322 state.set_previous_response_chain(
1323 "gemini",
1324 "gemini-2.5-pro",
1325 Some("resp_123"),
1326 prior_messages,
1327 );
1328
1329 let (request_messages, previous_response_id) = prepare_responses_request_messages(
1330 &mut state.previous_response_chains,
1331 "gemini",
1332 false,
1333 "gemini-2.5-pro",
1334 ¤t_messages,
1335 );
1336
1337 assert_eq!(previous_response_id.as_deref(), Some("resp_123"));
1338 assert_eq!(request_messages.as_ref(), current_messages.as_slice());
1339 }
1340
1341 #[test]
1342 fn compatible_prepare_responses_request_messages_uses_incremental_suffix() {
1343 let mut state = AgentSessionState::new("session".to_string(), 4, 4, 16_000);
1344 let prior_messages = vec![Message::user("hello".to_string())];
1345 let current_messages = vec![
1346 Message::user("hello".to_string()),
1347 Message::user("continue".to_string()),
1348 ];
1349 state.set_previous_response_chain("mycorp", "gpt-5.4", Some("resp_123"), prior_messages);
1350
1351 let (request_messages, previous_response_id) = prepare_responses_request_messages(
1352 &mut state.previous_response_chains,
1353 "mycorp",
1354 true,
1355 "gpt-5.4",
1356 ¤t_messages,
1357 );
1358
1359 assert_eq!(previous_response_id.as_deref(), Some("resp_123"));
1360 assert_eq!(
1361 request_messages.as_ref(),
1362 [Message::user("continue".to_string())].as_slice()
1363 );
1364 }
1365}