1use super::{
4 prepare_recalled_context, AgentResponse, LLMResponse, Message, PawanAgent, PermissionCallback,
5 PermissionRequest, Role, TokenCallback, TokenUsage, ToolCallRecord, ToolCallRequest,
6 ToolCallback, ToolResultMessage, ToolStartCallback,
7};
8use crate::coordinator::{CoordinatorResult, ToolCallingConfig, ToolCoordinator};
9use crate::tools::ToolRegistry;
10use crate::{PawanError, Result};
11use serde_json::{json, Value};
12use std::sync::Arc;
13use std::time::Instant;
14
15pub(crate) fn truncate_tool_result(value: Value, max_chars: usize) -> Value {
19 let serialized = serde_json::to_string(&value).unwrap_or_default();
20 if serialized.len() <= max_chars {
21 return value;
22 }
23
24 match value {
26 Value::Object(map) => {
27 let mut result = serde_json::Map::new();
28 let total = serialized.len();
29 for (k, v) in map {
30 if let Value::String(s) = &v {
31 if s.len() > 500 {
32 let target = s.len() * max_chars / total;
34 let target = target.max(200); let truncated: String = s.chars().take(target).collect();
36 result.insert(
37 k,
38 json!(format!(
39 "{}...[truncated from {} chars]",
40 truncated,
41 s.len()
42 )),
43 );
44 continue;
45 }
46 }
47 result.insert(k, truncate_tool_result(v, max_chars));
49 }
50 Value::Object(result)
51 }
52 Value::String(s) if s.len() > max_chars => {
53 let truncated: String = s.chars().take(max_chars).collect();
54 json!(format!(
55 "{}...[truncated from {} chars]",
56 truncated,
57 s.len()
58 ))
59 }
60 Value::Array(arr) if serialized.len() > max_chars => {
61 let mut result = Vec::new();
63 let mut running_len = 2; for item in arr {
65 let item_str = serde_json::to_string(&item).unwrap_or_default();
66 running_len += item_str.len() + 1; if running_len > max_chars {
68 result.push(json!(format!("...[{} more items truncated]", 0)));
69 break;
70 }
71 result.push(item);
72 }
73 Value::Array(result)
74 }
75 other => other,
76 }
77}
78
79pub(crate) fn summarize_args(args: &serde_json::Value) -> String {
81 match args {
82 serde_json::Value::Object(map) => {
83 let mut parts = Vec::new();
84 for (key, value) in map {
85 let value_str = match value {
86 serde_json::Value::String(s) if s.len() > 50 => {
87 format!("\"{}...\"", &s[..47])
88 }
89 serde_json::Value::String(s) => format!("\"{}\"", s),
90 serde_json::Value::Array(arr) if arr.len() > 3 => {
91 format!("[... {} items]", arr.len())
92 }
93 serde_json::Value::Array(arr) => {
94 let items: Vec<String> = arr
95 .iter()
96 .take(3)
97 .map(|v| match v {
98 serde_json::Value::String(s) => {
99 if s.len() > 20 {
100 format!("\"{}...\"", &s[..17])
101 } else {
102 format!("\"{}\"", s)
103 }
104 }
105 _ => v.to_string(),
106 })
107 .collect();
108 format!("[{}]", items.join(", "))
109 }
110 _ => value.to_string(),
111 };
112 parts.push(format!("{}: {}", key, value_str));
113 }
114 parts.join(", ")
115 }
116 serde_json::Value::String(s) => {
117 if s.len() > 100 {
118 format!("\"{}...\"", &s[..97])
119 } else {
120 format!("\"{}\"", s)
121 }
122 }
123 serde_json::Value::Array(arr) => {
124 format!("[{} items]", arr.len())
125 }
126 _ => args.to_string(),
127 }
128}
129
130impl PawanAgent {
131 pub async fn execute(&mut self, user_prompt: &str) -> Result<AgentResponse> {
133 self.execute_with_callbacks(user_prompt, None, None, None)
134 .await
135 }
136
137 pub async fn execute_with_callbacks(
139 &mut self,
140 user_prompt: &str,
141 on_token: Option<TokenCallback>,
142 on_tool: Option<ToolCallback>,
143 on_tool_start: Option<ToolStartCallback>,
144 ) -> Result<AgentResponse> {
145 self.execute_with_all_callbacks(user_prompt, on_token, on_tool, on_tool_start, None)
146 .await
147 }
148
149 pub async fn execute_with_all_callbacks(
151 &mut self,
152 user_prompt: &str,
153 on_token: Option<TokenCallback>,
154 on_tool: Option<ToolCallback>,
155 on_tool_start: Option<ToolStartCallback>,
156 on_permission: Option<PermissionCallback>,
157 ) -> Result<AgentResponse> {
158 if self.config.use_coordinator {
160 if on_token.is_some()
162 || on_tool.is_some()
163 || on_tool_start.is_some()
164 || on_permission.is_some()
165 {
166 tracing::warn!(
167 "Callbacks and permission prompts are not supported in coordinator mode; ignoring them"
168 );
169 }
170 return self.execute_with_coordinator(user_prompt).await;
171 }
172
173 self.last_tool_call_time = None;
175
176 self.inject_eruka_context(user_prompt).await;
178
179 let effective_prompt = self.build_user_prompt(user_prompt)?;
181 self.history.push(Message {
182 role: Role::User,
183 content: effective_prompt,
184 tool_calls: vec![],
185 tool_result: None,
186 });
187
188 let mut all_tool_calls = Vec::new();
189 let mut total_usage = TokenUsage::default();
190 let mut iterations = 0;
191 let max_iterations = self.config.max_tool_iterations;
192
193 loop {
194 if let Some(last_time) = self.last_tool_call_time {
196 let elapsed = last_time.elapsed().as_secs();
197 if elapsed > self.config.tool_call_idle_timeout_secs {
198 return Err(PawanError::Agent(format!(
199 "Tool idle timeout exceeded ({}s > {}s)",
200 elapsed, self.config.tool_call_idle_timeout_secs
201 )));
202 }
203 }
204
205 iterations += 1;
206 if iterations > max_iterations {
207 return Err(PawanError::Agent(format!(
208 "Max tool iterations ({}) exceeded",
209 max_iterations
210 )));
211 }
212
213 self.apply_iteration_budgets(iterations, max_iterations)
215 .await;
216
217 let latest_query = self
219 .history
220 .iter()
221 .rev()
222 .find(|m| m.role == Role::User)
223 .map(|m| m.content.as_str())
224 .unwrap_or("");
225 let tool_defs = self.tools.select_for_query(latest_query, 12);
226 if iterations == 1 {
227 let tool_names: Vec<&str> = tool_defs.iter().map(|t| t.name.as_str()).collect();
228 tracing::info!(tools = ?tool_names, count = tool_defs.len(), "Selected tools for query");
229 }
230
231 self.last_tool_call_time = Some(Instant::now());
233
234 let response = self
236 .call_llm_with_retry(&tool_defs, on_token.as_ref())
237 .await;
238
239 if let Some(ref usage) = response.usage {
241 Self::accumulate_token_usage(
242 usage,
243 &mut total_usage,
244 iterations,
245 self.config.thinking_budget,
246 );
247 }
248
249 let clean_content = Self::strip_thinking_blocks(&response.content);
251
252 if response.tool_calls.is_empty() {
253 if self
255 .handle_no_tool_response(
256 &clean_content,
257 user_prompt,
258 &tool_defs,
259 iterations,
260 max_iterations,
261 &response.finish_reason,
262 )
263 .await
264 {
265 self.history.push(Message {
266 role: Role::Assistant,
267 content: clean_content.clone(),
268 tool_calls: vec![],
269 tool_result: None,
270 });
271 if let Some(eruka) = &self.eruka {
273 if let Err(e) = eruka
274 .sync_turn(user_prompt, &clean_content, &self.session_id)
275 .await
276 {
277 tracing::warn!("Eruka sync_turn failed (non-fatal): {}", e);
278 }
279 }
280 return Ok(AgentResponse {
281 content: clean_content,
282 tool_calls: all_tool_calls,
283 iterations,
284 usage: total_usage,
285 });
286 }
287 continue;
288 }
289
290 self.history.push(Message {
292 role: Role::Assistant,
293 content: response.content.clone(),
294 tool_calls: response.tool_calls.clone(),
295 tool_result: None,
296 });
297
298 let (pending, mut ordered_records, mut ordered_tool_messages, mut ordered_compile_gate) =
300 self.check_tool_permissions(
301 &response.tool_calls,
302 on_permission.as_ref(),
303 on_tool.as_ref(),
304 on_tool_start.as_ref(),
305 )
306 .await;
307
308 if !pending.is_empty() {
310 let results = Self::execute_pending_tools(
311 &self.tools,
312 self.config.bash_timeout_secs,
313 self.config.max_result_chars,
314 pending,
315 on_tool.as_ref(),
316 )
317 .await;
318 for (idx, record, tool_msg, wrote_rs) in results {
319 ordered_records[idx] = Some(record);
320 ordered_tool_messages[idx] = Some(tool_msg);
321 ordered_compile_gate[idx] = wrote_rs;
322 }
323 }
324
325 self.collect_tool_results(
327 &mut all_tool_calls,
328 &mut ordered_records,
329 &mut ordered_tool_messages,
330 &ordered_compile_gate,
331 response.tool_calls.len(),
332 )
333 .await;
334 }
335 }
336 async fn inject_eruka_context(&mut self, user_prompt: &str) {
340 if let Some(eruka) = &self.eruka {
341 let before_inject = self.history.len();
342 if let Err(e) = eruka.inject_core_memory(&mut self.history).await {
343 tracing::warn!("Eruka memory injection failed (non-fatal): {}", e);
344 }
345
346 for msg in self
347 .history
348 .iter_mut()
349 .skip(before_inject)
350 .filter(|m| m.role == Role::System)
351 {
352 let fenced = prepare_recalled_context("eruka_core_memory", &msg.content);
353 if !fenced.is_empty() {
354 msg.content = fenced;
355 }
356 }
357
358 match eruka.prefetch(user_prompt, 2000).await {
362 Ok(Some(ctx)) => {
363 let fenced = prepare_recalled_context("eruka_prefetch", &ctx);
364 if !fenced.is_empty() {
365 self.history.push(Message {
366 role: Role::System,
367 content: fenced,
368 tool_calls: vec![],
369 tool_result: None,
370 });
371 }
372 }
373 Ok(None) => {}
374 Err(e) => tracing::warn!("Eruka prefetch failed (non-fatal): {}", e),
375 }
376 }
377 }
378
379 fn build_user_prompt(&self, user_prompt: &str) -> Result<String> {
381 if let Some(err) = &self.arch_context_error {
382 return Err(PawanError::Config(err.clone()));
383 }
384 Ok(match &self.arch_context {
385 Some(ctx) => format!(
386 "[Workspace Architecture]\n{ctx}\n[/Workspace Architecture]\n\n{user_prompt}"
387 ),
388 None => user_prompt.to_string(),
389 })
390 }
391
392 async fn apply_iteration_budgets(&mut self, iterations: usize, max_iterations: usize) {
395 let remaining = max_iterations.saturating_sub(iterations);
397 if remaining == 3 && iterations > 1 {
398 self.history.push(Message {
399 role: Role::User,
400 content: format!(
401 "[SYSTEM] You have {} tool iterations remaining. \
402 Stop exploring and write the most important output now. \
403 If you have code to write, write it immediately.",
404 remaining
405 ),
406 tool_calls: vec![],
407 tool_result: None,
408 });
409 }
410 self.context_tokens_estimate =
412 self.history.iter().map(|m| m.content.len()).sum::<usize>() / 4;
413 if self.context_tokens_estimate > self.config.max_context_tokens {
414 if let Some(eruka) = &self.eruka {
417 let snapshot = Self::history_snapshot_for_eruka(&self.history);
418 if let Err(e) = eruka.on_pre_compress(&snapshot, &self.session_id).await {
419 tracing::warn!("Eruka on_pre_compress failed (non-fatal): {}", e);
420 }
421 }
422 self.prune_history();
423 }
424 }
425
426 async fn call_llm_with_retry(
429 &mut self,
430 tool_defs: &[thulp_core::ToolDefinition],
431 on_token: Option<&TokenCallback>,
432 ) -> LLMResponse {
433 let max_llm_retries = 3;
434 let mut attempt = 0;
435 loop {
436 attempt += 1;
437 match self
438 .backend
439 .generate(&self.history, tool_defs, on_token)
440 .await
441 {
442 Ok(resp) => return resp,
443 Err(e) => {
444 let err_str = e.to_string();
445 let is_transient = err_str.contains("timeout")
446 || err_str.contains("connection")
447 || err_str.contains("429")
448 || err_str.contains("500")
449 || err_str.contains("502")
450 || err_str.contains("503")
451 || err_str.contains("504")
452 || err_str.contains("reset")
453 || err_str.contains("broken pipe");
454
455 if is_transient && attempt <= max_llm_retries {
456 let delay = std::time::Duration::from_secs(2u64.pow(attempt as u32));
457 tracing::warn!(
458 attempt = attempt,
459 delay_secs = delay.as_secs(),
460 error = err_str.as_str(),
461 "LLM call failed (transient) — retrying"
462 );
463 tokio::time::sleep(delay).await;
464
465 if err_str.contains("context") || err_str.contains("token") {
467 tracing::info!(
468 "Pruning history before retry (possible context overflow)"
469 );
470 if let Some(eruka) = &self.eruka {
471 let snapshot = Self::history_snapshot_for_eruka(&self.history);
472 if let Err(e) =
473 eruka.on_pre_compress(&snapshot, &self.session_id).await
474 {
475 tracing::warn!(
476 "Eruka on_pre_compress failed (non-fatal): {}",
477 e
478 );
479 }
480 }
481 self.prune_history();
482 }
483 continue;
484 }
485
486 tracing::error!(
488 attempt = attempt,
489 error = err_str.as_str(),
490 "LLM call failed permanently — returning error as content"
491 );
492 return LLMResponse {
493 content: format!(
494 "LLM error after {} attempts: {}. The task could not be completed.",
495 attempt, err_str
496 ),
497 reasoning: None,
498 tool_calls: vec![],
499 finish_reason: "error".to_string(),
500 usage: None,
501 };
502 }
503 }
504 }
505 }
506
507 fn accumulate_token_usage(
509 usage: &TokenUsage,
510 total_usage: &mut TokenUsage,
511 iterations: usize,
512 thinking_budget: usize,
513 ) {
514 total_usage.prompt_tokens += usage.prompt_tokens;
515 total_usage.completion_tokens += usage.completion_tokens;
516 total_usage.total_tokens += usage.total_tokens;
517 total_usage.reasoning_tokens += usage.reasoning_tokens;
518 total_usage.action_tokens += usage.action_tokens;
519
520 if usage.reasoning_tokens > 0 {
522 tracing::info!(
523 iteration = iterations,
524 think = usage.reasoning_tokens,
525 act = usage.action_tokens,
526 total = usage.completion_tokens,
527 "Token budget: think:{} act:{} (total:{})",
528 usage.reasoning_tokens,
529 usage.action_tokens,
530 usage.completion_tokens
531 );
532 }
533
534 if thinking_budget > 0 && usage.reasoning_tokens > thinking_budget as u64 {
536 tracing::warn!(
537 budget = thinking_budget,
538 actual = usage.reasoning_tokens,
539 "Thinking budget exceeded ({}/{} tokens)",
540 usage.reasoning_tokens,
541 thinking_budget
542 );
543 }
544 }
545
546 fn strip_thinking_blocks(content: &str) -> String {
548 let mut s = content.to_string();
549 loop {
550 let lower = s.to_lowercase();
551 let open = lower.find("<think>");
552 let close = lower.find("</think>");
553 match (open, close) {
554 (Some(i), Some(j)) if j > i => {
555 let before = s[..i].trim_end().to_string();
556 let after = if s.len() > j + 8 {
557 s[j + 8..].trim_start().to_string()
558 } else {
559 String::new()
560 };
561 s = if before.is_empty() {
562 after
563 } else if after.is_empty() {
564 before
565 } else {
566 format!("{}\n{}", before, after)
567 };
568 }
569 _ => break,
570 }
571 }
572 s
573 }
574
575 async fn handle_no_tool_response(
580 &mut self,
581 clean_content: &str,
582 _user_prompt: &str,
583 tool_defs: &[thulp_core::ToolDefinition],
584 iterations: usize,
585 max_iterations: usize,
586 finish_reason: &str,
587 ) -> bool {
588 let has_tools = !tool_defs.is_empty();
591 let lower = clean_content.to_lowercase();
592 let planning_prefix = lower.starts_with("let me")
593 || lower.starts_with("i'll help")
594 || lower.starts_with("i will help")
595 || lower.starts_with("sure, i")
596 || lower.starts_with("okay, i");
597 let looks_like_planning =
598 clean_content.len() > 200 || (planning_prefix && clean_content.len() > 50);
599 if has_tools
600 && looks_like_planning
601 && iterations == 1
602 && iterations < max_iterations
603 && finish_reason != "error"
604 {
605 tracing::warn!(
606 "No tool calls at iteration {} (content: {}B) — nudging model to use tools",
607 iterations,
608 clean_content.len()
609 );
610 self.history.push(Message {
611 role: Role::Assistant,
612 content: clean_content.to_string(),
613 tool_calls: vec![],
614 tool_result: None,
615 });
616 self.history.push(Message {
617 role: Role::User,
618 content: "You must use tools to complete this task. Do NOT just describe what you would do — actually call the tools. Start with bash or read_file.".to_string(),
619 tool_calls: vec![],
620 tool_result: None,
621 });
622 return false;
623 }
624
625 if iterations > 1 {
627 let prev_assistant = self
628 .history
629 .iter()
630 .rev()
631 .find(|m| m.role == Role::Assistant && !m.content.is_empty());
632 if let Some(prev) = prev_assistant {
633 if prev.content.trim() == clean_content.trim() && iterations < max_iterations {
634 tracing::warn!(
635 "Repeated response detected at iteration {} — injecting correction",
636 iterations
637 );
638 self.history.push(Message {
639 role: Role::Assistant,
640 content: clean_content.to_string(),
641 tool_calls: vec![],
642 tool_result: None,
643 });
644 self.history.push(Message {
645 role: Role::User,
646 content: "You gave the same response as before. Try a different approach. Use anchor_text in edit_file_lines, or use insert_after, or use bash with sed.".to_string(),
647 tool_calls: vec![],
648 tool_result: None,
649 });
650 return false;
651 }
652 }
653 }
654
655 true
656 }
657
658 async fn check_tool_permissions(
661 &mut self,
662 tool_calls: &[ToolCallRequest],
663 on_permission: Option<&PermissionCallback>,
664 on_tool: Option<&ToolCallback>,
665 on_tool_start: Option<&ToolStartCallback>,
666 ) -> (
667 Vec<(usize, ToolCallRequest)>,
668 Vec<Option<ToolCallRecord>>,
669 Vec<Option<Message>>,
670 Vec<bool>,
671 ) {
672 let mut ordered_records: Vec<Option<ToolCallRecord>> = vec![None; tool_calls.len()];
673 let mut ordered_tool_messages: Vec<Option<Message>> = vec![None; tool_calls.len()];
674 let ordered_compile_gate: Vec<bool> = vec![false; tool_calls.len()];
675 let mut pending: Vec<(usize, ToolCallRequest)> = Vec::new();
676
677 for (idx, tool_call) in tool_calls.iter().cloned().enumerate() {
678 self.tools.activate(&tool_call.name);
679
680 let perm =
681 crate::config::ToolPermission::resolve(&tool_call.name, &self.config.permissions);
682 let denied = match perm {
683 crate::config::ToolPermission::Deny => Some("Tool denied by permission policy"),
684 crate::config::ToolPermission::Prompt => {
685 if tool_call.name == "bash" {
686 if let Some(cmd) =
687 tool_call.arguments.get("command").and_then(|v| v.as_str())
688 {
689 if crate::tools::bash::is_read_only(cmd) {
690 tracing::debug!(
691 command = cmd,
692 "Auto-allowing read-only bash command under Prompt permission"
693 );
694 None
695 } else if let Some(ref perm_cb) = on_permission {
696 let args_summary = cmd.chars().take(120).collect::<String>();
697 let rx = perm_cb(PermissionRequest {
698 tool_name: tool_call.name.clone(),
699 args_summary,
700 });
701 match rx.await {
702 Ok(true) => None,
703 _ => Some("User denied tool execution"),
704 }
705 } else {
706 Some("Bash command requires user approval (read-only commands auto-allowed)")
707 }
708 } else {
709 Some("Tool requires user approval")
710 }
711 } else if let Some(ref perm_cb) = on_permission {
712 let args_summary = tool_call
713 .arguments
714 .to_string()
715 .chars()
716 .take(120)
717 .collect::<String>();
718 let rx = perm_cb(PermissionRequest {
719 tool_name: tool_call.name.clone(),
720 args_summary,
721 });
722 match rx.await {
723 Ok(true) => None,
724 _ => Some("User denied tool execution"),
725 }
726 } else {
727 Some(
728 "Tool requires user approval (set permission to allow or use TUI mode)",
729 )
730 }
731 }
732 crate::config::ToolPermission::Allow => None,
733 };
734
735 if let Some(reason) = denied {
736 let record = ToolCallRecord {
737 id: tool_call.id.clone(),
738 name: tool_call.name.clone(),
739 arguments: tool_call.arguments.clone(),
740 result: json!({"error": reason}),
741 success: false,
742 duration_ms: 0,
743 };
744 if let Some(ref callback) = on_tool {
745 callback(&record);
746 }
747 ordered_records[idx] = Some(record);
748 ordered_tool_messages[idx] = Some(Message {
749 role: Role::Tool,
750 content: serde_json::to_string(&json!({"error": reason})).unwrap_or_default(),
751 tool_calls: vec![],
752 tool_result: Some(ToolResultMessage {
753 tool_call_id: tool_call.id.clone(),
754 content: json!({"error": reason}),
755 success: false,
756 }),
757 });
758 continue;
759 }
760
761 if let Some(ref callback) = on_tool_start {
762 callback(&tool_call.name);
763 }
764
765 if let Some(tool) = self.tools.get(&tool_call.name) {
766 let schema = tool.parameters_schema();
767 if let Ok(params) = thulp_core::ToolDefinition::parse_mcp_input_schema(&schema) {
768 let thulp_def = thulp_core::ToolDefinition {
769 name: tool_call.name.clone(),
770 description: String::new(),
771 parameters: params,
772 };
773 if let Err(e) = thulp_def.validate_args(&tool_call.arguments) {
774 tracing::warn!(tool = tool_call.name.as_str(), error = %e, "Tool argument validation failed (continuing anyway)");
775 }
776 }
777 }
778
779 let tool = self.tools.get(&tool_call.name);
780 let is_mutating = tool.map(|t| t.mutating()).unwrap_or(false);
781 if is_mutating {
782 if let Some(ref callback) = on_permission {
783 let args_summary = summarize_args(&tool_call.arguments);
784 let request = PermissionRequest {
785 tool_name: tool_call.name.clone(),
786 args_summary,
787 };
788 let permission_rx = (callback)(request);
789 match permission_rx.await {
790 Ok(true) => {}
791 Ok(false) => {
792 let record = ToolCallRecord {
793 id: tool_call.id.clone(),
794 name: tool_call.name.clone(),
795 arguments: tool_call.arguments.clone(),
796 result: json!({"error": "Tool execution denied by user", "tool": tool_call.name}),
797 success: false,
798 duration_ms: 0,
799 };
800 if let Some(ref callback) = on_tool {
801 callback(&record);
802 }
803 ordered_records[idx] = Some(record);
804 ordered_tool_messages[idx] = Some(Message {
805 role: Role::Tool,
806 content: serde_json::to_string(&json!({"error": "Tool execution denied by user", "tool": tool_call.name})).unwrap_or_default(),
807 tool_calls: vec![],
808 tool_result: Some(ToolResultMessage {
809 tool_call_id: tool_call.id.clone(),
810 content: json!({"error": "Tool execution denied by user", "tool": tool_call.name}),
811 success: false,
812 }),
813 });
814 continue;
815 }
816 Err(_) => {
817 let record = ToolCallRecord {
818 id: tool_call.id.clone(),
819 name: tool_call.name.clone(),
820 arguments: tool_call.arguments.clone(),
821 result: json!({"error": "Permission channel closed", "tool": tool_call.name}),
822 success: false,
823 duration_ms: 0,
824 };
825 if let Some(ref callback) = on_tool {
826 callback(&record);
827 }
828 ordered_records[idx] = Some(record);
829 ordered_tool_messages[idx] = Some(Message {
830 role: Role::Tool,
831 content: serde_json::to_string(&json!({"error": "Permission channel closed", "tool": tool_call.name})).unwrap_or_default(),
832 tool_calls: vec![],
833 tool_result: Some(ToolResultMessage {
834 tool_call_id: tool_call.id.clone(),
835 content: json!({"error": "Permission channel closed", "tool": tool_call.name}),
836 success: false,
837 }),
838 });
839 continue;
840 }
841 }
842 } else {
843 tracing::warn!(
844 tool = tool_call.name.as_str(),
845 "No permission callback, auto-approving mutating tool"
846 );
847 }
848 }
849
850 pending.push((idx, tool_call));
851 }
852
853 (
854 pending,
855 ordered_records,
856 ordered_tool_messages,
857 ordered_compile_gate,
858 )
859 }
860
861 async fn execute_pending_tools(
863 tools: &ToolRegistry,
864 bash_timeout_secs: u64,
865 max_result_chars: usize,
866 pending: Vec<(usize, ToolCallRequest)>,
867 on_tool: Option<&ToolCallback>,
868 ) -> Vec<(usize, ToolCallRecord, Message, bool)> {
869 use futures::{stream, StreamExt};
870
871 let on_tool_cb = on_tool;
872 let max_parallel = std::cmp::max(1, 10);
873 stream::iter(pending)
874 .map(|(idx, tool_call)| async move {
875 let start = std::time::Instant::now();
876
877 let result = {
878 let tool_future = tools.execute(&tool_call.name, tool_call.arguments.clone());
879 let timeout_dur = if tool_call.name == "bash" {
880 std::time::Duration::from_secs(bash_timeout_secs)
881 } else {
882 std::time::Duration::from_secs(30)
883 };
884 match tokio::time::timeout(timeout_dur, tool_future).await {
885 Ok(inner) => inner,
886 Err(_) => Err(PawanError::Tool(format!(
887 "Tool {} timed out after {}s",
888 tool_call.name,
889 timeout_dur.as_secs()
890 ))),
891 }
892 };
893
894 let duration_ms = start.elapsed().as_millis() as u64;
895 let (mut result_value, success) = match result {
896 Ok(v) => (v, true),
897 Err(e) => {
898 tracing::warn!(tool = tool_call.name.as_str(), error = %e, "Tool execution failed");
899 (json!({"error": e.to_string(), "tool": tool_call.name, "hint": "Try a different approach or tool"}), false)
900 }
901 };
902
903 result_value = truncate_tool_result(result_value, max_result_chars);
904
905 let record = ToolCallRecord {
906 id: tool_call.id.clone(),
907 name: tool_call.name.clone(),
908 arguments: tool_call.arguments.clone(),
909 result: result_value.clone(),
910 success,
911 duration_ms,
912 };
913
914 if let Some(ref cb) = on_tool_cb {
915 cb(&record);
916 }
917
918 let tool_msg = Message {
919 role: Role::Tool,
920 content: serde_json::to_string(&result_value).unwrap_or_default(),
921 tool_calls: vec![],
922 tool_result: Some(ToolResultMessage {
923 tool_call_id: tool_call.id.clone(),
924 content: result_value,
925 success,
926 }),
927 };
928
929 let wrote_rs = success
930 && tool_call.name == "write_file"
931 && tool_call
932 .arguments
933 .get("path")
934 .and_then(|p| p.as_str())
935 .map(|p| p.ends_with(".rs"))
936 .unwrap_or(false);
937
938 (idx, record, tool_msg, wrote_rs)
939 })
940 .buffer_unordered(max_parallel)
941 .collect::<Vec<_>>()
942 .await
943 }
944
945 async fn collect_tool_results(
948 &mut self,
949 all_tool_calls: &mut Vec<ToolCallRecord>,
950 ordered_records: &mut [Option<ToolCallRecord>],
951 ordered_tool_messages: &mut [Option<Message>],
952 ordered_compile_gate: &[bool],
953 tool_calls_count: usize,
954 ) {
955 for i in 0..tool_calls_count {
956 if let Some(record) = ordered_records[i].take() {
957 all_tool_calls.push(record);
958 }
959 if let Some(msg) = ordered_tool_messages[i].take() {
960 self.history.push(msg);
961 }
962
963 if ordered_compile_gate[i] {
964 let ws = self.workspace_root.clone();
965 let check_result = tokio::process::Command::new("cargo")
966 .arg("check")
967 .arg("--message-format=short")
968 .current_dir(&ws)
969 .output()
970 .await;
971 match check_result {
972 Ok(output) if !output.status.success() => {
973 let stderr = String::from_utf8_lossy(&output.stderr);
974 let err_msg: String = stderr.chars().take(1500).collect();
975 tracing::info!(
976 "Compile-gate: cargo check failed after write_file, injecting errors"
977 );
978 self.history.push(Message {
979 role: Role::User,
980 content: format!(
981 "[SYSTEM] cargo check failed after your write_file. Fix the errors:\n{}",
982 err_msg
983 ),
984 tool_calls: vec![],
985 tool_result: None,
986 });
987 }
988 Ok(_) => {
989 tracing::debug!("Compile-gate: cargo check passed");
990 }
991 Err(e) => {
992 tracing::warn!("Compile-gate: cargo check failed to run: {}", e);
993 }
994 }
995 }
996 }
997 }
998
999 async fn execute_with_coordinator(&mut self, user_prompt: &str) -> Result<AgentResponse> {
1011 self.last_tool_call_time = None;
1013
1014 if let Some(eruka) = &self.eruka {
1016 let before_inject = self.history.len();
1017 if let Err(e) = eruka.inject_core_memory(&mut self.history).await {
1018 tracing::warn!("Eruka memory injection failed (non-fatal): {}", e);
1019 }
1020
1021 for msg in self
1022 .history
1023 .iter_mut()
1024 .skip(before_inject)
1025 .filter(|m| m.role == Role::System)
1026 {
1027 let fenced = prepare_recalled_context("eruka_core_memory", &msg.content);
1028 if !fenced.is_empty() {
1029 msg.content = fenced;
1030 }
1031 }
1032
1033 match eruka.prefetch(user_prompt, 2000).await {
1035 Ok(Some(ctx)) => {
1036 let fenced = prepare_recalled_context("eruka_prefetch", &ctx);
1037 if !fenced.is_empty() {
1038 self.history.push(Message {
1039 role: Role::System,
1040 content: fenced,
1041 tool_calls: vec![],
1042 tool_result: None,
1043 });
1044 }
1045 }
1046 Ok(None) => {}
1047 Err(e) => tracing::warn!("Eruka prefetch failed (non-fatal): {}", e),
1048 }
1049 }
1050
1051 if let Some(err) = &self.arch_context_error {
1054 return Err(PawanError::Config(err.clone()));
1055 }
1056
1057 let effective_prompt = match &self.arch_context {
1058 Some(ctx) => format!(
1059 "[Workspace Architecture]\n{ctx}\n[/Workspace Architecture]\n\n{user_prompt}"
1060 ),
1061 None => user_prompt.to_string(),
1062 };
1063
1064 let coordinator_config = ToolCallingConfig {
1066 max_iterations: self.config.max_tool_iterations,
1067 parallel_execution: true,
1068 max_parallel_tools: 10,
1069 tool_timeout: std::time::Duration::from_secs(self.config.bash_timeout_secs),
1070 stop_on_error: false,
1071 };
1072
1073 let system_prompt = self.config.get_system_prompt_checked()?;
1075 let backend = Self::create_backend(&self.config, &system_prompt);
1076 let backend = Arc::from(backend);
1077
1078 let registry = Arc::new(ToolRegistry::with_defaults(self.workspace_root.clone()));
1081
1082 let coordinator = ToolCoordinator::new(backend, registry, coordinator_config);
1084
1085 let result: CoordinatorResult = coordinator
1087 .execute(Some(&system_prompt), &effective_prompt)
1088 .await
1089 .map_err(|e| PawanError::Agent(format!("Coordinator execution failed: {}", e)))?;
1090
1091 let content = result.content.clone();
1093 let agent_response = AgentResponse {
1094 content: result.content,
1095 tool_calls: result.tool_calls,
1096 iterations: result.iterations,
1097 usage: result.total_usage,
1098 };
1099
1100 if let Some(eruka) = &self.eruka {
1102 if let Err(e) = eruka
1103 .sync_turn(user_prompt, &content, &self.session_id)
1104 .await
1105 {
1106 tracing::warn!("Eruka sync_turn failed (non-fatal): {}", e);
1107 }
1108 }
1109
1110 Ok(agent_response)
1111 }
1112
1113 pub async fn heal(&mut self) -> Result<AgentResponse> {
1115 let healer =
1116 crate::healing::Healer::new(self.workspace_root.clone(), self.config.healing.clone());
1117
1118 let diagnostics = healer.get_diagnostics().await?;
1119 let failed_tests = healer.get_failed_tests().await?;
1120
1121 let mut prompt = format!(
1122 "I need you to heal this Rust project at: {}
1123
1124",
1125 self.workspace_root.display()
1126 );
1127
1128 if !diagnostics.is_empty() {
1129 prompt.push_str(&format!(
1130 "## Compilation Issues ({} found)
1131{}
1132",
1133 diagnostics.len(),
1134 healer.format_diagnostics_for_prompt(&diagnostics)
1135 ));
1136 }
1137
1138 if !failed_tests.is_empty() {
1139 prompt.push_str(&format!(
1140 "## Failed Tests ({} found)
1141{}
1142",
1143 failed_tests.len(),
1144 healer.format_tests_for_prompt(&failed_tests)
1145 ));
1146 }
1147
1148 if diagnostics.is_empty() && failed_tests.is_empty() {
1149 prompt.push_str(
1150 "No issues found! Run cargo check and cargo test to verify.
1151",
1152 );
1153 }
1154
1155 prompt.push_str(
1156 "
1157Fix each issue one at a time. Verify with cargo check after each fix.",
1158 );
1159
1160 self.execute(&prompt).await
1161 }
1162 pub async fn heal_with_retries(&mut self, max_attempts: usize) -> Result<AgentResponse> {
1175 use std::collections::{HashMap, HashSet};
1176
1177 let mut last_response = self.heal().await?;
1178 let mut stuck_counts: HashMap<u64, usize> = HashMap::new();
1180
1181 for attempt in 1..max_attempts {
1182 let fixer = crate::healing::CompilerFixer::new(self.workspace_root.clone());
1184 let remaining = fixer.check().await?;
1185 let errors: Vec<_> = remaining
1186 .iter()
1187 .filter(|d| d.kind == crate::healing::DiagnosticKind::Error)
1188 .collect();
1189
1190 if !errors.is_empty() {
1191 let current_fps: HashSet<u64> = errors.iter().map(|d| d.fingerprint()).collect();
1194 stuck_counts.retain(|fp, _| current_fps.contains(fp));
1195 for fp in ¤t_fps {
1196 *stuck_counts.entry(*fp).or_insert(0) += 1;
1197 }
1198
1199 let thrashing: Vec<u64> = stuck_counts
1202 .iter()
1203 .filter_map(|(&fp, &count)| {
1204 if count >= max_attempts {
1205 Some(fp)
1206 } else {
1207 None
1208 }
1209 })
1210 .collect();
1211 if !thrashing.is_empty() {
1212 tracing::warn!(
1213 stuck_fingerprints = thrashing.len(),
1214 attempt,
1215 "Anti-thrash: {} error(s) unchanged after {} attempts, halting heal loop",
1216 thrashing.len(),
1217 max_attempts
1218 );
1219 return Ok(last_response);
1220 }
1221
1222 tracing::warn!(
1223 errors = errors.len(),
1224 attempt,
1225 "Stage 1 (cargo check): errors remain, retrying"
1226 );
1227 last_response = self.heal().await?;
1228 continue;
1229 }
1230
1231 stuck_counts.clear();
1233
1234 let verify_cmd = self.config.healing.verify_cmd.clone();
1236 if let Some(ref cmd) = verify_cmd {
1237 match crate::healing::run_verify_cmd(&self.workspace_root, cmd).await {
1238 Ok(None) => {
1239 tracing::info!(
1240 attempts = attempt,
1241 "Stage 2 (verify_cmd) passed, healing complete"
1242 );
1243 return Ok(last_response);
1244 }
1245 Ok(Some(diag)) => {
1246 tracing::warn!(
1247 attempt,
1248 cmd,
1249 output = diag.raw,
1250 "Stage 2 (verify_cmd) failed, retrying"
1251 );
1252 last_response = self.heal().await?;
1253 continue;
1254 }
1255 Err(e) => {
1256 tracing::warn!(cmd, error = %e, "verify_cmd could not be run, skipping stage 2");
1258 return Ok(last_response);
1259 }
1260 }
1261 } else {
1262 tracing::info!(
1263 attempts = attempt,
1264 "Stage 1 (cargo check) passed, healing complete"
1265 );
1266 return Ok(last_response);
1267 }
1268 }
1269
1270 tracing::info!(
1271 attempts = max_attempts,
1272 "Healing finished (may still have errors)"
1273 );
1274 Ok(last_response)
1275 }
1276 pub async fn task(&mut self, task_description: &str) -> Result<AgentResponse> {
1278 let prompt = format!(
1279 r#"I need you to complete the following coding task:
1280
1281{}
1282
1283The workspace is at: {}
1284
1285Please:
12861. First explore the codebase to understand the relevant code
12872. Make the necessary changes
12883. Verify the changes compile with `cargo check`
12894. Run relevant tests if applicable
1290
1291Explain your changes as you go."#,
1292 task_description,
1293 self.workspace_root.display()
1294 );
1295
1296 self.execute(&prompt).await
1297 }
1298
1299 pub async fn generate_commit_message(&mut self) -> Result<String> {
1301 let prompt = r#"Please:
13021. Run `git status` to see what files are changed
13032. Run `git diff --cached` to see staged changes (or `git diff` for unstaged)
13043. Generate a concise, descriptive commit message following conventional commits format
1305
1306Only output the suggested commit message, nothing else."#;
1307
1308 let response = self.execute(prompt).await?;
1309 Ok(response.content)
1310 }
1311}