1use std::collections::HashSet;
15use std::sync::Arc;
16
17use anyhow::Result;
18use chrono::Utc;
19use serde_json::json;
20use uuid::Uuid;
21
22use crate::audit::{AuditCategory, AuditOutcome, try_audit_log};
23use crate::cognition::tool_router::{ToolCallRouter, ToolRouterConfig};
24use crate::event_stream::ChatEvent;
25use crate::provider::{
26 CompletionRequest, ContentPart, Message, ProviderRegistry, Role, parse_model_string,
27};
28use crate::rlm::router::AutoProcessContext;
29use crate::rlm::{RlmChunker, RlmConfig, RlmRouter, RoutingContext};
30use crate::tool::ToolRegistry;
31
32use super::super::{DEFAULT_MAX_STEPS, Session, SessionResult};
33use super::bootstrap::{
34 inject_tool_prompt, list_tools_bootstrap_definition, list_tools_bootstrap_output,
35};
36use super::build::{
37 build_request_requires_tool, is_build_agent, should_force_build_tool_first_retry,
38};
39use super::compression::{compress_history_keep_last, enforce_context_window};
40use super::confirmation::{
41 auto_apply_pending_confirmation, pending_confirmation_tool_result_content,
42 tool_result_requires_confirmation,
43};
44use super::defaults::default_model_for_provider;
45use super::edit::{detect_stub_in_tool_input, normalize_tool_call_for_execution};
46use super::error::{is_prompt_too_long_error, is_retryable_upstream_error};
47use super::loop_constants::{
48 BUILD_MODE_TOOL_FIRST_MAX_RETRIES, BUILD_MODE_TOOL_FIRST_NUDGE, CODESEARCH_THRASH_NUDGE,
49 FORCE_FINAL_ANSWER_NUDGE, MAX_CONSECUTIVE_CODESEARCH_NO_MATCHES, MAX_CONSECUTIVE_SAME_TOOL,
50 NATIVE_TOOL_PROMISE_NUDGE, NATIVE_TOOL_PROMISE_RETRY_MAX_RETRIES,
51 POST_EDIT_VALIDATION_MAX_RETRIES,
52};
53use super::markup::normalize_textual_tool_calls;
54use super::provider::{
55 prefers_temperature_one, resolve_provider_for_session_request,
56 should_retry_missing_native_tool_call, temperature_is_deprecated,
57};
58use super::router::{build_proactive_lsp_context_message, choose_router_target};
59use super::runtime::{
60 enrich_tool_input_with_runtime_context, is_codesearch_no_match_output, is_interactive_tool,
61 is_local_cuda_provider, local_cuda_light_system_prompt,
62};
63use super::text::extract_text_content;
64use super::token::{
65 context_window_for_model, estimate_tokens_for_messages, session_completion_max_tokens,
66};
67use super::validation::{build_validation_report, capture_git_dirty_files, track_touched_files};
68
69pub(crate) async fn run_prompt(session: &mut Session, message: &str) -> Result<SessionResult> {
73 let registry = ProviderRegistry::from_vault().await?;
74 session.resolve_subcall_provider(®istry);
75
76 let providers = registry.list();
77 if providers.is_empty() {
78 anyhow::bail!(
79 "No providers available. Configure provider credentials in HashiCorp Vault (for ChatGPT subscription Codex use `codetether auth codex`; for Copilot use `codetether auth copilot`)."
80 );
81 }
82
83 tracing::info!("Available providers: {:?}", providers);
84
85 let (provider_name, model_id) = parse_session_model_selector(session, &providers);
86
87 let selected_provider =
88 resolve_provider_for_session_request(providers.as_slice(), provider_name.as_deref())?;
89
90 let provider = registry
91 .get(selected_provider)
92 .ok_or_else(|| anyhow::anyhow!("Provider {} not found", selected_provider))?;
93
94 session.add_message(Message {
95 role: Role::User,
96 content: vec![ContentPart::Text {
97 text: message.to_string(),
98 }],
99 });
100
101 if session.title.is_none() {
102 session.generate_title().await?;
103 }
104
105 let model = if !model_id.is_empty() {
106 model_id
107 } else {
108 default_model_for_provider(selected_provider)
109 };
110
111 compress_user_message_if_oversized(session, &provider, &model, message).await;
112
113 let tool_registry = ToolRegistry::with_provider_arc(Arc::clone(&provider), model.clone());
114 let tool_definitions: Vec<_> = tool_registry
115 .definitions()
116 .into_iter()
117 .filter(|tool| !is_interactive_tool(&tool.name))
118 .collect();
119
120 let temperature = if temperature_is_deprecated(&model) {
121 None
122 } else if prefers_temperature_one(&model) {
123 Some(1.0)
124 } else {
125 Some(0.7)
126 };
127
128 tracing::info!("Using model: {} via provider: {}", model, selected_provider);
129 tracing::info!("Available tools: {}", tool_definitions.len());
130
131 let model_supports_tools = !matches!(
132 selected_provider,
133 "gemini-web" | "local-cuda" | "local_cuda" | "localcuda"
134 );
135 let advertised_tool_definitions = if model_supports_tools {
136 tool_definitions.clone()
137 } else {
138 vec![list_tools_bootstrap_definition()]
139 };
140
141 let cwd = session
142 .metadata
143 .directory
144 .clone()
145 .unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
146 let system_prompt = if is_local_cuda_provider(selected_provider) {
147 local_cuda_light_system_prompt()
148 } else {
149 crate::agent::builtin::build_system_prompt(&cwd)
150 };
151 let system_prompt = if !model_supports_tools && !advertised_tool_definitions.is_empty() {
152 inject_tool_prompt(&system_prompt, &advertised_tool_definitions)
153 } else {
154 system_prompt
155 };
156
157 let max_steps = session.max_steps.unwrap_or(DEFAULT_MAX_STEPS);
158 let mut final_output = String::new();
159 let baseline_git_dirty_files = capture_git_dirty_files(&cwd).await;
160 let mut touched_files = HashSet::new();
161 let mut validation_retry_count: u8 = 0;
162
163 let mut last_tool_sig: Option<String> = None;
164 let mut consecutive_same_tool: u32 = 0;
165 let mut consecutive_codesearch_no_matches: u32 = 0;
166 let mut build_mode_tool_retry_count: u8 = 0;
167 let mut native_tool_promise_retry_count: u8 = 0;
168 let turn_id = Uuid::new_v4().to_string();
169
170 let tool_router: Option<ToolCallRouter> = {
171 let cfg = ToolRouterConfig::from_env();
172 match ToolCallRouter::from_config(&cfg) {
173 Ok(r) => r,
174 Err(e) => {
175 tracing::warn!(error = %e, "FunctionGemma tool router init failed; disabled");
176 None
177 }
178 }
179 };
180
181 for step in 1..=max_steps {
182 tracing::info!(step = step, "Agent step starting");
183
184 enforce_context_window(
185 session,
186 Arc::clone(&provider),
187 &model,
188 &system_prompt,
189 &advertised_tool_definitions,
190 )
191 .await?;
192
193 let proactive_lsp_message = build_proactive_lsp_context_message(
194 selected_provider,
195 step,
196 &tool_registry,
197 &session.messages,
198 &cwd,
199 )
200 .await;
201
202 let mut attempt = 0;
203 let mut upstream_retry_count: u8 = 0;
204 const MAX_UPSTREAM_RETRIES: u8 = 3;
205 let response = loop {
206 attempt += 1;
207
208 let mut messages = vec![Message {
209 role: Role::System,
210 content: vec![ContentPart::Text {
211 text: system_prompt.clone(),
212 }],
213 }];
214 if let Some(msg) = &proactive_lsp_message {
215 messages.push(msg.clone());
216 }
217 messages.extend(session.messages.clone());
218
219 let request = CompletionRequest {
220 messages,
221 tools: advertised_tool_definitions.clone(),
222 model: model.clone(),
223 temperature,
224 top_p: None,
225 max_tokens: Some(session_completion_max_tokens()),
226 stop: Vec::new(),
227 };
228
229 match provider.complete(request).await {
230 Ok(r) => break r,
231 Err(e) => {
232 if attempt == 1 && is_prompt_too_long_error(&e) {
233 tracing::warn!(error = %e, "Provider rejected prompt as too long; forcing extra compression and retrying");
234 let _ = compress_history_keep_last(
235 session,
236 Arc::clone(&provider),
237 &model,
238 6,
239 "prompt_too_long_retry",
240 )
241 .await?;
242 continue;
243 }
244 if upstream_retry_count < MAX_UPSTREAM_RETRIES
245 && is_retryable_upstream_error(&e)
246 {
247 upstream_retry_count += 1;
248 let backoff_secs = 1u64 << (upstream_retry_count - 1).min(2);
249 tracing::warn!(
250 error = %e,
251 retry = upstream_retry_count,
252 max = MAX_UPSTREAM_RETRIES,
253 backoff_secs,
254 "Retryable upstream provider error; sleeping and retrying"
255 );
256 tokio::time::sleep(std::time::Duration::from_secs(backoff_secs)).await;
257 if let Some((retry_provider, retry_model)) =
258 choose_router_target(®istry, selected_provider, &model)
259 {
260 tracing::info!(
261 to_provider = %retry_provider,
262 to_model = %retry_model,
263 "Failing over to alternate provider/model"
264 );
265 session.metadata.model =
266 Some(format!("{retry_provider}/{retry_model}"));
267 attempt = 0;
268 }
269 continue;
270 }
271 return Err(e);
272 }
273 }
274 };
275
276 let response = if let Some(ref router) = tool_router {
277 router
278 .maybe_reformat(response, &tool_definitions, model_supports_tools)
279 .await
280 } else {
281 response
282 };
283 let response = normalize_textual_tool_calls(response, &tool_definitions);
284
285 crate::telemetry::TOKEN_USAGE.record_model_usage(
286 &model,
287 response.usage.prompt_tokens as u64,
288 response.usage.completion_tokens as u64,
289 );
290
291 let mut truncated_tool_ids: Vec<(String, String)> = Vec::new();
292 let tool_calls: Vec<(String, String, serde_json::Value)> = response
293 .message
294 .content
295 .iter()
296 .filter_map(|part| {
297 if let ContentPart::ToolCall {
298 id,
299 name,
300 arguments,
301 ..
302 } = part
303 {
304 match serde_json::from_str::<serde_json::Value>(arguments) {
305 Ok(args) => Some((id.clone(), name.clone(), args)),
306 Err(e) => {
307 tracing::warn!(
308 tool = %name,
309 tool_call_id = %id,
310 args_len = arguments.len(),
311 error = %e,
312 "Tool call arguments failed to parse (likely truncated by max_tokens)"
313 );
314 truncated_tool_ids.push((id.clone(), name.clone()));
315 None
316 }
317 }
318 } else {
319 None
320 }
321 })
322 .collect();
323
324 let assistant_text = extract_text_content(&&response.message.content);
325 if should_force_build_tool_first_retry(
326 &session.agent,
327 build_mode_tool_retry_count,
328 &tool_definitions,
329 &session.messages,
330 &cwd,
331 &assistant_text,
332 !tool_calls.is_empty(),
333 BUILD_MODE_TOOL_FIRST_MAX_RETRIES,
334 ) {
335 build_mode_tool_retry_count += 1;
336 tracing::warn!(
337 step = step,
338 agent = %session.agent,
339 retry = build_mode_tool_retry_count,
340 "Build mode tool-first guard triggered; retrying with execution nudge"
341 );
342 session.add_message(Message {
343 role: Role::User,
344 content: vec![ContentPart::Text {
345 text: BUILD_MODE_TOOL_FIRST_NUDGE.to_string(),
346 }],
347 });
348 continue;
349 }
350 if should_retry_missing_native_tool_call(
351 selected_provider,
352 &model,
353 native_tool_promise_retry_count,
354 &tool_definitions,
355 &assistant_text,
356 !tool_calls.is_empty(),
357 NATIVE_TOOL_PROMISE_RETRY_MAX_RETRIES,
358 ) {
359 native_tool_promise_retry_count += 1;
360 tracing::warn!(
361 step = step,
362 provider = selected_provider,
363 model = %model,
364 retry = native_tool_promise_retry_count,
365 "Model described a tool step without emitting a tool call; retrying with corrective nudge"
366 );
367 session.add_message(response.message.clone());
368 session.add_message(Message {
369 role: Role::User,
370 content: vec![ContentPart::Text {
371 text: NATIVE_TOOL_PROMISE_NUDGE.to_string(),
372 }],
373 });
374 continue;
375 }
376 if !tool_calls.is_empty() {
377 build_mode_tool_retry_count = 0;
378 native_tool_promise_retry_count = 0;
379 } else if is_build_agent(&session.agent)
380 && build_request_requires_tool(&session.messages, &cwd)
381 && build_mode_tool_retry_count >= BUILD_MODE_TOOL_FIRST_MAX_RETRIES
382 {
383 return Err(anyhow::anyhow!(
384 "Build mode could not obtain tool calls for an explicit file-change request after {} retries. \
385 Switch to a tool-capable model and try again.",
386 BUILD_MODE_TOOL_FIRST_MAX_RETRIES
387 ));
388 }
389
390 let mut step_text = String::new();
391
392 for part in &response.message.content {
393 match part {
394 ContentPart::Text { text } if !text.is_empty() => {
395 step_text.push_str(text);
396 step_text.push('\n');
397 }
398 ContentPart::Thinking { text } if !text.is_empty() => {
399 if let Some(ref bus) = session.bus {
400 let handle = bus.handle(&session.agent);
401 handle.send_with_correlation(
402 format!("agent.{}.thinking", session.agent),
403 crate::bus::BusMessage::AgentThinking {
404 agent_id: session.agent.clone(),
405 thinking: text.clone(),
406 step,
407 },
408 Some(turn_id.clone()),
409 );
410 }
411 }
412 _ => {}
413 }
414 }
415
416 if !step_text.trim().is_empty() {
417 final_output.push_str(&step_text);
418 }
419
420 if tool_calls.is_empty() && truncated_tool_ids.is_empty() {
421 session.add_message(response.message.clone());
422 if is_build_agent(&session.agent) {
423 if let Some(report) =
424 build_validation_report(&cwd, &touched_files, &baseline_git_dirty_files).await?
425 {
426 validation_retry_count += 1;
427 tracing::warn!(
428 retries = validation_retry_count,
429 issues = report.issue_count,
430 "Post-edit validation found unresolved diagnostics"
431 );
432 if validation_retry_count >= POST_EDIT_VALIDATION_MAX_RETRIES {
433 return Err(anyhow::anyhow!(
434 "Post-edit validation failed after {} attempts.\n\n{}",
435 POST_EDIT_VALIDATION_MAX_RETRIES,
436 report.prompt
437 ));
438 }
439 session.add_message(Message {
440 role: Role::User,
441 content: vec![ContentPart::Text {
442 text: report.prompt,
443 }],
444 });
445 final_output.clear();
446 continue;
447 }
448 }
449 break;
450 }
451
452 if !truncated_tool_ids.is_empty() {
453 if tool_calls.is_empty() {
454 session.add_message(response.message.clone());
455 }
456 for (tool_id, tool_name) in &truncated_tool_ids {
457 let error_content = format!(
458 "Error: Your tool call to `{tool_name}` was truncated — the arguments \
459 JSON was cut off mid-string (likely hit the max_tokens limit). \
460 Please retry with a shorter approach: use the `write` tool to write \
461 content in smaller pieces, or reduce the size of your arguments."
462 );
463 session.add_message(Message {
464 role: Role::Tool,
465 content: vec![ContentPart::ToolResult {
466 tool_call_id: tool_id.clone(),
467 content: error_content,
468 }],
469 });
470 }
471 if tool_calls.is_empty() {
472 continue;
473 }
474 }
475
476 {
477 let mut sigs: Vec<String> = tool_calls
478 .iter()
479 .map(|(_, name, args)| format!("{name}:{args}"))
480 .collect();
481 sigs.sort();
482 let sig = sigs.join("|");
483
484 if last_tool_sig.as_deref() == Some(&sig) {
485 consecutive_same_tool += 1;
486 } else {
487 consecutive_same_tool = 1;
488 last_tool_sig = Some(sig);
489 }
490
491 let force_answer = consecutive_same_tool > MAX_CONSECUTIVE_SAME_TOOL
492 || (!model_supports_tools && step >= 3);
493
494 if force_answer {
495 tracing::warn!(
496 step = step,
497 consecutive = consecutive_same_tool,
498 "Breaking agent loop: forcing final answer",
499 );
500 let mut nudge_msg = response.message.clone();
501 nudge_msg
502 .content
503 .retain(|p| !matches!(p, ContentPart::ToolCall { .. }));
504 if !nudge_msg.content.is_empty() {
505 session.add_message(nudge_msg);
506 }
507 session.add_message(Message {
508 role: Role::User,
509 content: vec![ContentPart::Text {
510 text: FORCE_FINAL_ANSWER_NUDGE.to_string(),
511 }],
512 });
513 continue;
514 }
515 }
516
517 session.add_message(response.message.clone());
518
519 tracing::info!(
520 step = step,
521 num_tools = tool_calls.len(),
522 "Executing tool calls"
523 );
524
525 let mut codesearch_thrash_guard_triggered = false;
526 for (tool_id, tool_name, tool_input) in tool_calls {
527 let (tool_name, tool_input) =
528 normalize_tool_call_for_execution(&tool_name, &tool_input);
529 tracing::info!(tool = %tool_name, tool_id = %tool_id, "Executing tool");
530
531 if tool_name == "list_tools" {
532 let content = list_tools_bootstrap_output(&tool_definitions, &tool_input);
533 session.add_message(Message {
534 role: Role::Tool,
535 content: vec![ContentPart::ToolResult {
536 tool_call_id: tool_id,
537 content,
538 }],
539 });
540 continue;
541 }
542
543 if let Some(ref bus) = session.bus {
544 let handle = bus.handle(&session.agent);
545 handle.send_with_correlation(
546 format!("agent.{}.tool.request", session.agent),
547 crate::bus::BusMessage::ToolRequest {
548 request_id: tool_id.clone(),
549 agent_id: session.agent.clone(),
550 tool_name: tool_name.clone(),
551 arguments: tool_input.clone(),
552 step,
553 },
554 Some(turn_id.clone()),
555 );
556 }
557
558 if is_interactive_tool(&tool_name) {
559 tracing::warn!(tool = %tool_name, "Blocking interactive tool in session loop");
560 session.add_message(Message {
561 role: Role::Tool,
562 content: vec![ContentPart::ToolResult {
563 tool_call_id: tool_id,
564 content: "Error: Interactive tool 'question' is disabled in this interface. Ask the user directly in assistant text.".to_string(),
565 }],
566 });
567 continue;
568 }
569
570 if let Some(reason) = detect_stub_in_tool_input(&tool_name, &tool_input) {
571 tracing::warn!(tool = %tool_name, reason = %reason, "Blocking suspected stubbed edit");
572 session.add_message(Message {
573 role: Role::Tool,
574 content: vec![ContentPart::ToolResult {
575 tool_call_id: tool_id,
576 content: format!(
577 "Error: Refactor guard rejected this edit: {reason}. \
578 Provide concrete, behavior-preserving implementation (no placeholders/stubs)."
579 ),
580 }],
581 });
582 continue;
583 }
584
585 let exec_start = std::time::Instant::now();
586 let exec_input = enrich_tool_input_with_runtime_context(
587 &tool_input,
588 &cwd,
589 session.metadata.model.as_deref(),
590 &session.id,
591 &session.agent,
592 session.metadata.provenance.as_ref(),
593 );
594 let (content, success, tool_metadata) = execute_tool(
595 &tool_registry,
596 &tool_name,
597 &exec_input,
598 &session.id,
599 exec_start,
600 )
601 .await;
602
603 let requires_confirmation = tool_result_requires_confirmation(tool_metadata.as_ref());
604 let (content, success, tool_metadata, requires_confirmation) = if requires_confirmation
605 && session.metadata.auto_apply_edits
606 {
607 let preview_content = content.clone();
608 match auto_apply_pending_confirmation(
609 &tool_name,
610 &exec_input,
611 tool_metadata.as_ref(),
612 )
613 .await
614 {
615 Ok(Some((content, success, tool_metadata))) => {
616 tracing::info!(
617 tool = %tool_name,
618 "Auto-applied pending confirmation in TUI session"
619 );
620 (content, success, tool_metadata, false)
621 }
622 Ok(None) => (content, success, tool_metadata, true),
623 Err(error) => (
624 format!(
625 "{}\n\nTUI edit auto-apply failed: {}",
626 pending_confirmation_tool_result_content(&tool_name, &preview_content,),
627 error
628 ),
629 false,
630 tool_metadata,
631 true,
632 ),
633 }
634 } else {
635 (content, success, tool_metadata, requires_confirmation)
636 };
637 let rendered_content = if requires_confirmation {
638 pending_confirmation_tool_result_content(&tool_name, &content)
639 } else {
640 content.clone()
641 };
642
643 if !requires_confirmation {
644 track_touched_files(
645 &mut touched_files,
646 &cwd,
647 &tool_name,
648 &tool_input,
649 tool_metadata.as_ref(),
650 );
651 }
652
653 let duration_ms = exec_start.elapsed().as_millis() as u64;
654 let codesearch_no_match =
655 is_codesearch_no_match_output(&tool_name, success, &rendered_content);
656
657 if let Some(ref bus) = session.bus {
658 let handle = bus.handle(&session.agent);
659 handle.send_with_correlation(
660 format!("agent.{}.tool.response", session.agent),
661 crate::bus::BusMessage::ToolResponse {
662 request_id: tool_id.clone(),
663 agent_id: session.agent.clone(),
664 tool_name: tool_name.clone(),
665 result: rendered_content.clone(),
666 success,
667 step,
668 },
669 Some(turn_id.clone()),
670 );
671 handle.send_with_correlation(
672 format!("agent.{}.tool.output", session.agent),
673 crate::bus::BusMessage::ToolOutputFull {
674 agent_id: session.agent.clone(),
675 tool_name: tool_name.clone(),
676 output: rendered_content.clone(),
677 success,
678 step,
679 },
680 Some(turn_id.clone()),
681 );
682 }
683
684 if let Some(base_dir) = super::archive::event_stream_path() {
685 write_tool_event_file(
686 &base_dir,
687 &session.id,
688 &tool_name,
689 success,
690 duration_ms,
691 &rendered_content,
692 session.messages.len() as u64,
693 );
694 }
695
696 let content = maybe_route_through_rlm(
697 &rendered_content,
698 &tool_name,
699 &tool_input,
700 &tool_id,
701 &session.id,
702 &session.messages,
703 &model,
704 Arc::clone(&provider),
705 &session.metadata.rlm,
706 )
707 .await;
708
709 session.add_message(Message {
710 role: Role::Tool,
711 content: vec![ContentPart::ToolResult {
712 tool_call_id: tool_id,
713 content,
714 }],
715 });
716
717 if is_build_agent(&session.agent) {
718 if codesearch_no_match {
719 consecutive_codesearch_no_matches += 1;
720 } else {
721 consecutive_codesearch_no_matches = 0;
722 }
723
724 if consecutive_codesearch_no_matches >= MAX_CONSECUTIVE_CODESEARCH_NO_MATCHES {
725 tracing::warn!(
726 step = step,
727 consecutive_codesearch_no_matches = consecutive_codesearch_no_matches,
728 "Detected codesearch no-match thrash; nudging model to stop variant retries",
729 );
730 session.add_message(Message {
731 role: Role::User,
732 content: vec![ContentPart::Text {
733 text: CODESEARCH_THRASH_NUDGE.to_string(),
734 }],
735 });
736 codesearch_thrash_guard_triggered = true;
737 break;
738 }
739 }
740 }
741
742 if codesearch_thrash_guard_triggered {
743 continue;
744 }
745 }
746
747 session.save().await?;
748
749 super::archive::archive_event_stream_to_s3(&session.id, super::archive::event_stream_path())
750 .await;
751
752 Ok(SessionResult {
753 text: final_output.trim().to_string(),
754 session_id: session.id.clone(),
755 })
756}
757
758fn parse_session_model_selector(session: &Session, providers: &[&str]) -> (Option<String>, String) {
760 let Some(ref model_str) = session.metadata.model else {
761 return (None, String::new());
762 };
763 let (prov, model) = parse_model_string(model_str);
764 let prov = prov.map(|p| match p {
765 "zhipuai" | "z-ai" => "zai",
766 other => other,
767 });
768 if prov.is_some() {
769 (prov.map(|s| s.to_string()), model.to_string())
770 } else if providers.contains(&model) {
771 (Some(model.to_string()), String::new())
772 } else {
773 (None, model.to_string())
774 }
775}
776
777async fn compress_user_message_if_oversized(
780 session: &mut Session,
781 provider: &Arc<dyn crate::provider::Provider>,
782 model: &str,
783 message: &str,
784) {
785 let ctx_window = context_window_for_model(model);
786 let msg_tokens = RlmChunker::estimate_tokens(message);
787 let threshold = (ctx_window as f64 * 0.35) as usize;
788 if msg_tokens <= threshold {
789 return;
790 }
791
792 tracing::info!(
793 msg_tokens,
794 threshold,
795 ctx_window,
796 "RLM: User message exceeds context threshold, compressing"
797 );
798
799 let auto_ctx = AutoProcessContext {
800 tool_id: "session_context",
801 tool_args: serde_json::json!({}),
802 session_id: &session.id,
803 abort: None,
804 on_progress: None,
805 provider: Arc::clone(provider),
806 model: model.to_string(),
807 bus: None,
808 trace_id: None,
809 subcall_provider: session.metadata.subcall_provider.clone(),
810 subcall_model: session.metadata.subcall_model_name.clone(),
811 };
812 let rlm_config = session.metadata.rlm.clone();
813 match RlmRouter::auto_process(message, auto_ctx, &rlm_config).await {
814 Ok(result) => {
815 tracing::info!(
816 input_tokens = result.stats.input_tokens,
817 output_tokens = result.stats.output_tokens,
818 "RLM: User message compressed"
819 );
820 if let Some(last) = session.messages.last_mut() {
821 last.content = vec![ContentPart::Text {
822 text: format!(
823 "[Original message: {} tokens, compressed via RLM]\n\n{}\n\n---\nOriginal request prefix:\n{}",
824 msg_tokens,
825 result.processed,
826 message.chars().take(500).collect::<String>()
827 ),
828 }];
829 }
830 }
831 Err(e) => {
832 tracing::warn!(error = %e, "RLM: Failed to compress user message, using truncation");
833 let max_chars = threshold * 4;
834 let truncated = RlmChunker::compress(message, max_chars / 4, None);
835 if let Some(last) = session.messages.last_mut() {
836 last.content = vec![ContentPart::Text { text: truncated }];
837 }
838 }
839 }
840}
841
842async fn execute_tool(
844 tool_registry: &ToolRegistry,
845 tool_name: &str,
846 exec_input: &serde_json::Value,
847 session_id: &str,
848 exec_start: std::time::Instant,
849) -> (
850 String,
851 bool,
852 Option<std::collections::HashMap<String, serde_json::Value>>,
853) {
854 if let Some(tool) = tool_registry.get(tool_name) {
855 match tool.execute(exec_input.clone()).await {
856 Ok(result) => {
857 let duration_ms = exec_start.elapsed().as_millis() as u64;
858 tracing::info!(tool = %tool_name, success = result.success, "Tool execution completed");
859 if let Some(audit) = try_audit_log() {
860 audit
861 .log_with_correlation(
862 AuditCategory::ToolExecution,
863 format!("tool:{}", tool_name),
864 if result.success {
865 AuditOutcome::Success
866 } else {
867 AuditOutcome::Failure
868 },
869 None,
870 Some(json!({
871 "duration_ms": duration_ms,
872 "output_len": result.output.len()
873 })),
874 None,
875 None,
876 None,
877 Some(session_id.to_string()),
878 )
879 .await;
880 }
881 (result.output, result.success, Some(result.metadata))
882 }
883 Err(e) => {
884 let duration_ms = exec_start.elapsed().as_millis() as u64;
885 tracing::warn!(tool = %tool_name, error = %e, "Tool execution failed");
886 if let Some(audit) = try_audit_log() {
887 audit
888 .log_with_correlation(
889 AuditCategory::ToolExecution,
890 format!("tool:{}", tool_name),
891 AuditOutcome::Failure,
892 None,
893 Some(json!({ "duration_ms": duration_ms, "error": e.to_string() })),
894 None,
895 None,
896 None,
897 Some(session_id.to_string()),
898 )
899 .await;
900 }
901 (format!("Error: {}", e), false, None)
902 }
903 }
904 } else {
905 tracing::warn!(tool = %tool_name, "Tool not found");
906 if let Some(audit) = try_audit_log() {
907 audit
908 .log_with_correlation(
909 AuditCategory::ToolExecution,
910 format!("tool:{}", tool_name),
911 AuditOutcome::Failure,
912 None,
913 Some(json!({ "error": "unknown_tool" })),
914 None,
915 None,
916 None,
917 Some(session_id.to_string()),
918 )
919 .await;
920 }
921 (format!("Error: Unknown tool '{}'", tool_name), false, None)
922 }
923}
924
925fn write_tool_event_file(
927 base_dir: &std::path::Path,
928 session_id: &str,
929 tool_name: &str,
930 success: bool,
931 duration_ms: u64,
932 rendered_content: &str,
933 seq: u64,
934) {
935 let workspace = std::env::var("PWD")
936 .map(std::path::PathBuf::from)
937 .unwrap_or_else(|_| std::env::current_dir().unwrap_or_default());
938 let event = ChatEvent::tool_result(
939 workspace,
940 session_id.to_string(),
941 tool_name,
942 success,
943 duration_ms,
944 rendered_content,
945 seq,
946 );
947 let event_json = event.to_json();
948 let timestamp = Utc::now().format("%Y%m%dT%H%M%SZ");
949 let filename = format!(
950 "{}-chat-events-{:020}-{:020}.jsonl",
951 timestamp,
952 seq * 10000,
953 (seq + 1) * 10000
954 );
955 let event_path = base_dir.join(session_id).join(filename);
956
957 tokio::spawn(async move {
958 if let Some(parent) = event_path.parent() {
959 let _ = tokio::fs::create_dir_all(parent).await;
960 }
961 if let Ok(mut file) = tokio::fs::OpenOptions::new()
962 .create(true)
963 .append(true)
964 .open(&event_path)
965 .await
966 {
967 use tokio::io::AsyncWriteExt;
968 let _ = file.write_all(event_json.as_bytes()).await;
969 let _ = file.write_all(b"\n").await;
970 }
971 });
972}
973
974async fn maybe_route_through_rlm(
977 rendered_content: &str,
978 tool_name: &str,
979 tool_input: &serde_json::Value,
980 tool_id: &str,
981 session_id: &str,
982 messages: &[Message],
983 model: &str,
984 provider: Arc<dyn crate::provider::Provider>,
985 rlm_config: &RlmConfig,
986) -> String {
987 let ctx_window = context_window_for_model(model);
988 let current_tokens = estimate_tokens_for_messages(messages);
989 let routing_ctx = RoutingContext {
990 tool_id: tool_name.to_string(),
991 session_id: session_id.to_string(),
992 call_id: Some(tool_id.to_string()),
993 model_context_limit: ctx_window,
994 current_context_tokens: Some(current_tokens),
995 };
996 let routing = RlmRouter::should_route(rendered_content, &routing_ctx, rlm_config);
997 if !routing.should_route {
998 return rendered_content.to_string();
999 }
1000
1001 tracing::info!(
1002 tool = %tool_name,
1003 reason = %routing.reason,
1004 estimated_tokens = routing.estimated_tokens,
1005 "RLM: Routing large tool output"
1006 );
1007 let auto_ctx = AutoProcessContext {
1008 tool_id: tool_name,
1009 tool_args: tool_input.clone(),
1010 session_id,
1011 abort: None,
1012 on_progress: None,
1013 provider: Arc::clone(&provider),
1014 model: model.to_string(),
1015 bus: None,
1016 trace_id: None,
1017 subcall_provider: None,
1018 subcall_model: None,
1019 };
1020 match RlmRouter::auto_process(rendered_content, auto_ctx, &rlm_config).await {
1021 Ok(result) => {
1022 tracing::info!(
1023 input_tokens = result.stats.input_tokens,
1024 output_tokens = result.stats.output_tokens,
1025 iterations = result.stats.iterations,
1026 "RLM: Processing complete"
1027 );
1028 result.processed
1029 }
1030 Err(e) => {
1031 tracing::warn!(error = %e, "RLM: auto_process failed, using smart_truncate");
1032 let (truncated, _, _) =
1033 RlmRouter::smart_truncate(rendered_content, tool_name, tool_input, ctx_window / 4);
1034 truncated
1035 }
1036 }
1037}