1use std::collections::HashSet;
15use std::sync::Arc;
16
17use anyhow::Result;
18use chrono::Utc;
19use serde_json::json;
20use uuid::Uuid;
21
22use crate::audit::{AuditCategory, AuditOutcome, try_audit_log};
23use crate::cognition::tool_router::{ToolCallRouter, ToolRouterConfig};
24use crate::event_stream::ChatEvent;
25use crate::provider::{
26 CompletionRequest, ContentPart, Message, ProviderRegistry, Role, parse_model_string,
27};
28use crate::rlm::router::AutoProcessContext;
29use crate::rlm::{RlmConfig, RlmRouter, RoutingContext};
30use crate::tool::ToolRegistry;
31
32use super::super::{DEFAULT_MAX_STEPS, Session, SessionResult};
33use super::bootstrap::list_tools_bootstrap_output;
34use super::build::{
35 build_request_requires_tool, is_build_agent, should_force_build_tool_first_retry,
36};
37use super::confirmation::{
38 auto_apply_pending_confirmation, pending_confirmation_tool_result_content,
39 tool_result_requires_confirmation,
40};
41use super::defaults::default_model_for_provider;
42use super::edit::{detect_stub_in_tool_input, normalize_tool_call_for_execution};
43use super::error::{is_prompt_too_long_error, is_retryable_upstream_error};
44use super::loop_constants::{
45 BUILD_MODE_TOOL_FIRST_MAX_RETRIES, BUILD_MODE_TOOL_FIRST_NUDGE, CODESEARCH_THRASH_NUDGE,
46 FORCE_FINAL_ANSWER_NUDGE, MAX_CONSECUTIVE_CODESEARCH_NO_MATCHES, MAX_CONSECUTIVE_SAME_TOOL,
47 NATIVE_TOOL_PROMISE_NUDGE, NATIVE_TOOL_PROMISE_RETRY_MAX_RETRIES,
48 POST_EDIT_VALIDATION_MAX_RETRIES,
49};
50use super::markup::normalize_textual_tool_calls;
51use super::provider::{
52 resolve_provider_for_session_request, should_retry_missing_native_tool_call,
53};
54use super::request_state::build_provider_step_state;
55use super::router::{build_proactive_lsp_context_message, choose_router_target_bandit};
56use super::runtime::{
57 enrich_tool_input_with_runtime_context, is_codesearch_no_match_output, is_interactive_tool,
58};
59use super::text::extract_text_content;
60use super::token::{
61 context_window_for_model, estimate_tokens_for_messages, session_completion_max_tokens,
62};
63use super::validation::{build_validation_report, capture_git_dirty_files, track_touched_files};
64use crate::session::{
65 bucket_for_messages, delegation_skills, derive_with_policy, effective_policy,
66};
67
68pub(crate) async fn run_prompt(session: &mut Session, message: &str) -> Result<SessionResult> {
72 let registry = ProviderRegistry::from_vault().await?;
73 session.resolve_subcall_provider(®istry);
74
75 let providers = registry.list();
76 if providers.is_empty() {
77 anyhow::bail!(
78 "No providers available. Configure provider credentials in HashiCorp Vault (for ChatGPT subscription Codex use `codetether auth codex`; for Copilot use `codetether auth copilot`)."
79 );
80 }
81
82 tracing::info!("Available providers: {:?}", providers);
83
84 let (provider_name, model_id) = parse_session_model_selector(session, &providers);
85
86 let mut selected_provider =
87 resolve_provider_for_session_request(providers.as_slice(), provider_name.as_deref())?
88 .to_string();
89
90 let mut provider = registry
91 .get(&selected_provider)
92 .ok_or_else(|| anyhow::anyhow!("Provider {} not found", selected_provider.clone()))?;
93
94 session.add_message(Message {
95 role: Role::User,
96 content: vec![ContentPart::Text {
97 text: message.to_string(),
98 }],
99 });
100
101 if session.title.is_none() {
102 session.generate_title().await?;
103 }
104
105 let mut model = if !model_id.is_empty() {
106 model_id
107 } else {
108 default_model_for_provider(&selected_provider)
109 };
110
111 let cwd = session
112 .metadata
113 .directory
114 .clone()
115 .unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
116 let mut provider_state =
121 build_provider_step_state(Arc::clone(&provider), &selected_provider, &model, &cwd);
122 let mut tool_registry = provider_state.tool_registry.clone();
123 let mut tool_definitions = provider_state.tool_definitions.clone();
124 let mut temperature = provider_state.temperature;
125 let mut model_supports_tools = provider_state.model_supports_tools;
126 let mut advertised_tool_definitions = provider_state.advertised_tool_definitions.clone();
127 let mut system_prompt = provider_state.system_prompt.clone();
128
129 tracing::info!("Using model: {} via provider: {}", model, selected_provider);
130 tracing::info!("Available tools: {}", tool_definitions.len());
131
132 let max_steps = session.max_steps.unwrap_or(DEFAULT_MAX_STEPS);
133 let mut final_output = String::new();
134 let baseline_git_dirty_files = capture_git_dirty_files(&cwd).await;
135 let mut touched_files = HashSet::new();
136 let mut validation_retry_count: u8 = 0;
137
138 let mut last_tool_sig: Option<String> = None;
139 let mut consecutive_same_tool: u32 = 0;
140 let mut consecutive_codesearch_no_matches: u32 = 0;
141 let mut build_mode_tool_retry_count: u8 = 0;
142 let mut native_tool_promise_retry_count: u8 = 0;
143 let turn_id = Uuid::new_v4().to_string();
144
145 let tool_router: Option<ToolCallRouter> = {
146 let cfg = ToolRouterConfig::from_env();
147 match ToolCallRouter::from_config(&cfg) {
148 Ok(r) => r,
149 Err(e) => {
150 tracing::warn!(error = %e, "FunctionGemma tool router init failed; disabled");
151 None
152 }
153 }
154 };
155
156 for step in 1..=max_steps {
157 tracing::info!(step = step, "Agent step starting");
158
159 super::cost_guard::enforce_cost_budget()?;
160
161 let policy = effective_policy(session);
167 let mut derived = derive_with_policy(
168 session,
169 Arc::clone(&provider),
170 &model,
171 &system_prompt,
172 &advertised_tool_definitions,
173 None,
174 policy,
175 None,
176 )
177 .await?;
178
179 let mut proactive_lsp_message = build_proactive_lsp_context_message(
180 selected_provider.as_str(),
181 step,
182 &tool_registry,
183 &session.messages,
184 &cwd,
185 )
186 .await;
187 let bucket = bucket_for_messages(session.history());
188
189 let mut attempt = 0;
190 let mut upstream_retry_count: u8 = 0;
191 const MAX_UPSTREAM_RETRIES: u8 = 3;
192 let response = loop {
193 attempt += 1;
194
195 let mut messages = vec![Message {
196 role: Role::System,
197 content: vec![ContentPart::Text {
198 text: system_prompt.clone(),
199 }],
200 }];
201 if let Some(msg) = &proactive_lsp_message {
202 messages.push(msg.clone());
203 }
204 messages.extend(derived.messages.clone());
205
206 let request = CompletionRequest {
207 messages,
208 tools: advertised_tool_definitions.clone(),
209 model: model.clone(),
210 temperature,
211 top_p: None,
212 max_tokens: Some(session_completion_max_tokens()),
213 stop: Vec::new(),
214 };
215
216 match provider.complete(request).await {
217 Ok(r) => {
218 session.metadata.delegation.update(
219 &selected_provider,
220 delegation_skills::MODEL_CALL,
221 bucket,
222 true,
223 );
224 break r;
225 }
226 Err(e) => {
227 if attempt == 1 && is_prompt_too_long_error(&e) {
228 tracing::warn!(error = %e, "Provider rejected prompt as too long; re-deriving with force_keep_last=6 and retrying");
229 derived = derive_with_policy(
230 session,
231 Arc::clone(&provider),
232 &model,
233 &system_prompt,
234 &advertised_tool_definitions,
235 None,
236 policy,
237 Some(6),
238 )
239 .await?;
240 continue;
241 }
242 if upstream_retry_count < MAX_UPSTREAM_RETRIES
243 && is_retryable_upstream_error(&e)
244 {
245 session.metadata.delegation.update(
246 &selected_provider,
247 delegation_skills::MODEL_CALL,
248 bucket,
249 false,
250 );
251 upstream_retry_count += 1;
252 let backoff_secs = 1u64 << (upstream_retry_count - 1).min(2);
253 tracing::warn!(
254 error = %e,
255 retry = upstream_retry_count,
256 max = MAX_UPSTREAM_RETRIES,
257 backoff_secs,
258 "Retryable upstream provider error; sleeping and retrying"
259 );
260 tokio::time::sleep(std::time::Duration::from_secs(backoff_secs)).await;
261 if let Some((retry_provider, retry_model)) = choose_router_target_bandit(
262 ®istry,
263 &session.metadata.delegation,
264 bucket,
265 &selected_provider,
266 &model,
267 ) {
268 tracing::info!(
269 to_provider = %retry_provider,
270 to_model = %retry_model,
271 "Failing over to alternate provider/model"
272 );
273 selected_provider = retry_provider;
274 provider = registry.get(&selected_provider).ok_or_else(|| {
275 anyhow::anyhow!("Provider {} not found", selected_provider.clone())
276 })?;
277 model = retry_model;
278 provider_state = build_provider_step_state(
279 Arc::clone(&provider),
280 &selected_provider,
281 &model,
282 &cwd,
283 );
284 tool_registry = provider_state.tool_registry.clone();
285 tool_definitions = provider_state.tool_definitions.clone();
286 temperature = provider_state.temperature;
287 model_supports_tools = provider_state.model_supports_tools;
288 advertised_tool_definitions =
289 provider_state.advertised_tool_definitions.clone();
290 system_prompt = provider_state.system_prompt.clone();
291 derived = derive_with_policy(
292 session,
293 Arc::clone(&provider),
294 &model,
295 &system_prompt,
296 &advertised_tool_definitions,
297 None,
298 policy,
299 None,
300 )
301 .await?;
302 proactive_lsp_message = build_proactive_lsp_context_message(
303 selected_provider.as_str(),
304 step,
305 &tool_registry,
306 &session.messages,
307 &cwd,
308 )
309 .await;
310 session.metadata.model = Some(format!("{selected_provider}/{model}"));
311 attempt = 0;
312 }
313 continue;
314 }
315 return Err(e);
316 }
317 }
318 };
319
320 let response = if let Some(ref router) = tool_router {
321 router
322 .maybe_reformat(response, &tool_definitions, model_supports_tools)
323 .await
324 } else {
325 response
326 };
327 let response = normalize_textual_tool_calls(response, &tool_definitions);
328
329 crate::telemetry::TOKEN_USAGE.record_model_usage_with_cache(
330 &model,
331 response.usage.prompt_tokens as u64,
332 response.usage.completion_tokens as u64,
333 response.usage.cache_read_tokens.unwrap_or(0) as u64,
334 response.usage.cache_write_tokens.unwrap_or(0) as u64,
335 );
336
337 let mut truncated_tool_ids: Vec<(String, String)> = Vec::new();
338 let tool_calls: Vec<(String, String, serde_json::Value)> = response
339 .message
340 .content
341 .iter()
342 .filter_map(|part| {
343 if let ContentPart::ToolCall {
344 id,
345 name,
346 arguments,
347 ..
348 } = part
349 {
350 match serde_json::from_str::<serde_json::Value>(arguments) {
351 Ok(args) => Some((id.clone(), name.clone(), args)),
352 Err(e) => {
353 tracing::warn!(
354 tool = %name,
355 tool_call_id = %id,
356 args_len = arguments.len(),
357 error = %e,
358 "Tool call arguments failed to parse (likely truncated by max_tokens)"
359 );
360 truncated_tool_ids.push((id.clone(), name.clone()));
361 None
362 }
363 }
364 } else {
365 None
366 }
367 })
368 .collect();
369
370 let assistant_text = extract_text_content(&&response.message.content);
371 if should_force_build_tool_first_retry(
372 &session.agent,
373 build_mode_tool_retry_count,
374 &tool_definitions,
375 &session.messages,
376 &cwd,
377 &assistant_text,
378 !tool_calls.is_empty(),
379 BUILD_MODE_TOOL_FIRST_MAX_RETRIES,
380 ) {
381 build_mode_tool_retry_count += 1;
382 tracing::warn!(
383 step = step,
384 agent = %session.agent,
385 retry = build_mode_tool_retry_count,
386 "Build mode tool-first guard triggered; retrying with execution nudge"
387 );
388 session.add_message(Message {
389 role: Role::User,
390 content: vec![ContentPart::Text {
391 text: BUILD_MODE_TOOL_FIRST_NUDGE.to_string(),
392 }],
393 });
394 continue;
395 }
396 if should_retry_missing_native_tool_call(
397 selected_provider.as_str(),
398 &model,
399 native_tool_promise_retry_count,
400 &tool_definitions,
401 &assistant_text,
402 !tool_calls.is_empty(),
403 NATIVE_TOOL_PROMISE_RETRY_MAX_RETRIES,
404 ) {
405 native_tool_promise_retry_count += 1;
406 tracing::warn!(
407 step = step,
408 provider = selected_provider,
409 model = %model,
410 retry = native_tool_promise_retry_count,
411 "Model described a tool step without emitting a tool call; retrying with corrective nudge"
412 );
413 session.add_message(response.message.clone());
414 session.add_message(Message {
415 role: Role::User,
416 content: vec![ContentPart::Text {
417 text: NATIVE_TOOL_PROMISE_NUDGE.to_string(),
418 }],
419 });
420 continue;
421 }
422 if !tool_calls.is_empty() {
423 build_mode_tool_retry_count = 0;
424 native_tool_promise_retry_count = 0;
425 } else if is_build_agent(&session.agent)
426 && build_request_requires_tool(&session.messages, &cwd)
427 && build_mode_tool_retry_count >= BUILD_MODE_TOOL_FIRST_MAX_RETRIES
428 {
429 return Err(anyhow::anyhow!(
430 "Build mode could not obtain tool calls for an explicit file-change request after {} retries. \
431 Switch to a tool-capable model and try again.",
432 BUILD_MODE_TOOL_FIRST_MAX_RETRIES
433 ));
434 }
435
436 let mut step_text = String::new();
437
438 for part in &response.message.content {
439 match part {
440 ContentPart::Text { text } if !text.is_empty() => {
441 step_text.push_str(text);
442 step_text.push('\n');
443 }
444 ContentPart::Thinking { text } if !text.is_empty() => {
445 if let Some(ref bus) = session.bus {
446 let handle = bus.handle(&session.agent);
447 handle.send_with_correlation(
448 format!("agent.{}.thinking", session.agent),
449 crate::bus::BusMessage::AgentThinking {
450 agent_id: session.agent.clone(),
451 thinking: text.clone(),
452 step,
453 },
454 Some(turn_id.clone()),
455 );
456 }
457 }
458 _ => {}
459 }
460 }
461
462 if !step_text.trim().is_empty() {
463 final_output.push_str(&step_text);
464 }
465
466 if tool_calls.is_empty() && truncated_tool_ids.is_empty() {
467 session.add_message(response.message.clone());
468 if is_build_agent(&session.agent) {
469 if let Some(report) =
470 build_validation_report(&cwd, &touched_files, &baseline_git_dirty_files).await?
471 {
472 validation_retry_count += 1;
473 tracing::warn!(
474 retries = validation_retry_count,
475 issues = report.issue_count,
476 "Post-edit validation found unresolved diagnostics"
477 );
478 if validation_retry_count >= POST_EDIT_VALIDATION_MAX_RETRIES {
479 return Err(anyhow::anyhow!(
480 "Post-edit validation failed after {} attempts.\n\n{}",
481 POST_EDIT_VALIDATION_MAX_RETRIES,
482 report.prompt
483 ));
484 }
485 session.add_message(Message {
486 role: Role::User,
487 content: vec![ContentPart::Text {
488 text: report.prompt,
489 }],
490 });
491 final_output.clear();
492 continue;
493 }
494 }
495 break;
496 }
497
498 if !truncated_tool_ids.is_empty() {
499 if tool_calls.is_empty() {
500 session.add_message(response.message.clone());
501 }
502 for (tool_id, tool_name) in &truncated_tool_ids {
503 let error_content = format!(
504 "Error: Your tool call to `{tool_name}` was truncated — the arguments \
505 JSON was cut off mid-string (likely hit the max_tokens limit). \
506 Please retry with a shorter approach: use the `write` tool to write \
507 content in smaller pieces, or reduce the size of your arguments."
508 );
509 session.add_message(Message {
510 role: Role::Tool,
511 content: vec![ContentPart::ToolResult {
512 tool_call_id: tool_id.clone(),
513 content: error_content,
514 }],
515 });
516 }
517 if tool_calls.is_empty() {
518 continue;
519 }
520 }
521
522 {
523 let mut sigs: Vec<String> = tool_calls
524 .iter()
525 .map(|(_, name, args)| format!("{name}:{args}"))
526 .collect();
527 sigs.sort();
528 let sig = sigs.join("|");
529
530 if last_tool_sig.as_deref() == Some(&sig) {
531 consecutive_same_tool += 1;
532 } else {
533 consecutive_same_tool = 1;
534 last_tool_sig = Some(sig);
535 }
536
537 let force_answer = consecutive_same_tool > MAX_CONSECUTIVE_SAME_TOOL
538 || (!model_supports_tools && step >= 3);
539
540 if force_answer {
541 tracing::warn!(
542 step = step,
543 consecutive = consecutive_same_tool,
544 "Breaking agent loop: forcing final answer",
545 );
546 let mut nudge_msg = response.message.clone();
547 nudge_msg
548 .content
549 .retain(|p| !matches!(p, ContentPart::ToolCall { .. }));
550 if !nudge_msg.content.is_empty() {
551 session.add_message(nudge_msg);
552 }
553 session.add_message(Message {
554 role: Role::User,
555 content: vec![ContentPart::Text {
556 text: FORCE_FINAL_ANSWER_NUDGE.to_string(),
557 }],
558 });
559 continue;
560 }
561 }
562
563 session.add_message(response.message.clone());
564
565 tracing::info!(
566 step = step,
567 num_tools = tool_calls.len(),
568 "Executing tool calls"
569 );
570
571 let mut codesearch_thrash_guard_triggered = false;
572 for (tool_id, tool_name, tool_input) in tool_calls {
573 let (tool_name, tool_input) =
574 normalize_tool_call_for_execution(&tool_name, &tool_input);
575 tracing::info!(tool = %tool_name, tool_id = %tool_id, "Executing tool");
576
577 if tool_name == "list_tools" {
578 let content = list_tools_bootstrap_output(&tool_definitions, &tool_input);
579 session.add_message(Message {
580 role: Role::Tool,
581 content: vec![ContentPart::ToolResult {
582 tool_call_id: tool_id,
583 content,
584 }],
585 });
586 continue;
587 }
588
589 if let Some(ref bus) = session.bus {
590 let handle = bus.handle(&session.agent);
591 handle.send_with_correlation(
592 format!("agent.{}.tool.request", session.agent),
593 crate::bus::BusMessage::ToolRequest {
594 request_id: tool_id.clone(),
595 agent_id: session.agent.clone(),
596 tool_name: tool_name.clone(),
597 arguments: tool_input.clone(),
598 step,
599 },
600 Some(turn_id.clone()),
601 );
602 }
603
604 if is_interactive_tool(&tool_name) {
605 tracing::warn!(tool = %tool_name, "Blocking interactive tool in session loop");
606 session.add_message(Message {
607 role: Role::Tool,
608 content: vec![ContentPart::ToolResult {
609 tool_call_id: tool_id,
610 content: "Error: Interactive tool 'question' is disabled in this interface. Ask the user directly in assistant text.".to_string(),
611 }],
612 });
613 continue;
614 }
615
616 if let Some(reason) = detect_stub_in_tool_input(&tool_name, &tool_input) {
617 tracing::warn!(tool = %tool_name, reason = %reason, "Blocking suspected stubbed edit");
618 session.add_message(Message {
619 role: Role::Tool,
620 content: vec![ContentPart::ToolResult {
621 tool_call_id: tool_id,
622 content: format!(
623 "Error: Refactor guard rejected this edit: {reason}. \
624 Provide concrete, behavior-preserving implementation (no placeholders/stubs)."
625 ),
626 }],
627 });
628 continue;
629 }
630
631 let exec_start = std::time::Instant::now();
632 let exec_input = enrich_tool_input_with_runtime_context(
633 &tool_input,
634 &cwd,
635 session.metadata.model.as_deref(),
636 &session.id,
637 &session.agent,
638 session.metadata.provenance.as_ref(),
639 );
640 let (content, success, tool_metadata) = execute_tool(
641 &tool_registry,
642 &tool_name,
643 &exec_input,
644 &session.id,
645 exec_start,
646 )
647 .await;
648
649 let requires_confirmation = tool_result_requires_confirmation(tool_metadata.as_ref());
650 let (content, success, tool_metadata, requires_confirmation) = if requires_confirmation
651 && session.metadata.auto_apply_edits
652 {
653 let preview_content = content.clone();
654 match auto_apply_pending_confirmation(
655 &tool_name,
656 &exec_input,
657 tool_metadata.as_ref(),
658 )
659 .await
660 {
661 Ok(Some((content, success, tool_metadata))) => {
662 tracing::info!(
663 tool = %tool_name,
664 "Auto-applied pending confirmation in TUI session"
665 );
666 (content, success, tool_metadata, false)
667 }
668 Ok(None) => (content, success, tool_metadata, true),
669 Err(error) => (
670 format!(
671 "{}\n\nTUI edit auto-apply failed: {}",
672 pending_confirmation_tool_result_content(&tool_name, &preview_content,),
673 error
674 ),
675 false,
676 tool_metadata,
677 true,
678 ),
679 }
680 } else {
681 (content, success, tool_metadata, requires_confirmation)
682 };
683 let rendered_content = if requires_confirmation {
684 pending_confirmation_tool_result_content(&tool_name, &content)
685 } else {
686 content.clone()
687 };
688
689 if !requires_confirmation {
690 track_touched_files(
691 &mut touched_files,
692 &cwd,
693 &tool_name,
694 &tool_input,
695 tool_metadata.as_ref(),
696 );
697 }
698
699 let duration_ms = exec_start.elapsed().as_millis() as u64;
700 let codesearch_no_match =
701 is_codesearch_no_match_output(&tool_name, success, &rendered_content);
702
703 if let Some(ref bus) = session.bus {
704 let handle = bus.handle(&session.agent);
705 handle.send_with_correlation(
706 format!("agent.{}.tool.response", session.agent),
707 crate::bus::BusMessage::ToolResponse {
708 request_id: tool_id.clone(),
709 agent_id: session.agent.clone(),
710 tool_name: tool_name.clone(),
711 result: rendered_content.clone(),
712 success,
713 step,
714 },
715 Some(turn_id.clone()),
716 );
717 handle.send_with_correlation(
718 format!("agent.{}.tool.output", session.agent),
719 crate::bus::BusMessage::ToolOutputFull {
720 agent_id: session.agent.clone(),
721 tool_name: tool_name.clone(),
722 output: rendered_content.clone(),
723 success,
724 step,
725 },
726 Some(turn_id.clone()),
727 );
728 }
729
730 if let Some(base_dir) = super::archive::event_stream_path() {
731 write_tool_event_file(
732 &base_dir,
733 &session.id,
734 &tool_name,
735 success,
736 duration_ms,
737 &rendered_content,
738 session.messages.len() as u64,
739 );
740 }
741
742 let content = maybe_route_through_rlm(
743 &rendered_content,
744 &tool_name,
745 &tool_input,
746 &tool_id,
747 &session.id,
748 &session.messages,
749 &model,
750 Arc::clone(&provider),
751 &session.metadata.rlm,
752 )
753 .await;
754
755 session.add_message(Message {
756 role: Role::Tool,
757 content: vec![ContentPart::ToolResult {
758 tool_call_id: tool_id,
759 content,
760 }],
761 });
762
763 if is_build_agent(&session.agent) {
764 if codesearch_no_match {
765 consecutive_codesearch_no_matches += 1;
766 } else {
767 consecutive_codesearch_no_matches = 0;
768 }
769
770 if consecutive_codesearch_no_matches >= MAX_CONSECUTIVE_CODESEARCH_NO_MATCHES {
771 tracing::warn!(
772 step = step,
773 consecutive_codesearch_no_matches = consecutive_codesearch_no_matches,
774 "Detected codesearch no-match thrash; nudging model to stop variant retries",
775 );
776 session.add_message(Message {
777 role: Role::User,
778 content: vec![ContentPart::Text {
779 text: CODESEARCH_THRASH_NUDGE.to_string(),
780 }],
781 });
782 codesearch_thrash_guard_triggered = true;
783 break;
784 }
785 }
786 }
787
788 if codesearch_thrash_guard_triggered {
789 continue;
790 }
791 }
792
793 session.save().await?;
794
795 super::archive::archive_event_stream_to_s3(&session.id, super::archive::event_stream_path())
796 .await;
797
798 Ok(SessionResult {
799 text: final_output.trim().to_string(),
800 session_id: session.id.clone(),
801 })
802}
803
804fn parse_session_model_selector(session: &Session, providers: &[&str]) -> (Option<String>, String) {
806 let Some(ref model_str) = session.metadata.model else {
807 return (None, String::new());
808 };
809 let (prov, model) = parse_model_string(model_str);
810 let prov = prov.map(|p| match p {
811 "zhipuai" | "z-ai" => "zai",
812 other => other,
813 });
814 if prov.is_some() {
815 (prov.map(|s| s.to_string()), model.to_string())
816 } else if providers.contains(&model) {
817 (Some(model.to_string()), String::new())
818 } else {
819 (None, model.to_string())
820 }
821}
822
823async fn execute_tool(
825 tool_registry: &ToolRegistry,
826 tool_name: &str,
827 exec_input: &serde_json::Value,
828 session_id: &str,
829 exec_start: std::time::Instant,
830) -> (
831 String,
832 bool,
833 Option<std::collections::HashMap<String, serde_json::Value>>,
834) {
835 if let Some(tool) = tool_registry.get(tool_name) {
836 match tool.execute(exec_input.clone()).await {
837 Ok(result) => {
838 let duration_ms = exec_start.elapsed().as_millis() as u64;
839 tracing::info!(tool = %tool_name, success = result.success, "Tool execution completed");
840 if let Some(audit) = try_audit_log() {
841 audit
842 .log_with_correlation(
843 AuditCategory::ToolExecution,
844 format!("tool:{}", tool_name),
845 if result.success {
846 AuditOutcome::Success
847 } else {
848 AuditOutcome::Failure
849 },
850 None,
851 Some(json!({
852 "duration_ms": duration_ms,
853 "output_len": result.output.len()
854 })),
855 None,
856 None,
857 None,
858 Some(session_id.to_string()),
859 )
860 .await;
861 }
862 (result.output, result.success, Some(result.metadata))
863 }
864 Err(e) => {
865 let duration_ms = exec_start.elapsed().as_millis() as u64;
866 tracing::warn!(tool = %tool_name, error = %e, "Tool execution failed");
867 if let Some(audit) = try_audit_log() {
868 audit
869 .log_with_correlation(
870 AuditCategory::ToolExecution,
871 format!("tool:{}", tool_name),
872 AuditOutcome::Failure,
873 None,
874 Some(json!({ "duration_ms": duration_ms, "error": e.to_string() })),
875 None,
876 None,
877 None,
878 Some(session_id.to_string()),
879 )
880 .await;
881 }
882 (format!("Error: {}", e), false, None)
883 }
884 }
885 } else {
886 tracing::warn!(tool = %tool_name, "Tool not found");
887 if let Some(audit) = try_audit_log() {
888 audit
889 .log_with_correlation(
890 AuditCategory::ToolExecution,
891 format!("tool:{}", tool_name),
892 AuditOutcome::Failure,
893 None,
894 Some(json!({ "error": "unknown_tool" })),
895 None,
896 None,
897 None,
898 Some(session_id.to_string()),
899 )
900 .await;
901 }
902 (format!("Error: Unknown tool '{}'", tool_name), false, None)
903 }
904}
905
906fn write_tool_event_file(
908 base_dir: &std::path::Path,
909 session_id: &str,
910 tool_name: &str,
911 success: bool,
912 duration_ms: u64,
913 rendered_content: &str,
914 seq: u64,
915) {
916 let workspace = std::env::var("PWD")
917 .map(std::path::PathBuf::from)
918 .unwrap_or_else(|_| std::env::current_dir().unwrap_or_default());
919 let event = ChatEvent::tool_result(
920 workspace,
921 session_id.to_string(),
922 tool_name,
923 success,
924 duration_ms,
925 rendered_content,
926 seq,
927 );
928 let event_json = event.to_json();
929 let timestamp = Utc::now().format("%Y%m%dT%H%M%SZ");
930 let filename = format!(
931 "{}-chat-events-{:020}-{:020}.jsonl",
932 timestamp,
933 seq * 10000,
934 (seq + 1) * 10000
935 );
936 let event_path = base_dir.join(session_id).join(filename);
937
938 tokio::spawn(async move {
939 if let Some(parent) = event_path.parent() {
940 let _ = tokio::fs::create_dir_all(parent).await;
941 }
942 if let Ok(mut file) = tokio::fs::OpenOptions::new()
943 .create(true)
944 .append(true)
945 .open(&event_path)
946 .await
947 {
948 use tokio::io::AsyncWriteExt;
949 let _ = file.write_all(event_json.as_bytes()).await;
950 let _ = file.write_all(b"\n").await;
951 }
952 });
953}
954
955async fn maybe_route_through_rlm(
958 rendered_content: &str,
959 tool_name: &str,
960 tool_input: &serde_json::Value,
961 tool_id: &str,
962 session_id: &str,
963 messages: &[Message],
964 model: &str,
965 provider: Arc<dyn crate::provider::Provider>,
966 rlm_config: &RlmConfig,
967) -> String {
968 let ctx_window = context_window_for_model(model);
969 let current_tokens = estimate_tokens_for_messages(messages);
970 let routing_ctx = RoutingContext {
971 tool_id: tool_name.to_string(),
972 session_id: session_id.to_string(),
973 call_id: Some(tool_id.to_string()),
974 model_context_limit: ctx_window,
975 current_context_tokens: Some(current_tokens),
976 };
977 let routing = RlmRouter::should_route(rendered_content, &routing_ctx, rlm_config);
978 if !routing.should_route {
979 return rendered_content.to_string();
980 }
981
982 tracing::info!(
983 tool = %tool_name,
984 reason = %routing.reason,
985 estimated_tokens = routing.estimated_tokens,
986 "RLM: Routing large tool output"
987 );
988 let auto_ctx = AutoProcessContext {
989 tool_id: tool_name,
990 tool_args: tool_input.clone(),
991 session_id,
992 abort: None,
993 on_progress: None,
994 provider: Arc::clone(&provider),
995 model: model.to_string(),
996 bus: None,
997 trace_id: None,
998 subcall_provider: None,
999 subcall_model: None,
1000 };
1001 let original_bytes = rendered_content.len();
1002 match RlmRouter::auto_process(rendered_content, auto_ctx, &rlm_config).await {
1003 Ok(result) => {
1004 tracing::info!(
1005 input_tokens = result.stats.input_tokens,
1006 output_tokens = result.stats.output_tokens,
1007 iterations = result.stats.iterations,
1008 "RLM: Processing complete"
1009 );
1010 format!(
1011 "[RLM-SUMMARY tool={tool_name} original_bytes={original_bytes} reason={reason}]\n{body}\n[END RLM-SUMMARY — the raw tool output was replaced by this model-generated summary; re-running the same call will produce a similar summary, not the original bytes]",
1012 reason = routing.reason,
1013 body = result.processed,
1014 )
1015 }
1016 Err(e) => {
1017 tracing::warn!(error = %e, "RLM: auto_process failed, using smart_truncate");
1018 let (truncated, _, _) =
1019 RlmRouter::smart_truncate(rendered_content, tool_name, tool_input, ctx_window / 4);
1020 truncated
1021 }
1022 }
1023}