Skip to main content

pawan/agent/
execute.rs

1//! Agent execution loop — tool calling, streaming, and coordinator dispatch.
2
3use super::{
4    prepare_recalled_context, AgentResponse, LLMResponse, Message, PawanAgent, PermissionCallback,
5    PermissionRequest, Role, TokenCallback, TokenUsage, ToolCallback, ToolCallRecord,
6    ToolCallRequest, ToolResultMessage, ToolStartCallback,
7};
8use crate::coordinator::{CoordinatorResult, ToolCallingConfig, ToolCoordinator};
9use crate::tools::ToolRegistry;
10use crate::{PawanError, Result};
11use serde_json::{json, Value};
12use std::sync::Arc;
13use std::time::Instant;
14
15/// Truncate a tool result JSON value to fit within max_chars.
16/// Unlike naive string truncation (which breaks JSON), this truncates string
17/// *values* within the JSON structure, preserving valid JSON output.
18pub(crate) fn truncate_tool_result(value: Value, max_chars: usize) -> Value {
19    let serialized = serde_json::to_string(&value).unwrap_or_default();
20    if serialized.len() <= max_chars {
21        return value;
22    }
23
24    // Strategy: find the largest string values and truncate them
25    match value {
26        Value::Object(map) => {
27            let mut result = serde_json::Map::new();
28            let total = serialized.len();
29            for (k, v) in map {
30                if let Value::String(s) = &v {
31                    if s.len() > 500 {
32                        // Proportional truncation: shrink large strings
33                        let target = s.len() * max_chars / total;
34                        let target = target.max(200); // Keep at least 200 chars
35                        let truncated: String = s.chars().take(target).collect();
36                        result.insert(
37                            k,
38                            json!(format!(
39                                "{}...[truncated from {} chars]",
40                                truncated,
41                                s.len()
42                            )),
43                        );
44                        continue;
45                    }
46                }
47                // Recursively truncate nested structures
48                result.insert(k, truncate_tool_result(v, max_chars));
49            }
50            Value::Object(result)
51        }
52        Value::String(s) if s.len() > max_chars => {
53            let truncated: String = s.chars().take(max_chars).collect();
54            json!(format!(
55                "{}...[truncated from {} chars]",
56                truncated,
57                s.len()
58            ))
59        }
60        Value::Array(arr) if serialized.len() > max_chars => {
61            // Truncate array: keep first N items that fit
62            let mut result = Vec::new();
63            let mut running_len = 2; // "[]"
64            for item in arr {
65                let item_str = serde_json::to_string(&item).unwrap_or_default();
66                running_len += item_str.len() + 1; // +1 for comma
67                if running_len > max_chars {
68                    result.push(json!(format!("...[{} more items truncated]", 0)));
69                    break;
70                }
71                result.push(item);
72            }
73            Value::Array(result)
74        }
75        other => other,
76    }
77}
78
79/// Summarize tool arguments for permission requests
80pub(crate) fn summarize_args(args: &serde_json::Value) -> String {
81    match args {
82        serde_json::Value::Object(map) => {
83            let mut parts = Vec::new();
84            for (key, value) in map {
85                let value_str = match value {
86                    serde_json::Value::String(s) if s.len() > 50 => {
87                        format!("\"{}...\"", &s[..47])
88                    }
89                    serde_json::Value::String(s) => format!("\"{}\"", s),
90                    serde_json::Value::Array(arr) if arr.len() > 3 => {
91                        format!("[... {} items]", arr.len())
92                    }
93                    serde_json::Value::Array(arr) => {
94                        let items: Vec<String> = arr
95                            .iter()
96                            .take(3)
97                            .map(|v| match v {
98                                serde_json::Value::String(s) => {
99                                    if s.len() > 20 {
100                                        format!("\"{}...\"", &s[..17])
101                                    } else {
102                                        format!("\"{}\"", s)
103                                    }
104                                }
105                                _ => v.to_string(),
106                            })
107                            .collect();
108                        format!("[{}]", items.join(", "))
109                    }
110                    _ => value.to_string(),
111                };
112                parts.push(format!("{}: {}", key, value_str));
113            }
114            parts.join(", ")
115        }
116        serde_json::Value::String(s) => {
117            if s.len() > 100 {
118                format!("\"{}...\"", &s[..97])
119            } else {
120                format!("\"{}\"", s)
121            }
122        }
123        serde_json::Value::Array(arr) => {
124            format!("[{} items]", arr.len())
125        }
126        _ => args.to_string(),
127    }
128}
129
130
131impl PawanAgent {
132    /// Execute a single prompt with tool calling support
133    pub async fn execute(&mut self, user_prompt: &str) -> Result<AgentResponse> {
134        self.execute_with_callbacks(user_prompt, None, None, None)
135            .await
136    }
137
138    /// Execute with optional callbacks for streaming
139    pub async fn execute_with_callbacks(
140        &mut self,
141        user_prompt: &str,
142        on_token: Option<TokenCallback>,
143        on_tool: Option<ToolCallback>,
144        on_tool_start: Option<ToolStartCallback>,
145    ) -> Result<AgentResponse> {
146        self.execute_with_all_callbacks(user_prompt, on_token, on_tool, on_tool_start, None)
147            .await
148    }
149
150    /// Execute with all callbacks, including permission prompt.
151    pub async fn execute_with_all_callbacks(
152        &mut self,
153        user_prompt: &str,
154        on_token: Option<TokenCallback>,
155        on_tool: Option<ToolCallback>,
156        on_tool_start: Option<ToolStartCallback>,
157        on_permission: Option<PermissionCallback>,
158    ) -> Result<AgentResponse> {
159        // Check if coordinator mode is enabled
160        if self.config.use_coordinator {
161            // Coordinator mode does not support callbacks or permission prompts
162            if on_token.is_some()
163                || on_tool.is_some()
164                || on_tool_start.is_some()
165                || on_permission.is_some()
166            {
167                tracing::warn!(
168                    "Callbacks and permission prompts are not supported in coordinator mode; ignoring them"
169                );
170            }
171            return self.execute_with_coordinator(user_prompt).await;
172        }
173
174        // Reset idle timeout for the new turn
175        self.last_tool_call_time = None;
176
177        // Inject Eruka core memory before first LLM call
178        if let Some(eruka) = &self.eruka {
179            let before_inject = self.history.len();
180            if let Err(e) = eruka.inject_core_memory(&mut self.history).await {
181                tracing::warn!("Eruka memory injection failed (non-fatal): {}", e);
182            }
183
184            for msg in self
185                .history
186                .iter_mut()
187                .skip(before_inject)
188                .filter(|m| m.role == Role::System)
189            {
190                let fenced = prepare_recalled_context("eruka_core_memory", &msg.content);
191                if !fenced.is_empty() {
192                    msg.content = fenced;
193                }
194            }
195
196            // Prefetch task-relevant context: semantic search + compressed
197            // general context. Inject as a system message so the LLM can
198            // draw on prior-session context for the same query. Non-fatal.
199            match eruka.prefetch(user_prompt, 2000).await {
200                Ok(Some(ctx)) => {
201                    let fenced = prepare_recalled_context("eruka_prefetch", &ctx);
202                    if !fenced.is_empty() {
203                        self.history.push(Message {
204                            role: Role::System,
205                            content: fenced,
206                            tool_calls: vec![],
207                            tool_result: None,
208                        });
209                    }
210                }
211                Ok(None) => {}
212                Err(e) => tracing::warn!("Eruka prefetch failed (non-fatal): {}", e),
213            }
214        }
215
216        // Per-turn architecture context injection: prepend .pawan/arch.md content
217        // so key constraints stay visible even as tool-call history grows long.
218
219        if let Some(err) = &self.arch_context_error {
220            return Err(PawanError::Config(err.clone()));
221        }
222
223        let effective_prompt = match &self.arch_context {
224            Some(ctx) => format!(
225                "[Workspace Architecture]\n{ctx}\n[/Workspace Architecture]\n\n{user_prompt}"
226            ),
227            None => user_prompt.to_string(),
228        };
229
230        self.history.push(Message {
231            role: Role::User,
232            content: effective_prompt,
233            tool_calls: vec![],
234            tool_result: None,
235        });
236
237        let mut all_tool_calls = Vec::new();
238        let mut total_usage = TokenUsage::default();
239        let mut iterations = 0;
240        let max_iterations = self.config.max_tool_iterations;
241
242        loop {
243            // Check idle timeout
244            if let Some(last_time) = self.last_tool_call_time {
245                let elapsed = last_time.elapsed().as_secs();
246                if elapsed > self.config.tool_call_idle_timeout_secs {
247                    return Err(PawanError::Agent(format!(
248                        "Tool idle timeout exceeded ({}s > {}s)",
249                        elapsed, self.config.tool_call_idle_timeout_secs
250                    )));
251                }
252            }
253
254            iterations += 1;
255            if iterations > max_iterations {
256                return Err(PawanError::Agent(format!(
257                    "Max tool iterations ({}) exceeded",
258                    max_iterations
259                )));
260            }
261
262            // Budget awareness: when running low on iterations, nudge the model
263            let remaining = max_iterations.saturating_sub(iterations);
264            if remaining == 3 && iterations > 1 {
265                self.history.push(Message {
266                    role: Role::User,
267                    content: format!(
268                        "[SYSTEM] You have {} tool iterations remaining. \
269                         Stop exploring and write the most important output now. \
270                         If you have code to write, write it immediately.",
271                        remaining
272                    ),
273                    tool_calls: vec![],
274                    tool_result: None,
275                });
276            }
277            // Estimate context tokens
278            self.context_tokens_estimate =
279                self.history.iter().map(|m| m.content.len()).sum::<usize>() / 4;
280            if self.context_tokens_estimate > self.config.max_context_tokens {
281                // Snapshot pre-compression content to Eruka so the facts
282                // being discarded survive the prune. Non-fatal.
283                if let Some(eruka) = &self.eruka {
284                    let snapshot = Self::history_snapshot_for_eruka(&self.history);
285                    if let Err(e) = eruka.on_pre_compress(&snapshot, &self.session_id).await {
286                        tracing::warn!("Eruka on_pre_compress failed (non-fatal): {}", e);
287                    }
288                }
289                self.prune_history();
290            }
291
292            // Dynamic tool selection: pick the most relevant tools for this query
293            // Extract latest user message for keyword matching
294            let latest_query = self
295                .history
296                .iter()
297                .rev()
298                .find(|m| m.role == Role::User)
299                .map(|m| m.content.as_str())
300                .unwrap_or("");
301            let tool_defs = self.tools.select_for_query(latest_query, 12);
302            if iterations == 1 {
303                let tool_names: Vec<&str> = tool_defs.iter().map(|t| t.name.as_str()).collect();
304                tracing::info!(tools = ?tool_names, count = tool_defs.len(), "Selected tools for query");
305            }
306
307            // Update idle timeout tracker before LLM call to track time spent in generation
308            self.last_tool_call_time = Some(Instant::now());
309
310            // --- Resilient LLM call: retry on transient failures instead of crashing ---
311            let response = {
312                #[allow(unused_assignments)]
313                let mut last_err = None;
314                let max_llm_retries = 3;
315                let mut attempt = 0;
316                loop {
317                    attempt += 1;
318                    match self
319                        .backend
320                        .generate(&self.history, &tool_defs, on_token.as_ref())
321                        .await
322                    {
323                        Ok(resp) => break resp,
324                        Err(e) => {
325                            let err_str = e.to_string();
326                            let is_transient = err_str.contains("timeout")
327                                || err_str.contains("connection")
328                                || err_str.contains("429")
329                                || err_str.contains("500")
330                                || err_str.contains("502")
331                                || err_str.contains("503")
332                                || err_str.contains("504")
333                                || err_str.contains("reset")
334                                || err_str.contains("broken pipe");
335
336                            if is_transient && attempt <= max_llm_retries {
337                                let delay =
338                                    std::time::Duration::from_secs(2u64.pow(attempt as u32));
339                                tracing::warn!(
340                                    attempt = attempt,
341                                    delay_secs = delay.as_secs(),
342                                    error = err_str.as_str(),
343                                    "LLM call failed (transient) — retrying"
344                                );
345                                tokio::time::sleep(delay).await;
346
347                                // If context is too large, prune before retry
348                                if err_str.contains("context") || err_str.contains("token") {
349                                    tracing::info!(
350                                        "Pruning history before retry (possible context overflow)"
351                                    );
352                                    if let Some(eruka) = &self.eruka {
353                                        let snapshot =
354                                            Self::history_snapshot_for_eruka(&self.history);
355                                        if let Err(e) =
356                                            eruka.on_pre_compress(&snapshot, &self.session_id).await
357                                        {
358                                            tracing::warn!(
359                                                "Eruka on_pre_compress failed (non-fatal): {}",
360                                                e
361                                            );
362                                        }
363                                    }
364                                    self.prune_history();
365                                }
366                                continue;
367                            }
368
369                            // Non-transient or max retries exhausted
370                            last_err = Some(e);
371                            break {
372                                // Return a synthetic "give up" response instead of crashing
373                                tracing::error!(
374                                    attempt = attempt,
375                                    error = last_err
376                                        .as_ref()
377                                        .map(|e| e.to_string())
378                                        .unwrap_or_default()
379                                        .as_str(),
380                                    "LLM call failed permanently — returning error as content"
381                                );
382                                LLMResponse {
383                                    content: format!(
384                                        "LLM error after {} attempts: {}. The task could not be completed.",
385                                        attempt,
386                                        last_err.as_ref().map(|e| e.to_string()).unwrap_or_default()
387                                    ),
388                                    reasoning: None,
389                                    tool_calls: vec![],
390                                    finish_reason: "error".to_string(),
391                                    usage: None,
392                                }
393                            };
394                        }
395                    }
396                }
397            };
398
399            // Accumulate token usage with thinking/action split
400            if let Some(ref usage) = response.usage {
401                total_usage.prompt_tokens += usage.prompt_tokens;
402                total_usage.completion_tokens += usage.completion_tokens;
403                total_usage.total_tokens += usage.total_tokens;
404                total_usage.reasoning_tokens += usage.reasoning_tokens;
405                total_usage.action_tokens += usage.action_tokens;
406
407                // Log token budget split per iteration
408                if usage.reasoning_tokens > 0 {
409                    tracing::info!(
410                        iteration = iterations,
411                        think = usage.reasoning_tokens,
412                        act = usage.action_tokens,
413                        total = usage.completion_tokens,
414                        "Token budget: think:{} act:{} (total:{})",
415                        usage.reasoning_tokens,
416                        usage.action_tokens,
417                        usage.completion_tokens
418                    );
419                }
420
421                // Thinking budget enforcement
422                let thinking_budget = self.config.thinking_budget;
423                if thinking_budget > 0 && usage.reasoning_tokens > thinking_budget as u64 {
424                    tracing::warn!(
425                        budget = thinking_budget,
426                        actual = usage.reasoning_tokens,
427                        "Thinking budget exceeded ({}/{} tokens)",
428                        usage.reasoning_tokens,
429                        thinking_budget
430                    );
431                }
432            }
433
434            // --- Guardrail: strip thinking blocks from content ---
435            let clean_content = {
436                let mut s = response.content.clone();
437                loop {
438                    let lower = s.to_lowercase();
439                    let open = lower.find("<think>");
440                    let close = lower.find("</think>");
441                    match (open, close) {
442                        (Some(i), Some(j)) if j > i => {
443                            let before = s[..i].trim_end().to_string();
444                            let after = if s.len() > j + 8 {
445                                s[j + 8..].trim_start().to_string()
446                            } else {
447                                String::new()
448                            };
449                            s = if before.is_empty() {
450                                after
451                            } else if after.is_empty() {
452                                before
453                            } else {
454                                format!("{}\n{}", before, after)
455                            };
456                        }
457                        _ => break,
458                    }
459                }
460                s
461            };
462
463            if response.tool_calls.is_empty() {
464                // --- Guardrail: detect chatty no-op (content but no tools on early iterations) ---
465                // Only nudge if tools are available AND response looks like planning text (not a real answer)
466                let has_tools = !tool_defs.is_empty();
467                let lower = clean_content.to_lowercase();
468                let planning_prefix = lower.starts_with("let me")
469                    || lower.starts_with("i'll help")
470                    || lower.starts_with("i will help")
471                    || lower.starts_with("sure, i")
472                    || lower.starts_with("okay, i");
473                let looks_like_planning =
474                    clean_content.len() > 200 || (planning_prefix && clean_content.len() > 50);
475                if has_tools
476                    && looks_like_planning
477                    && iterations == 1
478                    && iterations < max_iterations
479                    && response.finish_reason != "error"
480                {
481                    tracing::warn!(
482                        "No tool calls at iteration {} (content: {}B) — nudging model to use tools",
483                        iterations,
484                        clean_content.len()
485                    );
486                    self.history.push(Message {
487                        role: Role::Assistant,
488                        content: clean_content.clone(),
489                        tool_calls: vec![],
490                        tool_result: None,
491                    });
492                    self.history.push(Message {
493                        role: Role::User,
494                        content: "You must use tools to complete this task. Do NOT just describe what you would do — actually call the tools. Start with bash or read_file.".to_string(),
495                        tool_calls: vec![],
496                        tool_result: None,
497                    });
498                    continue;
499                }
500
501                // --- Guardrail: detect repeated responses ---
502                if iterations > 1 {
503                    let prev_assistant = self
504                        .history
505                        .iter()
506                        .rev()
507                        .find(|m| m.role == Role::Assistant && !m.content.is_empty());
508                    if let Some(prev) = prev_assistant {
509                        if prev.content.trim() == clean_content.trim()
510                            && iterations < max_iterations
511                        {
512                            tracing::warn!(
513                                "Repeated response detected at iteration {} — injecting correction",
514                                iterations
515                            );
516                            self.history.push(Message {
517                                role: Role::Assistant,
518                                content: clean_content.clone(),
519                                tool_calls: vec![],
520                                tool_result: None,
521                            });
522                            self.history.push(Message {
523                                role: Role::User,
524                                content: "You gave the same response as before. Try a different approach. Use anchor_text in edit_file_lines, or use insert_after, or use bash with sed.".to_string(),
525                                tool_calls: vec![],
526                                tool_result: None,
527                            });
528                            continue;
529                        }
530                    }
531                }
532
533                self.history.push(Message {
534                    role: Role::Assistant,
535                    content: clean_content.clone(),
536                    tool_calls: vec![],
537                    tool_result: None,
538                });
539
540                // Persist this completed turn to Eruka so future prefetches
541                // and sessions can pull from it. Non-fatal on any error.
542                if let Some(eruka) = &self.eruka {
543                    if let Err(e) = eruka
544                        .sync_turn(user_prompt, &clean_content, &self.session_id)
545                        .await
546                    {
547                        tracing::warn!("Eruka sync_turn failed (non-fatal): {}", e);
548                    }
549                }
550
551                return Ok(AgentResponse {
552                    content: clean_content,
553                    tool_calls: all_tool_calls,
554                    iterations,
555                    usage: total_usage,
556                });
557            }
558
559            self.history.push(Message {
560                role: Role::Assistant,
561                content: response.content.clone(),
562                tool_calls: response.tool_calls.clone(),
563                tool_result: None,
564            });
565
566            // Execute tool calls (parallel when multiple tool calls are returned)
567            let max_parallel_tools: usize = 10;
568
569            let mut ordered_records: Vec<Option<ToolCallRecord>> =
570                vec![None; response.tool_calls.len()];
571            let mut ordered_tool_messages: Vec<Option<Message>> =
572                vec![None; response.tool_calls.len()];
573            let mut ordered_compile_gate: Vec<bool> = vec![false; response.tool_calls.len()];
574
575            // Phase 1: validate / permission-check / emit start events immediately.
576            let mut pending: Vec<(usize, ToolCallRequest)> = Vec::new();
577            for (idx, tool_call) in response.tool_calls.iter().cloned().enumerate() {
578                self.tools.activate(&tool_call.name);
579
580                let perm = crate::config::ToolPermission::resolve(
581                    &tool_call.name,
582                    &self.config.permissions,
583                );
584                let denied = match perm {
585                    crate::config::ToolPermission::Deny => Some("Tool denied by permission policy"),
586                    crate::config::ToolPermission::Prompt => {
587                        if tool_call.name == "bash" {
588                            if let Some(cmd) =
589                                tool_call.arguments.get("command").and_then(|v| v.as_str())
590                            {
591                                if crate::tools::bash::is_read_only(cmd) {
592                                    tracing::debug!(command = cmd, "Auto-allowing read-only bash command under Prompt permission");
593                                    None
594                                } else if let Some(ref perm_cb) = on_permission {
595                                    let args_summary = cmd.chars().take(120).collect::<String>();
596                                    let rx = perm_cb(PermissionRequest {
597                                        tool_name: tool_call.name.clone(),
598                                        args_summary,
599                                    });
600                                    match rx.await {
601                                        Ok(true) => None,
602                                        _ => Some("User denied tool execution"),
603                                    }
604                                } else {
605                                    Some("Bash command requires user approval (read-only commands auto-allowed)")
606                                }
607                            } else {
608                                Some("Tool requires user approval")
609                            }
610                        } else if let Some(ref perm_cb) = on_permission {
611                            let args_summary = tool_call
612                                .arguments
613                                .to_string()
614                                .chars()
615                                .take(120)
616                                .collect::<String>();
617                            let rx = perm_cb(PermissionRequest {
618                                tool_name: tool_call.name.clone(),
619                                args_summary,
620                            });
621                            match rx.await {
622                                Ok(true) => None,
623                                _ => Some("User denied tool execution"),
624                            }
625                        } else {
626                            Some("Tool requires user approval (set permission to allow or use TUI mode)")
627                        }
628                    }
629                    crate::config::ToolPermission::Allow => None,
630                };
631
632                if let Some(reason) = denied {
633                    let record = ToolCallRecord {
634                        id: tool_call.id.clone(),
635                        name: tool_call.name.clone(),
636                        arguments: tool_call.arguments.clone(),
637                        result: json!({"error": reason}),
638                        success: false,
639                        duration_ms: 0,
640                    };
641                    if let Some(ref callback) = on_tool {
642                        callback(&record);
643                    }
644                    ordered_records[idx] = Some(record);
645                    ordered_tool_messages[idx] = Some(Message {
646                        role: Role::Tool,
647                        content: serde_json::to_string(&json!({"error": reason}))
648                            .unwrap_or_default(),
649                        tool_calls: vec![],
650                        tool_result: Some(ToolResultMessage {
651                            tool_call_id: tool_call.id.clone(),
652                            content: json!({"error": reason}),
653                            success: false,
654                        }),
655                    });
656                    continue;
657                }
658
659                if let Some(ref callback) = on_tool_start {
660                    callback(&tool_call.name);
661                }
662
663                if let Some(tool) = self.tools.get(&tool_call.name) {
664                    let schema = tool.parameters_schema();
665                    if let Ok(params) = thulp_core::ToolDefinition::parse_mcp_input_schema(&schema)
666                    {
667                        let thulp_def = thulp_core::ToolDefinition {
668                            name: tool_call.name.clone(),
669                            description: String::new(),
670                            parameters: params,
671                        };
672                        if let Err(e) = thulp_def.validate_args(&tool_call.arguments) {
673                            tracing::warn!(tool = tool_call.name.as_str(), error = %e, "Tool argument validation failed (continuing anyway)");
674                        }
675                    }
676                }
677
678                let tool = self.tools.get(&tool_call.name);
679                let is_mutating = tool.map(|t| t.mutating()).unwrap_or(false);
680                if is_mutating {
681                    if let Some(ref callback) = on_permission {
682                        let args_summary = summarize_args(&tool_call.arguments);
683                        let request = PermissionRequest {
684                            tool_name: tool_call.name.clone(),
685                            args_summary,
686                        };
687                        let permission_rx = (callback)(request);
688                        match permission_rx.await {
689                            Ok(true) => {}
690                            Ok(false) => {
691                                let record = ToolCallRecord {
692                                    id: tool_call.id.clone(),
693                                    name: tool_call.name.clone(),
694                                    arguments: tool_call.arguments.clone(),
695                                    result: json!({"error": "Tool execution denied by user", "tool": tool_call.name}),
696                                    success: false,
697                                    duration_ms: 0,
698                                };
699                                if let Some(ref callback) = on_tool {
700                                    callback(&record);
701                                }
702                                ordered_records[idx] = Some(record);
703                                ordered_tool_messages[idx] = Some(Message {
704                                    role: Role::Tool,
705                                    content: serde_json::to_string(&json!({"error": "Tool execution denied by user", "tool": tool_call.name})).unwrap_or_default(),
706                                    tool_calls: vec![],
707                                    tool_result: Some(ToolResultMessage {
708                                        tool_call_id: tool_call.id.clone(),
709                                        content: json!({"error": "Tool execution denied by user", "tool": tool_call.name}),
710                                        success: false,
711                                    }),
712                                });
713                                continue;
714                            }
715                            Err(_) => {
716                                let record = ToolCallRecord {
717                                    id: tool_call.id.clone(),
718                                    name: tool_call.name.clone(),
719                                    arguments: tool_call.arguments.clone(),
720                                    result: json!({"error": "Permission channel closed", "tool": tool_call.name}),
721                                    success: false,
722                                    duration_ms: 0,
723                                };
724                                if let Some(ref callback) = on_tool {
725                                    callback(&record);
726                                }
727                                ordered_records[idx] = Some(record);
728                                ordered_tool_messages[idx] = Some(Message {
729                                    role: Role::Tool,
730                                    content: serde_json::to_string(&json!({"error": "Permission channel closed", "tool": tool_call.name})).unwrap_or_default(),
731                                    tool_calls: vec![],
732                                    tool_result: Some(ToolResultMessage {
733                                        tool_call_id: tool_call.id.clone(),
734                                        content: json!({"error": "Permission channel closed", "tool": tool_call.name}),
735                                        success: false,
736                                    }),
737                                });
738                                continue;
739                            }
740                        }
741                    } else {
742                        tracing::warn!(
743                            tool = tool_call.name.as_str(),
744                            "No permission callback, auto-approving mutating tool"
745                        );
746                    }
747                }
748
749                pending.push((idx, tool_call));
750            }
751
752            if !pending.is_empty() {
753                use futures::{stream, StreamExt};
754
755                let tools = &self.tools;
756                let bash_timeout_secs = self.config.bash_timeout_secs;
757                let max_result_chars = self.config.max_result_chars;
758                let on_tool_cb = on_tool.as_ref();
759
760                let max_parallel = std::cmp::max(1, max_parallel_tools);
761                let results = stream::iter(pending)
762                    .map(|(idx, tool_call)| async move {
763                        let start = std::time::Instant::now();
764
765                        let result = {
766                            let tool_future = tools.execute(&tool_call.name, tool_call.arguments.clone());
767                            let timeout_dur = if tool_call.name == "bash" {
768                                std::time::Duration::from_secs(bash_timeout_secs)
769                            } else {
770                                std::time::Duration::from_secs(30)
771                            };
772                            match tokio::time::timeout(timeout_dur, tool_future).await {
773                                Ok(inner) => inner,
774                                Err(_) => Err(PawanError::Tool(format!(
775                                    "Tool {} timed out after {}s",
776                                    tool_call.name,
777                                    timeout_dur.as_secs()
778                                ))),
779                            }
780                        };
781
782                        let duration_ms = start.elapsed().as_millis() as u64;
783                        let (mut result_value, success) = match result {
784                            Ok(v) => (v, true),
785                            Err(e) => {
786                                tracing::warn!(tool = tool_call.name.as_str(), error = %e, "Tool execution failed");
787                                (json!({"error": e.to_string(), "tool": tool_call.name, "hint": "Try a different approach or tool"}), false)
788                            }
789                        };
790
791                        result_value = truncate_tool_result(result_value, max_result_chars);
792
793                        let record = ToolCallRecord {
794                            id: tool_call.id.clone(),
795                            name: tool_call.name.clone(),
796                            arguments: tool_call.arguments.clone(),
797                            result: result_value.clone(),
798                            success,
799                            duration_ms,
800                        };
801
802                        if let Some(ref cb) = on_tool_cb {
803                            cb(&record);
804                        }
805
806                        let tool_msg = Message {
807                            role: Role::Tool,
808                            content: serde_json::to_string(&result_value).unwrap_or_default(),
809                            tool_calls: vec![],
810                            tool_result: Some(ToolResultMessage {
811                                tool_call_id: tool_call.id.clone(),
812                                content: result_value,
813                                success,
814                            }),
815                        };
816
817                        let wrote_rs = success
818                            && tool_call.name == "write_file"
819                            && tool_call
820                                .arguments
821                                .get("path")
822                                .and_then(|p| p.as_str())
823                                .map(|p| p.ends_with(".rs"))
824                                .unwrap_or(false);
825
826                        (idx, record, tool_msg, wrote_rs)
827                    })
828                    .buffer_unordered(max_parallel)
829                    .collect::<Vec<_>>()
830                    .await;
831
832                for (idx, record, tool_msg, wrote_rs) in results {
833                    ordered_records[idx] = Some(record);
834                    ordered_tool_messages[idx] = Some(tool_msg);
835                    ordered_compile_gate[idx] = wrote_rs;
836                }
837            }
838
839            for i in 0..response.tool_calls.len() {
840                if let Some(record) = ordered_records[i].take() {
841                    all_tool_calls.push(record);
842                }
843                if let Some(msg) = ordered_tool_messages[i].take() {
844                    self.history.push(msg);
845                }
846
847                if ordered_compile_gate[i] {
848                    let ws = self.workspace_root.clone();
849                    let check_result = tokio::process::Command::new("cargo")
850                        .arg("check")
851                        .arg("--message-format=short")
852                        .current_dir(&ws)
853                        .output()
854                        .await;
855                    match check_result {
856                        Ok(output) if !output.status.success() => {
857                            let stderr = String::from_utf8_lossy(&output.stderr);
858                            let err_msg: String = stderr.chars().take(1500).collect();
859                            tracing::info!("Compile-gate: cargo check failed after write_file, injecting errors");
860                            self.history.push(Message {
861                                role: Role::User,
862                                content: format!(
863                                    "[SYSTEM] cargo check failed after your write_file. Fix the errors:\n{}",
864                                    err_msg
865                                ),
866                                tool_calls: vec![],
867                                tool_result: None,
868                            });
869                        }
870                        Ok(_) => {
871                            tracing::debug!("Compile-gate: cargo check passed");
872                        }
873                        Err(e) => {
874                            tracing::warn!("Compile-gate: cargo check failed to run: {}", e);
875                        }
876                    }
877                }
878            }
879        }
880    }
881
882    /// Execute using the ToolCoordinator instead of the built-in loop.
883    ///
884    /// This method provides an alternative implementation that uses the
885    /// ToolCoordinator for tool-calling loops, which offers:
886    /// - Parallel tool execution
887    /// - Per-tool timeout handling
888    /// - Consistent error handling
889    /// - Max iteration limits
890    ///
891    /// Note: This method does not support streaming callbacks or permission
892    /// prompts - those are only available in the built-in loop.
893    async fn execute_with_coordinator(&mut self, user_prompt: &str) -> Result<AgentResponse> {
894        // Reset idle timeout for the new turn
895        self.last_tool_call_time = None;
896
897        // Inject Eruka core memory before first LLM call
898        if let Some(eruka) = &self.eruka {
899            let before_inject = self.history.len();
900            if let Err(e) = eruka.inject_core_memory(&mut self.history).await {
901                tracing::warn!("Eruka memory injection failed (non-fatal): {}", e);
902            }
903
904            for msg in self
905                .history
906                .iter_mut()
907                .skip(before_inject)
908                .filter(|m| m.role == Role::System)
909            {
910                let fenced = prepare_recalled_context("eruka_core_memory", &msg.content);
911                if !fenced.is_empty() {
912                    msg.content = fenced;
913                }
914            }
915
916            // Prefetch task-relevant context
917            match eruka.prefetch(user_prompt, 2000).await {
918                Ok(Some(ctx)) => {
919                    let fenced = prepare_recalled_context("eruka_prefetch", &ctx);
920                    if !fenced.is_empty() {
921                        self.history.push(Message {
922                            role: Role::System,
923                            content: fenced,
924                            tool_calls: vec![],
925                            tool_result: None,
926                        });
927                    }
928                }
929                Ok(None) => {}
930                Err(e) => tracing::warn!("Eruka prefetch failed (non-fatal): {}", e),
931            }
932        }
933
934        // Per-turn architecture context injection
935
936        if let Some(err) = &self.arch_context_error {
937            return Err(PawanError::Config(err.clone()));
938        }
939
940        let effective_prompt = match &self.arch_context {
941            Some(ctx) => format!(
942                "[Workspace Architecture]\n{ctx}\n[/Workspace Architecture]\n\n{user_prompt}"
943            ),
944            None => user_prompt.to_string(),
945        };
946
947        // Build coordinator config from agent config
948        let coordinator_config = ToolCallingConfig {
949            max_iterations: self.config.max_tool_iterations,
950            parallel_execution: true,
951            max_parallel_tools: 10,
952            tool_timeout: std::time::Duration::from_secs(self.config.bash_timeout_secs),
953            stop_on_error: false,
954        };
955
956        // Create a fresh backend for coordinator execution
957        let system_prompt = self.config.get_system_prompt_checked()?;
958        let backend = Self::create_backend(&self.config, &system_prompt);
959        let backend = Arc::from(backend);
960
961        // Create a fresh tool registry for coordinator execution
962        // Note: This will not include any MCP tools registered at runtime
963        let registry = Arc::new(ToolRegistry::with_defaults(self.workspace_root.clone()));
964
965        // Create coordinator with backend and tool registry
966        let coordinator = ToolCoordinator::new(backend, registry, coordinator_config);
967
968        // Execute with coordinator
969        let result: CoordinatorResult = coordinator
970            .execute(Some(&system_prompt), &effective_prompt)
971            .await
972            .map_err(|e| PawanError::Agent(format!("Coordinator execution failed: {}", e)))?;
973
974        // Convert CoordinatorResult to AgentResponse
975        let content = result.content.clone();
976        let agent_response = AgentResponse {
977            content: result.content,
978            tool_calls: result.tool_calls,
979            iterations: result.iterations,
980            usage: result.total_usage,
981        };
982
983        // Sync turn to Eruka if enabled
984        if let Some(eruka) = &self.eruka {
985            if let Err(e) = eruka
986                .sync_turn(user_prompt, &content, &self.session_id)
987                .await
988            {
989                tracing::warn!("Eruka sync_turn failed (non-fatal): {}", e);
990            }
991        }
992
993        Ok(agent_response)
994    }
995
996    /// Execute a healing task with real diagnostics
997    pub async fn heal(&mut self) -> Result<AgentResponse> {
998        let healer =
999            crate::healing::Healer::new(self.workspace_root.clone(), self.config.healing.clone());
1000
1001        let diagnostics = healer.get_diagnostics().await?;
1002        let failed_tests = healer.get_failed_tests().await?;
1003
1004        let mut prompt = format!(
1005            "I need you to heal this Rust project at: {}
1006
1007",
1008            self.workspace_root.display()
1009        );
1010
1011        if !diagnostics.is_empty() {
1012            prompt.push_str(&format!(
1013                "## Compilation Issues ({} found)
1014{}
1015",
1016                diagnostics.len(),
1017                healer.format_diagnostics_for_prompt(&diagnostics)
1018            ));
1019        }
1020
1021        if !failed_tests.is_empty() {
1022            prompt.push_str(&format!(
1023                "## Failed Tests ({} found)
1024{}
1025",
1026                failed_tests.len(),
1027                healer.format_tests_for_prompt(&failed_tests)
1028            ));
1029        }
1030
1031        if diagnostics.is_empty() && failed_tests.is_empty() {
1032            prompt.push_str(
1033                "No issues found! Run cargo check and cargo test to verify.
1034",
1035            );
1036        }
1037
1038        prompt.push_str(
1039            "
1040Fix each issue one at a time. Verify with cargo check after each fix.",
1041        );
1042
1043        self.execute(&prompt).await
1044    }
1045    /// Execute healing with retries — calls heal(), checks for remaining errors, retries if needed.
1046    ///
1047    /// Two-stage gate:
1048    ///   Stage 1 — `cargo check`: must produce zero errors before proceeding.
1049    ///   Stage 2 — `healing.verify_cmd` (optional): a user-supplied shell command
1050    ///             (e.g. `cargo test --workspace`).  If it exits non-zero the loop
1051    ///             continues so the LLM can address the reported failures.
1052    ///
1053    /// Anti-thrash guard: each Stage-1 error is fingerprinted (kind + code +
1054    /// message prefix).  If the same fingerprint survives `max_attempts`
1055    /// consecutive rounds unchanged the loop halts rather than spinning
1056    /// indefinitely on an error the LLM cannot fix.
1057    pub async fn heal_with_retries(&mut self, max_attempts: usize) -> Result<AgentResponse> {
1058        use std::collections::{HashMap, HashSet};
1059
1060        let mut last_response = self.heal().await?;
1061        // fingerprint → consecutive rounds this error has survived unchanged
1062        let mut stuck_counts: HashMap<u64, usize> = HashMap::new();
1063
1064        for attempt in 1..max_attempts {
1065            // Stage 1: cargo check must be error-free
1066            let fixer = crate::healing::CompilerFixer::new(self.workspace_root.clone());
1067            let remaining = fixer.check().await?;
1068            let errors: Vec<_> = remaining
1069                .iter()
1070                .filter(|d| d.kind == crate::healing::DiagnosticKind::Error)
1071                .collect();
1072
1073            if !errors.is_empty() {
1074                // Update fingerprint counts.
1075                // Drop entries for errors that were fixed; increment survivors.
1076                let current_fps: HashSet<u64> = errors.iter().map(|d| d.fingerprint()).collect();
1077                stuck_counts.retain(|fp, _| current_fps.contains(fp));
1078                for fp in &current_fps {
1079                    *stuck_counts.entry(*fp).or_insert(0) += 1;
1080                }
1081
1082                // Anti-thrash: halt if any error fingerprint has not budged
1083                // after max_attempts consecutive rounds.
1084                let thrashing: Vec<u64> = stuck_counts
1085                    .iter()
1086                    .filter_map(|(&fp, &count)| {
1087                        if count >= max_attempts {
1088                            Some(fp)
1089                        } else {
1090                            None
1091                        }
1092                    })
1093                    .collect();
1094                if !thrashing.is_empty() {
1095                    tracing::warn!(
1096                        stuck_fingerprints = thrashing.len(),
1097                        attempt,
1098                        "Anti-thrash: {} error(s) unchanged after {} attempts, halting heal loop",
1099                        thrashing.len(),
1100                        max_attempts
1101                    );
1102                    return Ok(last_response);
1103                }
1104
1105                tracing::warn!(
1106                    errors = errors.len(),
1107                    attempt,
1108                    "Stage 1 (cargo check): errors remain, retrying"
1109                );
1110                last_response = self.heal().await?;
1111                continue;
1112            }
1113
1114            // All Stage-1 errors cleared — reset thrash counters.
1115            stuck_counts.clear();
1116
1117            // Stage 2: optional verify_cmd
1118            let verify_cmd = self.config.healing.verify_cmd.clone();
1119            if let Some(ref cmd) = verify_cmd {
1120                match crate::healing::run_verify_cmd(&self.workspace_root, cmd).await {
1121                    Ok(None) => {
1122                        tracing::info!(
1123                            attempts = attempt,
1124                            "Stage 2 (verify_cmd) passed, healing complete"
1125                        );
1126                        return Ok(last_response);
1127                    }
1128                    Ok(Some(diag)) => {
1129                        tracing::warn!(
1130                            attempt,
1131                            cmd,
1132                            output = diag.raw,
1133                            "Stage 2 (verify_cmd) failed, retrying"
1134                        );
1135                        last_response = self.heal().await?;
1136                        continue;
1137                    }
1138                    Err(e) => {
1139                        // Cannot spawn the command — don't block healing on this
1140                        tracing::warn!(cmd, error = %e, "verify_cmd could not be run, skipping stage 2");
1141                        return Ok(last_response);
1142                    }
1143                }
1144            } else {
1145                tracing::info!(
1146                    attempts = attempt,
1147                    "Stage 1 (cargo check) passed, healing complete"
1148                );
1149                return Ok(last_response);
1150            }
1151        }
1152
1153        tracing::info!(
1154            attempts = max_attempts,
1155            "Healing finished (may still have errors)"
1156        );
1157        Ok(last_response)
1158    }
1159    /// Execute a task with a specific prompt
1160    pub async fn task(&mut self, task_description: &str) -> Result<AgentResponse> {
1161        let prompt = format!(
1162            r#"I need you to complete the following coding task:
1163
1164{}
1165
1166The workspace is at: {}
1167
1168Please:
11691. First explore the codebase to understand the relevant code
11702. Make the necessary changes
11713. Verify the changes compile with `cargo check`
11724. Run relevant tests if applicable
1173
1174Explain your changes as you go."#,
1175            task_description,
1176            self.workspace_root.display()
1177        );
1178
1179        self.execute(&prompt).await
1180    }
1181
1182    /// Generate a commit message for current changes
1183    pub async fn generate_commit_message(&mut self) -> Result<String> {
1184        let prompt = r#"Please:
11851. Run `git status` to see what files are changed
11862. Run `git diff --cached` to see staged changes (or `git diff` for unstaged)
11873. Generate a concise, descriptive commit message following conventional commits format
1188
1189Only output the suggested commit message, nothing else."#;
1190
1191        let response = self.execute(prompt).await?;
1192        Ok(response.content)
1193    }
1194}