Skip to main content

codetether_agent/session/helper/
prompt.rs

1//! Core agentic prompt loop used by [`Session::prompt`](super::super::Session::prompt).
2//!
3//! This is the non-streaming, non-event-emitting variant. It loads the
4//! provider registry from Vault, runs the completion/tool-call loop up to
5//! `max_steps` times, and returns the final plain-text answer.
6//!
7//! The code here was extracted from `src/session/mod.rs` verbatim to keep
8//! the Session facade small and keep the loop's behavior stable. Refinements
9//! to the loop should be made here (and mirrored in [`prompt_events`](super::prompt_events)
10//! where appropriate).
11// TODO: Keep this loop in sync with `prompt_events.rs` until both prompt
12// loops are consolidated into one shared implementation.
13
14use std::collections::HashSet;
15use std::sync::Arc;
16
17use anyhow::Result;
18use chrono::Utc;
19use serde_json::json;
20use uuid::Uuid;
21
22use crate::audit::{AuditCategory, AuditOutcome, try_audit_log};
23use crate::cognition::tool_router::{ToolCallRouter, ToolRouterConfig};
24use crate::event_stream::ChatEvent;
25use crate::provider::{
26    CompletionRequest, ContentPart, Message, ProviderRegistry, Role, parse_model_string,
27};
28use crate::rlm::router::AutoProcessContext;
29use crate::rlm::{RlmChunker, RlmConfig, RlmRouter, RoutingContext};
30use crate::tool::ToolRegistry;
31
32use super::super::{DEFAULT_MAX_STEPS, Session, SessionResult};
33use super::bootstrap::{
34    inject_tool_prompt, list_tools_bootstrap_definition, list_tools_bootstrap_output,
35};
36use super::build::{
37    build_request_requires_tool, is_build_agent, should_force_build_tool_first_retry,
38};
39use super::compression::{compress_history_keep_last, enforce_context_window};
40use super::confirmation::{
41    auto_apply_pending_confirmation, pending_confirmation_tool_result_content,
42    tool_result_requires_confirmation,
43};
44use super::defaults::default_model_for_provider;
45use super::edit::{detect_stub_in_tool_input, normalize_tool_call_for_execution};
46use super::error::{is_prompt_too_long_error, is_retryable_upstream_error};
47use super::loop_constants::{
48    BUILD_MODE_TOOL_FIRST_MAX_RETRIES, BUILD_MODE_TOOL_FIRST_NUDGE, CODESEARCH_THRASH_NUDGE,
49    FORCE_FINAL_ANSWER_NUDGE, MAX_CONSECUTIVE_CODESEARCH_NO_MATCHES, MAX_CONSECUTIVE_SAME_TOOL,
50    NATIVE_TOOL_PROMISE_NUDGE, NATIVE_TOOL_PROMISE_RETRY_MAX_RETRIES,
51    POST_EDIT_VALIDATION_MAX_RETRIES,
52};
53use super::markup::normalize_textual_tool_calls;
54use super::provider::{
55    prefers_temperature_one, resolve_provider_for_session_request,
56    should_retry_missing_native_tool_call, temperature_is_deprecated,
57};
58use super::router::{build_proactive_lsp_context_message, choose_router_target};
59use super::runtime::{
60    enrich_tool_input_with_runtime_context, is_codesearch_no_match_output, is_interactive_tool,
61    is_local_cuda_provider, local_cuda_light_system_prompt,
62};
63use super::text::extract_text_content;
64use super::token::{
65    context_window_for_model, estimate_tokens_for_messages, session_completion_max_tokens,
66};
67use super::validation::{build_validation_report, capture_git_dirty_files, track_touched_files};
68
69/// Execute a prompt against the session and return a [`SessionResult`].
70///
71/// See [`Session::prompt`](super::super::Session::prompt) for the public-facing contract.
72pub(crate) async fn run_prompt(session: &mut Session, message: &str) -> Result<SessionResult> {
73    let registry = ProviderRegistry::from_vault().await?;
74    session.resolve_subcall_provider(&registry);
75
76    let providers = registry.list();
77    if providers.is_empty() {
78        anyhow::bail!(
79            "No providers available. Configure provider credentials in HashiCorp Vault (for ChatGPT subscription Codex use `codetether auth codex`; for Copilot use `codetether auth copilot`)."
80        );
81    }
82
83    tracing::info!("Available providers: {:?}", providers);
84
85    let (provider_name, model_id) = parse_session_model_selector(session, &providers);
86
87    let selected_provider =
88        resolve_provider_for_session_request(providers.as_slice(), provider_name.as_deref())?;
89
90    let provider = registry
91        .get(selected_provider)
92        .ok_or_else(|| anyhow::anyhow!("Provider {} not found", selected_provider))?;
93
94    session.add_message(Message {
95        role: Role::User,
96        content: vec![ContentPart::Text {
97            text: message.to_string(),
98        }],
99    });
100
101    if session.title.is_none() {
102        session.generate_title().await?;
103    }
104
105    let model = if !model_id.is_empty() {
106        model_id
107    } else {
108        default_model_for_provider(selected_provider)
109    };
110
111    compress_user_message_if_oversized(session, &provider, &model, message).await;
112
113    let tool_registry = ToolRegistry::with_provider_arc(Arc::clone(&provider), model.clone());
114    let tool_definitions: Vec<_> = tool_registry
115        .definitions()
116        .into_iter()
117        .filter(|tool| !is_interactive_tool(&tool.name))
118        .collect();
119
120    let temperature = if temperature_is_deprecated(&model) {
121        None
122    } else if prefers_temperature_one(&model) {
123        Some(1.0)
124    } else {
125        Some(0.7)
126    };
127
128    tracing::info!("Using model: {} via provider: {}", model, selected_provider);
129    tracing::info!("Available tools: {}", tool_definitions.len());
130
131    let model_supports_tools = !matches!(
132        selected_provider,
133        "gemini-web" | "local-cuda" | "local_cuda" | "localcuda"
134    );
135    let advertised_tool_definitions = if model_supports_tools {
136        tool_definitions.clone()
137    } else {
138        vec![list_tools_bootstrap_definition()]
139    };
140
141    let cwd = session
142        .metadata
143        .directory
144        .clone()
145        .unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
146    let system_prompt = if is_local_cuda_provider(selected_provider) {
147        local_cuda_light_system_prompt()
148    } else {
149        crate::agent::builtin::build_system_prompt(&cwd)
150    };
151    let system_prompt = if !model_supports_tools && !advertised_tool_definitions.is_empty() {
152        inject_tool_prompt(&system_prompt, &advertised_tool_definitions)
153    } else {
154        system_prompt
155    };
156
157    let max_steps = session.max_steps.unwrap_or(DEFAULT_MAX_STEPS);
158    let mut final_output = String::new();
159    let baseline_git_dirty_files = capture_git_dirty_files(&cwd).await;
160    let mut touched_files = HashSet::new();
161    let mut validation_retry_count: u8 = 0;
162
163    let mut last_tool_sig: Option<String> = None;
164    let mut consecutive_same_tool: u32 = 0;
165    let mut consecutive_codesearch_no_matches: u32 = 0;
166    let mut build_mode_tool_retry_count: u8 = 0;
167    let mut native_tool_promise_retry_count: u8 = 0;
168    let turn_id = Uuid::new_v4().to_string();
169
170    let tool_router: Option<ToolCallRouter> = {
171        let cfg = ToolRouterConfig::from_env();
172        match ToolCallRouter::from_config(&cfg) {
173            Ok(r) => r,
174            Err(e) => {
175                tracing::warn!(error = %e, "FunctionGemma tool router init failed; disabled");
176                None
177            }
178        }
179    };
180
181    for step in 1..=max_steps {
182        tracing::info!(step = step, "Agent step starting");
183
184        enforce_context_window(
185            session,
186            Arc::clone(&provider),
187            &model,
188            &system_prompt,
189            &advertised_tool_definitions,
190            None,
191        )
192        .await?;
193
194        let proactive_lsp_message = build_proactive_lsp_context_message(
195            selected_provider,
196            step,
197            &tool_registry,
198            &session.messages,
199            &cwd,
200        )
201        .await;
202
203        let mut attempt = 0;
204        let mut upstream_retry_count: u8 = 0;
205        const MAX_UPSTREAM_RETRIES: u8 = 3;
206        let response = loop {
207            attempt += 1;
208
209            let mut messages = vec![Message {
210                role: Role::System,
211                content: vec![ContentPart::Text {
212                    text: system_prompt.clone(),
213                }],
214            }];
215            if let Some(msg) = &proactive_lsp_message {
216                messages.push(msg.clone());
217            }
218            messages.extend(session.messages.clone());
219
220            let request = CompletionRequest {
221                messages,
222                tools: advertised_tool_definitions.clone(),
223                model: model.clone(),
224                temperature,
225                top_p: None,
226                max_tokens: Some(session_completion_max_tokens()),
227                stop: Vec::new(),
228            };
229
230            match provider.complete(request).await {
231                Ok(r) => break r,
232                Err(e) => {
233                    if attempt == 1 && is_prompt_too_long_error(&e) {
234                        tracing::warn!(error = %e, "Provider rejected prompt as too long; forcing extra compression and retrying");
235                        let _ = compress_history_keep_last(
236                            session,
237                            Arc::clone(&provider),
238                            &model,
239                            6,
240                            "prompt_too_long_retry",
241                        )
242                        .await?;
243                        continue;
244                    }
245                    if upstream_retry_count < MAX_UPSTREAM_RETRIES
246                        && is_retryable_upstream_error(&e)
247                    {
248                        upstream_retry_count += 1;
249                        let backoff_secs = 1u64 << (upstream_retry_count - 1).min(2);
250                        tracing::warn!(
251                            error = %e,
252                            retry = upstream_retry_count,
253                            max = MAX_UPSTREAM_RETRIES,
254                            backoff_secs,
255                            "Retryable upstream provider error; sleeping and retrying"
256                        );
257                        tokio::time::sleep(std::time::Duration::from_secs(backoff_secs)).await;
258                        if let Some((retry_provider, retry_model)) =
259                            choose_router_target(&registry, selected_provider, &model)
260                        {
261                            tracing::info!(
262                                to_provider = %retry_provider,
263                                to_model = %retry_model,
264                                "Failing over to alternate provider/model"
265                            );
266                            session.metadata.model =
267                                Some(format!("{retry_provider}/{retry_model}"));
268                            attempt = 0;
269                        }
270                        continue;
271                    }
272                    return Err(e);
273                }
274            }
275        };
276
277        let response = if let Some(ref router) = tool_router {
278            router
279                .maybe_reformat(response, &tool_definitions, model_supports_tools)
280                .await
281        } else {
282            response
283        };
284        let response = normalize_textual_tool_calls(response, &tool_definitions);
285
286        crate::telemetry::TOKEN_USAGE.record_model_usage(
287            &model,
288            response.usage.prompt_tokens as u64,
289            response.usage.completion_tokens as u64,
290        );
291
292        let mut truncated_tool_ids: Vec<(String, String)> = Vec::new();
293        let tool_calls: Vec<(String, String, serde_json::Value)> = response
294            .message
295            .content
296            .iter()
297            .filter_map(|part| {
298                if let ContentPart::ToolCall {
299                    id,
300                    name,
301                    arguments,
302                    ..
303                } = part
304                {
305                    match serde_json::from_str::<serde_json::Value>(arguments) {
306                        Ok(args) => Some((id.clone(), name.clone(), args)),
307                        Err(e) => {
308                            tracing::warn!(
309                                tool = %name,
310                                tool_call_id = %id,
311                                args_len = arguments.len(),
312                                error = %e,
313                                "Tool call arguments failed to parse (likely truncated by max_tokens)"
314                            );
315                            truncated_tool_ids.push((id.clone(), name.clone()));
316                            None
317                        }
318                    }
319                } else {
320                    None
321                }
322            })
323            .collect();
324
325        let assistant_text = extract_text_content(&&response.message.content);
326        if should_force_build_tool_first_retry(
327            &session.agent,
328            build_mode_tool_retry_count,
329            &tool_definitions,
330            &session.messages,
331            &cwd,
332            &assistant_text,
333            !tool_calls.is_empty(),
334            BUILD_MODE_TOOL_FIRST_MAX_RETRIES,
335        ) {
336            build_mode_tool_retry_count += 1;
337            tracing::warn!(
338                step = step,
339                agent = %session.agent,
340                retry = build_mode_tool_retry_count,
341                "Build mode tool-first guard triggered; retrying with execution nudge"
342            );
343            session.add_message(Message {
344                role: Role::User,
345                content: vec![ContentPart::Text {
346                    text: BUILD_MODE_TOOL_FIRST_NUDGE.to_string(),
347                }],
348            });
349            continue;
350        }
351        if should_retry_missing_native_tool_call(
352            selected_provider,
353            &model,
354            native_tool_promise_retry_count,
355            &tool_definitions,
356            &assistant_text,
357            !tool_calls.is_empty(),
358            NATIVE_TOOL_PROMISE_RETRY_MAX_RETRIES,
359        ) {
360            native_tool_promise_retry_count += 1;
361            tracing::warn!(
362                step = step,
363                provider = selected_provider,
364                model = %model,
365                retry = native_tool_promise_retry_count,
366                "Model described a tool step without emitting a tool call; retrying with corrective nudge"
367            );
368            session.add_message(response.message.clone());
369            session.add_message(Message {
370                role: Role::User,
371                content: vec![ContentPart::Text {
372                    text: NATIVE_TOOL_PROMISE_NUDGE.to_string(),
373                }],
374            });
375            continue;
376        }
377        if !tool_calls.is_empty() {
378            build_mode_tool_retry_count = 0;
379            native_tool_promise_retry_count = 0;
380        } else if is_build_agent(&session.agent)
381            && build_request_requires_tool(&session.messages, &cwd)
382            && build_mode_tool_retry_count >= BUILD_MODE_TOOL_FIRST_MAX_RETRIES
383        {
384            return Err(anyhow::anyhow!(
385                "Build mode could not obtain tool calls for an explicit file-change request after {} retries. \
386                 Switch to a tool-capable model and try again.",
387                BUILD_MODE_TOOL_FIRST_MAX_RETRIES
388            ));
389        }
390
391        let mut step_text = String::new();
392
393        for part in &response.message.content {
394            match part {
395                ContentPart::Text { text } if !text.is_empty() => {
396                    step_text.push_str(text);
397                    step_text.push('\n');
398                }
399                ContentPart::Thinking { text } if !text.is_empty() => {
400                    if let Some(ref bus) = session.bus {
401                        let handle = bus.handle(&session.agent);
402                        handle.send_with_correlation(
403                            format!("agent.{}.thinking", session.agent),
404                            crate::bus::BusMessage::AgentThinking {
405                                agent_id: session.agent.clone(),
406                                thinking: text.clone(),
407                                step,
408                            },
409                            Some(turn_id.clone()),
410                        );
411                    }
412                }
413                _ => {}
414            }
415        }
416
417        if !step_text.trim().is_empty() {
418            final_output.push_str(&step_text);
419        }
420
421        if tool_calls.is_empty() && truncated_tool_ids.is_empty() {
422            session.add_message(response.message.clone());
423            if is_build_agent(&session.agent) {
424                if let Some(report) =
425                    build_validation_report(&cwd, &touched_files, &baseline_git_dirty_files).await?
426                {
427                    validation_retry_count += 1;
428                    tracing::warn!(
429                        retries = validation_retry_count,
430                        issues = report.issue_count,
431                        "Post-edit validation found unresolved diagnostics"
432                    );
433                    if validation_retry_count >= POST_EDIT_VALIDATION_MAX_RETRIES {
434                        return Err(anyhow::anyhow!(
435                            "Post-edit validation failed after {} attempts.\n\n{}",
436                            POST_EDIT_VALIDATION_MAX_RETRIES,
437                            report.prompt
438                        ));
439                    }
440                    session.add_message(Message {
441                        role: Role::User,
442                        content: vec![ContentPart::Text {
443                            text: report.prompt,
444                        }],
445                    });
446                    final_output.clear();
447                    continue;
448                }
449            }
450            break;
451        }
452
453        if !truncated_tool_ids.is_empty() {
454            if tool_calls.is_empty() {
455                session.add_message(response.message.clone());
456            }
457            for (tool_id, tool_name) in &truncated_tool_ids {
458                let error_content = format!(
459                    "Error: Your tool call to `{tool_name}` was truncated — the arguments \
460                     JSON was cut off mid-string (likely hit the max_tokens limit). \
461                     Please retry with a shorter approach: use the `write` tool to write \
462                     content in smaller pieces, or reduce the size of your arguments."
463                );
464                session.add_message(Message {
465                    role: Role::Tool,
466                    content: vec![ContentPart::ToolResult {
467                        tool_call_id: tool_id.clone(),
468                        content: error_content,
469                    }],
470                });
471            }
472            if tool_calls.is_empty() {
473                continue;
474            }
475        }
476
477        {
478            let mut sigs: Vec<String> = tool_calls
479                .iter()
480                .map(|(_, name, args)| format!("{name}:{args}"))
481                .collect();
482            sigs.sort();
483            let sig = sigs.join("|");
484
485            if last_tool_sig.as_deref() == Some(&sig) {
486                consecutive_same_tool += 1;
487            } else {
488                consecutive_same_tool = 1;
489                last_tool_sig = Some(sig);
490            }
491
492            let force_answer = consecutive_same_tool > MAX_CONSECUTIVE_SAME_TOOL
493                || (!model_supports_tools && step >= 3);
494
495            if force_answer {
496                tracing::warn!(
497                    step = step,
498                    consecutive = consecutive_same_tool,
499                    "Breaking agent loop: forcing final answer",
500                );
501                let mut nudge_msg = response.message.clone();
502                nudge_msg
503                    .content
504                    .retain(|p| !matches!(p, ContentPart::ToolCall { .. }));
505                if !nudge_msg.content.is_empty() {
506                    session.add_message(nudge_msg);
507                }
508                session.add_message(Message {
509                    role: Role::User,
510                    content: vec![ContentPart::Text {
511                        text: FORCE_FINAL_ANSWER_NUDGE.to_string(),
512                    }],
513                });
514                continue;
515            }
516        }
517
518        session.add_message(response.message.clone());
519
520        tracing::info!(
521            step = step,
522            num_tools = tool_calls.len(),
523            "Executing tool calls"
524        );
525
526        let mut codesearch_thrash_guard_triggered = false;
527        for (tool_id, tool_name, tool_input) in tool_calls {
528            let (tool_name, tool_input) =
529                normalize_tool_call_for_execution(&tool_name, &tool_input);
530            tracing::info!(tool = %tool_name, tool_id = %tool_id, "Executing tool");
531
532            if tool_name == "list_tools" {
533                let content = list_tools_bootstrap_output(&tool_definitions, &tool_input);
534                session.add_message(Message {
535                    role: Role::Tool,
536                    content: vec![ContentPart::ToolResult {
537                        tool_call_id: tool_id,
538                        content,
539                    }],
540                });
541                continue;
542            }
543
544            if let Some(ref bus) = session.bus {
545                let handle = bus.handle(&session.agent);
546                handle.send_with_correlation(
547                    format!("agent.{}.tool.request", session.agent),
548                    crate::bus::BusMessage::ToolRequest {
549                        request_id: tool_id.clone(),
550                        agent_id: session.agent.clone(),
551                        tool_name: tool_name.clone(),
552                        arguments: tool_input.clone(),
553                        step,
554                    },
555                    Some(turn_id.clone()),
556                );
557            }
558
559            if is_interactive_tool(&tool_name) {
560                tracing::warn!(tool = %tool_name, "Blocking interactive tool in session loop");
561                session.add_message(Message {
562                    role: Role::Tool,
563                    content: vec![ContentPart::ToolResult {
564                        tool_call_id: tool_id,
565                        content: "Error: Interactive tool 'question' is disabled in this interface. Ask the user directly in assistant text.".to_string(),
566                    }],
567                });
568                continue;
569            }
570
571            if let Some(reason) = detect_stub_in_tool_input(&tool_name, &tool_input) {
572                tracing::warn!(tool = %tool_name, reason = %reason, "Blocking suspected stubbed edit");
573                session.add_message(Message {
574                    role: Role::Tool,
575                    content: vec![ContentPart::ToolResult {
576                        tool_call_id: tool_id,
577                        content: format!(
578                            "Error: Refactor guard rejected this edit: {reason}. \
579                             Provide concrete, behavior-preserving implementation (no placeholders/stubs)."
580                        ),
581                    }],
582                });
583                continue;
584            }
585
586            let exec_start = std::time::Instant::now();
587            let exec_input = enrich_tool_input_with_runtime_context(
588                &tool_input,
589                &cwd,
590                session.metadata.model.as_deref(),
591                &session.id,
592                &session.agent,
593                session.metadata.provenance.as_ref(),
594            );
595            let (content, success, tool_metadata) = execute_tool(
596                &tool_registry,
597                &tool_name,
598                &exec_input,
599                &session.id,
600                exec_start,
601            )
602            .await;
603
604            let requires_confirmation = tool_result_requires_confirmation(tool_metadata.as_ref());
605            let (content, success, tool_metadata, requires_confirmation) = if requires_confirmation
606                && session.metadata.auto_apply_edits
607            {
608                let preview_content = content.clone();
609                match auto_apply_pending_confirmation(
610                    &tool_name,
611                    &exec_input,
612                    tool_metadata.as_ref(),
613                )
614                .await
615                {
616                    Ok(Some((content, success, tool_metadata))) => {
617                        tracing::info!(
618                            tool = %tool_name,
619                            "Auto-applied pending confirmation in TUI session"
620                        );
621                        (content, success, tool_metadata, false)
622                    }
623                    Ok(None) => (content, success, tool_metadata, true),
624                    Err(error) => (
625                        format!(
626                            "{}\n\nTUI edit auto-apply failed: {}",
627                            pending_confirmation_tool_result_content(&tool_name, &preview_content,),
628                            error
629                        ),
630                        false,
631                        tool_metadata,
632                        true,
633                    ),
634                }
635            } else {
636                (content, success, tool_metadata, requires_confirmation)
637            };
638            let rendered_content = if requires_confirmation {
639                pending_confirmation_tool_result_content(&tool_name, &content)
640            } else {
641                content.clone()
642            };
643
644            if !requires_confirmation {
645                track_touched_files(
646                    &mut touched_files,
647                    &cwd,
648                    &tool_name,
649                    &tool_input,
650                    tool_metadata.as_ref(),
651                );
652            }
653
654            let duration_ms = exec_start.elapsed().as_millis() as u64;
655            let codesearch_no_match =
656                is_codesearch_no_match_output(&tool_name, success, &rendered_content);
657
658            if let Some(ref bus) = session.bus {
659                let handle = bus.handle(&session.agent);
660                handle.send_with_correlation(
661                    format!("agent.{}.tool.response", session.agent),
662                    crate::bus::BusMessage::ToolResponse {
663                        request_id: tool_id.clone(),
664                        agent_id: session.agent.clone(),
665                        tool_name: tool_name.clone(),
666                        result: rendered_content.clone(),
667                        success,
668                        step,
669                    },
670                    Some(turn_id.clone()),
671                );
672                handle.send_with_correlation(
673                    format!("agent.{}.tool.output", session.agent),
674                    crate::bus::BusMessage::ToolOutputFull {
675                        agent_id: session.agent.clone(),
676                        tool_name: tool_name.clone(),
677                        output: rendered_content.clone(),
678                        success,
679                        step,
680                    },
681                    Some(turn_id.clone()),
682                );
683            }
684
685            if let Some(base_dir) = super::archive::event_stream_path() {
686                write_tool_event_file(
687                    &base_dir,
688                    &session.id,
689                    &tool_name,
690                    success,
691                    duration_ms,
692                    &rendered_content,
693                    session.messages.len() as u64,
694                );
695            }
696
697            let content = maybe_route_through_rlm(
698                &rendered_content,
699                &tool_name,
700                &tool_input,
701                &tool_id,
702                &session.id,
703                &session.messages,
704                &model,
705                Arc::clone(&provider),
706                &session.metadata.rlm,
707            )
708            .await;
709
710            session.add_message(Message {
711                role: Role::Tool,
712                content: vec![ContentPart::ToolResult {
713                    tool_call_id: tool_id,
714                    content,
715                }],
716            });
717
718            if is_build_agent(&session.agent) {
719                if codesearch_no_match {
720                    consecutive_codesearch_no_matches += 1;
721                } else {
722                    consecutive_codesearch_no_matches = 0;
723                }
724
725                if consecutive_codesearch_no_matches >= MAX_CONSECUTIVE_CODESEARCH_NO_MATCHES {
726                    tracing::warn!(
727                        step = step,
728                        consecutive_codesearch_no_matches = consecutive_codesearch_no_matches,
729                        "Detected codesearch no-match thrash; nudging model to stop variant retries",
730                    );
731                    session.add_message(Message {
732                        role: Role::User,
733                        content: vec![ContentPart::Text {
734                            text: CODESEARCH_THRASH_NUDGE.to_string(),
735                        }],
736                    });
737                    codesearch_thrash_guard_triggered = true;
738                    break;
739                }
740            }
741        }
742
743        if codesearch_thrash_guard_triggered {
744            continue;
745        }
746    }
747
748    session.save().await?;
749
750    super::archive::archive_event_stream_to_s3(&session.id, super::archive::event_stream_path())
751        .await;
752
753    Ok(SessionResult {
754        text: final_output.trim().to_string(),
755        session_id: session.id.clone(),
756    })
757}
758
759/// Split the session's configured model string into `(provider, model_id)`.
760fn parse_session_model_selector(session: &Session, providers: &[&str]) -> (Option<String>, String) {
761    let Some(ref model_str) = session.metadata.model else {
762        return (None, String::new());
763    };
764    let (prov, model) = parse_model_string(model_str);
765    let prov = prov.map(|p| match p {
766        "zhipuai" | "z-ai" => "zai",
767        other => other,
768    });
769    if prov.is_some() {
770        (prov.map(|s| s.to_string()), model.to_string())
771    } else if providers.contains(&model) {
772        (Some(model.to_string()), String::new())
773    } else {
774        (None, model.to_string())
775    }
776}
777
778/// Apply RLM compression to the most recent user message when it blows past
779/// the context-window heuristic threshold.
780async fn compress_user_message_if_oversized(
781    session: &mut Session,
782    provider: &Arc<dyn crate::provider::Provider>,
783    model: &str,
784    message: &str,
785) {
786    let ctx_window = context_window_for_model(model);
787    let msg_tokens = RlmChunker::estimate_tokens(message);
788    let threshold = (ctx_window as f64 * 0.35) as usize;
789    if msg_tokens <= threshold {
790        return;
791    }
792
793    tracing::info!(
794        msg_tokens,
795        threshold,
796        ctx_window,
797        "RLM: User message exceeds context threshold, compressing"
798    );
799
800    let auto_ctx = AutoProcessContext {
801        tool_id: "session_context",
802        tool_args: serde_json::json!({}),
803        session_id: &session.id,
804        abort: None,
805        on_progress: None,
806        provider: Arc::clone(provider),
807        model: model.to_string(),
808        bus: None,
809        trace_id: None,
810        subcall_provider: session.metadata.subcall_provider.clone(),
811        subcall_model: session.metadata.subcall_model_name.clone(),
812    };
813    let rlm_config = session.metadata.rlm.clone();
814    match RlmRouter::auto_process(message, auto_ctx, &rlm_config).await {
815        Ok(result) => {
816            tracing::info!(
817                input_tokens = result.stats.input_tokens,
818                output_tokens = result.stats.output_tokens,
819                "RLM: User message compressed"
820            );
821            if let Some(last) = session.messages.last_mut() {
822                last.content = vec![ContentPart::Text {
823                    text: format!(
824                        "[Original message: {} tokens, compressed via RLM]\n\n{}\n\n---\nOriginal request prefix:\n{}",
825                        msg_tokens,
826                        result.processed,
827                        message.chars().take(500).collect::<String>()
828                    ),
829                }];
830            }
831        }
832        Err(e) => {
833            tracing::warn!(error = %e, "RLM: Failed to compress user message, using truncation");
834            let max_chars = threshold * 4;
835            let truncated = RlmChunker::compress(message, max_chars / 4, None);
836            if let Some(last) = session.messages.last_mut() {
837                last.content = vec![ContentPart::Text { text: truncated }];
838            }
839        }
840    }
841}
842
843/// Execute a tool call and emit the corresponding audit entry.
844async fn execute_tool(
845    tool_registry: &ToolRegistry,
846    tool_name: &str,
847    exec_input: &serde_json::Value,
848    session_id: &str,
849    exec_start: std::time::Instant,
850) -> (
851    String,
852    bool,
853    Option<std::collections::HashMap<String, serde_json::Value>>,
854) {
855    if let Some(tool) = tool_registry.get(tool_name) {
856        match tool.execute(exec_input.clone()).await {
857            Ok(result) => {
858                let duration_ms = exec_start.elapsed().as_millis() as u64;
859                tracing::info!(tool = %tool_name, success = result.success, "Tool execution completed");
860                if let Some(audit) = try_audit_log() {
861                    audit
862                        .log_with_correlation(
863                            AuditCategory::ToolExecution,
864                            format!("tool:{}", tool_name),
865                            if result.success {
866                                AuditOutcome::Success
867                            } else {
868                                AuditOutcome::Failure
869                            },
870                            None,
871                            Some(json!({
872                                "duration_ms": duration_ms,
873                                "output_len": result.output.len()
874                            })),
875                            None,
876                            None,
877                            None,
878                            Some(session_id.to_string()),
879                        )
880                        .await;
881                }
882                (result.output, result.success, Some(result.metadata))
883            }
884            Err(e) => {
885                let duration_ms = exec_start.elapsed().as_millis() as u64;
886                tracing::warn!(tool = %tool_name, error = %e, "Tool execution failed");
887                if let Some(audit) = try_audit_log() {
888                    audit
889                        .log_with_correlation(
890                            AuditCategory::ToolExecution,
891                            format!("tool:{}", tool_name),
892                            AuditOutcome::Failure,
893                            None,
894                            Some(json!({ "duration_ms": duration_ms, "error": e.to_string() })),
895                            None,
896                            None,
897                            None,
898                            Some(session_id.to_string()),
899                        )
900                        .await;
901                }
902                (format!("Error: {}", e), false, None)
903            }
904        }
905    } else {
906        tracing::warn!(tool = %tool_name, "Tool not found");
907        if let Some(audit) = try_audit_log() {
908            audit
909                .log_with_correlation(
910                    AuditCategory::ToolExecution,
911                    format!("tool:{}", tool_name),
912                    AuditOutcome::Failure,
913                    None,
914                    Some(json!({ "error": "unknown_tool" })),
915                    None,
916                    None,
917                    None,
918                    Some(session_id.to_string()),
919                )
920                .await;
921        }
922        (format!("Error: Unknown tool '{}'", tool_name), false, None)
923    }
924}
925
926/// Write a [`ChatEvent::tool_result`] JSONL record to disk (fire-and-forget).
927fn write_tool_event_file(
928    base_dir: &std::path::Path,
929    session_id: &str,
930    tool_name: &str,
931    success: bool,
932    duration_ms: u64,
933    rendered_content: &str,
934    seq: u64,
935) {
936    let workspace = std::env::var("PWD")
937        .map(std::path::PathBuf::from)
938        .unwrap_or_else(|_| std::env::current_dir().unwrap_or_default());
939    let event = ChatEvent::tool_result(
940        workspace,
941        session_id.to_string(),
942        tool_name,
943        success,
944        duration_ms,
945        rendered_content,
946        seq,
947    );
948    let event_json = event.to_json();
949    let timestamp = Utc::now().format("%Y%m%dT%H%M%SZ");
950    let filename = format!(
951        "{}-chat-events-{:020}-{:020}.jsonl",
952        timestamp,
953        seq * 10000,
954        (seq + 1) * 10000
955    );
956    let event_path = base_dir.join(session_id).join(filename);
957
958    tokio::spawn(async move {
959        if let Some(parent) = event_path.parent() {
960            let _ = tokio::fs::create_dir_all(parent).await;
961        }
962        if let Ok(mut file) = tokio::fs::OpenOptions::new()
963            .create(true)
964            .append(true)
965            .open(&event_path)
966            .await
967        {
968            use tokio::io::AsyncWriteExt;
969            let _ = file.write_all(event_json.as_bytes()).await;
970            let _ = file.write_all(b"\n").await;
971        }
972    });
973}
974
975/// Route a large tool output through the Recursive Language Model when it
976/// exceeds the routing heuristics.
977async fn maybe_route_through_rlm(
978    rendered_content: &str,
979    tool_name: &str,
980    tool_input: &serde_json::Value,
981    tool_id: &str,
982    session_id: &str,
983    messages: &[Message],
984    model: &str,
985    provider: Arc<dyn crate::provider::Provider>,
986    rlm_config: &RlmConfig,
987) -> String {
988    let ctx_window = context_window_for_model(model);
989    let current_tokens = estimate_tokens_for_messages(messages);
990    let routing_ctx = RoutingContext {
991        tool_id: tool_name.to_string(),
992        session_id: session_id.to_string(),
993        call_id: Some(tool_id.to_string()),
994        model_context_limit: ctx_window,
995        current_context_tokens: Some(current_tokens),
996    };
997    let routing = RlmRouter::should_route(rendered_content, &routing_ctx, rlm_config);
998    if !routing.should_route {
999        return rendered_content.to_string();
1000    }
1001
1002    tracing::info!(
1003        tool = %tool_name,
1004        reason = %routing.reason,
1005        estimated_tokens = routing.estimated_tokens,
1006        "RLM: Routing large tool output"
1007    );
1008    let auto_ctx = AutoProcessContext {
1009        tool_id: tool_name,
1010        tool_args: tool_input.clone(),
1011        session_id,
1012        abort: None,
1013        on_progress: None,
1014        provider: Arc::clone(&provider),
1015        model: model.to_string(),
1016        bus: None,
1017        trace_id: None,
1018        subcall_provider: None,
1019        subcall_model: None,
1020    };
1021    match RlmRouter::auto_process(rendered_content, auto_ctx, &rlm_config).await {
1022        Ok(result) => {
1023            tracing::info!(
1024                input_tokens = result.stats.input_tokens,
1025                output_tokens = result.stats.output_tokens,
1026                iterations = result.stats.iterations,
1027                "RLM: Processing complete"
1028            );
1029            result.processed
1030        }
1031        Err(e) => {
1032            tracing::warn!(error = %e, "RLM: auto_process failed, using smart_truncate");
1033            let (truncated, _, _) =
1034                RlmRouter::smart_truncate(rendered_content, tool_name, tool_input, ctx_window / 4);
1035            truncated
1036        }
1037    }
1038}