1use std::collections::HashSet;
15use std::sync::Arc;
16
17use anyhow::Result;
18use chrono::Utc;
19use serde_json::json;
20use uuid::Uuid;
21
22use crate::audit::{AuditCategory, AuditOutcome, try_audit_log};
23use crate::cognition::tool_router::{ToolCallRouter, ToolRouterConfig};
24use crate::event_stream::ChatEvent;
25use crate::provider::{
26 CompletionRequest, ContentPart, Message, ProviderRegistry, Role, parse_model_string,
27};
28use crate::rlm::router::AutoProcessContext;
29use crate::rlm::{RlmChunker, RlmConfig, RlmRouter, RoutingContext};
30use crate::tool::ToolRegistry;
31
32use super::super::{DEFAULT_MAX_STEPS, Session, SessionResult};
33use super::bootstrap::{
34 inject_tool_prompt, list_tools_bootstrap_definition, list_tools_bootstrap_output,
35};
36use super::build::{
37 build_request_requires_tool, is_build_agent, should_force_build_tool_first_retry,
38};
39use super::compression::{compress_history_keep_last, enforce_context_window};
40use super::confirmation::{
41 auto_apply_pending_confirmation, pending_confirmation_tool_result_content,
42 tool_result_requires_confirmation,
43};
44use super::defaults::default_model_for_provider;
45use super::edit::{detect_stub_in_tool_input, normalize_tool_call_for_execution};
46use super::error::{is_prompt_too_long_error, is_retryable_upstream_error};
47use super::loop_constants::{
48 BUILD_MODE_TOOL_FIRST_MAX_RETRIES, BUILD_MODE_TOOL_FIRST_NUDGE, CODESEARCH_THRASH_NUDGE,
49 FORCE_FINAL_ANSWER_NUDGE, MAX_CONSECUTIVE_CODESEARCH_NO_MATCHES, MAX_CONSECUTIVE_SAME_TOOL,
50 NATIVE_TOOL_PROMISE_NUDGE, NATIVE_TOOL_PROMISE_RETRY_MAX_RETRIES,
51 POST_EDIT_VALIDATION_MAX_RETRIES,
52};
53use super::markup::normalize_textual_tool_calls;
54use super::provider::{
55 prefers_temperature_one, resolve_provider_for_session_request,
56 should_retry_missing_native_tool_call, temperature_is_deprecated,
57};
58use super::router::{build_proactive_lsp_context_message, choose_router_target};
59use super::runtime::{
60 enrich_tool_input_with_runtime_context, is_codesearch_no_match_output, is_interactive_tool,
61 is_local_cuda_provider, local_cuda_light_system_prompt,
62};
63use super::text::extract_text_content;
64use super::token::{
65 context_window_for_model, estimate_tokens_for_messages, session_completion_max_tokens,
66};
67use super::validation::{build_validation_report, capture_git_dirty_files, track_touched_files};
68
69pub(crate) async fn run_prompt(session: &mut Session, message: &str) -> Result<SessionResult> {
73 let registry = ProviderRegistry::from_vault().await?;
74 session.resolve_subcall_provider(®istry);
75
76 let providers = registry.list();
77 if providers.is_empty() {
78 anyhow::bail!(
79 "No providers available. Configure provider credentials in HashiCorp Vault (for ChatGPT subscription Codex use `codetether auth codex`; for Copilot use `codetether auth copilot`)."
80 );
81 }
82
83 tracing::info!("Available providers: {:?}", providers);
84
85 let (provider_name, model_id) = parse_session_model_selector(session, &providers);
86
87 let selected_provider =
88 resolve_provider_for_session_request(providers.as_slice(), provider_name.as_deref())?;
89
90 let provider = registry
91 .get(selected_provider)
92 .ok_or_else(|| anyhow::anyhow!("Provider {} not found", selected_provider))?;
93
94 session.add_message(Message {
95 role: Role::User,
96 content: vec![ContentPart::Text {
97 text: message.to_string(),
98 }],
99 });
100
101 if session.title.is_none() {
102 session.generate_title().await?;
103 }
104
105 let model = if !model_id.is_empty() {
106 model_id
107 } else {
108 default_model_for_provider(selected_provider)
109 };
110
111 compress_user_message_if_oversized(session, &provider, &model, message).await;
112
113 let tool_registry = ToolRegistry::with_provider_arc(Arc::clone(&provider), model.clone());
114 let tool_definitions: Vec<_> = tool_registry
115 .definitions()
116 .into_iter()
117 .filter(|tool| !is_interactive_tool(&tool.name))
118 .collect();
119
120 let temperature = if temperature_is_deprecated(&model) {
121 None
122 } else if prefers_temperature_one(&model) {
123 Some(1.0)
124 } else {
125 Some(0.7)
126 };
127
128 tracing::info!("Using model: {} via provider: {}", model, selected_provider);
129 tracing::info!("Available tools: {}", tool_definitions.len());
130
131 let model_supports_tools = !matches!(
132 selected_provider,
133 "gemini-web" | "local-cuda" | "local_cuda" | "localcuda"
134 );
135 let advertised_tool_definitions = if model_supports_tools {
136 tool_definitions.clone()
137 } else {
138 vec![list_tools_bootstrap_definition()]
139 };
140
141 let cwd = session
142 .metadata
143 .directory
144 .clone()
145 .unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
146 let system_prompt = if is_local_cuda_provider(selected_provider) {
147 local_cuda_light_system_prompt()
148 } else {
149 crate::agent::builtin::build_system_prompt(&cwd)
150 };
151 let system_prompt = if !model_supports_tools && !advertised_tool_definitions.is_empty() {
152 inject_tool_prompt(&system_prompt, &advertised_tool_definitions)
153 } else {
154 system_prompt
155 };
156
157 let max_steps = session.max_steps.unwrap_or(DEFAULT_MAX_STEPS);
158 let mut final_output = String::new();
159 let baseline_git_dirty_files = capture_git_dirty_files(&cwd).await;
160 let mut touched_files = HashSet::new();
161 let mut validation_retry_count: u8 = 0;
162
163 let mut last_tool_sig: Option<String> = None;
164 let mut consecutive_same_tool: u32 = 0;
165 let mut consecutive_codesearch_no_matches: u32 = 0;
166 let mut build_mode_tool_retry_count: u8 = 0;
167 let mut native_tool_promise_retry_count: u8 = 0;
168 let turn_id = Uuid::new_v4().to_string();
169
170 let tool_router: Option<ToolCallRouter> = {
171 let cfg = ToolRouterConfig::from_env();
172 match ToolCallRouter::from_config(&cfg) {
173 Ok(r) => r,
174 Err(e) => {
175 tracing::warn!(error = %e, "FunctionGemma tool router init failed; disabled");
176 None
177 }
178 }
179 };
180
181 for step in 1..=max_steps {
182 tracing::info!(step = step, "Agent step starting");
183
184 enforce_context_window(
185 session,
186 Arc::clone(&provider),
187 &model,
188 &system_prompt,
189 &advertised_tool_definitions,
190 None,
191 )
192 .await?;
193
194 let proactive_lsp_message = build_proactive_lsp_context_message(
195 selected_provider,
196 step,
197 &tool_registry,
198 &session.messages,
199 &cwd,
200 )
201 .await;
202
203 let mut attempt = 0;
204 let mut upstream_retry_count: u8 = 0;
205 const MAX_UPSTREAM_RETRIES: u8 = 3;
206 let response = loop {
207 attempt += 1;
208
209 let mut messages = vec![Message {
210 role: Role::System,
211 content: vec![ContentPart::Text {
212 text: system_prompt.clone(),
213 }],
214 }];
215 if let Some(msg) = &proactive_lsp_message {
216 messages.push(msg.clone());
217 }
218 messages.extend(session.messages.clone());
219
220 let request = CompletionRequest {
221 messages,
222 tools: advertised_tool_definitions.clone(),
223 model: model.clone(),
224 temperature,
225 top_p: None,
226 max_tokens: Some(session_completion_max_tokens()),
227 stop: Vec::new(),
228 };
229
230 match provider.complete(request).await {
231 Ok(r) => break r,
232 Err(e) => {
233 if attempt == 1 && is_prompt_too_long_error(&e) {
234 tracing::warn!(error = %e, "Provider rejected prompt as too long; forcing extra compression and retrying");
235 let _ = compress_history_keep_last(
236 session,
237 Arc::clone(&provider),
238 &model,
239 6,
240 "prompt_too_long_retry",
241 )
242 .await?;
243 continue;
244 }
245 if upstream_retry_count < MAX_UPSTREAM_RETRIES
246 && is_retryable_upstream_error(&e)
247 {
248 upstream_retry_count += 1;
249 let backoff_secs = 1u64 << (upstream_retry_count - 1).min(2);
250 tracing::warn!(
251 error = %e,
252 retry = upstream_retry_count,
253 max = MAX_UPSTREAM_RETRIES,
254 backoff_secs,
255 "Retryable upstream provider error; sleeping and retrying"
256 );
257 tokio::time::sleep(std::time::Duration::from_secs(backoff_secs)).await;
258 if let Some((retry_provider, retry_model)) =
259 choose_router_target(®istry, selected_provider, &model)
260 {
261 tracing::info!(
262 to_provider = %retry_provider,
263 to_model = %retry_model,
264 "Failing over to alternate provider/model"
265 );
266 session.metadata.model =
267 Some(format!("{retry_provider}/{retry_model}"));
268 attempt = 0;
269 }
270 continue;
271 }
272 return Err(e);
273 }
274 }
275 };
276
277 let response = if let Some(ref router) = tool_router {
278 router
279 .maybe_reformat(response, &tool_definitions, model_supports_tools)
280 .await
281 } else {
282 response
283 };
284 let response = normalize_textual_tool_calls(response, &tool_definitions);
285
286 crate::telemetry::TOKEN_USAGE.record_model_usage(
287 &model,
288 response.usage.prompt_tokens as u64,
289 response.usage.completion_tokens as u64,
290 );
291
292 let mut truncated_tool_ids: Vec<(String, String)> = Vec::new();
293 let tool_calls: Vec<(String, String, serde_json::Value)> = response
294 .message
295 .content
296 .iter()
297 .filter_map(|part| {
298 if let ContentPart::ToolCall {
299 id,
300 name,
301 arguments,
302 ..
303 } = part
304 {
305 match serde_json::from_str::<serde_json::Value>(arguments) {
306 Ok(args) => Some((id.clone(), name.clone(), args)),
307 Err(e) => {
308 tracing::warn!(
309 tool = %name,
310 tool_call_id = %id,
311 args_len = arguments.len(),
312 error = %e,
313 "Tool call arguments failed to parse (likely truncated by max_tokens)"
314 );
315 truncated_tool_ids.push((id.clone(), name.clone()));
316 None
317 }
318 }
319 } else {
320 None
321 }
322 })
323 .collect();
324
325 let assistant_text = extract_text_content(&&response.message.content);
326 if should_force_build_tool_first_retry(
327 &session.agent,
328 build_mode_tool_retry_count,
329 &tool_definitions,
330 &session.messages,
331 &cwd,
332 &assistant_text,
333 !tool_calls.is_empty(),
334 BUILD_MODE_TOOL_FIRST_MAX_RETRIES,
335 ) {
336 build_mode_tool_retry_count += 1;
337 tracing::warn!(
338 step = step,
339 agent = %session.agent,
340 retry = build_mode_tool_retry_count,
341 "Build mode tool-first guard triggered; retrying with execution nudge"
342 );
343 session.add_message(Message {
344 role: Role::User,
345 content: vec![ContentPart::Text {
346 text: BUILD_MODE_TOOL_FIRST_NUDGE.to_string(),
347 }],
348 });
349 continue;
350 }
351 if should_retry_missing_native_tool_call(
352 selected_provider,
353 &model,
354 native_tool_promise_retry_count,
355 &tool_definitions,
356 &assistant_text,
357 !tool_calls.is_empty(),
358 NATIVE_TOOL_PROMISE_RETRY_MAX_RETRIES,
359 ) {
360 native_tool_promise_retry_count += 1;
361 tracing::warn!(
362 step = step,
363 provider = selected_provider,
364 model = %model,
365 retry = native_tool_promise_retry_count,
366 "Model described a tool step without emitting a tool call; retrying with corrective nudge"
367 );
368 session.add_message(response.message.clone());
369 session.add_message(Message {
370 role: Role::User,
371 content: vec![ContentPart::Text {
372 text: NATIVE_TOOL_PROMISE_NUDGE.to_string(),
373 }],
374 });
375 continue;
376 }
377 if !tool_calls.is_empty() {
378 build_mode_tool_retry_count = 0;
379 native_tool_promise_retry_count = 0;
380 } else if is_build_agent(&session.agent)
381 && build_request_requires_tool(&session.messages, &cwd)
382 && build_mode_tool_retry_count >= BUILD_MODE_TOOL_FIRST_MAX_RETRIES
383 {
384 return Err(anyhow::anyhow!(
385 "Build mode could not obtain tool calls for an explicit file-change request after {} retries. \
386 Switch to a tool-capable model and try again.",
387 BUILD_MODE_TOOL_FIRST_MAX_RETRIES
388 ));
389 }
390
391 let mut step_text = String::new();
392
393 for part in &response.message.content {
394 match part {
395 ContentPart::Text { text } if !text.is_empty() => {
396 step_text.push_str(text);
397 step_text.push('\n');
398 }
399 ContentPart::Thinking { text } if !text.is_empty() => {
400 if let Some(ref bus) = session.bus {
401 let handle = bus.handle(&session.agent);
402 handle.send_with_correlation(
403 format!("agent.{}.thinking", session.agent),
404 crate::bus::BusMessage::AgentThinking {
405 agent_id: session.agent.clone(),
406 thinking: text.clone(),
407 step,
408 },
409 Some(turn_id.clone()),
410 );
411 }
412 }
413 _ => {}
414 }
415 }
416
417 if !step_text.trim().is_empty() {
418 final_output.push_str(&step_text);
419 }
420
421 if tool_calls.is_empty() && truncated_tool_ids.is_empty() {
422 session.add_message(response.message.clone());
423 if is_build_agent(&session.agent) {
424 if let Some(report) =
425 build_validation_report(&cwd, &touched_files, &baseline_git_dirty_files).await?
426 {
427 validation_retry_count += 1;
428 tracing::warn!(
429 retries = validation_retry_count,
430 issues = report.issue_count,
431 "Post-edit validation found unresolved diagnostics"
432 );
433 if validation_retry_count >= POST_EDIT_VALIDATION_MAX_RETRIES {
434 return Err(anyhow::anyhow!(
435 "Post-edit validation failed after {} attempts.\n\n{}",
436 POST_EDIT_VALIDATION_MAX_RETRIES,
437 report.prompt
438 ));
439 }
440 session.add_message(Message {
441 role: Role::User,
442 content: vec![ContentPart::Text {
443 text: report.prompt,
444 }],
445 });
446 final_output.clear();
447 continue;
448 }
449 }
450 break;
451 }
452
453 if !truncated_tool_ids.is_empty() {
454 if tool_calls.is_empty() {
455 session.add_message(response.message.clone());
456 }
457 for (tool_id, tool_name) in &truncated_tool_ids {
458 let error_content = format!(
459 "Error: Your tool call to `{tool_name}` was truncated — the arguments \
460 JSON was cut off mid-string (likely hit the max_tokens limit). \
461 Please retry with a shorter approach: use the `write` tool to write \
462 content in smaller pieces, or reduce the size of your arguments."
463 );
464 session.add_message(Message {
465 role: Role::Tool,
466 content: vec![ContentPart::ToolResult {
467 tool_call_id: tool_id.clone(),
468 content: error_content,
469 }],
470 });
471 }
472 if tool_calls.is_empty() {
473 continue;
474 }
475 }
476
477 {
478 let mut sigs: Vec<String> = tool_calls
479 .iter()
480 .map(|(_, name, args)| format!("{name}:{args}"))
481 .collect();
482 sigs.sort();
483 let sig = sigs.join("|");
484
485 if last_tool_sig.as_deref() == Some(&sig) {
486 consecutive_same_tool += 1;
487 } else {
488 consecutive_same_tool = 1;
489 last_tool_sig = Some(sig);
490 }
491
492 let force_answer = consecutive_same_tool > MAX_CONSECUTIVE_SAME_TOOL
493 || (!model_supports_tools && step >= 3);
494
495 if force_answer {
496 tracing::warn!(
497 step = step,
498 consecutive = consecutive_same_tool,
499 "Breaking agent loop: forcing final answer",
500 );
501 let mut nudge_msg = response.message.clone();
502 nudge_msg
503 .content
504 .retain(|p| !matches!(p, ContentPart::ToolCall { .. }));
505 if !nudge_msg.content.is_empty() {
506 session.add_message(nudge_msg);
507 }
508 session.add_message(Message {
509 role: Role::User,
510 content: vec![ContentPart::Text {
511 text: FORCE_FINAL_ANSWER_NUDGE.to_string(),
512 }],
513 });
514 continue;
515 }
516 }
517
518 session.add_message(response.message.clone());
519
520 tracing::info!(
521 step = step,
522 num_tools = tool_calls.len(),
523 "Executing tool calls"
524 );
525
526 let mut codesearch_thrash_guard_triggered = false;
527 for (tool_id, tool_name, tool_input) in tool_calls {
528 let (tool_name, tool_input) =
529 normalize_tool_call_for_execution(&tool_name, &tool_input);
530 tracing::info!(tool = %tool_name, tool_id = %tool_id, "Executing tool");
531
532 if tool_name == "list_tools" {
533 let content = list_tools_bootstrap_output(&tool_definitions, &tool_input);
534 session.add_message(Message {
535 role: Role::Tool,
536 content: vec![ContentPart::ToolResult {
537 tool_call_id: tool_id,
538 content,
539 }],
540 });
541 continue;
542 }
543
544 if let Some(ref bus) = session.bus {
545 let handle = bus.handle(&session.agent);
546 handle.send_with_correlation(
547 format!("agent.{}.tool.request", session.agent),
548 crate::bus::BusMessage::ToolRequest {
549 request_id: tool_id.clone(),
550 agent_id: session.agent.clone(),
551 tool_name: tool_name.clone(),
552 arguments: tool_input.clone(),
553 step,
554 },
555 Some(turn_id.clone()),
556 );
557 }
558
559 if is_interactive_tool(&tool_name) {
560 tracing::warn!(tool = %tool_name, "Blocking interactive tool in session loop");
561 session.add_message(Message {
562 role: Role::Tool,
563 content: vec![ContentPart::ToolResult {
564 tool_call_id: tool_id,
565 content: "Error: Interactive tool 'question' is disabled in this interface. Ask the user directly in assistant text.".to_string(),
566 }],
567 });
568 continue;
569 }
570
571 if let Some(reason) = detect_stub_in_tool_input(&tool_name, &tool_input) {
572 tracing::warn!(tool = %tool_name, reason = %reason, "Blocking suspected stubbed edit");
573 session.add_message(Message {
574 role: Role::Tool,
575 content: vec![ContentPart::ToolResult {
576 tool_call_id: tool_id,
577 content: format!(
578 "Error: Refactor guard rejected this edit: {reason}. \
579 Provide concrete, behavior-preserving implementation (no placeholders/stubs)."
580 ),
581 }],
582 });
583 continue;
584 }
585
586 let exec_start = std::time::Instant::now();
587 let exec_input = enrich_tool_input_with_runtime_context(
588 &tool_input,
589 &cwd,
590 session.metadata.model.as_deref(),
591 &session.id,
592 &session.agent,
593 session.metadata.provenance.as_ref(),
594 );
595 let (content, success, tool_metadata) = execute_tool(
596 &tool_registry,
597 &tool_name,
598 &exec_input,
599 &session.id,
600 exec_start,
601 )
602 .await;
603
604 let requires_confirmation = tool_result_requires_confirmation(tool_metadata.as_ref());
605 let (content, success, tool_metadata, requires_confirmation) = if requires_confirmation
606 && session.metadata.auto_apply_edits
607 {
608 let preview_content = content.clone();
609 match auto_apply_pending_confirmation(
610 &tool_name,
611 &exec_input,
612 tool_metadata.as_ref(),
613 )
614 .await
615 {
616 Ok(Some((content, success, tool_metadata))) => {
617 tracing::info!(
618 tool = %tool_name,
619 "Auto-applied pending confirmation in TUI session"
620 );
621 (content, success, tool_metadata, false)
622 }
623 Ok(None) => (content, success, tool_metadata, true),
624 Err(error) => (
625 format!(
626 "{}\n\nTUI edit auto-apply failed: {}",
627 pending_confirmation_tool_result_content(&tool_name, &preview_content,),
628 error
629 ),
630 false,
631 tool_metadata,
632 true,
633 ),
634 }
635 } else {
636 (content, success, tool_metadata, requires_confirmation)
637 };
638 let rendered_content = if requires_confirmation {
639 pending_confirmation_tool_result_content(&tool_name, &content)
640 } else {
641 content.clone()
642 };
643
644 if !requires_confirmation {
645 track_touched_files(
646 &mut touched_files,
647 &cwd,
648 &tool_name,
649 &tool_input,
650 tool_metadata.as_ref(),
651 );
652 }
653
654 let duration_ms = exec_start.elapsed().as_millis() as u64;
655 let codesearch_no_match =
656 is_codesearch_no_match_output(&tool_name, success, &rendered_content);
657
658 if let Some(ref bus) = session.bus {
659 let handle = bus.handle(&session.agent);
660 handle.send_with_correlation(
661 format!("agent.{}.tool.response", session.agent),
662 crate::bus::BusMessage::ToolResponse {
663 request_id: tool_id.clone(),
664 agent_id: session.agent.clone(),
665 tool_name: tool_name.clone(),
666 result: rendered_content.clone(),
667 success,
668 step,
669 },
670 Some(turn_id.clone()),
671 );
672 handle.send_with_correlation(
673 format!("agent.{}.tool.output", session.agent),
674 crate::bus::BusMessage::ToolOutputFull {
675 agent_id: session.agent.clone(),
676 tool_name: tool_name.clone(),
677 output: rendered_content.clone(),
678 success,
679 step,
680 },
681 Some(turn_id.clone()),
682 );
683 }
684
685 if let Some(base_dir) = super::archive::event_stream_path() {
686 write_tool_event_file(
687 &base_dir,
688 &session.id,
689 &tool_name,
690 success,
691 duration_ms,
692 &rendered_content,
693 session.messages.len() as u64,
694 );
695 }
696
697 let content = maybe_route_through_rlm(
698 &rendered_content,
699 &tool_name,
700 &tool_input,
701 &tool_id,
702 &session.id,
703 &session.messages,
704 &model,
705 Arc::clone(&provider),
706 &session.metadata.rlm,
707 )
708 .await;
709
710 session.add_message(Message {
711 role: Role::Tool,
712 content: vec![ContentPart::ToolResult {
713 tool_call_id: tool_id,
714 content,
715 }],
716 });
717
718 if is_build_agent(&session.agent) {
719 if codesearch_no_match {
720 consecutive_codesearch_no_matches += 1;
721 } else {
722 consecutive_codesearch_no_matches = 0;
723 }
724
725 if consecutive_codesearch_no_matches >= MAX_CONSECUTIVE_CODESEARCH_NO_MATCHES {
726 tracing::warn!(
727 step = step,
728 consecutive_codesearch_no_matches = consecutive_codesearch_no_matches,
729 "Detected codesearch no-match thrash; nudging model to stop variant retries",
730 );
731 session.add_message(Message {
732 role: Role::User,
733 content: vec![ContentPart::Text {
734 text: CODESEARCH_THRASH_NUDGE.to_string(),
735 }],
736 });
737 codesearch_thrash_guard_triggered = true;
738 break;
739 }
740 }
741 }
742
743 if codesearch_thrash_guard_triggered {
744 continue;
745 }
746 }
747
748 session.save().await?;
749
750 super::archive::archive_event_stream_to_s3(&session.id, super::archive::event_stream_path())
751 .await;
752
753 Ok(SessionResult {
754 text: final_output.trim().to_string(),
755 session_id: session.id.clone(),
756 })
757}
758
759fn parse_session_model_selector(session: &Session, providers: &[&str]) -> (Option<String>, String) {
761 let Some(ref model_str) = session.metadata.model else {
762 return (None, String::new());
763 };
764 let (prov, model) = parse_model_string(model_str);
765 let prov = prov.map(|p| match p {
766 "zhipuai" | "z-ai" => "zai",
767 other => other,
768 });
769 if prov.is_some() {
770 (prov.map(|s| s.to_string()), model.to_string())
771 } else if providers.contains(&model) {
772 (Some(model.to_string()), String::new())
773 } else {
774 (None, model.to_string())
775 }
776}
777
778async fn compress_user_message_if_oversized(
781 session: &mut Session,
782 provider: &Arc<dyn crate::provider::Provider>,
783 model: &str,
784 message: &str,
785) {
786 let ctx_window = context_window_for_model(model);
787 let msg_tokens = RlmChunker::estimate_tokens(message);
788 let threshold = (ctx_window as f64 * 0.35) as usize;
789 if msg_tokens <= threshold {
790 return;
791 }
792
793 tracing::info!(
794 msg_tokens,
795 threshold,
796 ctx_window,
797 "RLM: User message exceeds context threshold, compressing"
798 );
799
800 let auto_ctx = AutoProcessContext {
801 tool_id: "session_context",
802 tool_args: serde_json::json!({}),
803 session_id: &session.id,
804 abort: None,
805 on_progress: None,
806 provider: Arc::clone(provider),
807 model: model.to_string(),
808 bus: None,
809 trace_id: None,
810 subcall_provider: session.metadata.subcall_provider.clone(),
811 subcall_model: session.metadata.subcall_model_name.clone(),
812 };
813 let rlm_config = session.metadata.rlm.clone();
814 match RlmRouter::auto_process(message, auto_ctx, &rlm_config).await {
815 Ok(result) => {
816 tracing::info!(
817 input_tokens = result.stats.input_tokens,
818 output_tokens = result.stats.output_tokens,
819 "RLM: User message compressed"
820 );
821 if let Some(last) = session.messages.last_mut() {
822 last.content = vec![ContentPart::Text {
823 text: format!(
824 "[Original message: {} tokens, compressed via RLM]\n\n{}\n\n---\nOriginal request prefix:\n{}",
825 msg_tokens,
826 result.processed,
827 message.chars().take(500).collect::<String>()
828 ),
829 }];
830 }
831 }
832 Err(e) => {
833 tracing::warn!(error = %e, "RLM: Failed to compress user message, using truncation");
834 let max_chars = threshold * 4;
835 let truncated = RlmChunker::compress(message, max_chars / 4, None);
836 if let Some(last) = session.messages.last_mut() {
837 last.content = vec![ContentPart::Text { text: truncated }];
838 }
839 }
840 }
841}
842
843async fn execute_tool(
845 tool_registry: &ToolRegistry,
846 tool_name: &str,
847 exec_input: &serde_json::Value,
848 session_id: &str,
849 exec_start: std::time::Instant,
850) -> (
851 String,
852 bool,
853 Option<std::collections::HashMap<String, serde_json::Value>>,
854) {
855 if let Some(tool) = tool_registry.get(tool_name) {
856 match tool.execute(exec_input.clone()).await {
857 Ok(result) => {
858 let duration_ms = exec_start.elapsed().as_millis() as u64;
859 tracing::info!(tool = %tool_name, success = result.success, "Tool execution completed");
860 if let Some(audit) = try_audit_log() {
861 audit
862 .log_with_correlation(
863 AuditCategory::ToolExecution,
864 format!("tool:{}", tool_name),
865 if result.success {
866 AuditOutcome::Success
867 } else {
868 AuditOutcome::Failure
869 },
870 None,
871 Some(json!({
872 "duration_ms": duration_ms,
873 "output_len": result.output.len()
874 })),
875 None,
876 None,
877 None,
878 Some(session_id.to_string()),
879 )
880 .await;
881 }
882 (result.output, result.success, Some(result.metadata))
883 }
884 Err(e) => {
885 let duration_ms = exec_start.elapsed().as_millis() as u64;
886 tracing::warn!(tool = %tool_name, error = %e, "Tool execution failed");
887 if let Some(audit) = try_audit_log() {
888 audit
889 .log_with_correlation(
890 AuditCategory::ToolExecution,
891 format!("tool:{}", tool_name),
892 AuditOutcome::Failure,
893 None,
894 Some(json!({ "duration_ms": duration_ms, "error": e.to_string() })),
895 None,
896 None,
897 None,
898 Some(session_id.to_string()),
899 )
900 .await;
901 }
902 (format!("Error: {}", e), false, None)
903 }
904 }
905 } else {
906 tracing::warn!(tool = %tool_name, "Tool not found");
907 if let Some(audit) = try_audit_log() {
908 audit
909 .log_with_correlation(
910 AuditCategory::ToolExecution,
911 format!("tool:{}", tool_name),
912 AuditOutcome::Failure,
913 None,
914 Some(json!({ "error": "unknown_tool" })),
915 None,
916 None,
917 None,
918 Some(session_id.to_string()),
919 )
920 .await;
921 }
922 (format!("Error: Unknown tool '{}'", tool_name), false, None)
923 }
924}
925
926fn write_tool_event_file(
928 base_dir: &std::path::Path,
929 session_id: &str,
930 tool_name: &str,
931 success: bool,
932 duration_ms: u64,
933 rendered_content: &str,
934 seq: u64,
935) {
936 let workspace = std::env::var("PWD")
937 .map(std::path::PathBuf::from)
938 .unwrap_or_else(|_| std::env::current_dir().unwrap_or_default());
939 let event = ChatEvent::tool_result(
940 workspace,
941 session_id.to_string(),
942 tool_name,
943 success,
944 duration_ms,
945 rendered_content,
946 seq,
947 );
948 let event_json = event.to_json();
949 let timestamp = Utc::now().format("%Y%m%dT%H%M%SZ");
950 let filename = format!(
951 "{}-chat-events-{:020}-{:020}.jsonl",
952 timestamp,
953 seq * 10000,
954 (seq + 1) * 10000
955 );
956 let event_path = base_dir.join(session_id).join(filename);
957
958 tokio::spawn(async move {
959 if let Some(parent) = event_path.parent() {
960 let _ = tokio::fs::create_dir_all(parent).await;
961 }
962 if let Ok(mut file) = tokio::fs::OpenOptions::new()
963 .create(true)
964 .append(true)
965 .open(&event_path)
966 .await
967 {
968 use tokio::io::AsyncWriteExt;
969 let _ = file.write_all(event_json.as_bytes()).await;
970 let _ = file.write_all(b"\n").await;
971 }
972 });
973}
974
975async fn maybe_route_through_rlm(
978 rendered_content: &str,
979 tool_name: &str,
980 tool_input: &serde_json::Value,
981 tool_id: &str,
982 session_id: &str,
983 messages: &[Message],
984 model: &str,
985 provider: Arc<dyn crate::provider::Provider>,
986 rlm_config: &RlmConfig,
987) -> String {
988 let ctx_window = context_window_for_model(model);
989 let current_tokens = estimate_tokens_for_messages(messages);
990 let routing_ctx = RoutingContext {
991 tool_id: tool_name.to_string(),
992 session_id: session_id.to_string(),
993 call_id: Some(tool_id.to_string()),
994 model_context_limit: ctx_window,
995 current_context_tokens: Some(current_tokens),
996 };
997 let routing = RlmRouter::should_route(rendered_content, &routing_ctx, rlm_config);
998 if !routing.should_route {
999 return rendered_content.to_string();
1000 }
1001
1002 tracing::info!(
1003 tool = %tool_name,
1004 reason = %routing.reason,
1005 estimated_tokens = routing.estimated_tokens,
1006 "RLM: Routing large tool output"
1007 );
1008 let auto_ctx = AutoProcessContext {
1009 tool_id: tool_name,
1010 tool_args: tool_input.clone(),
1011 session_id,
1012 abort: None,
1013 on_progress: None,
1014 provider: Arc::clone(&provider),
1015 model: model.to_string(),
1016 bus: None,
1017 trace_id: None,
1018 subcall_provider: None,
1019 subcall_model: None,
1020 };
1021 match RlmRouter::auto_process(rendered_content, auto_ctx, &rlm_config).await {
1022 Ok(result) => {
1023 tracing::info!(
1024 input_tokens = result.stats.input_tokens,
1025 output_tokens = result.stats.output_tokens,
1026 iterations = result.stats.iterations,
1027 "RLM: Processing complete"
1028 );
1029 result.processed
1030 }
1031 Err(e) => {
1032 tracing::warn!(error = %e, "RLM: auto_process failed, using smart_truncate");
1033 let (truncated, _, _) =
1034 RlmRouter::smart_truncate(rendered_content, tool_name, tool_input, ctx_window / 4);
1035 truncated
1036 }
1037 }
1038}