1use super::{
4 prepare_recalled_context, AgentResponse, LLMResponse, Message, PawanAgent, PermissionCallback,
5 PermissionRequest, Role, TokenCallback, TokenUsage, ToolCallback, ToolCallRecord,
6 ToolCallRequest, ToolResultMessage, ToolStartCallback,
7};
8use crate::coordinator::{CoordinatorResult, ToolCallingConfig, ToolCoordinator};
9use crate::tools::ToolRegistry;
10use crate::{PawanError, Result};
11use serde_json::{json, Value};
12use std::sync::Arc;
13use std::time::Instant;
14
15pub(crate) fn truncate_tool_result(value: Value, max_chars: usize) -> Value {
19 let serialized = serde_json::to_string(&value).unwrap_or_default();
20 if serialized.len() <= max_chars {
21 return value;
22 }
23
24 match value {
26 Value::Object(map) => {
27 let mut result = serde_json::Map::new();
28 let total = serialized.len();
29 for (k, v) in map {
30 if let Value::String(s) = &v {
31 if s.len() > 500 {
32 let target = s.len() * max_chars / total;
34 let target = target.max(200); let truncated: String = s.chars().take(target).collect();
36 result.insert(
37 k,
38 json!(format!(
39 "{}...[truncated from {} chars]",
40 truncated,
41 s.len()
42 )),
43 );
44 continue;
45 }
46 }
47 result.insert(k, truncate_tool_result(v, max_chars));
49 }
50 Value::Object(result)
51 }
52 Value::String(s) if s.len() > max_chars => {
53 let truncated: String = s.chars().take(max_chars).collect();
54 json!(format!(
55 "{}...[truncated from {} chars]",
56 truncated,
57 s.len()
58 ))
59 }
60 Value::Array(arr) if serialized.len() > max_chars => {
61 let mut result = Vec::new();
63 let mut running_len = 2; for item in arr {
65 let item_str = serde_json::to_string(&item).unwrap_or_default();
66 running_len += item_str.len() + 1; if running_len > max_chars {
68 result.push(json!(format!("...[{} more items truncated]", 0)));
69 break;
70 }
71 result.push(item);
72 }
73 Value::Array(result)
74 }
75 other => other,
76 }
77}
78
79pub(crate) fn summarize_args(args: &serde_json::Value) -> String {
81 match args {
82 serde_json::Value::Object(map) => {
83 let mut parts = Vec::new();
84 for (key, value) in map {
85 let value_str = match value {
86 serde_json::Value::String(s) if s.len() > 50 => {
87 format!("\"{}...\"", &s[..47])
88 }
89 serde_json::Value::String(s) => format!("\"{}\"", s),
90 serde_json::Value::Array(arr) if arr.len() > 3 => {
91 format!("[... {} items]", arr.len())
92 }
93 serde_json::Value::Array(arr) => {
94 let items: Vec<String> = arr
95 .iter()
96 .take(3)
97 .map(|v| match v {
98 serde_json::Value::String(s) => {
99 if s.len() > 20 {
100 format!("\"{}...\"", &s[..17])
101 } else {
102 format!("\"{}\"", s)
103 }
104 }
105 _ => v.to_string(),
106 })
107 .collect();
108 format!("[{}]", items.join(", "))
109 }
110 _ => value.to_string(),
111 };
112 parts.push(format!("{}: {}", key, value_str));
113 }
114 parts.join(", ")
115 }
116 serde_json::Value::String(s) => {
117 if s.len() > 100 {
118 format!("\"{}...\"", &s[..97])
119 } else {
120 format!("\"{}\"", s)
121 }
122 }
123 serde_json::Value::Array(arr) => {
124 format!("[{} items]", arr.len())
125 }
126 _ => args.to_string(),
127 }
128}
129
130
131impl PawanAgent {
132 pub async fn execute(&mut self, user_prompt: &str) -> Result<AgentResponse> {
134 self.execute_with_callbacks(user_prompt, None, None, None)
135 .await
136 }
137
138 pub async fn execute_with_callbacks(
140 &mut self,
141 user_prompt: &str,
142 on_token: Option<TokenCallback>,
143 on_tool: Option<ToolCallback>,
144 on_tool_start: Option<ToolStartCallback>,
145 ) -> Result<AgentResponse> {
146 self.execute_with_all_callbacks(user_prompt, on_token, on_tool, on_tool_start, None)
147 .await
148 }
149
150 pub async fn execute_with_all_callbacks(
152 &mut self,
153 user_prompt: &str,
154 on_token: Option<TokenCallback>,
155 on_tool: Option<ToolCallback>,
156 on_tool_start: Option<ToolStartCallback>,
157 on_permission: Option<PermissionCallback>,
158 ) -> Result<AgentResponse> {
159 if self.config.use_coordinator {
161 if on_token.is_some()
163 || on_tool.is_some()
164 || on_tool_start.is_some()
165 || on_permission.is_some()
166 {
167 tracing::warn!(
168 "Callbacks and permission prompts are not supported in coordinator mode; ignoring them"
169 );
170 }
171 return self.execute_with_coordinator(user_prompt).await;
172 }
173
174 self.last_tool_call_time = None;
176
177 self.inject_eruka_context(user_prompt).await;
179
180 let effective_prompt = self.build_user_prompt(user_prompt)?;
182 self.history.push(Message {
183 role: Role::User,
184 content: effective_prompt,
185 tool_calls: vec![],
186 tool_result: None,
187 });
188
189 let mut all_tool_calls = Vec::new();
190 let mut total_usage = TokenUsage::default();
191 let mut iterations = 0;
192 let max_iterations = self.config.max_tool_iterations;
193
194 loop {
195 if let Some(last_time) = self.last_tool_call_time {
197 let elapsed = last_time.elapsed().as_secs();
198 if elapsed > self.config.tool_call_idle_timeout_secs {
199 return Err(PawanError::Agent(format!(
200 "Tool idle timeout exceeded ({}s > {}s)",
201 elapsed, self.config.tool_call_idle_timeout_secs
202 )));
203 }
204 }
205
206 iterations += 1;
207 if iterations > max_iterations {
208 return Err(PawanError::Agent(format!(
209 "Max tool iterations ({}) exceeded",
210 max_iterations
211 )));
212 }
213
214 self.apply_iteration_budgets(iterations, max_iterations).await;
216
217 let latest_query = self
219 .history
220 .iter()
221 .rev()
222 .find(|m| m.role == Role::User)
223 .map(|m| m.content.as_str())
224 .unwrap_or("");
225 let tool_defs = self.tools.select_for_query(latest_query, 12);
226 if iterations == 1 {
227 let tool_names: Vec<&str> = tool_defs.iter().map(|t| t.name.as_str()).collect();
228 tracing::info!(tools = ?tool_names, count = tool_defs.len(), "Selected tools for query");
229 }
230
231 self.last_tool_call_time = Some(Instant::now());
233
234 let response = self.call_llm_with_retry(&tool_defs, on_token.as_ref()).await;
236
237 if let Some(ref usage) = response.usage {
239 Self::accumulate_token_usage(
240 usage, &mut total_usage, iterations, self.config.thinking_budget,
241 );
242 }
243
244 let clean_content = Self::strip_thinking_blocks(&response.content);
246
247 if response.tool_calls.is_empty() {
248 if self
250 .handle_no_tool_response(
251 &clean_content,
252 user_prompt,
253 &tool_defs,
254 iterations,
255 max_iterations,
256 &response.finish_reason,
257 )
258 .await
259 {
260 self.history.push(Message {
261 role: Role::Assistant,
262 content: clean_content.clone(),
263 tool_calls: vec![],
264 tool_result: None,
265 });
266 if let Some(eruka) = &self.eruka {
268 if let Err(e) = eruka
269 .sync_turn(user_prompt, &clean_content, &self.session_id)
270 .await
271 {
272 tracing::warn!("Eruka sync_turn failed (non-fatal): {}", e);
273 }
274 }
275 return Ok(AgentResponse {
276 content: clean_content,
277 tool_calls: all_tool_calls,
278 iterations,
279 usage: total_usage,
280 });
281 }
282 continue;
283 }
284
285 self.history.push(Message {
287 role: Role::Assistant,
288 content: response.content.clone(),
289 tool_calls: response.tool_calls.clone(),
290 tool_result: None,
291 });
292
293 let (pending, mut ordered_records, mut ordered_tool_messages, mut ordered_compile_gate) =
295 self.check_tool_permissions(
296 &response.tool_calls,
297 on_permission.as_ref(),
298 on_tool.as_ref(),
299 on_tool_start.as_ref(),
300 )
301 .await;
302
303 if !pending.is_empty() {
305 let results = Self::execute_pending_tools(
306 &self.tools,
307 self.config.bash_timeout_secs,
308 self.config.max_result_chars,
309 pending,
310 on_tool.as_ref(),
311 )
312 .await;
313 for (idx, record, tool_msg, wrote_rs) in results {
314 ordered_records[idx] = Some(record);
315 ordered_tool_messages[idx] = Some(tool_msg);
316 ordered_compile_gate[idx] = wrote_rs;
317 }
318 }
319
320 self.collect_tool_results(
322 &mut all_tool_calls,
323 &mut ordered_records,
324 &mut ordered_tool_messages,
325 &ordered_compile_gate,
326 response.tool_calls.len(),
327 )
328 .await;
329 }
330 }
331 async fn inject_eruka_context(&mut self, user_prompt: &str) {
335 if let Some(eruka) = &self.eruka {
336 let before_inject = self.history.len();
337 if let Err(e) = eruka.inject_core_memory(&mut self.history).await {
338 tracing::warn!("Eruka memory injection failed (non-fatal): {}", e);
339 }
340
341 for msg in self
342 .history
343 .iter_mut()
344 .skip(before_inject)
345 .filter(|m| m.role == Role::System)
346 {
347 let fenced = prepare_recalled_context("eruka_core_memory", &msg.content);
348 if !fenced.is_empty() {
349 msg.content = fenced;
350 }
351 }
352
353 match eruka.prefetch(user_prompt, 2000).await {
357 Ok(Some(ctx)) => {
358 let fenced = prepare_recalled_context("eruka_prefetch", &ctx);
359 if !fenced.is_empty() {
360 self.history.push(Message {
361 role: Role::System,
362 content: fenced,
363 tool_calls: vec![],
364 tool_result: None,
365 });
366 }
367 }
368 Ok(None) => {}
369 Err(e) => tracing::warn!("Eruka prefetch failed (non-fatal): {}", e),
370 }
371 }
372 }
373
374 fn build_user_prompt(&self, user_prompt: &str) -> Result<String> {
376 if let Some(err) = &self.arch_context_error {
377 return Err(PawanError::Config(err.clone()));
378 }
379 Ok(match &self.arch_context {
380 Some(ctx) => format!(
381 "[Workspace Architecture]\n{ctx}\n[/Workspace Architecture]\n\n{user_prompt}"
382 ),
383 None => user_prompt.to_string(),
384 })
385 }
386
387 async fn apply_iteration_budgets(&mut self, iterations: usize, max_iterations: usize) {
390 let remaining = max_iterations.saturating_sub(iterations);
392 if remaining == 3 && iterations > 1 {
393 self.history.push(Message {
394 role: Role::User,
395 content: format!(
396 "[SYSTEM] You have {} tool iterations remaining. \
397 Stop exploring and write the most important output now. \
398 If you have code to write, write it immediately.",
399 remaining
400 ),
401 tool_calls: vec![],
402 tool_result: None,
403 });
404 }
405 self.context_tokens_estimate =
407 self.history.iter().map(|m| m.content.len()).sum::<usize>() / 4;
408 if self.context_tokens_estimate > self.config.max_context_tokens {
409 if let Some(eruka) = &self.eruka {
412 let snapshot = Self::history_snapshot_for_eruka(&self.history);
413 if let Err(e) = eruka.on_pre_compress(&snapshot, &self.session_id).await {
414 tracing::warn!("Eruka on_pre_compress failed (non-fatal): {}", e);
415 }
416 }
417 self.prune_history();
418 }
419 }
420
421 async fn call_llm_with_retry(
424 &mut self,
425 tool_defs: &[thulp_core::ToolDefinition],
426 on_token: Option<&TokenCallback>,
427 ) -> LLMResponse {
428 let max_llm_retries = 3;
429 let mut attempt = 0;
430 loop {
431 attempt += 1;
432 match self
433 .backend
434 .generate(&self.history, tool_defs, on_token)
435 .await
436 {
437 Ok(resp) => return resp,
438 Err(e) => {
439 let err_str = e.to_string();
440 let is_transient = err_str.contains("timeout")
441 || err_str.contains("connection")
442 || err_str.contains("429")
443 || err_str.contains("500")
444 || err_str.contains("502")
445 || err_str.contains("503")
446 || err_str.contains("504")
447 || err_str.contains("reset")
448 || err_str.contains("broken pipe");
449
450 if is_transient && attempt <= max_llm_retries {
451 let delay =
452 std::time::Duration::from_secs(2u64.pow(attempt as u32));
453 tracing::warn!(
454 attempt = attempt,
455 delay_secs = delay.as_secs(),
456 error = err_str.as_str(),
457 "LLM call failed (transient) — retrying"
458 );
459 tokio::time::sleep(delay).await;
460
461 if err_str.contains("context") || err_str.contains("token") {
463 tracing::info!(
464 "Pruning history before retry (possible context overflow)"
465 );
466 if let Some(eruka) = &self.eruka {
467 let snapshot =
468 Self::history_snapshot_for_eruka(&self.history);
469 if let Err(e) =
470 eruka.on_pre_compress(&snapshot, &self.session_id).await
471 {
472 tracing::warn!(
473 "Eruka on_pre_compress failed (non-fatal): {}",
474 e
475 );
476 }
477 }
478 self.prune_history();
479 }
480 continue;
481 }
482
483 tracing::error!(
485 attempt = attempt,
486 error = err_str.as_str(),
487 "LLM call failed permanently — returning error as content"
488 );
489 return LLMResponse {
490 content: format!(
491 "LLM error after {} attempts: {}. The task could not be completed.",
492 attempt, err_str
493 ),
494 reasoning: None,
495 tool_calls: vec![],
496 finish_reason: "error".to_string(),
497 usage: None,
498 };
499 }
500 }
501 }
502 }
503
504 fn accumulate_token_usage(
506 usage: &TokenUsage,
507 total_usage: &mut TokenUsage,
508 iterations: usize,
509 thinking_budget: usize,
510 ) {
511 total_usage.prompt_tokens += usage.prompt_tokens;
512 total_usage.completion_tokens += usage.completion_tokens;
513 total_usage.total_tokens += usage.total_tokens;
514 total_usage.reasoning_tokens += usage.reasoning_tokens;
515 total_usage.action_tokens += usage.action_tokens;
516
517 if usage.reasoning_tokens > 0 {
519 tracing::info!(
520 iteration = iterations,
521 think = usage.reasoning_tokens,
522 act = usage.action_tokens,
523 total = usage.completion_tokens,
524 "Token budget: think:{} act:{} (total:{})",
525 usage.reasoning_tokens,
526 usage.action_tokens,
527 usage.completion_tokens
528 );
529 }
530
531 if thinking_budget > 0 && usage.reasoning_tokens > thinking_budget as u64 {
533 tracing::warn!(
534 budget = thinking_budget,
535 actual = usage.reasoning_tokens,
536 "Thinking budget exceeded ({}/{} tokens)",
537 usage.reasoning_tokens,
538 thinking_budget
539 );
540 }
541 }
542
543 fn strip_thinking_blocks(content: &str) -> String {
545 let mut s = content.to_string();
546 loop {
547 let lower = s.to_lowercase();
548 let open = lower.find("<think>");
549 let close = lower.find("</think>");
550 match (open, close) {
551 (Some(i), Some(j)) if j > i => {
552 let before = s[..i].trim_end().to_string();
553 let after = if s.len() > j + 8 {
554 s[j + 8..].trim_start().to_string()
555 } else {
556 String::new()
557 };
558 s = if before.is_empty() {
559 after
560 } else if after.is_empty() {
561 before
562 } else {
563 format!("{}\n{}", before, after)
564 };
565 }
566 _ => break,
567 }
568 }
569 s
570 }
571
572 async fn handle_no_tool_response(
577 &mut self,
578 clean_content: &str,
579 _user_prompt: &str,
580 tool_defs: &[thulp_core::ToolDefinition],
581 iterations: usize,
582 max_iterations: usize,
583 finish_reason: &str,
584 ) -> bool {
585 let has_tools = !tool_defs.is_empty();
588 let lower = clean_content.to_lowercase();
589 let planning_prefix = lower.starts_with("let me")
590 || lower.starts_with("i'll help")
591 || lower.starts_with("i will help")
592 || lower.starts_with("sure, i")
593 || lower.starts_with("okay, i");
594 let looks_like_planning =
595 clean_content.len() > 200 || (planning_prefix && clean_content.len() > 50);
596 if has_tools
597 && looks_like_planning
598 && iterations == 1
599 && iterations < max_iterations
600 && finish_reason != "error"
601 {
602 tracing::warn!(
603 "No tool calls at iteration {} (content: {}B) — nudging model to use tools",
604 iterations,
605 clean_content.len()
606 );
607 self.history.push(Message {
608 role: Role::Assistant,
609 content: clean_content.to_string(),
610 tool_calls: vec![],
611 tool_result: None,
612 });
613 self.history.push(Message {
614 role: Role::User,
615 content: "You must use tools to complete this task. Do NOT just describe what you would do — actually call the tools. Start with bash or read_file.".to_string(),
616 tool_calls: vec![],
617 tool_result: None,
618 });
619 return false;
620 }
621
622 if iterations > 1 {
624 let prev_assistant = self
625 .history
626 .iter()
627 .rev()
628 .find(|m| m.role == Role::Assistant && !m.content.is_empty());
629 if let Some(prev) = prev_assistant {
630 if prev.content.trim() == clean_content.trim()
631 && iterations < max_iterations
632 {
633 tracing::warn!(
634 "Repeated response detected at iteration {} — injecting correction",
635 iterations
636 );
637 self.history.push(Message {
638 role: Role::Assistant,
639 content: clean_content.to_string(),
640 tool_calls: vec![],
641 tool_result: None,
642 });
643 self.history.push(Message {
644 role: Role::User,
645 content: "You gave the same response as before. Try a different approach. Use anchor_text in edit_file_lines, or use insert_after, or use bash with sed.".to_string(),
646 tool_calls: vec![],
647 tool_result: None,
648 });
649 return false;
650 }
651 }
652 }
653
654 true
655 }
656
657 async fn check_tool_permissions(
660 &mut self,
661 tool_calls: &[ToolCallRequest],
662 on_permission: Option<&PermissionCallback>,
663 on_tool: Option<&ToolCallback>,
664 on_tool_start: Option<&ToolStartCallback>,
665 ) -> (
666 Vec<(usize, ToolCallRequest)>,
667 Vec<Option<ToolCallRecord>>,
668 Vec<Option<Message>>,
669 Vec<bool>,
670 ) {
671 let mut ordered_records: Vec<Option<ToolCallRecord>> =
672 vec![None; tool_calls.len()];
673 let mut ordered_tool_messages: Vec<Option<Message>> =
674 vec![None; tool_calls.len()];
675 let ordered_compile_gate: Vec<bool> = vec![false; tool_calls.len()];
676 let mut pending: Vec<(usize, ToolCallRequest)> = Vec::new();
677
678 for (idx, tool_call) in tool_calls.iter().cloned().enumerate() {
679 self.tools.activate(&tool_call.name);
680
681 let perm = crate::config::ToolPermission::resolve(
682 &tool_call.name,
683 &self.config.permissions,
684 );
685 let denied = match perm {
686 crate::config::ToolPermission::Deny => Some("Tool denied by permission policy"),
687 crate::config::ToolPermission::Prompt => {
688 if tool_call.name == "bash" {
689 if let Some(cmd) =
690 tool_call.arguments.get("command").and_then(|v| v.as_str())
691 {
692 if crate::tools::bash::is_read_only(cmd) {
693 tracing::debug!(command = cmd, "Auto-allowing read-only bash command under Prompt permission");
694 None
695 } else if let Some(ref perm_cb) = on_permission {
696 let args_summary = cmd.chars().take(120).collect::<String>();
697 let rx = perm_cb(PermissionRequest {
698 tool_name: tool_call.name.clone(),
699 args_summary,
700 });
701 match rx.await {
702 Ok(true) => None,
703 _ => Some("User denied tool execution"),
704 }
705 } else {
706 Some("Bash command requires user approval (read-only commands auto-allowed)")
707 }
708 } else {
709 Some("Tool requires user approval")
710 }
711 } else if let Some(ref perm_cb) = on_permission {
712 let args_summary = tool_call
713 .arguments
714 .to_string()
715 .chars()
716 .take(120)
717 .collect::<String>();
718 let rx = perm_cb(PermissionRequest {
719 tool_name: tool_call.name.clone(),
720 args_summary,
721 });
722 match rx.await {
723 Ok(true) => None,
724 _ => Some("User denied tool execution"),
725 }
726 } else {
727 Some("Tool requires user approval (set permission to allow or use TUI mode)")
728 }
729 }
730 crate::config::ToolPermission::Allow => None,
731 };
732
733 if let Some(reason) = denied {
734 let record = ToolCallRecord {
735 id: tool_call.id.clone(),
736 name: tool_call.name.clone(),
737 arguments: tool_call.arguments.clone(),
738 result: json!({"error": reason}),
739 success: false,
740 duration_ms: 0,
741 };
742 if let Some(ref callback) = on_tool {
743 callback(&record);
744 }
745 ordered_records[idx] = Some(record);
746 ordered_tool_messages[idx] = Some(Message {
747 role: Role::Tool,
748 content: serde_json::to_string(&json!({"error": reason}))
749 .unwrap_or_default(),
750 tool_calls: vec![],
751 tool_result: Some(ToolResultMessage {
752 tool_call_id: tool_call.id.clone(),
753 content: json!({"error": reason}),
754 success: false,
755 }),
756 });
757 continue;
758 }
759
760 if let Some(ref callback) = on_tool_start {
761 callback(&tool_call.name);
762 }
763
764 if let Some(tool) = self.tools.get(&tool_call.name) {
765 let schema = tool.parameters_schema();
766 if let Ok(params) = thulp_core::ToolDefinition::parse_mcp_input_schema(&schema)
767 {
768 let thulp_def = thulp_core::ToolDefinition {
769 name: tool_call.name.clone(),
770 description: String::new(),
771 parameters: params,
772 };
773 if let Err(e) = thulp_def.validate_args(&tool_call.arguments) {
774 tracing::warn!(tool = tool_call.name.as_str(), error = %e, "Tool argument validation failed (continuing anyway)");
775 }
776 }
777 }
778
779 let tool = self.tools.get(&tool_call.name);
780 let is_mutating = tool.map(|t| t.mutating()).unwrap_or(false);
781 if is_mutating {
782 if let Some(ref callback) = on_permission {
783 let args_summary = summarize_args(&tool_call.arguments);
784 let request = PermissionRequest {
785 tool_name: tool_call.name.clone(),
786 args_summary,
787 };
788 let permission_rx = (callback)(request);
789 match permission_rx.await {
790 Ok(true) => {}
791 Ok(false) => {
792 let record = ToolCallRecord {
793 id: tool_call.id.clone(),
794 name: tool_call.name.clone(),
795 arguments: tool_call.arguments.clone(),
796 result: json!({"error": "Tool execution denied by user", "tool": tool_call.name}),
797 success: false,
798 duration_ms: 0,
799 };
800 if let Some(ref callback) = on_tool {
801 callback(&record);
802 }
803 ordered_records[idx] = Some(record);
804 ordered_tool_messages[idx] = Some(Message {
805 role: Role::Tool,
806 content: serde_json::to_string(&json!({"error": "Tool execution denied by user", "tool": tool_call.name})).unwrap_or_default(),
807 tool_calls: vec![],
808 tool_result: Some(ToolResultMessage {
809 tool_call_id: tool_call.id.clone(),
810 content: json!({"error": "Tool execution denied by user", "tool": tool_call.name}),
811 success: false,
812 }),
813 });
814 continue;
815 }
816 Err(_) => {
817 let record = ToolCallRecord {
818 id: tool_call.id.clone(),
819 name: tool_call.name.clone(),
820 arguments: tool_call.arguments.clone(),
821 result: json!({"error": "Permission channel closed", "tool": tool_call.name}),
822 success: false,
823 duration_ms: 0,
824 };
825 if let Some(ref callback) = on_tool {
826 callback(&record);
827 }
828 ordered_records[idx] = Some(record);
829 ordered_tool_messages[idx] = Some(Message {
830 role: Role::Tool,
831 content: serde_json::to_string(&json!({"error": "Permission channel closed", "tool": tool_call.name})).unwrap_or_default(),
832 tool_calls: vec![],
833 tool_result: Some(ToolResultMessage {
834 tool_call_id: tool_call.id.clone(),
835 content: json!({"error": "Permission channel closed", "tool": tool_call.name}),
836 success: false,
837 }),
838 });
839 continue;
840 }
841 }
842 } else {
843 tracing::warn!(
844 tool = tool_call.name.as_str(),
845 "No permission callback, auto-approving mutating tool"
846 );
847 }
848 }
849
850 pending.push((idx, tool_call));
851 }
852
853 (pending, ordered_records, ordered_tool_messages, ordered_compile_gate)
854 }
855
856 async fn execute_pending_tools(
858 tools: &ToolRegistry,
859 bash_timeout_secs: u64,
860 max_result_chars: usize,
861 pending: Vec<(usize, ToolCallRequest)>,
862 on_tool: Option<&ToolCallback>,
863 ) -> Vec<(usize, ToolCallRecord, Message, bool)> {
864 use futures::{stream, StreamExt};
865
866 let on_tool_cb = on_tool;
867 let max_parallel = std::cmp::max(1, 10);
868 stream::iter(pending)
869 .map(|(idx, tool_call)| async move {
870 let start = std::time::Instant::now();
871
872 let result = {
873 let tool_future = tools.execute(&tool_call.name, tool_call.arguments.clone());
874 let timeout_dur = if tool_call.name == "bash" {
875 std::time::Duration::from_secs(bash_timeout_secs)
876 } else {
877 std::time::Duration::from_secs(30)
878 };
879 match tokio::time::timeout(timeout_dur, tool_future).await {
880 Ok(inner) => inner,
881 Err(_) => Err(PawanError::Tool(format!(
882 "Tool {} timed out after {}s",
883 tool_call.name,
884 timeout_dur.as_secs()
885 ))),
886 }
887 };
888
889 let duration_ms = start.elapsed().as_millis() as u64;
890 let (mut result_value, success) = match result {
891 Ok(v) => (v, true),
892 Err(e) => {
893 tracing::warn!(tool = tool_call.name.as_str(), error = %e, "Tool execution failed");
894 (json!({"error": e.to_string(), "tool": tool_call.name, "hint": "Try a different approach or tool"}), false)
895 }
896 };
897
898 result_value = truncate_tool_result(result_value, max_result_chars);
899
900 let record = ToolCallRecord {
901 id: tool_call.id.clone(),
902 name: tool_call.name.clone(),
903 arguments: tool_call.arguments.clone(),
904 result: result_value.clone(),
905 success,
906 duration_ms,
907 };
908
909 if let Some(ref cb) = on_tool_cb {
910 cb(&record);
911 }
912
913 let tool_msg = Message {
914 role: Role::Tool,
915 content: serde_json::to_string(&result_value).unwrap_or_default(),
916 tool_calls: vec![],
917 tool_result: Some(ToolResultMessage {
918 tool_call_id: tool_call.id.clone(),
919 content: result_value,
920 success,
921 }),
922 };
923
924 let wrote_rs = success
925 && tool_call.name == "write_file"
926 && tool_call
927 .arguments
928 .get("path")
929 .and_then(|p| p.as_str())
930 .map(|p| p.ends_with(".rs"))
931 .unwrap_or(false);
932
933 (idx, record, tool_msg, wrote_rs)
934 })
935 .buffer_unordered(max_parallel)
936 .collect::<Vec<_>>()
937 .await
938 }
939
940 async fn collect_tool_results(
943 &mut self,
944 all_tool_calls: &mut Vec<ToolCallRecord>,
945 ordered_records: &mut [Option<ToolCallRecord>],
946 ordered_tool_messages: &mut [Option<Message>],
947 ordered_compile_gate: &[bool],
948 tool_calls_count: usize,
949 ) {
950 for i in 0..tool_calls_count {
951 if let Some(record) = ordered_records[i].take() {
952 all_tool_calls.push(record);
953 }
954 if let Some(msg) = ordered_tool_messages[i].take() {
955 self.history.push(msg);
956 }
957
958 if ordered_compile_gate[i] {
959 let ws = self.workspace_root.clone();
960 let check_result = tokio::process::Command::new("cargo")
961 .arg("check")
962 .arg("--message-format=short")
963 .current_dir(&ws)
964 .output()
965 .await;
966 match check_result {
967 Ok(output) if !output.status.success() => {
968 let stderr = String::from_utf8_lossy(&output.stderr);
969 let err_msg: String = stderr.chars().take(1500).collect();
970 tracing::info!("Compile-gate: cargo check failed after write_file, injecting errors");
971 self.history.push(Message {
972 role: Role::User,
973 content: format!(
974 "[SYSTEM] cargo check failed after your write_file. Fix the errors:\n{}",
975 err_msg
976 ),
977 tool_calls: vec![],
978 tool_result: None,
979 });
980 }
981 Ok(_) => {
982 tracing::debug!("Compile-gate: cargo check passed");
983 }
984 Err(e) => {
985 tracing::warn!("Compile-gate: cargo check failed to run: {}", e);
986 }
987 }
988 }
989 }
990 }
991
992 async fn execute_with_coordinator(&mut self, user_prompt: &str) -> Result<AgentResponse> {
1004 self.last_tool_call_time = None;
1006
1007 if let Some(eruka) = &self.eruka {
1009 let before_inject = self.history.len();
1010 if let Err(e) = eruka.inject_core_memory(&mut self.history).await {
1011 tracing::warn!("Eruka memory injection failed (non-fatal): {}", e);
1012 }
1013
1014 for msg in self
1015 .history
1016 .iter_mut()
1017 .skip(before_inject)
1018 .filter(|m| m.role == Role::System)
1019 {
1020 let fenced = prepare_recalled_context("eruka_core_memory", &msg.content);
1021 if !fenced.is_empty() {
1022 msg.content = fenced;
1023 }
1024 }
1025
1026 match eruka.prefetch(user_prompt, 2000).await {
1028 Ok(Some(ctx)) => {
1029 let fenced = prepare_recalled_context("eruka_prefetch", &ctx);
1030 if !fenced.is_empty() {
1031 self.history.push(Message {
1032 role: Role::System,
1033 content: fenced,
1034 tool_calls: vec![],
1035 tool_result: None,
1036 });
1037 }
1038 }
1039 Ok(None) => {}
1040 Err(e) => tracing::warn!("Eruka prefetch failed (non-fatal): {}", e),
1041 }
1042 }
1043
1044 if let Some(err) = &self.arch_context_error {
1047 return Err(PawanError::Config(err.clone()));
1048 }
1049
1050 let effective_prompt = match &self.arch_context {
1051 Some(ctx) => format!(
1052 "[Workspace Architecture]\n{ctx}\n[/Workspace Architecture]\n\n{user_prompt}"
1053 ),
1054 None => user_prompt.to_string(),
1055 };
1056
1057 let coordinator_config = ToolCallingConfig {
1059 max_iterations: self.config.max_tool_iterations,
1060 parallel_execution: true,
1061 max_parallel_tools: 10,
1062 tool_timeout: std::time::Duration::from_secs(self.config.bash_timeout_secs),
1063 stop_on_error: false,
1064 };
1065
1066 let system_prompt = self.config.get_system_prompt_checked()?;
1068 let backend = Self::create_backend(&self.config, &system_prompt);
1069 let backend = Arc::from(backend);
1070
1071 let registry = Arc::new(ToolRegistry::with_defaults(self.workspace_root.clone()));
1074
1075 let coordinator = ToolCoordinator::new(backend, registry, coordinator_config);
1077
1078 let result: CoordinatorResult = coordinator
1080 .execute(Some(&system_prompt), &effective_prompt)
1081 .await
1082 .map_err(|e| PawanError::Agent(format!("Coordinator execution failed: {}", e)))?;
1083
1084 let content = result.content.clone();
1086 let agent_response = AgentResponse {
1087 content: result.content,
1088 tool_calls: result.tool_calls,
1089 iterations: result.iterations,
1090 usage: result.total_usage,
1091 };
1092
1093 if let Some(eruka) = &self.eruka {
1095 if let Err(e) = eruka
1096 .sync_turn(user_prompt, &content, &self.session_id)
1097 .await
1098 {
1099 tracing::warn!("Eruka sync_turn failed (non-fatal): {}", e);
1100 }
1101 }
1102
1103 Ok(agent_response)
1104 }
1105
1106 pub async fn heal(&mut self) -> Result<AgentResponse> {
1108 let healer =
1109 crate::healing::Healer::new(self.workspace_root.clone(), self.config.healing.clone());
1110
1111 let diagnostics = healer.get_diagnostics().await?;
1112 let failed_tests = healer.get_failed_tests().await?;
1113
1114 let mut prompt = format!(
1115 "I need you to heal this Rust project at: {}
1116
1117",
1118 self.workspace_root.display()
1119 );
1120
1121 if !diagnostics.is_empty() {
1122 prompt.push_str(&format!(
1123 "## Compilation Issues ({} found)
1124{}
1125",
1126 diagnostics.len(),
1127 healer.format_diagnostics_for_prompt(&diagnostics)
1128 ));
1129 }
1130
1131 if !failed_tests.is_empty() {
1132 prompt.push_str(&format!(
1133 "## Failed Tests ({} found)
1134{}
1135",
1136 failed_tests.len(),
1137 healer.format_tests_for_prompt(&failed_tests)
1138 ));
1139 }
1140
1141 if diagnostics.is_empty() && failed_tests.is_empty() {
1142 prompt.push_str(
1143 "No issues found! Run cargo check and cargo test to verify.
1144",
1145 );
1146 }
1147
1148 prompt.push_str(
1149 "
1150Fix each issue one at a time. Verify with cargo check after each fix.",
1151 );
1152
1153 self.execute(&prompt).await
1154 }
1155 pub async fn heal_with_retries(&mut self, max_attempts: usize) -> Result<AgentResponse> {
1168 use std::collections::{HashMap, HashSet};
1169
1170 let mut last_response = self.heal().await?;
1171 let mut stuck_counts: HashMap<u64, usize> = HashMap::new();
1173
1174 for attempt in 1..max_attempts {
1175 let fixer = crate::healing::CompilerFixer::new(self.workspace_root.clone());
1177 let remaining = fixer.check().await?;
1178 let errors: Vec<_> = remaining
1179 .iter()
1180 .filter(|d| d.kind == crate::healing::DiagnosticKind::Error)
1181 .collect();
1182
1183 if !errors.is_empty() {
1184 let current_fps: HashSet<u64> = errors.iter().map(|d| d.fingerprint()).collect();
1187 stuck_counts.retain(|fp, _| current_fps.contains(fp));
1188 for fp in ¤t_fps {
1189 *stuck_counts.entry(*fp).or_insert(0) += 1;
1190 }
1191
1192 let thrashing: Vec<u64> = stuck_counts
1195 .iter()
1196 .filter_map(|(&fp, &count)| {
1197 if count >= max_attempts {
1198 Some(fp)
1199 } else {
1200 None
1201 }
1202 })
1203 .collect();
1204 if !thrashing.is_empty() {
1205 tracing::warn!(
1206 stuck_fingerprints = thrashing.len(),
1207 attempt,
1208 "Anti-thrash: {} error(s) unchanged after {} attempts, halting heal loop",
1209 thrashing.len(),
1210 max_attempts
1211 );
1212 return Ok(last_response);
1213 }
1214
1215 tracing::warn!(
1216 errors = errors.len(),
1217 attempt,
1218 "Stage 1 (cargo check): errors remain, retrying"
1219 );
1220 last_response = self.heal().await?;
1221 continue;
1222 }
1223
1224 stuck_counts.clear();
1226
1227 let verify_cmd = self.config.healing.verify_cmd.clone();
1229 if let Some(ref cmd) = verify_cmd {
1230 match crate::healing::run_verify_cmd(&self.workspace_root, cmd).await {
1231 Ok(None) => {
1232 tracing::info!(
1233 attempts = attempt,
1234 "Stage 2 (verify_cmd) passed, healing complete"
1235 );
1236 return Ok(last_response);
1237 }
1238 Ok(Some(diag)) => {
1239 tracing::warn!(
1240 attempt,
1241 cmd,
1242 output = diag.raw,
1243 "Stage 2 (verify_cmd) failed, retrying"
1244 );
1245 last_response = self.heal().await?;
1246 continue;
1247 }
1248 Err(e) => {
1249 tracing::warn!(cmd, error = %e, "verify_cmd could not be run, skipping stage 2");
1251 return Ok(last_response);
1252 }
1253 }
1254 } else {
1255 tracing::info!(
1256 attempts = attempt,
1257 "Stage 1 (cargo check) passed, healing complete"
1258 );
1259 return Ok(last_response);
1260 }
1261 }
1262
1263 tracing::info!(
1264 attempts = max_attempts,
1265 "Healing finished (may still have errors)"
1266 );
1267 Ok(last_response)
1268 }
1269 pub async fn task(&mut self, task_description: &str) -> Result<AgentResponse> {
1271 let prompt = format!(
1272 r#"I need you to complete the following coding task:
1273
1274{}
1275
1276The workspace is at: {}
1277
1278Please:
12791. First explore the codebase to understand the relevant code
12802. Make the necessary changes
12813. Verify the changes compile with `cargo check`
12824. Run relevant tests if applicable
1283
1284Explain your changes as you go."#,
1285 task_description,
1286 self.workspace_root.display()
1287 );
1288
1289 self.execute(&prompt).await
1290 }
1291
1292 pub async fn generate_commit_message(&mut self) -> Result<String> {
1294 let prompt = r#"Please:
12951. Run `git status` to see what files are changed
12962. Run `git diff --cached` to see staged changes (or `git diff` for unstaged)
12973. Generate a concise, descriptive commit message following conventional commits format
1298
1299Only output the suggested commit message, nothing else."#;
1300
1301 let response = self.execute(prompt).await?;
1302 Ok(response.content)
1303 }
1304}