Skip to main content

pawan/agent/
execute.rs

1//! Agent execution loop — tool calling, streaming, and coordinator dispatch.
2
3use super::{
4    prepare_recalled_context, AgentResponse, LLMResponse, Message, PawanAgent, PermissionCallback,
5    PermissionRequest, Role, TokenCallback, TokenUsage, ToolCallback, ToolCallRecord,
6    ToolCallRequest, ToolResultMessage, ToolStartCallback,
7};
8use crate::coordinator::{CoordinatorResult, ToolCallingConfig, ToolCoordinator};
9use crate::tools::ToolRegistry;
10use crate::{PawanError, Result};
11use serde_json::{json, Value};
12use std::sync::Arc;
13use std::time::Instant;
14
15/// Truncate a tool result JSON value to fit within max_chars.
16/// Unlike naive string truncation (which breaks JSON), this truncates string
17/// *values* within the JSON structure, preserving valid JSON output.
18pub(crate) fn truncate_tool_result(value: Value, max_chars: usize) -> Value {
19    let serialized = serde_json::to_string(&value).unwrap_or_default();
20    if serialized.len() <= max_chars {
21        return value;
22    }
23
24    // Strategy: find the largest string values and truncate them
25    match value {
26        Value::Object(map) => {
27            let mut result = serde_json::Map::new();
28            let total = serialized.len();
29            for (k, v) in map {
30                if let Value::String(s) = &v {
31                    if s.len() > 500 {
32                        // Proportional truncation: shrink large strings
33                        let target = s.len() * max_chars / total;
34                        let target = target.max(200); // Keep at least 200 chars
35                        let truncated: String = s.chars().take(target).collect();
36                        result.insert(
37                            k,
38                            json!(format!(
39                                "{}...[truncated from {} chars]",
40                                truncated,
41                                s.len()
42                            )),
43                        );
44                        continue;
45                    }
46                }
47                // Recursively truncate nested structures
48                result.insert(k, truncate_tool_result(v, max_chars));
49            }
50            Value::Object(result)
51        }
52        Value::String(s) if s.len() > max_chars => {
53            let truncated: String = s.chars().take(max_chars).collect();
54            json!(format!(
55                "{}...[truncated from {} chars]",
56                truncated,
57                s.len()
58            ))
59        }
60        Value::Array(arr) if serialized.len() > max_chars => {
61            // Truncate array: keep first N items that fit
62            let mut result = Vec::new();
63            let mut running_len = 2; // "[]"
64            for item in arr {
65                let item_str = serde_json::to_string(&item).unwrap_or_default();
66                running_len += item_str.len() + 1; // +1 for comma
67                if running_len > max_chars {
68                    result.push(json!(format!("...[{} more items truncated]", 0)));
69                    break;
70                }
71                result.push(item);
72            }
73            Value::Array(result)
74        }
75        other => other,
76    }
77}
78
79/// Summarize tool arguments for permission requests
80pub(crate) fn summarize_args(args: &serde_json::Value) -> String {
81    match args {
82        serde_json::Value::Object(map) => {
83            let mut parts = Vec::new();
84            for (key, value) in map {
85                let value_str = match value {
86                    serde_json::Value::String(s) if s.len() > 50 => {
87                        format!("\"{}...\"", &s[..47])
88                    }
89                    serde_json::Value::String(s) => format!("\"{}\"", s),
90                    serde_json::Value::Array(arr) if arr.len() > 3 => {
91                        format!("[... {} items]", arr.len())
92                    }
93                    serde_json::Value::Array(arr) => {
94                        let items: Vec<String> = arr
95                            .iter()
96                            .take(3)
97                            .map(|v| match v {
98                                serde_json::Value::String(s) => {
99                                    if s.len() > 20 {
100                                        format!("\"{}...\"", &s[..17])
101                                    } else {
102                                        format!("\"{}\"", s)
103                                    }
104                                }
105                                _ => v.to_string(),
106                            })
107                            .collect();
108                        format!("[{}]", items.join(", "))
109                    }
110                    _ => value.to_string(),
111                };
112                parts.push(format!("{}: {}", key, value_str));
113            }
114            parts.join(", ")
115        }
116        serde_json::Value::String(s) => {
117            if s.len() > 100 {
118                format!("\"{}...\"", &s[..97])
119            } else {
120                format!("\"{}\"", s)
121            }
122        }
123        serde_json::Value::Array(arr) => {
124            format!("[{} items]", arr.len())
125        }
126        _ => args.to_string(),
127    }
128}
129
130
131impl PawanAgent {
132    /// Execute a single prompt with tool calling support
133    pub async fn execute(&mut self, user_prompt: &str) -> Result<AgentResponse> {
134        self.execute_with_callbacks(user_prompt, None, None, None)
135            .await
136    }
137
138    /// Execute with optional callbacks for streaming
139    pub async fn execute_with_callbacks(
140        &mut self,
141        user_prompt: &str,
142        on_token: Option<TokenCallback>,
143        on_tool: Option<ToolCallback>,
144        on_tool_start: Option<ToolStartCallback>,
145    ) -> Result<AgentResponse> {
146        self.execute_with_all_callbacks(user_prompt, on_token, on_tool, on_tool_start, None)
147            .await
148    }
149
150    /// Execute with all callbacks, including permission prompt.
151    pub async fn execute_with_all_callbacks(
152        &mut self,
153        user_prompt: &str,
154        on_token: Option<TokenCallback>,
155        on_tool: Option<ToolCallback>,
156        on_tool_start: Option<ToolStartCallback>,
157        on_permission: Option<PermissionCallback>,
158    ) -> Result<AgentResponse> {
159        // Check if coordinator mode is enabled
160        if self.config.use_coordinator {
161            // Coordinator mode does not support callbacks or permission prompts
162            if on_token.is_some()
163                || on_tool.is_some()
164                || on_tool_start.is_some()
165                || on_permission.is_some()
166            {
167                tracing::warn!(
168                    "Callbacks and permission prompts are not supported in coordinator mode; ignoring them"
169                );
170            }
171            return self.execute_with_coordinator(user_prompt).await;
172        }
173
174        // Reset idle timeout for the new turn
175        self.last_tool_call_time = None;
176
177        // Inject Eruka core memory and prefetch context
178        self.inject_eruka_context(user_prompt).await;
179
180        // Build effective prompt with architecture context and push to history
181        let effective_prompt = self.build_user_prompt(user_prompt)?;
182        self.history.push(Message {
183            role: Role::User,
184            content: effective_prompt,
185            tool_calls: vec![],
186            tool_result: None,
187        });
188
189        let mut all_tool_calls = Vec::new();
190        let mut total_usage = TokenUsage::default();
191        let mut iterations = 0;
192        let max_iterations = self.config.max_tool_iterations;
193
194        loop {
195            // Check idle timeout
196            if let Some(last_time) = self.last_tool_call_time {
197                let elapsed = last_time.elapsed().as_secs();
198                if elapsed > self.config.tool_call_idle_timeout_secs {
199                    return Err(PawanError::Agent(format!(
200                        "Tool idle timeout exceeded ({}s > {}s)",
201                        elapsed, self.config.tool_call_idle_timeout_secs
202                    )));
203                }
204            }
205
206            iterations += 1;
207            if iterations > max_iterations {
208                return Err(PawanError::Agent(format!(
209                    "Max tool iterations ({}) exceeded",
210                    max_iterations
211                )));
212            }
213
214            // Budget nudge + context estimation + pruning
215            self.apply_iteration_budgets(iterations, max_iterations).await;
216
217            // Dynamic tool selection: pick the most relevant tools for this query
218            let latest_query = self
219                .history
220                .iter()
221                .rev()
222                .find(|m| m.role == Role::User)
223                .map(|m| m.content.as_str())
224                .unwrap_or("");
225            let tool_defs = self.tools.select_for_query(latest_query, 12);
226            if iterations == 1 {
227                let tool_names: Vec<&str> = tool_defs.iter().map(|t| t.name.as_str()).collect();
228                tracing::info!(tools = ?tool_names, count = tool_defs.len(), "Selected tools for query");
229            }
230
231            // Update idle timeout tracker before LLM call to track time spent in generation
232            self.last_tool_call_time = Some(Instant::now());
233
234            // Resilient LLM call with retry on transient failures
235            let response = self.call_llm_with_retry(&tool_defs, on_token.as_ref()).await;
236
237            // Accumulate token usage with thinking/action split
238            if let Some(ref usage) = response.usage {
239                Self::accumulate_token_usage(
240                    usage, &mut total_usage, iterations, self.config.thinking_budget,
241                );
242            }
243
244            // Strip thinking blocks from content
245            let clean_content = Self::strip_thinking_blocks(&response.content);
246
247            if response.tool_calls.is_empty() {
248                // Guardrails for no-tool responses; returns true if conversation is complete
249                if self
250                    .handle_no_tool_response(
251                        &clean_content,
252                        user_prompt,
253                        &tool_defs,
254                        iterations,
255                        max_iterations,
256                        &response.finish_reason,
257                    )
258                    .await
259                {
260                    self.history.push(Message {
261                        role: Role::Assistant,
262                        content: clean_content.clone(),
263                        tool_calls: vec![],
264                        tool_result: None,
265                    });
266                    // Persist this completed turn to Eruka
267                    if let Some(eruka) = &self.eruka {
268                        if let Err(e) = eruka
269                            .sync_turn(user_prompt, &clean_content, &self.session_id)
270                            .await
271                        {
272                            tracing::warn!("Eruka sync_turn failed (non-fatal): {}", e);
273                        }
274                    }
275                    return Ok(AgentResponse {
276                        content: clean_content,
277                        tool_calls: all_tool_calls,
278                        iterations,
279                        usage: total_usage,
280                    });
281                }
282                continue;
283            }
284
285            // Push assistant response to history
286            self.history.push(Message {
287                role: Role::Assistant,
288                content: response.content.clone(),
289                tool_calls: response.tool_calls.clone(),
290                tool_result: None,
291            });
292
293            // Validate permissions, emit start events, partition into pending vs denied
294            let (pending, mut ordered_records, mut ordered_tool_messages, mut ordered_compile_gate) =
295                self.check_tool_permissions(
296                    &response.tool_calls,
297                    on_permission.as_ref(),
298                    on_tool.as_ref(),
299                    on_tool_start.as_ref(),
300                )
301                .await;
302
303            // Execute pending tools in parallel
304            if !pending.is_empty() {
305                let results = Self::execute_pending_tools(
306                    &self.tools,
307                    self.config.bash_timeout_secs,
308                    self.config.max_result_chars,
309                    pending,
310                    on_tool.as_ref(),
311                )
312                .await;
313                for (idx, record, tool_msg, wrote_rs) in results {
314                    ordered_records[idx] = Some(record);
315                    ordered_tool_messages[idx] = Some(tool_msg);
316                    ordered_compile_gate[idx] = wrote_rs;
317                }
318            }
319
320            // Collect ordered results and run compile gates
321            self.collect_tool_results(
322                &mut all_tool_calls,
323                &mut ordered_records,
324                &mut ordered_tool_messages,
325                &ordered_compile_gate,
326                response.tool_calls.len(),
327            )
328            .await;
329        }
330    }
331    // ─── Helper functions for execute_with_all_callbacks ───────────────
332
333    /// Inject Eruka core memory and prefetch task-relevant context into history.
334    async fn inject_eruka_context(&mut self, user_prompt: &str) {
335        if let Some(eruka) = &self.eruka {
336            let before_inject = self.history.len();
337            if let Err(e) = eruka.inject_core_memory(&mut self.history).await {
338                tracing::warn!("Eruka memory injection failed (non-fatal): {}", e);
339            }
340
341            for msg in self
342                .history
343                .iter_mut()
344                .skip(before_inject)
345                .filter(|m| m.role == Role::System)
346            {
347                let fenced = prepare_recalled_context("eruka_core_memory", &msg.content);
348                if !fenced.is_empty() {
349                    msg.content = fenced;
350                }
351            }
352
353            // Prefetch task-relevant context: semantic search + compressed
354            // general context. Inject as a system message so the LLM can
355            // draw on prior-session context for the same query. Non-fatal.
356            match eruka.prefetch(user_prompt, 2000).await {
357                Ok(Some(ctx)) => {
358                    let fenced = prepare_recalled_context("eruka_prefetch", &ctx);
359                    if !fenced.is_empty() {
360                        self.history.push(Message {
361                            role: Role::System,
362                            content: fenced,
363                            tool_calls: vec![],
364                            tool_result: None,
365                        });
366                    }
367                }
368                Ok(None) => {}
369                Err(e) => tracing::warn!("Eruka prefetch failed (non-fatal): {}", e),
370            }
371        }
372    }
373
374    /// Prepend workspace architecture context to the user prompt, if available.
375    fn build_user_prompt(&self, user_prompt: &str) -> Result<String> {
376        if let Some(err) = &self.arch_context_error {
377            return Err(PawanError::Config(err.clone()));
378        }
379        Ok(match &self.arch_context {
380            Some(ctx) => format!(
381                "[Workspace Architecture]\n{ctx}\n[/Workspace Architecture]\n\n{user_prompt}"
382            ),
383            None => user_prompt.to_string(),
384        })
385    }
386
387    /// Nudge the model when running low on tool iterations, estimate context
388    /// size, and prune history if it exceeds the configured token budget.
389    async fn apply_iteration_budgets(&mut self, iterations: usize, max_iterations: usize) {
390        // Budget awareness: when running low on iterations, nudge the model
391        let remaining = max_iterations.saturating_sub(iterations);
392        if remaining == 3 && iterations > 1 {
393            self.history.push(Message {
394                role: Role::User,
395                content: format!(
396                    "[SYSTEM] You have {} tool iterations remaining. \
397                     Stop exploring and write the most important output now. \
398                     If you have code to write, write it immediately.",
399                    remaining
400                ),
401                tool_calls: vec![],
402                tool_result: None,
403            });
404        }
405        // Estimate context tokens
406        self.context_tokens_estimate =
407            self.history.iter().map(|m| m.content.len()).sum::<usize>() / 4;
408        if self.context_tokens_estimate > self.config.max_context_tokens {
409            // Snapshot pre-compression content to Eruka so the facts
410            // being discarded survive the prune. Non-fatal.
411            if let Some(eruka) = &self.eruka {
412                let snapshot = Self::history_snapshot_for_eruka(&self.history);
413                if let Err(e) = eruka.on_pre_compress(&snapshot, &self.session_id).await {
414                    tracing::warn!("Eruka on_pre_compress failed (non-fatal): {}", e);
415                }
416            }
417            self.prune_history();
418        }
419    }
420
421    /// Call the LLM backend with retry logic for transient failures.
422    /// Returns a synthetic error response after exhausting retries.
423    async fn call_llm_with_retry(
424        &mut self,
425        tool_defs: &[thulp_core::ToolDefinition],
426        on_token: Option<&TokenCallback>,
427    ) -> LLMResponse {
428        let max_llm_retries = 3;
429        let mut attempt = 0;
430        loop {
431            attempt += 1;
432            match self
433                .backend
434                .generate(&self.history, tool_defs, on_token)
435                .await
436            {
437                Ok(resp) => return resp,
438                Err(e) => {
439                    let err_str = e.to_string();
440                    let is_transient = err_str.contains("timeout")
441                        || err_str.contains("connection")
442                        || err_str.contains("429")
443                        || err_str.contains("500")
444                        || err_str.contains("502")
445                        || err_str.contains("503")
446                        || err_str.contains("504")
447                        || err_str.contains("reset")
448                        || err_str.contains("broken pipe");
449
450                    if is_transient && attempt <= max_llm_retries {
451                        let delay =
452                            std::time::Duration::from_secs(2u64.pow(attempt as u32));
453                        tracing::warn!(
454                            attempt = attempt,
455                            delay_secs = delay.as_secs(),
456                            error = err_str.as_str(),
457                            "LLM call failed (transient) — retrying"
458                        );
459                        tokio::time::sleep(delay).await;
460
461                        // If context is too large, prune before retry
462                        if err_str.contains("context") || err_str.contains("token") {
463                            tracing::info!(
464                                "Pruning history before retry (possible context overflow)"
465                            );
466                            if let Some(eruka) = &self.eruka {
467                                let snapshot =
468                                    Self::history_snapshot_for_eruka(&self.history);
469                                if let Err(e) =
470                                    eruka.on_pre_compress(&snapshot, &self.session_id).await
471                                {
472                                    tracing::warn!(
473                                        "Eruka on_pre_compress failed (non-fatal): {}",
474                                        e
475                                    );
476                                }
477                            }
478                            self.prune_history();
479                        }
480                        continue;
481                    }
482
483                    // Non-transient or max retries exhausted — return synthetic error
484                    tracing::error!(
485                        attempt = attempt,
486                        error = err_str.as_str(),
487                        "LLM call failed permanently — returning error as content"
488                    );
489                    return LLMResponse {
490                        content: format!(
491                            "LLM error after {} attempts: {}. The task could not be completed.",
492                            attempt, err_str
493                        ),
494                        reasoning: None,
495                        tool_calls: vec![],
496                        finish_reason: "error".to_string(),
497                        usage: None,
498                    };
499                }
500            }
501        }
502    }
503
504    /// Accumulate token usage from an LLM response and enforce thinking budget.
505    fn accumulate_token_usage(
506        usage: &TokenUsage,
507        total_usage: &mut TokenUsage,
508        iterations: usize,
509        thinking_budget: usize,
510    ) {
511        total_usage.prompt_tokens += usage.prompt_tokens;
512        total_usage.completion_tokens += usage.completion_tokens;
513        total_usage.total_tokens += usage.total_tokens;
514        total_usage.reasoning_tokens += usage.reasoning_tokens;
515        total_usage.action_tokens += usage.action_tokens;
516
517        // Log token budget split per iteration
518        if usage.reasoning_tokens > 0 {
519            tracing::info!(
520                iteration = iterations,
521                think = usage.reasoning_tokens,
522                act = usage.action_tokens,
523                total = usage.completion_tokens,
524                "Token budget: think:{} act:{} (total:{})",
525                usage.reasoning_tokens,
526                usage.action_tokens,
527                usage.completion_tokens
528            );
529        }
530
531        // Thinking budget enforcement
532        if thinking_budget > 0 && usage.reasoning_tokens > thinking_budget as u64 {
533            tracing::warn!(
534                budget = thinking_budget,
535                actual = usage.reasoning_tokens,
536                "Thinking budget exceeded ({}/{} tokens)",
537                usage.reasoning_tokens,
538                thinking_budget
539            );
540        }
541    }
542
543    /// Strip `<think>` blocks from LLM response content.
544    fn strip_thinking_blocks(content: &str) -> String {
545        let mut s = content.to_string();
546        loop {
547            let lower = s.to_lowercase();
548            let open = lower.find("<think>");
549            let close = lower.find("</think>");
550            match (open, close) {
551                (Some(i), Some(j)) if j > i => {
552                    let before = s[..i].trim_end().to_string();
553                    let after = if s.len() > j + 8 {
554                        s[j + 8..].trim_start().to_string()
555                    } else {
556                        String::new()
557                    };
558                    s = if before.is_empty() {
559                        after
560                    } else if after.is_empty() {
561                        before
562                    } else {
563                        format!("{}\n{}", before, after)
564                    };
565                }
566                _ => break,
567            }
568        }
569        s
570    }
571
572    /// Handle guardrails for no-tool-call responses.
573    ///
574    /// Returns `true` if the conversation is complete (caller should return the
575    /// response), `false` if the loop should continue with a correction message.
576    async fn handle_no_tool_response(
577        &mut self,
578        clean_content: &str,
579        _user_prompt: &str,
580        tool_defs: &[thulp_core::ToolDefinition],
581        iterations: usize,
582        max_iterations: usize,
583        finish_reason: &str,
584    ) -> bool {
585        // Guardrail: detect chatty no-op (content but no tools on early iterations)
586        // Only nudge if tools are available AND response looks like planning text
587        let has_tools = !tool_defs.is_empty();
588        let lower = clean_content.to_lowercase();
589        let planning_prefix = lower.starts_with("let me")
590            || lower.starts_with("i'll help")
591            || lower.starts_with("i will help")
592            || lower.starts_with("sure, i")
593            || lower.starts_with("okay, i");
594        let looks_like_planning =
595            clean_content.len() > 200 || (planning_prefix && clean_content.len() > 50);
596        if has_tools
597            && looks_like_planning
598            && iterations == 1
599            && iterations < max_iterations
600            && finish_reason != "error"
601        {
602            tracing::warn!(
603                "No tool calls at iteration {} (content: {}B) — nudging model to use tools",
604                iterations,
605                clean_content.len()
606            );
607            self.history.push(Message {
608                role: Role::Assistant,
609                content: clean_content.to_string(),
610                tool_calls: vec![],
611                tool_result: None,
612            });
613            self.history.push(Message {
614                role: Role::User,
615                content: "You must use tools to complete this task. Do NOT just describe what you would do — actually call the tools. Start with bash or read_file.".to_string(),
616                tool_calls: vec![],
617                tool_result: None,
618            });
619            return false;
620        }
621
622        // Guardrail: detect repeated responses
623        if iterations > 1 {
624            let prev_assistant = self
625                .history
626                .iter()
627                .rev()
628                .find(|m| m.role == Role::Assistant && !m.content.is_empty());
629            if let Some(prev) = prev_assistant {
630                if prev.content.trim() == clean_content.trim()
631                    && iterations < max_iterations
632                {
633                    tracing::warn!(
634                        "Repeated response detected at iteration {} — injecting correction",
635                        iterations
636                    );
637                    self.history.push(Message {
638                        role: Role::Assistant,
639                        content: clean_content.to_string(),
640                        tool_calls: vec![],
641                        tool_result: None,
642                    });
643                    self.history.push(Message {
644                        role: Role::User,
645                        content: "You gave the same response as before. Try a different approach. Use anchor_text in edit_file_lines, or use insert_after, or use bash with sed.".to_string(),
646                        tool_calls: vec![],
647                        tool_result: None,
648                    });
649                    return false;
650                }
651            }
652        }
653
654        true
655    }
656
657    /// Validate permissions, emit start events, and partition tool calls into
658    /// pending (ready to execute) vs denied.
659    async fn check_tool_permissions(
660        &mut self,
661        tool_calls: &[ToolCallRequest],
662        on_permission: Option<&PermissionCallback>,
663        on_tool: Option<&ToolCallback>,
664        on_tool_start: Option<&ToolStartCallback>,
665    ) -> (
666        Vec<(usize, ToolCallRequest)>,
667        Vec<Option<ToolCallRecord>>,
668        Vec<Option<Message>>,
669        Vec<bool>,
670    ) {
671        let mut ordered_records: Vec<Option<ToolCallRecord>> =
672            vec![None; tool_calls.len()];
673        let mut ordered_tool_messages: Vec<Option<Message>> =
674            vec![None; tool_calls.len()];
675        let ordered_compile_gate: Vec<bool> = vec![false; tool_calls.len()];
676        let mut pending: Vec<(usize, ToolCallRequest)> = Vec::new();
677
678        for (idx, tool_call) in tool_calls.iter().cloned().enumerate() {
679            self.tools.activate(&tool_call.name);
680
681            let perm = crate::config::ToolPermission::resolve(
682                &tool_call.name,
683                &self.config.permissions,
684            );
685            let denied = match perm {
686                crate::config::ToolPermission::Deny => Some("Tool denied by permission policy"),
687                crate::config::ToolPermission::Prompt => {
688                    if tool_call.name == "bash" {
689                        if let Some(cmd) =
690                            tool_call.arguments.get("command").and_then(|v| v.as_str())
691                        {
692                            if crate::tools::bash::is_read_only(cmd) {
693                                tracing::debug!(command = cmd, "Auto-allowing read-only bash command under Prompt permission");
694                                None
695                            } else if let Some(ref perm_cb) = on_permission {
696                                let args_summary = cmd.chars().take(120).collect::<String>();
697                                let rx = perm_cb(PermissionRequest {
698                                    tool_name: tool_call.name.clone(),
699                                    args_summary,
700                                });
701                                match rx.await {
702                                    Ok(true) => None,
703                                    _ => Some("User denied tool execution"),
704                                }
705                            } else {
706                                Some("Bash command requires user approval (read-only commands auto-allowed)")
707                            }
708                        } else {
709                            Some("Tool requires user approval")
710                        }
711                    } else if let Some(ref perm_cb) = on_permission {
712                        let args_summary = tool_call
713                            .arguments
714                            .to_string()
715                            .chars()
716                            .take(120)
717                            .collect::<String>();
718                        let rx = perm_cb(PermissionRequest {
719                            tool_name: tool_call.name.clone(),
720                            args_summary,
721                        });
722                        match rx.await {
723                            Ok(true) => None,
724                            _ => Some("User denied tool execution"),
725                        }
726                    } else {
727                        Some("Tool requires user approval (set permission to allow or use TUI mode)")
728                    }
729                }
730                crate::config::ToolPermission::Allow => None,
731            };
732
733            if let Some(reason) = denied {
734                let record = ToolCallRecord {
735                    id: tool_call.id.clone(),
736                    name: tool_call.name.clone(),
737                    arguments: tool_call.arguments.clone(),
738                    result: json!({"error": reason}),
739                    success: false,
740                    duration_ms: 0,
741                };
742                if let Some(ref callback) = on_tool {
743                    callback(&record);
744                }
745                ordered_records[idx] = Some(record);
746                ordered_tool_messages[idx] = Some(Message {
747                    role: Role::Tool,
748                    content: serde_json::to_string(&json!({"error": reason}))
749                        .unwrap_or_default(),
750                    tool_calls: vec![],
751                    tool_result: Some(ToolResultMessage {
752                        tool_call_id: tool_call.id.clone(),
753                        content: json!({"error": reason}),
754                        success: false,
755                    }),
756                });
757                continue;
758            }
759
760            if let Some(ref callback) = on_tool_start {
761                callback(&tool_call.name);
762            }
763
764            if let Some(tool) = self.tools.get(&tool_call.name) {
765                let schema = tool.parameters_schema();
766                if let Ok(params) = thulp_core::ToolDefinition::parse_mcp_input_schema(&schema)
767                {
768                    let thulp_def = thulp_core::ToolDefinition {
769                        name: tool_call.name.clone(),
770                        description: String::new(),
771                        parameters: params,
772                    };
773                    if let Err(e) = thulp_def.validate_args(&tool_call.arguments) {
774                        tracing::warn!(tool = tool_call.name.as_str(), error = %e, "Tool argument validation failed (continuing anyway)");
775                    }
776                }
777            }
778
779            let tool = self.tools.get(&tool_call.name);
780            let is_mutating = tool.map(|t| t.mutating()).unwrap_or(false);
781            if is_mutating {
782                if let Some(ref callback) = on_permission {
783                    let args_summary = summarize_args(&tool_call.arguments);
784                    let request = PermissionRequest {
785                        tool_name: tool_call.name.clone(),
786                        args_summary,
787                    };
788                    let permission_rx = (callback)(request);
789                    match permission_rx.await {
790                        Ok(true) => {}
791                        Ok(false) => {
792                            let record = ToolCallRecord {
793                                id: tool_call.id.clone(),
794                                name: tool_call.name.clone(),
795                                arguments: tool_call.arguments.clone(),
796                                result: json!({"error": "Tool execution denied by user", "tool": tool_call.name}),
797                                success: false,
798                                duration_ms: 0,
799                            };
800                            if let Some(ref callback) = on_tool {
801                                callback(&record);
802                            }
803                            ordered_records[idx] = Some(record);
804                            ordered_tool_messages[idx] = Some(Message {
805                                role: Role::Tool,
806                                content: serde_json::to_string(&json!({"error": "Tool execution denied by user", "tool": tool_call.name})).unwrap_or_default(),
807                                tool_calls: vec![],
808                                tool_result: Some(ToolResultMessage {
809                                    tool_call_id: tool_call.id.clone(),
810                                    content: json!({"error": "Tool execution denied by user", "tool": tool_call.name}),
811                                    success: false,
812                                }),
813                            });
814                            continue;
815                        }
816                        Err(_) => {
817                            let record = ToolCallRecord {
818                                id: tool_call.id.clone(),
819                                name: tool_call.name.clone(),
820                                arguments: tool_call.arguments.clone(),
821                                result: json!({"error": "Permission channel closed", "tool": tool_call.name}),
822                                success: false,
823                                duration_ms: 0,
824                            };
825                            if let Some(ref callback) = on_tool {
826                                callback(&record);
827                            }
828                            ordered_records[idx] = Some(record);
829                            ordered_tool_messages[idx] = Some(Message {
830                                role: Role::Tool,
831                                content: serde_json::to_string(&json!({"error": "Permission channel closed", "tool": tool_call.name})).unwrap_or_default(),
832                                tool_calls: vec![],
833                                tool_result: Some(ToolResultMessage {
834                                    tool_call_id: tool_call.id.clone(),
835                                    content: json!({"error": "Permission channel closed", "tool": tool_call.name}),
836                                    success: false,
837                                }),
838                            });
839                            continue;
840                        }
841                    }
842                } else {
843                    tracing::warn!(
844                        tool = tool_call.name.as_str(),
845                        "No permission callback, auto-approving mutating tool"
846                    );
847                }
848            }
849
850            pending.push((idx, tool_call));
851        }
852
853        (pending, ordered_records, ordered_tool_messages, ordered_compile_gate)
854    }
855
856    /// Execute pending tool calls in parallel with per-tool timeout handling.
857    async fn execute_pending_tools(
858        tools: &ToolRegistry,
859        bash_timeout_secs: u64,
860        max_result_chars: usize,
861        pending: Vec<(usize, ToolCallRequest)>,
862        on_tool: Option<&ToolCallback>,
863    ) -> Vec<(usize, ToolCallRecord, Message, bool)> {
864        use futures::{stream, StreamExt};
865
866        let on_tool_cb = on_tool;
867        let max_parallel = std::cmp::max(1, 10);
868        stream::iter(pending)
869            .map(|(idx, tool_call)| async move {
870                let start = std::time::Instant::now();
871
872                let result = {
873                    let tool_future = tools.execute(&tool_call.name, tool_call.arguments.clone());
874                    let timeout_dur = if tool_call.name == "bash" {
875                        std::time::Duration::from_secs(bash_timeout_secs)
876                    } else {
877                        std::time::Duration::from_secs(30)
878                    };
879                    match tokio::time::timeout(timeout_dur, tool_future).await {
880                        Ok(inner) => inner,
881                        Err(_) => Err(PawanError::Tool(format!(
882                            "Tool {} timed out after {}s",
883                            tool_call.name,
884                            timeout_dur.as_secs()
885                        ))),
886                    }
887                };
888
889                let duration_ms = start.elapsed().as_millis() as u64;
890                let (mut result_value, success) = match result {
891                    Ok(v) => (v, true),
892                    Err(e) => {
893                        tracing::warn!(tool = tool_call.name.as_str(), error = %e, "Tool execution failed");
894                        (json!({"error": e.to_string(), "tool": tool_call.name, "hint": "Try a different approach or tool"}), false)
895                    }
896                };
897
898                result_value = truncate_tool_result(result_value, max_result_chars);
899
900                let record = ToolCallRecord {
901                    id: tool_call.id.clone(),
902                    name: tool_call.name.clone(),
903                    arguments: tool_call.arguments.clone(),
904                    result: result_value.clone(),
905                    success,
906                    duration_ms,
907                };
908
909                if let Some(ref cb) = on_tool_cb {
910                    cb(&record);
911                }
912
913                let tool_msg = Message {
914                    role: Role::Tool,
915                    content: serde_json::to_string(&result_value).unwrap_or_default(),
916                    tool_calls: vec![],
917                    tool_result: Some(ToolResultMessage {
918                        tool_call_id: tool_call.id.clone(),
919                        content: result_value,
920                        success,
921                    }),
922                };
923
924                let wrote_rs = success
925                    && tool_call.name == "write_file"
926                    && tool_call
927                        .arguments
928                        .get("path")
929                        .and_then(|p| p.as_str())
930                        .map(|p| p.ends_with(".rs"))
931                        .unwrap_or(false);
932
933                (idx, record, tool_msg, wrote_rs)
934            })
935            .buffer_unordered(max_parallel)
936            .collect::<Vec<_>>()
937            .await
938    }
939
940    /// Collect ordered tool results into the history and all_tool_calls list,
941    /// and apply compile gates (cargo check after .rs file writes).
942    async fn collect_tool_results(
943        &mut self,
944        all_tool_calls: &mut Vec<ToolCallRecord>,
945        ordered_records: &mut [Option<ToolCallRecord>],
946        ordered_tool_messages: &mut [Option<Message>],
947        ordered_compile_gate: &[bool],
948        tool_calls_count: usize,
949    ) {
950        for i in 0..tool_calls_count {
951            if let Some(record) = ordered_records[i].take() {
952                all_tool_calls.push(record);
953            }
954            if let Some(msg) = ordered_tool_messages[i].take() {
955                self.history.push(msg);
956            }
957
958            if ordered_compile_gate[i] {
959                let ws = self.workspace_root.clone();
960                let check_result = tokio::process::Command::new("cargo")
961                    .arg("check")
962                    .arg("--message-format=short")
963                    .current_dir(&ws)
964                    .output()
965                    .await;
966                match check_result {
967                    Ok(output) if !output.status.success() => {
968                        let stderr = String::from_utf8_lossy(&output.stderr);
969                        let err_msg: String = stderr.chars().take(1500).collect();
970                        tracing::info!("Compile-gate: cargo check failed after write_file, injecting errors");
971                        self.history.push(Message {
972                            role: Role::User,
973                            content: format!(
974                                "[SYSTEM] cargo check failed after your write_file. Fix the errors:\n{}",
975                                err_msg
976                            ),
977                            tool_calls: vec![],
978                            tool_result: None,
979                        });
980                    }
981                    Ok(_) => {
982                        tracing::debug!("Compile-gate: cargo check passed");
983                    }
984                    Err(e) => {
985                        tracing::warn!("Compile-gate: cargo check failed to run: {}", e);
986                    }
987                }
988            }
989        }
990    }
991
992    /// Execute using the ToolCoordinator instead of the built-in loop.
993    ///
994    /// This method provides an alternative implementation that uses the
995    /// ToolCoordinator for tool-calling loops, which offers:
996    /// - Parallel tool execution
997    /// - Per-tool timeout handling
998    /// - Consistent error handling
999    /// - Max iteration limits
1000    ///
1001    /// Note: This method does not support streaming callbacks or permission
1002    /// prompts - those are only available in the built-in loop.
1003    async fn execute_with_coordinator(&mut self, user_prompt: &str) -> Result<AgentResponse> {
1004        // Reset idle timeout for the new turn
1005        self.last_tool_call_time = None;
1006
1007        // Inject Eruka core memory before first LLM call
1008        if let Some(eruka) = &self.eruka {
1009            let before_inject = self.history.len();
1010            if let Err(e) = eruka.inject_core_memory(&mut self.history).await {
1011                tracing::warn!("Eruka memory injection failed (non-fatal): {}", e);
1012            }
1013
1014            for msg in self
1015                .history
1016                .iter_mut()
1017                .skip(before_inject)
1018                .filter(|m| m.role == Role::System)
1019            {
1020                let fenced = prepare_recalled_context("eruka_core_memory", &msg.content);
1021                if !fenced.is_empty() {
1022                    msg.content = fenced;
1023                }
1024            }
1025
1026            // Prefetch task-relevant context
1027            match eruka.prefetch(user_prompt, 2000).await {
1028                Ok(Some(ctx)) => {
1029                    let fenced = prepare_recalled_context("eruka_prefetch", &ctx);
1030                    if !fenced.is_empty() {
1031                        self.history.push(Message {
1032                            role: Role::System,
1033                            content: fenced,
1034                            tool_calls: vec![],
1035                            tool_result: None,
1036                        });
1037                    }
1038                }
1039                Ok(None) => {}
1040                Err(e) => tracing::warn!("Eruka prefetch failed (non-fatal): {}", e),
1041            }
1042        }
1043
1044        // Per-turn architecture context injection
1045
1046        if let Some(err) = &self.arch_context_error {
1047            return Err(PawanError::Config(err.clone()));
1048        }
1049
1050        let effective_prompt = match &self.arch_context {
1051            Some(ctx) => format!(
1052                "[Workspace Architecture]\n{ctx}\n[/Workspace Architecture]\n\n{user_prompt}"
1053            ),
1054            None => user_prompt.to_string(),
1055        };
1056
1057        // Build coordinator config from agent config
1058        let coordinator_config = ToolCallingConfig {
1059            max_iterations: self.config.max_tool_iterations,
1060            parallel_execution: true,
1061            max_parallel_tools: 10,
1062            tool_timeout: std::time::Duration::from_secs(self.config.bash_timeout_secs),
1063            stop_on_error: false,
1064        };
1065
1066        // Create a fresh backend for coordinator execution
1067        let system_prompt = self.config.get_system_prompt_checked()?;
1068        let backend = Self::create_backend(&self.config, &system_prompt);
1069        let backend = Arc::from(backend);
1070
1071        // Create a fresh tool registry for coordinator execution
1072        // Note: This will not include any MCP tools registered at runtime
1073        let registry = Arc::new(ToolRegistry::with_defaults(self.workspace_root.clone()));
1074
1075        // Create coordinator with backend and tool registry
1076        let coordinator = ToolCoordinator::new(backend, registry, coordinator_config);
1077
1078        // Execute with coordinator
1079        let result: CoordinatorResult = coordinator
1080            .execute(Some(&system_prompt), &effective_prompt)
1081            .await
1082            .map_err(|e| PawanError::Agent(format!("Coordinator execution failed: {}", e)))?;
1083
1084        // Convert CoordinatorResult to AgentResponse
1085        let content = result.content.clone();
1086        let agent_response = AgentResponse {
1087            content: result.content,
1088            tool_calls: result.tool_calls,
1089            iterations: result.iterations,
1090            usage: result.total_usage,
1091        };
1092
1093        // Sync turn to Eruka if enabled
1094        if let Some(eruka) = &self.eruka {
1095            if let Err(e) = eruka
1096                .sync_turn(user_prompt, &content, &self.session_id)
1097                .await
1098            {
1099                tracing::warn!("Eruka sync_turn failed (non-fatal): {}", e);
1100            }
1101        }
1102
1103        Ok(agent_response)
1104    }
1105
1106    /// Execute a healing task with real diagnostics
1107    pub async fn heal(&mut self) -> Result<AgentResponse> {
1108        let healer =
1109            crate::healing::Healer::new(self.workspace_root.clone(), self.config.healing.clone());
1110
1111        let diagnostics = healer.get_diagnostics().await?;
1112        let failed_tests = healer.get_failed_tests().await?;
1113
1114        let mut prompt = format!(
1115            "I need you to heal this Rust project at: {}
1116
1117",
1118            self.workspace_root.display()
1119        );
1120
1121        if !diagnostics.is_empty() {
1122            prompt.push_str(&format!(
1123                "## Compilation Issues ({} found)
1124{}
1125",
1126                diagnostics.len(),
1127                healer.format_diagnostics_for_prompt(&diagnostics)
1128            ));
1129        }
1130
1131        if !failed_tests.is_empty() {
1132            prompt.push_str(&format!(
1133                "## Failed Tests ({} found)
1134{}
1135",
1136                failed_tests.len(),
1137                healer.format_tests_for_prompt(&failed_tests)
1138            ));
1139        }
1140
1141        if diagnostics.is_empty() && failed_tests.is_empty() {
1142            prompt.push_str(
1143                "No issues found! Run cargo check and cargo test to verify.
1144",
1145            );
1146        }
1147
1148        prompt.push_str(
1149            "
1150Fix each issue one at a time. Verify with cargo check after each fix.",
1151        );
1152
1153        self.execute(&prompt).await
1154    }
1155    /// Execute healing with retries — calls heal(), checks for remaining errors, retries if needed.
1156    ///
1157    /// Two-stage gate:
1158    ///   Stage 1 — `cargo check`: must produce zero errors before proceeding.
1159    ///   Stage 2 — `healing.verify_cmd` (optional): a user-supplied shell command
1160    ///             (e.g. `cargo test --workspace`).  If it exits non-zero the loop
1161    ///             continues so the LLM can address the reported failures.
1162    ///
1163    /// Anti-thrash guard: each Stage-1 error is fingerprinted (kind + code +
1164    /// message prefix).  If the same fingerprint survives `max_attempts`
1165    /// consecutive rounds unchanged the loop halts rather than spinning
1166    /// indefinitely on an error the LLM cannot fix.
1167    pub async fn heal_with_retries(&mut self, max_attempts: usize) -> Result<AgentResponse> {
1168        use std::collections::{HashMap, HashSet};
1169
1170        let mut last_response = self.heal().await?;
1171        // fingerprint → consecutive rounds this error has survived unchanged
1172        let mut stuck_counts: HashMap<u64, usize> = HashMap::new();
1173
1174        for attempt in 1..max_attempts {
1175            // Stage 1: cargo check must be error-free
1176            let fixer = crate::healing::CompilerFixer::new(self.workspace_root.clone());
1177            let remaining = fixer.check().await?;
1178            let errors: Vec<_> = remaining
1179                .iter()
1180                .filter(|d| d.kind == crate::healing::DiagnosticKind::Error)
1181                .collect();
1182
1183            if !errors.is_empty() {
1184                // Update fingerprint counts.
1185                // Drop entries for errors that were fixed; increment survivors.
1186                let current_fps: HashSet<u64> = errors.iter().map(|d| d.fingerprint()).collect();
1187                stuck_counts.retain(|fp, _| current_fps.contains(fp));
1188                for fp in &current_fps {
1189                    *stuck_counts.entry(*fp).or_insert(0) += 1;
1190                }
1191
1192                // Anti-thrash: halt if any error fingerprint has not budged
1193                // after max_attempts consecutive rounds.
1194                let thrashing: Vec<u64> = stuck_counts
1195                    .iter()
1196                    .filter_map(|(&fp, &count)| {
1197                        if count >= max_attempts {
1198                            Some(fp)
1199                        } else {
1200                            None
1201                        }
1202                    })
1203                    .collect();
1204                if !thrashing.is_empty() {
1205                    tracing::warn!(
1206                        stuck_fingerprints = thrashing.len(),
1207                        attempt,
1208                        "Anti-thrash: {} error(s) unchanged after {} attempts, halting heal loop",
1209                        thrashing.len(),
1210                        max_attempts
1211                    );
1212                    return Ok(last_response);
1213                }
1214
1215                tracing::warn!(
1216                    errors = errors.len(),
1217                    attempt,
1218                    "Stage 1 (cargo check): errors remain, retrying"
1219                );
1220                last_response = self.heal().await?;
1221                continue;
1222            }
1223
1224            // All Stage-1 errors cleared — reset thrash counters.
1225            stuck_counts.clear();
1226
1227            // Stage 2: optional verify_cmd
1228            let verify_cmd = self.config.healing.verify_cmd.clone();
1229            if let Some(ref cmd) = verify_cmd {
1230                match crate::healing::run_verify_cmd(&self.workspace_root, cmd).await {
1231                    Ok(None) => {
1232                        tracing::info!(
1233                            attempts = attempt,
1234                            "Stage 2 (verify_cmd) passed, healing complete"
1235                        );
1236                        return Ok(last_response);
1237                    }
1238                    Ok(Some(diag)) => {
1239                        tracing::warn!(
1240                            attempt,
1241                            cmd,
1242                            output = diag.raw,
1243                            "Stage 2 (verify_cmd) failed, retrying"
1244                        );
1245                        last_response = self.heal().await?;
1246                        continue;
1247                    }
1248                    Err(e) => {
1249                        // Cannot spawn the command — don't block healing on this
1250                        tracing::warn!(cmd, error = %e, "verify_cmd could not be run, skipping stage 2");
1251                        return Ok(last_response);
1252                    }
1253                }
1254            } else {
1255                tracing::info!(
1256                    attempts = attempt,
1257                    "Stage 1 (cargo check) passed, healing complete"
1258                );
1259                return Ok(last_response);
1260            }
1261        }
1262
1263        tracing::info!(
1264            attempts = max_attempts,
1265            "Healing finished (may still have errors)"
1266        );
1267        Ok(last_response)
1268    }
1269    /// Execute a task with a specific prompt
1270    pub async fn task(&mut self, task_description: &str) -> Result<AgentResponse> {
1271        let prompt = format!(
1272            r#"I need you to complete the following coding task:
1273
1274{}
1275
1276The workspace is at: {}
1277
1278Please:
12791. First explore the codebase to understand the relevant code
12802. Make the necessary changes
12813. Verify the changes compile with `cargo check`
12824. Run relevant tests if applicable
1283
1284Explain your changes as you go."#,
1285            task_description,
1286            self.workspace_root.display()
1287        );
1288
1289        self.execute(&prompt).await
1290    }
1291
1292    /// Generate a commit message for current changes
1293    pub async fn generate_commit_message(&mut self) -> Result<String> {
1294        let prompt = r#"Please:
12951. Run `git status` to see what files are changed
12962. Run `git diff --cached` to see staged changes (or `git diff` for unstaged)
12973. Generate a concise, descriptive commit message following conventional commits format
1298
1299Only output the suggested commit message, nothing else."#;
1300
1301        let response = self.execute(prompt).await?;
1302        Ok(response.content)
1303    }
1304}