Skip to main content

pawan/agent/
execute.rs

1//! Agent execution loop — tool calling, streaming, and coordinator dispatch.
2
3use super::{
4    prepare_recalled_context, AgentResponse, LLMResponse, Message, PawanAgent, PermissionCallback,
5    PermissionRequest, Role, TokenCallback, TokenUsage, ToolCallRecord, ToolCallRequest,
6    ToolCallback, ToolResultMessage, ToolStartCallback,
7};
8use crate::coordinator::{CoordinatorResult, ToolCallingConfig, ToolCoordinator};
9use crate::tools::ToolRegistry;
10use crate::{PawanError, Result};
11use serde_json::{json, Value};
12use std::sync::Arc;
13use std::time::Instant;
14
15/// Truncate a tool result JSON value to fit within max_chars.
16/// Unlike naive string truncation (which breaks JSON), this truncates string
17/// *values* within the JSON structure, preserving valid JSON output.
18pub(crate) fn truncate_tool_result(value: Value, max_chars: usize) -> Value {
19    let serialized = serde_json::to_string(&value).unwrap_or_default();
20    if serialized.len() <= max_chars {
21        return value;
22    }
23
24    // Strategy: find the largest string values and truncate them
25    match value {
26        Value::Object(map) => {
27            let mut result = serde_json::Map::new();
28            let total = serialized.len();
29            for (k, v) in map {
30                if let Value::String(s) = &v {
31                    if s.len() > 500 {
32                        // Proportional truncation: shrink large strings
33                        let target = s.len() * max_chars / total;
34                        let target = target.max(200); // Keep at least 200 chars
35                        let truncated: String = s.chars().take(target).collect();
36                        result.insert(
37                            k,
38                            json!(format!(
39                                "{}...[truncated from {} chars]",
40                                truncated,
41                                s.len()
42                            )),
43                        );
44                        continue;
45                    }
46                }
47                // Recursively truncate nested structures
48                result.insert(k, truncate_tool_result(v, max_chars));
49            }
50            Value::Object(result)
51        }
52        Value::String(s) if s.len() > max_chars => {
53            let truncated: String = s.chars().take(max_chars).collect();
54            json!(format!(
55                "{}...[truncated from {} chars]",
56                truncated,
57                s.len()
58            ))
59        }
60        Value::Array(arr) if serialized.len() > max_chars => {
61            // Truncate array: keep first N items that fit
62            let mut result = Vec::new();
63            let mut running_len = 2; // "[]"
64            for item in arr {
65                let item_str = serde_json::to_string(&item).unwrap_or_default();
66                running_len += item_str.len() + 1; // +1 for comma
67                if running_len > max_chars {
68                    result.push(json!(format!("...[{} more items truncated]", 0)));
69                    break;
70                }
71                result.push(item);
72            }
73            Value::Array(result)
74        }
75        other => other,
76    }
77}
78
79/// Summarize tool arguments for permission requests
80pub(crate) fn summarize_args(args: &serde_json::Value) -> String {
81    match args {
82        serde_json::Value::Object(map) => {
83            let mut parts = Vec::new();
84            for (key, value) in map {
85                let value_str = match value {
86                    serde_json::Value::String(s) if s.len() > 50 => {
87                        format!("\"{}...\"", &s[..47])
88                    }
89                    serde_json::Value::String(s) => format!("\"{}\"", s),
90                    serde_json::Value::Array(arr) if arr.len() > 3 => {
91                        format!("[... {} items]", arr.len())
92                    }
93                    serde_json::Value::Array(arr) => {
94                        let items: Vec<String> = arr
95                            .iter()
96                            .take(3)
97                            .map(|v| match v {
98                                serde_json::Value::String(s) => {
99                                    if s.len() > 20 {
100                                        format!("\"{}...\"", &s[..17])
101                                    } else {
102                                        format!("\"{}\"", s)
103                                    }
104                                }
105                                _ => v.to_string(),
106                            })
107                            .collect();
108                        format!("[{}]", items.join(", "))
109                    }
110                    _ => value.to_string(),
111                };
112                parts.push(format!("{}: {}", key, value_str));
113            }
114            parts.join(", ")
115        }
116        serde_json::Value::String(s) => {
117            if s.len() > 100 {
118                format!("\"{}...\"", &s[..97])
119            } else {
120                format!("\"{}\"", s)
121            }
122        }
123        serde_json::Value::Array(arr) => {
124            format!("[{} items]", arr.len())
125        }
126        _ => args.to_string(),
127    }
128}
129
130impl PawanAgent {
131    /// Execute a single prompt with tool calling support
132    pub async fn execute(&mut self, user_prompt: &str) -> Result<AgentResponse> {
133        self.execute_with_callbacks(user_prompt, None, None, None)
134            .await
135    }
136
137    /// Execute with optional callbacks for streaming
138    pub async fn execute_with_callbacks(
139        &mut self,
140        user_prompt: &str,
141        on_token: Option<TokenCallback>,
142        on_tool: Option<ToolCallback>,
143        on_tool_start: Option<ToolStartCallback>,
144    ) -> Result<AgentResponse> {
145        self.execute_with_all_callbacks(user_prompt, on_token, on_tool, on_tool_start, None)
146            .await
147    }
148
149    /// Execute with all callbacks, including permission prompt.
150    pub async fn execute_with_all_callbacks(
151        &mut self,
152        user_prompt: &str,
153        on_token: Option<TokenCallback>,
154        on_tool: Option<ToolCallback>,
155        on_tool_start: Option<ToolStartCallback>,
156        on_permission: Option<PermissionCallback>,
157    ) -> Result<AgentResponse> {
158        // Check if coordinator mode is enabled
159        if self.config.use_coordinator {
160            // Coordinator mode does not support callbacks or permission prompts
161            if on_token.is_some()
162                || on_tool.is_some()
163                || on_tool_start.is_some()
164                || on_permission.is_some()
165            {
166                tracing::warn!(
167                    "Callbacks and permission prompts are not supported in coordinator mode; ignoring them"
168                );
169            }
170            return self.execute_with_coordinator(user_prompt).await;
171        }
172
173        // Reset idle timeout for the new turn
174        self.last_tool_call_time = None;
175
176        // Inject Eruka core memory and prefetch context
177        self.inject_eruka_context(user_prompt).await;
178
179        // Build effective prompt with architecture context and push to history
180        let effective_prompt = self.build_user_prompt(user_prompt)?;
181        self.history.push(Message {
182            role: Role::User,
183            content: effective_prompt,
184            tool_calls: vec![],
185            tool_result: None,
186        });
187
188        let mut all_tool_calls = Vec::new();
189        let mut total_usage = TokenUsage::default();
190        let mut iterations = 0;
191        let max_iterations = self.config.max_tool_iterations;
192
193        loop {
194            // Check idle timeout
195            if let Some(last_time) = self.last_tool_call_time {
196                let elapsed = last_time.elapsed().as_secs();
197                if elapsed > self.config.tool_call_idle_timeout_secs {
198                    return Err(PawanError::Agent(format!(
199                        "Tool idle timeout exceeded ({}s > {}s)",
200                        elapsed, self.config.tool_call_idle_timeout_secs
201                    )));
202                }
203            }
204
205            iterations += 1;
206            if iterations > max_iterations {
207                return Err(PawanError::Agent(format!(
208                    "Max tool iterations ({}) exceeded",
209                    max_iterations
210                )));
211            }
212
213            // Budget nudge + context estimation + pruning
214            self.apply_iteration_budgets(iterations, max_iterations)
215                .await;
216
217            // Dynamic tool selection: pick the most relevant tools for this query
218            let latest_query = self
219                .history
220                .iter()
221                .rev()
222                .find(|m| m.role == Role::User)
223                .map(|m| m.content.as_str())
224                .unwrap_or("");
225            let tool_defs = self.tools.select_for_query(latest_query, 12);
226            if iterations == 1 {
227                let tool_names: Vec<&str> = tool_defs.iter().map(|t| t.name.as_str()).collect();
228                tracing::info!(tools = ?tool_names, count = tool_defs.len(), "Selected tools for query");
229            }
230
231            // Update idle timeout tracker before LLM call to track time spent in generation
232            self.last_tool_call_time = Some(Instant::now());
233
234            // Resilient LLM call with retry on transient failures
235            let response = self
236                .call_llm_with_retry(&tool_defs, on_token.as_ref())
237                .await;
238
239            // Accumulate token usage with thinking/action split
240            if let Some(ref usage) = response.usage {
241                Self::accumulate_token_usage(
242                    usage,
243                    &mut total_usage,
244                    iterations,
245                    self.config.thinking_budget,
246                );
247            }
248
249            // Strip thinking blocks from content
250            let clean_content = Self::strip_thinking_blocks(&response.content);
251
252            if response.tool_calls.is_empty() {
253                // Guardrails for no-tool responses; returns true if conversation is complete
254                if self
255                    .handle_no_tool_response(
256                        &clean_content,
257                        user_prompt,
258                        &tool_defs,
259                        iterations,
260                        max_iterations,
261                        &response.finish_reason,
262                    )
263                    .await
264                {
265                    self.history.push(Message {
266                        role: Role::Assistant,
267                        content: clean_content.clone(),
268                        tool_calls: vec![],
269                        tool_result: None,
270                    });
271                    // Persist this completed turn to Eruka
272                    if let Some(eruka) = &self.eruka {
273                        if let Err(e) = eruka
274                            .sync_turn(user_prompt, &clean_content, &self.session_id)
275                            .await
276                        {
277                            tracing::warn!("Eruka sync_turn failed (non-fatal): {}", e);
278                        }
279                    }
280                    return Ok(AgentResponse {
281                        content: clean_content,
282                        tool_calls: all_tool_calls,
283                        iterations,
284                        usage: total_usage,
285                    });
286                }
287                continue;
288            }
289
290            // Push assistant response to history
291            self.history.push(Message {
292                role: Role::Assistant,
293                content: response.content.clone(),
294                tool_calls: response.tool_calls.clone(),
295                tool_result: None,
296            });
297
298            // Validate permissions, emit start events, partition into pending vs denied
299            let (pending, mut ordered_records, mut ordered_tool_messages, mut ordered_compile_gate) =
300                self.check_tool_permissions(
301                    &response.tool_calls,
302                    on_permission.as_ref(),
303                    on_tool.as_ref(),
304                    on_tool_start.as_ref(),
305                )
306                .await;
307
308            // Execute pending tools in parallel
309            if !pending.is_empty() {
310                let results = Self::execute_pending_tools(
311                    &self.tools,
312                    self.config.bash_timeout_secs,
313                    self.config.max_result_chars,
314                    pending,
315                    on_tool.as_ref(),
316                )
317                .await;
318                for (idx, record, tool_msg, wrote_rs) in results {
319                    ordered_records[idx] = Some(record);
320                    ordered_tool_messages[idx] = Some(tool_msg);
321                    ordered_compile_gate[idx] = wrote_rs;
322                }
323            }
324
325            // Collect ordered results and run compile gates
326            self.collect_tool_results(
327                &mut all_tool_calls,
328                &mut ordered_records,
329                &mut ordered_tool_messages,
330                &ordered_compile_gate,
331                response.tool_calls.len(),
332            )
333            .await;
334        }
335    }
336    // ─── Helper functions for execute_with_all_callbacks ───────────────
337
338    /// Inject Eruka core memory and prefetch task-relevant context into history.
339    async fn inject_eruka_context(&mut self, user_prompt: &str) {
340        if let Some(eruka) = &self.eruka {
341            let before_inject = self.history.len();
342            if let Err(e) = eruka.inject_core_memory(&mut self.history).await {
343                tracing::warn!("Eruka memory injection failed (non-fatal): {}", e);
344            }
345
346            for msg in self
347                .history
348                .iter_mut()
349                .skip(before_inject)
350                .filter(|m| m.role == Role::System)
351            {
352                let fenced = prepare_recalled_context("eruka_core_memory", &msg.content);
353                if !fenced.is_empty() {
354                    msg.content = fenced;
355                }
356            }
357
358            // Prefetch task-relevant context: semantic search + compressed
359            // general context. Inject as a system message so the LLM can
360            // draw on prior-session context for the same query. Non-fatal.
361            match eruka.prefetch(user_prompt, 2000).await {
362                Ok(Some(ctx)) => {
363                    let fenced = prepare_recalled_context("eruka_prefetch", &ctx);
364                    if !fenced.is_empty() {
365                        self.history.push(Message {
366                            role: Role::System,
367                            content: fenced,
368                            tool_calls: vec![],
369                            tool_result: None,
370                        });
371                    }
372                }
373                Ok(None) => {}
374                Err(e) => tracing::warn!("Eruka prefetch failed (non-fatal): {}", e),
375            }
376        }
377    }
378
379    /// Prepend workspace architecture context to the user prompt, if available.
380    fn build_user_prompt(&self, user_prompt: &str) -> Result<String> {
381        if let Some(err) = &self.arch_context_error {
382            return Err(PawanError::Config(err.clone()));
383        }
384        Ok(match &self.arch_context {
385            Some(ctx) => format!(
386                "[Workspace Architecture]\n{ctx}\n[/Workspace Architecture]\n\n{user_prompt}"
387            ),
388            None => user_prompt.to_string(),
389        })
390    }
391
392    /// Nudge the model when running low on tool iterations, estimate context
393    /// size, and prune history if it exceeds the configured token budget.
394    async fn apply_iteration_budgets(&mut self, iterations: usize, max_iterations: usize) {
395        // Budget awareness: when running low on iterations, nudge the model
396        let remaining = max_iterations.saturating_sub(iterations);
397        if remaining == 3 && iterations > 1 {
398            self.history.push(Message {
399                role: Role::User,
400                content: format!(
401                    "[SYSTEM] You have {} tool iterations remaining. \
402                     Stop exploring and write the most important output now. \
403                     If you have code to write, write it immediately.",
404                    remaining
405                ),
406                tool_calls: vec![],
407                tool_result: None,
408            });
409        }
410        // Estimate context tokens
411        self.context_tokens_estimate =
412            self.history.iter().map(|m| m.content.len()).sum::<usize>() / 4;
413        if self.context_tokens_estimate > self.config.max_context_tokens {
414            // Snapshot pre-compression content to Eruka so the facts
415            // being discarded survive the prune. Non-fatal.
416            if let Some(eruka) = &self.eruka {
417                let snapshot = Self::history_snapshot_for_eruka(&self.history);
418                if let Err(e) = eruka.on_pre_compress(&snapshot, &self.session_id).await {
419                    tracing::warn!("Eruka on_pre_compress failed (non-fatal): {}", e);
420                }
421            }
422            self.prune_history();
423        }
424    }
425
426    /// Call the LLM backend with retry logic for transient failures.
427    /// Returns a synthetic error response after exhausting retries.
428    async fn call_llm_with_retry(
429        &mut self,
430        tool_defs: &[thulp_core::ToolDefinition],
431        on_token: Option<&TokenCallback>,
432    ) -> LLMResponse {
433        let max_llm_retries = 3;
434        let mut attempt = 0;
435        loop {
436            attempt += 1;
437            match self
438                .backend
439                .generate(&self.history, tool_defs, on_token)
440                .await
441            {
442                Ok(resp) => return resp,
443                Err(e) => {
444                    let err_str = e.to_string();
445                    let is_transient = err_str.contains("timeout")
446                        || err_str.contains("connection")
447                        || err_str.contains("429")
448                        || err_str.contains("500")
449                        || err_str.contains("502")
450                        || err_str.contains("503")
451                        || err_str.contains("504")
452                        || err_str.contains("reset")
453                        || err_str.contains("broken pipe");
454
455                    if is_transient && attempt <= max_llm_retries {
456                        let delay = std::time::Duration::from_secs(2u64.pow(attempt as u32));
457                        tracing::warn!(
458                            attempt = attempt,
459                            delay_secs = delay.as_secs(),
460                            error = err_str.as_str(),
461                            "LLM call failed (transient) — retrying"
462                        );
463                        tokio::time::sleep(delay).await;
464
465                        // If context is too large, prune before retry
466                        if err_str.contains("context") || err_str.contains("token") {
467                            tracing::info!(
468                                "Pruning history before retry (possible context overflow)"
469                            );
470                            if let Some(eruka) = &self.eruka {
471                                let snapshot = Self::history_snapshot_for_eruka(&self.history);
472                                if let Err(e) =
473                                    eruka.on_pre_compress(&snapshot, &self.session_id).await
474                                {
475                                    tracing::warn!(
476                                        "Eruka on_pre_compress failed (non-fatal): {}",
477                                        e
478                                    );
479                                }
480                            }
481                            self.prune_history();
482                        }
483                        continue;
484                    }
485
486                    // Non-transient or max retries exhausted — return synthetic error
487                    tracing::error!(
488                        attempt = attempt,
489                        error = err_str.as_str(),
490                        "LLM call failed permanently — returning error as content"
491                    );
492                    return LLMResponse {
493                        content: format!(
494                            "LLM error after {} attempts: {}. The task could not be completed.",
495                            attempt, err_str
496                        ),
497                        reasoning: None,
498                        tool_calls: vec![],
499                        finish_reason: "error".to_string(),
500                        usage: None,
501                    };
502                }
503            }
504        }
505    }
506
507    /// Accumulate token usage from an LLM response and enforce thinking budget.
508    fn accumulate_token_usage(
509        usage: &TokenUsage,
510        total_usage: &mut TokenUsage,
511        iterations: usize,
512        thinking_budget: usize,
513    ) {
514        total_usage.prompt_tokens += usage.prompt_tokens;
515        total_usage.completion_tokens += usage.completion_tokens;
516        total_usage.total_tokens += usage.total_tokens;
517        total_usage.reasoning_tokens += usage.reasoning_tokens;
518        total_usage.action_tokens += usage.action_tokens;
519
520        // Log token budget split per iteration
521        if usage.reasoning_tokens > 0 {
522            tracing::info!(
523                iteration = iterations,
524                think = usage.reasoning_tokens,
525                act = usage.action_tokens,
526                total = usage.completion_tokens,
527                "Token budget: think:{} act:{} (total:{})",
528                usage.reasoning_tokens,
529                usage.action_tokens,
530                usage.completion_tokens
531            );
532        }
533
534        // Thinking budget enforcement
535        if thinking_budget > 0 && usage.reasoning_tokens > thinking_budget as u64 {
536            tracing::warn!(
537                budget = thinking_budget,
538                actual = usage.reasoning_tokens,
539                "Thinking budget exceeded ({}/{} tokens)",
540                usage.reasoning_tokens,
541                thinking_budget
542            );
543        }
544    }
545
546    /// Strip `<think>` blocks from LLM response content.
547    fn strip_thinking_blocks(content: &str) -> String {
548        let mut s = content.to_string();
549        loop {
550            let lower = s.to_lowercase();
551            let open = lower.find("<think>");
552            let close = lower.find("</think>");
553            match (open, close) {
554                (Some(i), Some(j)) if j > i => {
555                    let before = s[..i].trim_end().to_string();
556                    let after = if s.len() > j + 8 {
557                        s[j + 8..].trim_start().to_string()
558                    } else {
559                        String::new()
560                    };
561                    s = if before.is_empty() {
562                        after
563                    } else if after.is_empty() {
564                        before
565                    } else {
566                        format!("{}\n{}", before, after)
567                    };
568                }
569                _ => break,
570            }
571        }
572        s
573    }
574
575    /// Handle guardrails for no-tool-call responses.
576    ///
577    /// Returns `true` if the conversation is complete (caller should return the
578    /// response), `false` if the loop should continue with a correction message.
579    async fn handle_no_tool_response(
580        &mut self,
581        clean_content: &str,
582        _user_prompt: &str,
583        tool_defs: &[thulp_core::ToolDefinition],
584        iterations: usize,
585        max_iterations: usize,
586        finish_reason: &str,
587    ) -> bool {
588        // Guardrail: detect chatty no-op (content but no tools on early iterations)
589        // Only nudge if tools are available AND response looks like planning text
590        let has_tools = !tool_defs.is_empty();
591        let lower = clean_content.to_lowercase();
592        let planning_prefix = lower.starts_with("let me")
593            || lower.starts_with("i'll help")
594            || lower.starts_with("i will help")
595            || lower.starts_with("sure, i")
596            || lower.starts_with("okay, i");
597        let looks_like_planning =
598            clean_content.len() > 200 || (planning_prefix && clean_content.len() > 50);
599        if has_tools
600            && looks_like_planning
601            && iterations == 1
602            && iterations < max_iterations
603            && finish_reason != "error"
604        {
605            tracing::warn!(
606                "No tool calls at iteration {} (content: {}B) — nudging model to use tools",
607                iterations,
608                clean_content.len()
609            );
610            self.history.push(Message {
611                role: Role::Assistant,
612                content: clean_content.to_string(),
613                tool_calls: vec![],
614                tool_result: None,
615            });
616            self.history.push(Message {
617                role: Role::User,
618                content: "You must use tools to complete this task. Do NOT just describe what you would do — actually call the tools. Start with bash or read_file.".to_string(),
619                tool_calls: vec![],
620                tool_result: None,
621            });
622            return false;
623        }
624
625        // Guardrail: detect repeated responses
626        if iterations > 1 {
627            let prev_assistant = self
628                .history
629                .iter()
630                .rev()
631                .find(|m| m.role == Role::Assistant && !m.content.is_empty());
632            if let Some(prev) = prev_assistant {
633                if prev.content.trim() == clean_content.trim() && iterations < max_iterations {
634                    tracing::warn!(
635                        "Repeated response detected at iteration {} — injecting correction",
636                        iterations
637                    );
638                    self.history.push(Message {
639                        role: Role::Assistant,
640                        content: clean_content.to_string(),
641                        tool_calls: vec![],
642                        tool_result: None,
643                    });
644                    self.history.push(Message {
645                        role: Role::User,
646                        content: "You gave the same response as before. Try a different approach. Use anchor_text in edit_file_lines, or use insert_after, or use bash with sed.".to_string(),
647                        tool_calls: vec![],
648                        tool_result: None,
649                    });
650                    return false;
651                }
652            }
653        }
654
655        true
656    }
657
658    /// Validate permissions, emit start events, and partition tool calls into
659    /// pending (ready to execute) vs denied.
660    async fn check_tool_permissions(
661        &mut self,
662        tool_calls: &[ToolCallRequest],
663        on_permission: Option<&PermissionCallback>,
664        on_tool: Option<&ToolCallback>,
665        on_tool_start: Option<&ToolStartCallback>,
666    ) -> (
667        Vec<(usize, ToolCallRequest)>,
668        Vec<Option<ToolCallRecord>>,
669        Vec<Option<Message>>,
670        Vec<bool>,
671    ) {
672        let mut ordered_records: Vec<Option<ToolCallRecord>> = vec![None; tool_calls.len()];
673        let mut ordered_tool_messages: Vec<Option<Message>> = vec![None; tool_calls.len()];
674        let ordered_compile_gate: Vec<bool> = vec![false; tool_calls.len()];
675        let mut pending: Vec<(usize, ToolCallRequest)> = Vec::new();
676
677        for (idx, tool_call) in tool_calls.iter().cloned().enumerate() {
678            self.tools.activate(&tool_call.name);
679
680            let perm =
681                crate::config::ToolPermission::resolve(&tool_call.name, &self.config.permissions);
682            let denied = match perm {
683                crate::config::ToolPermission::Deny => Some("Tool denied by permission policy"),
684                crate::config::ToolPermission::Prompt => {
685                    if tool_call.name == "bash" {
686                        if let Some(cmd) =
687                            tool_call.arguments.get("command").and_then(|v| v.as_str())
688                        {
689                            if crate::tools::bash::is_read_only(cmd) {
690                                tracing::debug!(
691                                    command = cmd,
692                                    "Auto-allowing read-only bash command under Prompt permission"
693                                );
694                                None
695                            } else if let Some(ref perm_cb) = on_permission {
696                                let args_summary = cmd.chars().take(120).collect::<String>();
697                                let rx = perm_cb(PermissionRequest {
698                                    tool_name: tool_call.name.clone(),
699                                    args_summary,
700                                });
701                                match rx.await {
702                                    Ok(true) => None,
703                                    _ => Some("User denied tool execution"),
704                                }
705                            } else {
706                                Some("Bash command requires user approval (read-only commands auto-allowed)")
707                            }
708                        } else {
709                            Some("Tool requires user approval")
710                        }
711                    } else if let Some(ref perm_cb) = on_permission {
712                        let args_summary = tool_call
713                            .arguments
714                            .to_string()
715                            .chars()
716                            .take(120)
717                            .collect::<String>();
718                        let rx = perm_cb(PermissionRequest {
719                            tool_name: tool_call.name.clone(),
720                            args_summary,
721                        });
722                        match rx.await {
723                            Ok(true) => None,
724                            _ => Some("User denied tool execution"),
725                        }
726                    } else {
727                        Some(
728                            "Tool requires user approval (set permission to allow or use TUI mode)",
729                        )
730                    }
731                }
732                crate::config::ToolPermission::Allow => None,
733            };
734
735            if let Some(reason) = denied {
736                let record = ToolCallRecord {
737                    id: tool_call.id.clone(),
738                    name: tool_call.name.clone(),
739                    arguments: tool_call.arguments.clone(),
740                    result: json!({"error": reason}),
741                    success: false,
742                    duration_ms: 0,
743                };
744                if let Some(ref callback) = on_tool {
745                    callback(&record);
746                }
747                ordered_records[idx] = Some(record);
748                ordered_tool_messages[idx] = Some(Message {
749                    role: Role::Tool,
750                    content: serde_json::to_string(&json!({"error": reason})).unwrap_or_default(),
751                    tool_calls: vec![],
752                    tool_result: Some(ToolResultMessage {
753                        tool_call_id: tool_call.id.clone(),
754                        content: json!({"error": reason}),
755                        success: false,
756                    }),
757                });
758                continue;
759            }
760
761            if let Some(ref callback) = on_tool_start {
762                callback(&tool_call.name);
763            }
764
765            if let Some(tool) = self.tools.get(&tool_call.name) {
766                let schema = tool.parameters_schema();
767                if let Ok(params) = thulp_core::ToolDefinition::parse_mcp_input_schema(&schema) {
768                    let thulp_def = thulp_core::ToolDefinition {
769                        name: tool_call.name.clone(),
770                        description: String::new(),
771                        parameters: params,
772                    };
773                    if let Err(e) = thulp_def.validate_args(&tool_call.arguments) {
774                        tracing::warn!(tool = tool_call.name.as_str(), error = %e, "Tool argument validation failed (continuing anyway)");
775                    }
776                }
777            }
778
779            let tool = self.tools.get(&tool_call.name);
780            let is_mutating = tool.map(|t| t.mutating()).unwrap_or(false);
781            if is_mutating {
782                if let Some(ref callback) = on_permission {
783                    let args_summary = summarize_args(&tool_call.arguments);
784                    let request = PermissionRequest {
785                        tool_name: tool_call.name.clone(),
786                        args_summary,
787                    };
788                    let permission_rx = (callback)(request);
789                    match permission_rx.await {
790                        Ok(true) => {}
791                        Ok(false) => {
792                            let record = ToolCallRecord {
793                                id: tool_call.id.clone(),
794                                name: tool_call.name.clone(),
795                                arguments: tool_call.arguments.clone(),
796                                result: json!({"error": "Tool execution denied by user", "tool": tool_call.name}),
797                                success: false,
798                                duration_ms: 0,
799                            };
800                            if let Some(ref callback) = on_tool {
801                                callback(&record);
802                            }
803                            ordered_records[idx] = Some(record);
804                            ordered_tool_messages[idx] = Some(Message {
805                                role: Role::Tool,
806                                content: serde_json::to_string(&json!({"error": "Tool execution denied by user", "tool": tool_call.name})).unwrap_or_default(),
807                                tool_calls: vec![],
808                                tool_result: Some(ToolResultMessage {
809                                    tool_call_id: tool_call.id.clone(),
810                                    content: json!({"error": "Tool execution denied by user", "tool": tool_call.name}),
811                                    success: false,
812                                }),
813                            });
814                            continue;
815                        }
816                        Err(_) => {
817                            let record = ToolCallRecord {
818                                id: tool_call.id.clone(),
819                                name: tool_call.name.clone(),
820                                arguments: tool_call.arguments.clone(),
821                                result: json!({"error": "Permission channel closed", "tool": tool_call.name}),
822                                success: false,
823                                duration_ms: 0,
824                            };
825                            if let Some(ref callback) = on_tool {
826                                callback(&record);
827                            }
828                            ordered_records[idx] = Some(record);
829                            ordered_tool_messages[idx] = Some(Message {
830                                role: Role::Tool,
831                                content: serde_json::to_string(&json!({"error": "Permission channel closed", "tool": tool_call.name})).unwrap_or_default(),
832                                tool_calls: vec![],
833                                tool_result: Some(ToolResultMessage {
834                                    tool_call_id: tool_call.id.clone(),
835                                    content: json!({"error": "Permission channel closed", "tool": tool_call.name}),
836                                    success: false,
837                                }),
838                            });
839                            continue;
840                        }
841                    }
842                } else {
843                    tracing::warn!(
844                        tool = tool_call.name.as_str(),
845                        "No permission callback, auto-approving mutating tool"
846                    );
847                }
848            }
849
850            pending.push((idx, tool_call));
851        }
852
853        (
854            pending,
855            ordered_records,
856            ordered_tool_messages,
857            ordered_compile_gate,
858        )
859    }
860
861    /// Execute pending tool calls in parallel with per-tool timeout handling.
862    async fn execute_pending_tools(
863        tools: &ToolRegistry,
864        bash_timeout_secs: u64,
865        max_result_chars: usize,
866        pending: Vec<(usize, ToolCallRequest)>,
867        on_tool: Option<&ToolCallback>,
868    ) -> Vec<(usize, ToolCallRecord, Message, bool)> {
869        use futures::{stream, StreamExt};
870
871        let on_tool_cb = on_tool;
872        let max_parallel = std::cmp::max(1, 10);
873        stream::iter(pending)
874            .map(|(idx, tool_call)| async move {
875                let start = std::time::Instant::now();
876
877                let result = {
878                    let tool_future = tools.execute(&tool_call.name, tool_call.arguments.clone());
879                    let timeout_dur = if tool_call.name == "bash" {
880                        std::time::Duration::from_secs(bash_timeout_secs)
881                    } else {
882                        std::time::Duration::from_secs(30)
883                    };
884                    match tokio::time::timeout(timeout_dur, tool_future).await {
885                        Ok(inner) => inner,
886                        Err(_) => Err(PawanError::Tool(format!(
887                            "Tool {} timed out after {}s",
888                            tool_call.name,
889                            timeout_dur.as_secs()
890                        ))),
891                    }
892                };
893
894                let duration_ms = start.elapsed().as_millis() as u64;
895                let (mut result_value, success) = match result {
896                    Ok(v) => (v, true),
897                    Err(e) => {
898                        tracing::warn!(tool = tool_call.name.as_str(), error = %e, "Tool execution failed");
899                        (json!({"error": e.to_string(), "tool": tool_call.name, "hint": "Try a different approach or tool"}), false)
900                    }
901                };
902
903                result_value = truncate_tool_result(result_value, max_result_chars);
904
905                let record = ToolCallRecord {
906                    id: tool_call.id.clone(),
907                    name: tool_call.name.clone(),
908                    arguments: tool_call.arguments.clone(),
909                    result: result_value.clone(),
910                    success,
911                    duration_ms,
912                };
913
914                if let Some(ref cb) = on_tool_cb {
915                    cb(&record);
916                }
917
918                let tool_msg = Message {
919                    role: Role::Tool,
920                    content: serde_json::to_string(&result_value).unwrap_or_default(),
921                    tool_calls: vec![],
922                    tool_result: Some(ToolResultMessage {
923                        tool_call_id: tool_call.id.clone(),
924                        content: result_value,
925                        success,
926                    }),
927                };
928
929                let wrote_rs = success
930                    && tool_call.name == "write_file"
931                    && tool_call
932                        .arguments
933                        .get("path")
934                        .and_then(|p| p.as_str())
935                        .map(|p| p.ends_with(".rs"))
936                        .unwrap_or(false);
937
938                (idx, record, tool_msg, wrote_rs)
939            })
940            .buffer_unordered(max_parallel)
941            .collect::<Vec<_>>()
942            .await
943    }
944
945    /// Collect ordered tool results into the history and all_tool_calls list,
946    /// and apply compile gates (cargo check after .rs file writes).
947    async fn collect_tool_results(
948        &mut self,
949        all_tool_calls: &mut Vec<ToolCallRecord>,
950        ordered_records: &mut [Option<ToolCallRecord>],
951        ordered_tool_messages: &mut [Option<Message>],
952        ordered_compile_gate: &[bool],
953        tool_calls_count: usize,
954    ) {
955        for i in 0..tool_calls_count {
956            if let Some(record) = ordered_records[i].take() {
957                all_tool_calls.push(record);
958            }
959            if let Some(msg) = ordered_tool_messages[i].take() {
960                self.history.push(msg);
961            }
962
963            if ordered_compile_gate[i] {
964                let ws = self.workspace_root.clone();
965                let check_result = tokio::process::Command::new("cargo")
966                    .arg("check")
967                    .arg("--message-format=short")
968                    .current_dir(&ws)
969                    .output()
970                    .await;
971                match check_result {
972                    Ok(output) if !output.status.success() => {
973                        let stderr = String::from_utf8_lossy(&output.stderr);
974                        let err_msg: String = stderr.chars().take(1500).collect();
975                        tracing::info!(
976                            "Compile-gate: cargo check failed after write_file, injecting errors"
977                        );
978                        self.history.push(Message {
979                            role: Role::User,
980                            content: format!(
981                                "[SYSTEM] cargo check failed after your write_file. Fix the errors:\n{}",
982                                err_msg
983                            ),
984                            tool_calls: vec![],
985                            tool_result: None,
986                        });
987                    }
988                    Ok(_) => {
989                        tracing::debug!("Compile-gate: cargo check passed");
990                    }
991                    Err(e) => {
992                        tracing::warn!("Compile-gate: cargo check failed to run: {}", e);
993                    }
994                }
995            }
996        }
997    }
998
999    /// Execute using the ToolCoordinator instead of the built-in loop.
1000    ///
1001    /// This method provides an alternative implementation that uses the
1002    /// ToolCoordinator for tool-calling loops, which offers:
1003    /// - Parallel tool execution
1004    /// - Per-tool timeout handling
1005    /// - Consistent error handling
1006    /// - Max iteration limits
1007    ///
1008    /// Note: This method does not support streaming callbacks or permission
1009    /// prompts - those are only available in the built-in loop.
1010    async fn execute_with_coordinator(&mut self, user_prompt: &str) -> Result<AgentResponse> {
1011        // Reset idle timeout for the new turn
1012        self.last_tool_call_time = None;
1013
1014        // Inject Eruka core memory before first LLM call
1015        if let Some(eruka) = &self.eruka {
1016            let before_inject = self.history.len();
1017            if let Err(e) = eruka.inject_core_memory(&mut self.history).await {
1018                tracing::warn!("Eruka memory injection failed (non-fatal): {}", e);
1019            }
1020
1021            for msg in self
1022                .history
1023                .iter_mut()
1024                .skip(before_inject)
1025                .filter(|m| m.role == Role::System)
1026            {
1027                let fenced = prepare_recalled_context("eruka_core_memory", &msg.content);
1028                if !fenced.is_empty() {
1029                    msg.content = fenced;
1030                }
1031            }
1032
1033            // Prefetch task-relevant context
1034            match eruka.prefetch(user_prompt, 2000).await {
1035                Ok(Some(ctx)) => {
1036                    let fenced = prepare_recalled_context("eruka_prefetch", &ctx);
1037                    if !fenced.is_empty() {
1038                        self.history.push(Message {
1039                            role: Role::System,
1040                            content: fenced,
1041                            tool_calls: vec![],
1042                            tool_result: None,
1043                        });
1044                    }
1045                }
1046                Ok(None) => {}
1047                Err(e) => tracing::warn!("Eruka prefetch failed (non-fatal): {}", e),
1048            }
1049        }
1050
1051        // Per-turn architecture context injection
1052
1053        if let Some(err) = &self.arch_context_error {
1054            return Err(PawanError::Config(err.clone()));
1055        }
1056
1057        let effective_prompt = match &self.arch_context {
1058            Some(ctx) => format!(
1059                "[Workspace Architecture]\n{ctx}\n[/Workspace Architecture]\n\n{user_prompt}"
1060            ),
1061            None => user_prompt.to_string(),
1062        };
1063
1064        // Build coordinator config from agent config
1065        let coordinator_config = ToolCallingConfig {
1066            max_iterations: self.config.max_tool_iterations,
1067            parallel_execution: true,
1068            max_parallel_tools: 10,
1069            tool_timeout: std::time::Duration::from_secs(self.config.bash_timeout_secs),
1070            stop_on_error: false,
1071        };
1072
1073        // Create a fresh backend for coordinator execution
1074        let system_prompt = self.config.get_system_prompt_checked()?;
1075        let backend = Self::create_backend(&self.config, &system_prompt);
1076        let backend = Arc::from(backend);
1077
1078        // Create a fresh tool registry for coordinator execution
1079        // Note: This will not include any MCP tools registered at runtime
1080        let registry = Arc::new(ToolRegistry::with_defaults(self.workspace_root.clone()));
1081
1082        // Create coordinator with backend and tool registry
1083        let coordinator = ToolCoordinator::new(backend, registry, coordinator_config);
1084
1085        // Execute with coordinator
1086        let result: CoordinatorResult = coordinator
1087            .execute(Some(&system_prompt), &effective_prompt)
1088            .await
1089            .map_err(|e| PawanError::Agent(format!("Coordinator execution failed: {}", e)))?;
1090
1091        // Convert CoordinatorResult to AgentResponse
1092        let content = result.content.clone();
1093        let agent_response = AgentResponse {
1094            content: result.content,
1095            tool_calls: result.tool_calls,
1096            iterations: result.iterations,
1097            usage: result.total_usage,
1098        };
1099
1100        // Sync turn to Eruka if enabled
1101        if let Some(eruka) = &self.eruka {
1102            if let Err(e) = eruka
1103                .sync_turn(user_prompt, &content, &self.session_id)
1104                .await
1105            {
1106                tracing::warn!("Eruka sync_turn failed (non-fatal): {}", e);
1107            }
1108        }
1109
1110        Ok(agent_response)
1111    }
1112
1113    /// Execute a healing task with real diagnostics
1114    pub async fn heal(&mut self) -> Result<AgentResponse> {
1115        let healer =
1116            crate::healing::Healer::new(self.workspace_root.clone(), self.config.healing.clone());
1117
1118        let diagnostics = healer.get_diagnostics().await?;
1119        let failed_tests = healer.get_failed_tests().await?;
1120
1121        let mut prompt = format!(
1122            "I need you to heal this Rust project at: {}
1123
1124",
1125            self.workspace_root.display()
1126        );
1127
1128        if !diagnostics.is_empty() {
1129            prompt.push_str(&format!(
1130                "## Compilation Issues ({} found)
1131{}
1132",
1133                diagnostics.len(),
1134                healer.format_diagnostics_for_prompt(&diagnostics)
1135            ));
1136        }
1137
1138        if !failed_tests.is_empty() {
1139            prompt.push_str(&format!(
1140                "## Failed Tests ({} found)
1141{}
1142",
1143                failed_tests.len(),
1144                healer.format_tests_for_prompt(&failed_tests)
1145            ));
1146        }
1147
1148        if diagnostics.is_empty() && failed_tests.is_empty() {
1149            prompt.push_str(
1150                "No issues found! Run cargo check and cargo test to verify.
1151",
1152            );
1153        }
1154
1155        prompt.push_str(
1156            "
1157Fix each issue one at a time. Verify with cargo check after each fix.",
1158        );
1159
1160        self.execute(&prompt).await
1161    }
1162    /// Execute healing with retries — calls heal(), checks for remaining errors, retries if needed.
1163    ///
1164    /// Two-stage gate:
1165    ///   Stage 1 — `cargo check`: must produce zero errors before proceeding.
1166    ///   Stage 2 — `healing.verify_cmd` (optional): a user-supplied shell command
1167    ///             (e.g. `cargo test --workspace`).  If it exits non-zero the loop
1168    ///             continues so the LLM can address the reported failures.
1169    ///
1170    /// Anti-thrash guard: each Stage-1 error is fingerprinted (kind + code +
1171    /// message prefix).  If the same fingerprint survives `max_attempts`
1172    /// consecutive rounds unchanged the loop halts rather than spinning
1173    /// indefinitely on an error the LLM cannot fix.
1174    pub async fn heal_with_retries(&mut self, max_attempts: usize) -> Result<AgentResponse> {
1175        use std::collections::{HashMap, HashSet};
1176
1177        let mut last_response = self.heal().await?;
1178        // fingerprint → consecutive rounds this error has survived unchanged
1179        let mut stuck_counts: HashMap<u64, usize> = HashMap::new();
1180
1181        for attempt in 1..max_attempts {
1182            // Stage 1: cargo check must be error-free
1183            let fixer = crate::healing::CompilerFixer::new(self.workspace_root.clone());
1184            let remaining = fixer.check().await?;
1185            let errors: Vec<_> = remaining
1186                .iter()
1187                .filter(|d| d.kind == crate::healing::DiagnosticKind::Error)
1188                .collect();
1189
1190            if !errors.is_empty() {
1191                // Update fingerprint counts.
1192                // Drop entries for errors that were fixed; increment survivors.
1193                let current_fps: HashSet<u64> = errors.iter().map(|d| d.fingerprint()).collect();
1194                stuck_counts.retain(|fp, _| current_fps.contains(fp));
1195                for fp in &current_fps {
1196                    *stuck_counts.entry(*fp).or_insert(0) += 1;
1197                }
1198
1199                // Anti-thrash: halt if any error fingerprint has not budged
1200                // after max_attempts consecutive rounds.
1201                let thrashing: Vec<u64> = stuck_counts
1202                    .iter()
1203                    .filter_map(|(&fp, &count)| {
1204                        if count >= max_attempts {
1205                            Some(fp)
1206                        } else {
1207                            None
1208                        }
1209                    })
1210                    .collect();
1211                if !thrashing.is_empty() {
1212                    tracing::warn!(
1213                        stuck_fingerprints = thrashing.len(),
1214                        attempt,
1215                        "Anti-thrash: {} error(s) unchanged after {} attempts, halting heal loop",
1216                        thrashing.len(),
1217                        max_attempts
1218                    );
1219                    return Ok(last_response);
1220                }
1221
1222                tracing::warn!(
1223                    errors = errors.len(),
1224                    attempt,
1225                    "Stage 1 (cargo check): errors remain, retrying"
1226                );
1227                last_response = self.heal().await?;
1228                continue;
1229            }
1230
1231            // All Stage-1 errors cleared — reset thrash counters.
1232            stuck_counts.clear();
1233
1234            // Stage 2: optional verify_cmd
1235            let verify_cmd = self.config.healing.verify_cmd.clone();
1236            if let Some(ref cmd) = verify_cmd {
1237                match crate::healing::run_verify_cmd(&self.workspace_root, cmd).await {
1238                    Ok(None) => {
1239                        tracing::info!(
1240                            attempts = attempt,
1241                            "Stage 2 (verify_cmd) passed, healing complete"
1242                        );
1243                        return Ok(last_response);
1244                    }
1245                    Ok(Some(diag)) => {
1246                        tracing::warn!(
1247                            attempt,
1248                            cmd,
1249                            output = diag.raw,
1250                            "Stage 2 (verify_cmd) failed, retrying"
1251                        );
1252                        last_response = self.heal().await?;
1253                        continue;
1254                    }
1255                    Err(e) => {
1256                        // Cannot spawn the command — don't block healing on this
1257                        tracing::warn!(cmd, error = %e, "verify_cmd could not be run, skipping stage 2");
1258                        return Ok(last_response);
1259                    }
1260                }
1261            } else {
1262                tracing::info!(
1263                    attempts = attempt,
1264                    "Stage 1 (cargo check) passed, healing complete"
1265                );
1266                return Ok(last_response);
1267            }
1268        }
1269
1270        tracing::info!(
1271            attempts = max_attempts,
1272            "Healing finished (may still have errors)"
1273        );
1274        Ok(last_response)
1275    }
1276    /// Execute a task with a specific prompt
1277    pub async fn task(&mut self, task_description: &str) -> Result<AgentResponse> {
1278        let prompt = format!(
1279            r#"I need you to complete the following coding task:
1280
1281{}
1282
1283The workspace is at: {}
1284
1285Please:
12861. First explore the codebase to understand the relevant code
12872. Make the necessary changes
12883. Verify the changes compile with `cargo check`
12894. Run relevant tests if applicable
1290
1291Explain your changes as you go."#,
1292            task_description,
1293            self.workspace_root.display()
1294        );
1295
1296        self.execute(&prompt).await
1297    }
1298
1299    /// Generate a commit message for current changes
1300    pub async fn generate_commit_message(&mut self) -> Result<String> {
1301        let prompt = r#"Please:
13021. Run `git status` to see what files are changed
13032. Run `git diff --cached` to see staged changes (or `git diff` for unstaged)
13043. Generate a concise, descriptive commit message following conventional commits format
1305
1306Only output the suggested commit message, nothing else."#;
1307
1308        let response = self.execute(prompt).await?;
1309        Ok(response.content)
1310    }
1311}