Skip to main content

codetether_agent/session/helper/
prompt.rs

1//! Core agentic prompt loop used by [`Session::prompt`](super::super::Session::prompt).
2//!
3//! This is the non-streaming, non-event-emitting variant. It loads the
4//! provider registry from Vault, runs the completion/tool-call loop up to
5//! `max_steps` times, and returns the final plain-text answer.
6//!
7//! The code here was extracted from `src/session/mod.rs` verbatim to keep
8//! the Session facade small and keep the loop's behavior stable. Refinements
9//! to the loop should be made here (and mirrored in [`prompt_events`](super::prompt_events)
10//! where appropriate).
11// TODO: Keep this loop in sync with `prompt_events.rs` until both prompt
12// loops are consolidated into one shared implementation.
13
14use std::collections::HashSet;
15use std::sync::Arc;
16
17use anyhow::Result;
18use chrono::Utc;
19use serde_json::json;
20use uuid::Uuid;
21
22use crate::audit::{AuditCategory, AuditOutcome, try_audit_log};
23use crate::cognition::tool_router::{ToolCallRouter, ToolRouterConfig};
24use crate::event_stream::ChatEvent;
25use crate::provider::{
26    CompletionRequest, ContentPart, Message, ProviderRegistry, Role, parse_model_string,
27};
28use crate::rlm::router::AutoProcessContext;
29use crate::rlm::{RlmConfig, RlmRouter, RoutingContext};
30use crate::tool::ToolRegistry;
31
32use super::super::{DEFAULT_MAX_STEPS, Session, SessionResult};
33use super::bootstrap::list_tools_bootstrap_output;
34use super::build::{
35    build_request_requires_tool, is_build_agent, should_force_build_tool_first_retry,
36};
37use super::confirmation::{
38    auto_apply_pending_confirmation, pending_confirmation_tool_result_content,
39    tool_result_requires_confirmation,
40};
41use super::defaults::default_model_for_provider;
42use super::edit::{detect_stub_in_tool_input, normalize_tool_call_for_execution};
43use super::error::{is_prompt_too_long_error, is_retryable_upstream_error};
44use super::loop_constants::{
45    BUILD_MODE_TOOL_FIRST_MAX_RETRIES, BUILD_MODE_TOOL_FIRST_NUDGE, CODESEARCH_THRASH_NUDGE,
46    FORCE_FINAL_ANSWER_NUDGE, MAX_CONSECUTIVE_CODESEARCH_NO_MATCHES, MAX_CONSECUTIVE_SAME_TOOL,
47    NATIVE_TOOL_PROMISE_NUDGE, NATIVE_TOOL_PROMISE_RETRY_MAX_RETRIES,
48    POST_EDIT_VALIDATION_MAX_RETRIES,
49};
50use super::markup::normalize_textual_tool_calls;
51use super::provider::{
52    resolve_provider_for_session_request, should_retry_missing_native_tool_call,
53};
54use super::request_state::build_provider_step_state;
55use super::router::{build_proactive_lsp_context_message, choose_router_target_bandit};
56use super::runtime::{
57    enrich_tool_input_with_runtime_context, is_codesearch_no_match_output, is_interactive_tool,
58};
59use super::text::extract_text_content;
60use super::token::{
61    context_window_for_model, estimate_tokens_for_messages, session_completion_max_tokens,
62};
63use super::validation::{build_validation_report, capture_git_dirty_files, track_touched_files};
64use crate::session::{
65    bucket_for_messages, delegation_skills, derive_with_policy, effective_policy,
66};
67
68/// Execute a prompt against the session and return a [`SessionResult`].
69///
70/// See [`Session::prompt`](super::super::Session::prompt) for the public-facing contract.
71pub(crate) async fn run_prompt(session: &mut Session, message: &str) -> Result<SessionResult> {
72    let registry = ProviderRegistry::from_vault().await?;
73    session.resolve_subcall_provider(&registry);
74
75    let providers = registry.list();
76    if providers.is_empty() {
77        anyhow::bail!(
78            "No providers available. Configure provider credentials in HashiCorp Vault (for ChatGPT subscription Codex use `codetether auth codex`; for Copilot use `codetether auth copilot`)."
79        );
80    }
81
82    tracing::info!("Available providers: {:?}", providers);
83
84    let (provider_name, model_id) = parse_session_model_selector(session, &providers);
85
86    let mut selected_provider =
87        resolve_provider_for_session_request(providers.as_slice(), provider_name.as_deref())?
88            .to_string();
89
90    let mut provider = registry
91        .get(&selected_provider)
92        .ok_or_else(|| anyhow::anyhow!("Provider {} not found", selected_provider.clone()))?;
93
94    session.add_message(Message {
95        role: Role::User,
96        content: vec![ContentPart::Text {
97            text: message.to_string(),
98        }],
99    });
100
101    if session.title.is_none() {
102        session.generate_title().await?;
103    }
104
105    let mut model = if !model_id.is_empty() {
106        model_id
107    } else {
108        default_model_for_provider(&selected_provider)
109    };
110
111    let cwd = session
112        .metadata
113        .directory
114        .clone()
115        .unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
116    // Phase A: oversized-user-message compression now happens inside
117    // `derive_context` on a clone. Keeping the original text in
118    // `session.messages` means `session_recall` and the MinIO history
119    // sink can still see what the user actually typed.
120    let mut provider_state =
121        build_provider_step_state(Arc::clone(&provider), &selected_provider, &model, &cwd);
122    let mut tool_registry = provider_state.tool_registry.clone();
123    let mut tool_definitions = provider_state.tool_definitions.clone();
124    let mut temperature = provider_state.temperature;
125    let mut model_supports_tools = provider_state.model_supports_tools;
126    let mut advertised_tool_definitions = provider_state.advertised_tool_definitions.clone();
127    let mut system_prompt = provider_state.system_prompt.clone();
128
129    tracing::info!("Using model: {} via provider: {}", model, selected_provider);
130    tracing::info!("Available tools: {}", tool_definitions.len());
131
132    let max_steps = session.max_steps.unwrap_or(DEFAULT_MAX_STEPS);
133    let mut final_output = String::new();
134    let baseline_git_dirty_files = capture_git_dirty_files(&cwd).await;
135    let mut touched_files = HashSet::new();
136    let mut validation_retry_count: u8 = 0;
137
138    let mut last_tool_sig: Option<String> = None;
139    let mut consecutive_same_tool: u32 = 0;
140    let mut consecutive_codesearch_no_matches: u32 = 0;
141    let mut build_mode_tool_retry_count: u8 = 0;
142    let mut native_tool_promise_retry_count: u8 = 0;
143    let turn_id = Uuid::new_v4().to_string();
144
145    let tool_router: Option<ToolCallRouter> = {
146        let cfg = ToolRouterConfig::from_env();
147        match ToolCallRouter::from_config(&cfg) {
148            Ok(r) => r,
149            Err(e) => {
150                tracing::warn!(error = %e, "FunctionGemma tool router init failed; disabled");
151                None
152            }
153        }
154    };
155
156    for step in 1..=max_steps {
157        tracing::info!(step = step, "Agent step starting");
158
159        super::cost_guard::enforce_cost_budget()?;
160
161        // Phase A: derive the per-step LLM context from a clone of
162        // `session.messages` rather than mutating history in place. The
163        // experimental strategies, RLM-powered context-window
164        // enforcement, and orphan-pair repair all run against the
165        // clone; the canonical transcript stays append-only.
166        let policy = effective_policy(session);
167        let mut derived = derive_with_policy(
168            session,
169            Arc::clone(&provider),
170            &model,
171            &system_prompt,
172            &advertised_tool_definitions,
173            None,
174            policy,
175            None,
176        )
177        .await?;
178
179        let mut proactive_lsp_message = build_proactive_lsp_context_message(
180            selected_provider.as_str(),
181            step,
182            &tool_registry,
183            &session.messages,
184            &cwd,
185        )
186        .await;
187        let bucket = bucket_for_messages(session.history());
188
189        let mut attempt = 0;
190        let mut upstream_retry_count: u8 = 0;
191        const MAX_UPSTREAM_RETRIES: u8 = 3;
192        let response = loop {
193            attempt += 1;
194
195            let mut messages = vec![Message {
196                role: Role::System,
197                content: vec![ContentPart::Text {
198                    text: system_prompt.clone(),
199                }],
200            }];
201            if let Some(msg) = &proactive_lsp_message {
202                messages.push(msg.clone());
203            }
204            messages.extend(derived.messages.clone());
205
206            let request = CompletionRequest {
207                messages,
208                tools: advertised_tool_definitions.clone(),
209                model: model.clone(),
210                temperature,
211                top_p: None,
212                max_tokens: Some(session_completion_max_tokens()),
213                stop: Vec::new(),
214            };
215
216            match provider.complete(request).await {
217                Ok(r) => {
218                    session.metadata.delegation.update(
219                        &selected_provider,
220                        delegation_skills::MODEL_CALL,
221                        bucket,
222                        true,
223                    );
224                    break r;
225                }
226                Err(e) => {
227                    if attempt == 1 && is_prompt_too_long_error(&e) {
228                        tracing::warn!(error = %e, "Provider rejected prompt as too long; re-deriving with force_keep_last=6 and retrying");
229                        derived = derive_with_policy(
230                            session,
231                            Arc::clone(&provider),
232                            &model,
233                            &system_prompt,
234                            &advertised_tool_definitions,
235                            None,
236                            policy,
237                            Some(6),
238                        )
239                        .await?;
240                        continue;
241                    }
242                    if upstream_retry_count < MAX_UPSTREAM_RETRIES
243                        && is_retryable_upstream_error(&e)
244                    {
245                        session.metadata.delegation.update(
246                            &selected_provider,
247                            delegation_skills::MODEL_CALL,
248                            bucket,
249                            false,
250                        );
251                        upstream_retry_count += 1;
252                        let backoff_secs = 1u64 << (upstream_retry_count - 1).min(2);
253                        tracing::warn!(
254                            error = %e,
255                            retry = upstream_retry_count,
256                            max = MAX_UPSTREAM_RETRIES,
257                            backoff_secs,
258                            "Retryable upstream provider error; sleeping and retrying"
259                        );
260                        tokio::time::sleep(std::time::Duration::from_secs(backoff_secs)).await;
261                        if let Some((retry_provider, retry_model)) = choose_router_target_bandit(
262                            &registry,
263                            &session.metadata.delegation,
264                            bucket,
265                            &selected_provider,
266                            &model,
267                        ) {
268                            tracing::info!(
269                                to_provider = %retry_provider,
270                                to_model = %retry_model,
271                                "Failing over to alternate provider/model"
272                            );
273                            selected_provider = retry_provider;
274                            provider = registry.get(&selected_provider).ok_or_else(|| {
275                                anyhow::anyhow!("Provider {} not found", selected_provider.clone())
276                            })?;
277                            model = retry_model;
278                            provider_state = build_provider_step_state(
279                                Arc::clone(&provider),
280                                &selected_provider,
281                                &model,
282                                &cwd,
283                            );
284                            tool_registry = provider_state.tool_registry.clone();
285                            tool_definitions = provider_state.tool_definitions.clone();
286                            temperature = provider_state.temperature;
287                            model_supports_tools = provider_state.model_supports_tools;
288                            advertised_tool_definitions =
289                                provider_state.advertised_tool_definitions.clone();
290                            system_prompt = provider_state.system_prompt.clone();
291                            derived = derive_with_policy(
292                                session,
293                                Arc::clone(&provider),
294                                &model,
295                                &system_prompt,
296                                &advertised_tool_definitions,
297                                None,
298                                policy,
299                                None,
300                            )
301                            .await?;
302                            proactive_lsp_message = build_proactive_lsp_context_message(
303                                selected_provider.as_str(),
304                                step,
305                                &tool_registry,
306                                &session.messages,
307                                &cwd,
308                            )
309                            .await;
310                            session.metadata.model = Some(format!("{selected_provider}/{model}"));
311                            attempt = 0;
312                        }
313                        continue;
314                    }
315                    return Err(e);
316                }
317            }
318        };
319
320        let response = if let Some(ref router) = tool_router {
321            router
322                .maybe_reformat(response, &tool_definitions, model_supports_tools)
323                .await
324        } else {
325            response
326        };
327        let response = normalize_textual_tool_calls(response, &tool_definitions);
328
329        crate::telemetry::TOKEN_USAGE.record_model_usage_with_cache(
330            &model,
331            response.usage.prompt_tokens as u64,
332            response.usage.completion_tokens as u64,
333            response.usage.cache_read_tokens.unwrap_or(0) as u64,
334            response.usage.cache_write_tokens.unwrap_or(0) as u64,
335        );
336
337        let mut truncated_tool_ids: Vec<(String, String)> = Vec::new();
338        let tool_calls: Vec<(String, String, serde_json::Value)> = response
339            .message
340            .content
341            .iter()
342            .filter_map(|part| {
343                if let ContentPart::ToolCall {
344                    id,
345                    name,
346                    arguments,
347                    ..
348                } = part
349                {
350                    match serde_json::from_str::<serde_json::Value>(arguments) {
351                        Ok(args) => Some((id.clone(), name.clone(), args)),
352                        Err(e) => {
353                            tracing::warn!(
354                                tool = %name,
355                                tool_call_id = %id,
356                                args_len = arguments.len(),
357                                error = %e,
358                                "Tool call arguments failed to parse (likely truncated by max_tokens)"
359                            );
360                            truncated_tool_ids.push((id.clone(), name.clone()));
361                            None
362                        }
363                    }
364                } else {
365                    None
366                }
367            })
368            .collect();
369
370        let assistant_text = extract_text_content(&&response.message.content);
371        if should_force_build_tool_first_retry(
372            &session.agent,
373            build_mode_tool_retry_count,
374            &tool_definitions,
375            &session.messages,
376            &cwd,
377            &assistant_text,
378            !tool_calls.is_empty(),
379            BUILD_MODE_TOOL_FIRST_MAX_RETRIES,
380        ) {
381            build_mode_tool_retry_count += 1;
382            tracing::warn!(
383                step = step,
384                agent = %session.agent,
385                retry = build_mode_tool_retry_count,
386                "Build mode tool-first guard triggered; retrying with execution nudge"
387            );
388            session.add_message(Message {
389                role: Role::User,
390                content: vec![ContentPart::Text {
391                    text: BUILD_MODE_TOOL_FIRST_NUDGE.to_string(),
392                }],
393            });
394            continue;
395        }
396        if should_retry_missing_native_tool_call(
397            selected_provider.as_str(),
398            &model,
399            native_tool_promise_retry_count,
400            &tool_definitions,
401            &assistant_text,
402            !tool_calls.is_empty(),
403            NATIVE_TOOL_PROMISE_RETRY_MAX_RETRIES,
404        ) {
405            native_tool_promise_retry_count += 1;
406            tracing::warn!(
407                step = step,
408                provider = selected_provider,
409                model = %model,
410                retry = native_tool_promise_retry_count,
411                "Model described a tool step without emitting a tool call; retrying with corrective nudge"
412            );
413            session.add_message(response.message.clone());
414            session.add_message(Message {
415                role: Role::User,
416                content: vec![ContentPart::Text {
417                    text: NATIVE_TOOL_PROMISE_NUDGE.to_string(),
418                }],
419            });
420            continue;
421        }
422        if !tool_calls.is_empty() {
423            build_mode_tool_retry_count = 0;
424            native_tool_promise_retry_count = 0;
425        } else if is_build_agent(&session.agent)
426            && build_request_requires_tool(&session.messages, &cwd)
427            && build_mode_tool_retry_count >= BUILD_MODE_TOOL_FIRST_MAX_RETRIES
428        {
429            return Err(anyhow::anyhow!(
430                "Build mode could not obtain tool calls for an explicit file-change request after {} retries. \
431                 Switch to a tool-capable model and try again.",
432                BUILD_MODE_TOOL_FIRST_MAX_RETRIES
433            ));
434        }
435
436        let mut step_text = String::new();
437
438        for part in &response.message.content {
439            match part {
440                ContentPart::Text { text } if !text.is_empty() => {
441                    step_text.push_str(text);
442                    step_text.push('\n');
443                }
444                ContentPart::Thinking { text } if !text.is_empty() => {
445                    if let Some(ref bus) = session.bus {
446                        let handle = bus.handle(&session.agent);
447                        handle.send_with_correlation(
448                            format!("agent.{}.thinking", session.agent),
449                            crate::bus::BusMessage::AgentThinking {
450                                agent_id: session.agent.clone(),
451                                thinking: text.clone(),
452                                step,
453                            },
454                            Some(turn_id.clone()),
455                        );
456                    }
457                }
458                _ => {}
459            }
460        }
461
462        if !step_text.trim().is_empty() {
463            final_output.push_str(&step_text);
464        }
465
466        if tool_calls.is_empty() && truncated_tool_ids.is_empty() {
467            session.add_message(response.message.clone());
468            if is_build_agent(&session.agent) {
469                if let Some(report) =
470                    build_validation_report(&cwd, &touched_files, &baseline_git_dirty_files).await?
471                {
472                    validation_retry_count += 1;
473                    tracing::warn!(
474                        retries = validation_retry_count,
475                        issues = report.issue_count,
476                        "Post-edit validation found unresolved diagnostics"
477                    );
478                    if validation_retry_count >= POST_EDIT_VALIDATION_MAX_RETRIES {
479                        return Err(anyhow::anyhow!(
480                            "Post-edit validation failed after {} attempts.\n\n{}",
481                            POST_EDIT_VALIDATION_MAX_RETRIES,
482                            report.prompt
483                        ));
484                    }
485                    session.add_message(Message {
486                        role: Role::User,
487                        content: vec![ContentPart::Text {
488                            text: report.prompt,
489                        }],
490                    });
491                    final_output.clear();
492                    continue;
493                }
494            }
495            break;
496        }
497
498        if !truncated_tool_ids.is_empty() {
499            if tool_calls.is_empty() {
500                session.add_message(response.message.clone());
501            }
502            for (tool_id, tool_name) in &truncated_tool_ids {
503                let error_content = format!(
504                    "Error: Your tool call to `{tool_name}` was truncated — the arguments \
505                     JSON was cut off mid-string (likely hit the max_tokens limit). \
506                     Please retry with a shorter approach: use the `write` tool to write \
507                     content in smaller pieces, or reduce the size of your arguments."
508                );
509                session.add_message(Message {
510                    role: Role::Tool,
511                    content: vec![ContentPart::ToolResult {
512                        tool_call_id: tool_id.clone(),
513                        content: error_content,
514                    }],
515                });
516            }
517            if tool_calls.is_empty() {
518                continue;
519            }
520        }
521
522        {
523            let mut sigs: Vec<String> = tool_calls
524                .iter()
525                .map(|(_, name, args)| format!("{name}:{args}"))
526                .collect();
527            sigs.sort();
528            let sig = sigs.join("|");
529
530            if last_tool_sig.as_deref() == Some(&sig) {
531                consecutive_same_tool += 1;
532            } else {
533                consecutive_same_tool = 1;
534                last_tool_sig = Some(sig);
535            }
536
537            let force_answer = consecutive_same_tool > MAX_CONSECUTIVE_SAME_TOOL
538                || (!model_supports_tools && step >= 3);
539
540            if force_answer {
541                tracing::warn!(
542                    step = step,
543                    consecutive = consecutive_same_tool,
544                    "Breaking agent loop: forcing final answer",
545                );
546                let mut nudge_msg = response.message.clone();
547                nudge_msg
548                    .content
549                    .retain(|p| !matches!(p, ContentPart::ToolCall { .. }));
550                if !nudge_msg.content.is_empty() {
551                    session.add_message(nudge_msg);
552                }
553                session.add_message(Message {
554                    role: Role::User,
555                    content: vec![ContentPart::Text {
556                        text: FORCE_FINAL_ANSWER_NUDGE.to_string(),
557                    }],
558                });
559                continue;
560            }
561        }
562
563        session.add_message(response.message.clone());
564
565        tracing::info!(
566            step = step,
567            num_tools = tool_calls.len(),
568            "Executing tool calls"
569        );
570
571        let mut codesearch_thrash_guard_triggered = false;
572        for (tool_id, tool_name, tool_input) in tool_calls {
573            let (tool_name, tool_input) =
574                normalize_tool_call_for_execution(&tool_name, &tool_input);
575            tracing::info!(tool = %tool_name, tool_id = %tool_id, "Executing tool");
576
577            if tool_name == "list_tools" {
578                let content = list_tools_bootstrap_output(&tool_definitions, &tool_input);
579                session.add_message(Message {
580                    role: Role::Tool,
581                    content: vec![ContentPart::ToolResult {
582                        tool_call_id: tool_id,
583                        content,
584                    }],
585                });
586                continue;
587            }
588
589            if let Some(ref bus) = session.bus {
590                let handle = bus.handle(&session.agent);
591                handle.send_with_correlation(
592                    format!("agent.{}.tool.request", session.agent),
593                    crate::bus::BusMessage::ToolRequest {
594                        request_id: tool_id.clone(),
595                        agent_id: session.agent.clone(),
596                        tool_name: tool_name.clone(),
597                        arguments: tool_input.clone(),
598                        step,
599                    },
600                    Some(turn_id.clone()),
601                );
602            }
603
604            if is_interactive_tool(&tool_name) {
605                tracing::warn!(tool = %tool_name, "Blocking interactive tool in session loop");
606                session.add_message(Message {
607                    role: Role::Tool,
608                    content: vec![ContentPart::ToolResult {
609                        tool_call_id: tool_id,
610                        content: "Error: Interactive tool 'question' is disabled in this interface. Ask the user directly in assistant text.".to_string(),
611                    }],
612                });
613                continue;
614            }
615
616            if let Some(reason) = detect_stub_in_tool_input(&tool_name, &tool_input) {
617                tracing::warn!(tool = %tool_name, reason = %reason, "Blocking suspected stubbed edit");
618                session.add_message(Message {
619                    role: Role::Tool,
620                    content: vec![ContentPart::ToolResult {
621                        tool_call_id: tool_id,
622                        content: format!(
623                            "Error: Refactor guard rejected this edit: {reason}. \
624                             Provide concrete, behavior-preserving implementation (no placeholders/stubs)."
625                        ),
626                    }],
627                });
628                continue;
629            }
630
631            let exec_start = std::time::Instant::now();
632            let exec_input = enrich_tool_input_with_runtime_context(
633                &tool_input,
634                &cwd,
635                session.metadata.model.as_deref(),
636                &session.id,
637                &session.agent,
638                session.metadata.provenance.as_ref(),
639            );
640            let (content, success, tool_metadata) = execute_tool(
641                &tool_registry,
642                &tool_name,
643                &exec_input,
644                &session.id,
645                exec_start,
646            )
647            .await;
648
649            let requires_confirmation = tool_result_requires_confirmation(tool_metadata.as_ref());
650            let (content, success, tool_metadata, requires_confirmation) = if requires_confirmation
651                && session.metadata.auto_apply_edits
652            {
653                let preview_content = content.clone();
654                match auto_apply_pending_confirmation(
655                    &tool_name,
656                    &exec_input,
657                    tool_metadata.as_ref(),
658                )
659                .await
660                {
661                    Ok(Some((content, success, tool_metadata))) => {
662                        tracing::info!(
663                            tool = %tool_name,
664                            "Auto-applied pending confirmation in TUI session"
665                        );
666                        (content, success, tool_metadata, false)
667                    }
668                    Ok(None) => (content, success, tool_metadata, true),
669                    Err(error) => (
670                        format!(
671                            "{}\n\nTUI edit auto-apply failed: {}",
672                            pending_confirmation_tool_result_content(&tool_name, &preview_content,),
673                            error
674                        ),
675                        false,
676                        tool_metadata,
677                        true,
678                    ),
679                }
680            } else {
681                (content, success, tool_metadata, requires_confirmation)
682            };
683            let rendered_content = if requires_confirmation {
684                pending_confirmation_tool_result_content(&tool_name, &content)
685            } else {
686                content.clone()
687            };
688
689            if !requires_confirmation {
690                track_touched_files(
691                    &mut touched_files,
692                    &cwd,
693                    &tool_name,
694                    &tool_input,
695                    tool_metadata.as_ref(),
696                );
697            }
698
699            let duration_ms = exec_start.elapsed().as_millis() as u64;
700            let codesearch_no_match =
701                is_codesearch_no_match_output(&tool_name, success, &rendered_content);
702
703            if let Some(ref bus) = session.bus {
704                let handle = bus.handle(&session.agent);
705                handle.send_with_correlation(
706                    format!("agent.{}.tool.response", session.agent),
707                    crate::bus::BusMessage::ToolResponse {
708                        request_id: tool_id.clone(),
709                        agent_id: session.agent.clone(),
710                        tool_name: tool_name.clone(),
711                        result: rendered_content.clone(),
712                        success,
713                        step,
714                    },
715                    Some(turn_id.clone()),
716                );
717                handle.send_with_correlation(
718                    format!("agent.{}.tool.output", session.agent),
719                    crate::bus::BusMessage::ToolOutputFull {
720                        agent_id: session.agent.clone(),
721                        tool_name: tool_name.clone(),
722                        output: rendered_content.clone(),
723                        success,
724                        step,
725                    },
726                    Some(turn_id.clone()),
727                );
728            }
729
730            if let Some(base_dir) = super::archive::event_stream_path() {
731                write_tool_event_file(
732                    &base_dir,
733                    &session.id,
734                    &tool_name,
735                    success,
736                    duration_ms,
737                    &rendered_content,
738                    session.messages.len() as u64,
739                );
740            }
741
742            let content = maybe_route_through_rlm(
743                &rendered_content,
744                &tool_name,
745                &tool_input,
746                &tool_id,
747                &session.id,
748                &session.messages,
749                &model,
750                Arc::clone(&provider),
751                &session.metadata.rlm,
752            )
753            .await;
754
755            session.add_message(Message {
756                role: Role::Tool,
757                content: vec![ContentPart::ToolResult {
758                    tool_call_id: tool_id,
759                    content,
760                }],
761            });
762
763            if is_build_agent(&session.agent) {
764                if codesearch_no_match {
765                    consecutive_codesearch_no_matches += 1;
766                } else {
767                    consecutive_codesearch_no_matches = 0;
768                }
769
770                if consecutive_codesearch_no_matches >= MAX_CONSECUTIVE_CODESEARCH_NO_MATCHES {
771                    tracing::warn!(
772                        step = step,
773                        consecutive_codesearch_no_matches = consecutive_codesearch_no_matches,
774                        "Detected codesearch no-match thrash; nudging model to stop variant retries",
775                    );
776                    session.add_message(Message {
777                        role: Role::User,
778                        content: vec![ContentPart::Text {
779                            text: CODESEARCH_THRASH_NUDGE.to_string(),
780                        }],
781                    });
782                    codesearch_thrash_guard_triggered = true;
783                    break;
784                }
785            }
786        }
787
788        if codesearch_thrash_guard_triggered {
789            continue;
790        }
791    }
792
793    session.save().await?;
794
795    super::archive::archive_event_stream_to_s3(&session.id, super::archive::event_stream_path())
796        .await;
797
798    Ok(SessionResult {
799        text: final_output.trim().to_string(),
800        session_id: session.id.clone(),
801    })
802}
803
804/// Split the session's configured model string into `(provider, model_id)`.
805fn parse_session_model_selector(session: &Session, providers: &[&str]) -> (Option<String>, String) {
806    let Some(ref model_str) = session.metadata.model else {
807        return (None, String::new());
808    };
809    let (prov, model) = parse_model_string(model_str);
810    let prov = prov.map(|p| match p {
811        "zhipuai" | "z-ai" => "zai",
812        other => other,
813    });
814    if prov.is_some() {
815        (prov.map(|s| s.to_string()), model.to_string())
816    } else if providers.contains(&model) {
817        (Some(model.to_string()), String::new())
818    } else {
819        (None, model.to_string())
820    }
821}
822
823/// Execute a tool call and emit the corresponding audit entry.
824async fn execute_tool(
825    tool_registry: &ToolRegistry,
826    tool_name: &str,
827    exec_input: &serde_json::Value,
828    session_id: &str,
829    exec_start: std::time::Instant,
830) -> (
831    String,
832    bool,
833    Option<std::collections::HashMap<String, serde_json::Value>>,
834) {
835    if let Some(tool) = tool_registry.get(tool_name) {
836        match tool.execute(exec_input.clone()).await {
837            Ok(result) => {
838                let duration_ms = exec_start.elapsed().as_millis() as u64;
839                tracing::info!(tool = %tool_name, success = result.success, "Tool execution completed");
840                if let Some(audit) = try_audit_log() {
841                    audit
842                        .log_with_correlation(
843                            AuditCategory::ToolExecution,
844                            format!("tool:{}", tool_name),
845                            if result.success {
846                                AuditOutcome::Success
847                            } else {
848                                AuditOutcome::Failure
849                            },
850                            None,
851                            Some(json!({
852                                "duration_ms": duration_ms,
853                                "output_len": result.output.len()
854                            })),
855                            None,
856                            None,
857                            None,
858                            Some(session_id.to_string()),
859                        )
860                        .await;
861                }
862                (result.output, result.success, Some(result.metadata))
863            }
864            Err(e) => {
865                let duration_ms = exec_start.elapsed().as_millis() as u64;
866                tracing::warn!(tool = %tool_name, error = %e, "Tool execution failed");
867                if let Some(audit) = try_audit_log() {
868                    audit
869                        .log_with_correlation(
870                            AuditCategory::ToolExecution,
871                            format!("tool:{}", tool_name),
872                            AuditOutcome::Failure,
873                            None,
874                            Some(json!({ "duration_ms": duration_ms, "error": e.to_string() })),
875                            None,
876                            None,
877                            None,
878                            Some(session_id.to_string()),
879                        )
880                        .await;
881                }
882                (format!("Error: {}", e), false, None)
883            }
884        }
885    } else {
886        tracing::warn!(tool = %tool_name, "Tool not found");
887        if let Some(audit) = try_audit_log() {
888            audit
889                .log_with_correlation(
890                    AuditCategory::ToolExecution,
891                    format!("tool:{}", tool_name),
892                    AuditOutcome::Failure,
893                    None,
894                    Some(json!({ "error": "unknown_tool" })),
895                    None,
896                    None,
897                    None,
898                    Some(session_id.to_string()),
899                )
900                .await;
901        }
902        (format!("Error: Unknown tool '{}'", tool_name), false, None)
903    }
904}
905
906/// Write a [`ChatEvent::tool_result`] JSONL record to disk (fire-and-forget).
907fn write_tool_event_file(
908    base_dir: &std::path::Path,
909    session_id: &str,
910    tool_name: &str,
911    success: bool,
912    duration_ms: u64,
913    rendered_content: &str,
914    seq: u64,
915) {
916    let workspace = std::env::var("PWD")
917        .map(std::path::PathBuf::from)
918        .unwrap_or_else(|_| std::env::current_dir().unwrap_or_default());
919    let event = ChatEvent::tool_result(
920        workspace,
921        session_id.to_string(),
922        tool_name,
923        success,
924        duration_ms,
925        rendered_content,
926        seq,
927    );
928    let event_json = event.to_json();
929    let timestamp = Utc::now().format("%Y%m%dT%H%M%SZ");
930    let filename = format!(
931        "{}-chat-events-{:020}-{:020}.jsonl",
932        timestamp,
933        seq * 10000,
934        (seq + 1) * 10000
935    );
936    let event_path = base_dir.join(session_id).join(filename);
937
938    tokio::spawn(async move {
939        if let Some(parent) = event_path.parent() {
940            let _ = tokio::fs::create_dir_all(parent).await;
941        }
942        if let Ok(mut file) = tokio::fs::OpenOptions::new()
943            .create(true)
944            .append(true)
945            .open(&event_path)
946            .await
947        {
948            use tokio::io::AsyncWriteExt;
949            let _ = file.write_all(event_json.as_bytes()).await;
950            let _ = file.write_all(b"\n").await;
951        }
952    });
953}
954
955/// Route a large tool output through the Recursive Language Model when it
956/// exceeds the routing heuristics.
957async fn maybe_route_through_rlm(
958    rendered_content: &str,
959    tool_name: &str,
960    tool_input: &serde_json::Value,
961    tool_id: &str,
962    session_id: &str,
963    messages: &[Message],
964    model: &str,
965    provider: Arc<dyn crate::provider::Provider>,
966    rlm_config: &RlmConfig,
967) -> String {
968    let ctx_window = context_window_for_model(model);
969    let current_tokens = estimate_tokens_for_messages(messages);
970    let routing_ctx = RoutingContext {
971        tool_id: tool_name.to_string(),
972        session_id: session_id.to_string(),
973        call_id: Some(tool_id.to_string()),
974        model_context_limit: ctx_window,
975        current_context_tokens: Some(current_tokens),
976    };
977    let routing = RlmRouter::should_route(rendered_content, &routing_ctx, rlm_config);
978    if !routing.should_route {
979        return rendered_content.to_string();
980    }
981
982    tracing::info!(
983        tool = %tool_name,
984        reason = %routing.reason,
985        estimated_tokens = routing.estimated_tokens,
986        "RLM: Routing large tool output"
987    );
988    let auto_ctx = AutoProcessContext {
989        tool_id: tool_name,
990        tool_args: tool_input.clone(),
991        session_id,
992        abort: None,
993        on_progress: None,
994        provider: Arc::clone(&provider),
995        model: model.to_string(),
996        bus: None,
997        trace_id: None,
998        subcall_provider: None,
999        subcall_model: None,
1000    };
1001    let original_bytes = rendered_content.len();
1002    match RlmRouter::auto_process(rendered_content, auto_ctx, &rlm_config).await {
1003        Ok(result) => {
1004            tracing::info!(
1005                input_tokens = result.stats.input_tokens,
1006                output_tokens = result.stats.output_tokens,
1007                iterations = result.stats.iterations,
1008                "RLM: Processing complete"
1009            );
1010            format!(
1011                "[RLM-SUMMARY tool={tool_name} original_bytes={original_bytes} reason={reason}]\n{body}\n[END RLM-SUMMARY — the raw tool output was replaced by this model-generated summary; re-running the same call will produce a similar summary, not the original bytes]",
1012                reason = routing.reason,
1013                body = result.processed,
1014            )
1015        }
1016        Err(e) => {
1017            tracing::warn!(error = %e, "RLM: auto_process failed, using smart_truncate");
1018            let (truncated, _, _) =
1019                RlmRouter::smart_truncate(rendered_content, tool_name, tool_input, ctx_window / 4);
1020            truncated
1021        }
1022    }
1023}