1use crate::agent::api::{build_request_with_tools, create_llm_client};
2use crate::agent::thread_identity::{current_agent_name, current_agent_type};
3use crate::chat_error::ChatError;
4use crate::context::compact::{CompactConfig, new_invoked_skills_map};
5use crate::infra::hook::HookManager;
6use crate::llm::{ChatResponse, ToolCall, ToolDefinition};
7use crate::message_types::AskRequest;
8use crate::permission::JcliConfig;
9use crate::permission::queue::{PendingAgentPerm, PermissionQueue};
10use crate::storage::{ChatMessage, DisplayHint, MessageRole, ModelProvider, ToolCallItem};
11use crate::tools::ToolRegistry;
12use crate::tools::background::BackgroundManager;
13use crate::tools::plan::{PlanApprovalQueue, PlanModeState};
14use crate::tools::task::TaskManager;
15use crate::util::log::write_info_log;
16use rand::Rng;
17use serde::{Deserialize, Serialize};
18use std::sync::{
19 Arc, Mutex,
20 atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
21 mpsc,
22};
23use std::time::Instant;
24
25#[derive(Clone, Debug, PartialEq)]
29pub enum SubAgentStatus {
30 Initializing,
32 Thinking,
34 Working,
36 Retrying {
38 attempt: u32,
39 max_attempts: u32,
40 delay_ms: u64,
41 error: String,
42 },
43 Completed,
45 Cancelled,
47 Error(String),
49}
50
51impl SubAgentStatus {
52 pub fn icon(&self) -> &'static str {
54 match self {
55 Self::Initializing => "◐",
56 Self::Thinking => "◐",
57 Self::Working => "●",
58 Self::Retrying { .. } => "↻",
59 Self::Completed => "✓",
60 Self::Cancelled => "✗",
61 Self::Error(_) => "✗",
62 }
63 }
64
65 pub fn label(&self) -> &'static str {
67 match self {
68 Self::Initializing => "初始化",
69 Self::Thinking => "思考中",
70 Self::Working => "执行中",
71 Self::Retrying { .. } => "重试中",
72 Self::Completed => "已完成",
73 Self::Cancelled => "已取消",
74 Self::Error(_) => "错误",
75 }
76 }
77
78 #[allow(dead_code)]
80 pub fn is_terminal(&self) -> bool {
81 matches!(self, Self::Completed | Self::Cancelled | Self::Error(_))
82 }
83}
84
85pub struct SubAgentSnapshot {
87 pub id: String,
88 pub description: String,
89 pub mode: &'static str, pub is_running: Arc<AtomicBool>,
91 pub system_prompt: Arc<Mutex<String>>,
92 pub messages: Arc<Mutex<Vec<ChatMessage>>>,
93 pub status: Arc<Mutex<SubAgentStatus>>,
95 pub current_tool: Arc<Mutex<Option<String>>>,
97 pub tool_calls_count: Arc<AtomicUsize>,
99 pub current_round: Arc<AtomicUsize>,
101 pub started_at: Instant,
103}
104
105#[derive(Clone, Debug)]
107#[allow(dead_code)]
108pub struct SubAgentDisplay {
109 pub id: String,
110 pub description: String,
111 pub mode: &'static str,
112 pub status: SubAgentStatus,
113 pub current_tool: Option<String>,
114 pub tool_calls_count: usize,
115 pub current_round: usize,
116 pub elapsed_secs: u64,
117}
118
119pub struct SubAgentTracker {
124 agents: Mutex<Vec<SubAgentSnapshot>>,
125 counter: AtomicU64,
126}
127
128pub type RunningSubAgentDump = (String, String, &'static str, String, Vec<ChatMessage>);
130
131#[allow(dead_code)]
133pub struct SubAgentHandle {
134 pub id: String,
135 pub is_running: Arc<AtomicBool>,
136 pub system_prompt: Arc<Mutex<String>>,
137 pub messages: Arc<Mutex<Vec<ChatMessage>>>,
138 pub status: Arc<Mutex<SubAgentStatus>>,
139 pub current_tool: Arc<Mutex<Option<String>>>,
140 pub tool_calls_count: Arc<AtomicUsize>,
141 pub current_round: Arc<AtomicUsize>,
142}
143
144impl SubAgentTracker {
145 pub fn new() -> Self {
147 Self {
148 agents: Mutex::new(Vec::new()),
149 counter: AtomicU64::new(1),
150 }
151 }
152
153 pub fn allocate_id(&self) -> String {
157 format!("sub_{:04}", self.counter.fetch_add(1, Ordering::Relaxed))
158 }
159
160 pub fn register_with_id(
162 &self,
163 id: String,
164 description: &str,
165 mode: &'static str,
166 ) -> SubAgentHandle {
167 let is_running = Arc::new(AtomicBool::new(true));
168 let system_prompt = Arc::new(Mutex::new(String::new()));
169 let messages = Arc::new(Mutex::new(Vec::new()));
170 let status = Arc::new(Mutex::new(SubAgentStatus::Initializing));
171 let current_tool = Arc::new(Mutex::new(None));
172 let tool_calls_count = Arc::new(AtomicUsize::new(0));
173 let current_round = Arc::new(AtomicUsize::new(0));
174 if let Ok(mut list) = self.agents.lock() {
175 list.push(SubAgentSnapshot {
176 id: id.clone(),
177 description: description.to_string(),
178 mode,
179 is_running: Arc::clone(&is_running),
180 system_prompt: Arc::clone(&system_prompt),
181 messages: Arc::clone(&messages),
182 status: Arc::clone(&status),
183 current_tool: Arc::clone(¤t_tool),
184 tool_calls_count: Arc::clone(&tool_calls_count),
185 current_round: Arc::clone(¤t_round),
186 started_at: Instant::now(),
187 });
188 }
189 SubAgentHandle {
190 id,
191 is_running,
192 system_prompt,
193 messages,
194 status,
195 current_tool,
196 tool_calls_count,
197 current_round,
198 }
199 }
200
201 pub fn snapshot_running(&self) -> Vec<RunningSubAgentDump> {
203 let list = match self.agents.lock() {
204 Ok(l) => l,
205 Err(_) => return Vec::new(),
206 };
207 list.iter()
208 .filter(|s| s.is_running.load(Ordering::Relaxed))
209 .map(|s| {
210 let sp = s
211 .system_prompt
212 .lock()
213 .map(|x| x.clone())
214 .unwrap_or_default();
215 let msgs = s.messages.lock().map(|x| x.clone()).unwrap_or_default();
216 (s.id.clone(), s.description.clone(), s.mode, sp, msgs)
217 })
218 .collect()
219 }
220
221 pub fn display_snapshots(&self) -> Vec<SubAgentDisplay> {
223 let list = match self.agents.lock() {
224 Ok(l) => l,
225 Err(_) => return Vec::new(),
226 };
227 list.iter()
228 .map(|s| {
229 let status = s
230 .status
231 .lock()
232 .map(|x| x.clone())
233 .unwrap_or(SubAgentStatus::Working);
234 let current_tool = s.current_tool.lock().ok().and_then(|t| t.clone());
235 SubAgentDisplay {
236 id: s.id.clone(),
237 description: s.description.clone(),
238 mode: s.mode,
239 status,
240 current_tool,
241 tool_calls_count: s.tool_calls_count.load(Ordering::Relaxed),
242 current_round: s.current_round.load(Ordering::Relaxed),
243 elapsed_secs: s.started_at.elapsed().as_secs(),
244 }
245 })
246 .collect()
247 }
248
249 pub fn gc_finished(&self) {
253 if let Ok(mut list) = self.agents.lock() {
254 list.retain(|s| {
255 if s.is_running.load(Ordering::Relaxed) {
256 return true;
257 }
258 s.started_at.elapsed().as_secs() < 30
260 || matches!(
261 s.status.lock().map(|x| x.clone()),
262 Ok(SubAgentStatus::Working)
263 | Ok(SubAgentStatus::Initializing)
264 | Ok(SubAgentStatus::Retrying { .. })
265 )
266 });
267 }
268 }
269
270 pub fn clear_all(&self) {
274 if let Ok(mut list) = self.agents.lock() {
275 for s in list.iter() {
276 s.is_running.store(false, Ordering::Relaxed);
277 }
278 list.clear();
279 }
280 self.counter.store(1, Ordering::Relaxed);
281 }
282}
283
284impl Default for SubAgentTracker {
285 fn default() -> Self {
286 Self::new()
287 }
288}
289
290#[derive(Debug, Clone, Default, Serialize, Deserialize)]
297pub struct SubAgentMetrics {
298 pub total_llm_calls: u32,
300 pub total_tool_calls: u32,
302 pub total_input_tokens: u64,
304 pub total_output_tokens: u64,
306 pub total_llm_elapsed_ms: u64,
308 pub total_tool_elapsed_ms: u64,
310 pub llm_elapsed_ms_per_call: Vec<u64>,
312}
313
314#[derive(Clone)]
323pub struct DerivedAgentShared {
324 pub background_manager: Arc<BackgroundManager>,
325 pub provider: Arc<Mutex<ModelProvider>>,
326 pub system_prompt: Arc<Mutex<Option<String>>>,
327 pub jcli_config: Arc<JcliConfig>,
328 pub hook_manager: Arc<Mutex<HookManager>>,
329 pub task_manager: Arc<TaskManager>,
330 pub disabled_tools: Arc<Vec<String>>,
331 pub deferred_tools: Arc<Mutex<Vec<String>>>,
333 pub session_loaded_deferred: Arc<Mutex<Vec<String>>>,
335 pub permission_queue: Arc<PermissionQueue>,
337 pub plan_approval_queue: Arc<PlanApprovalQueue>,
339 pub sub_agent_tracker: Arc<SubAgentTracker>,
341 pub display_messages: Arc<Mutex<Vec<ChatMessage>>>,
343 pub context_messages: Arc<Mutex<Vec<ChatMessage>>>,
345 pub session_id: Arc<Mutex<String>>,
347 pub plan_mode_state: Arc<PlanModeState>,
349 pub agent_context_config: Arc<Mutex<AgentContextConfig>>,
352 pub disabled_hooks: Arc<Mutex<Vec<String>>>,
354 pub sub_agent_metrics: Arc<Mutex<SubAgentMetrics>>,
357}
358
359#[derive(Clone, Debug)]
361pub struct AgentContextConfig {
362 pub max_history_messages: usize,
363 pub max_context_tokens: usize,
364 pub compact: CompactConfig,
365}
366
367impl DerivedAgentShared {
368 pub fn build_child_registry(
377 &self,
378 todos_file_path: std::path::PathBuf,
379 ) -> (ToolRegistry, mpsc::Receiver<AskRequest>) {
380 let (ask_tx, ask_rx) = mpsc::channel::<AskRequest>();
381
382 let mut registry = ToolRegistry::new(
383 vec![], ask_tx,
385 Arc::clone(&self.background_manager),
386 Arc::clone(&self.task_manager),
387 Arc::clone(&self.hook_manager),
388 new_invoked_skills_map(),
389 todos_file_path,
390 );
391 registry.permission_queue = Some(Arc::clone(&self.permission_queue));
393 registry.plan_approval_queue = Some(Arc::clone(&self.plan_approval_queue));
395 registry.plan_mode_state = Arc::clone(&self.plan_mode_state);
397 (registry, ask_rx)
398 }
399} pub fn create_runtime_and_client(
405 provider: &ModelProvider,
406) -> Result<(tokio::runtime::Runtime, crate::llm::LlmClient), String> {
407 let rt = tokio::runtime::Runtime::new()
408 .map_err(|e| format!("Failed to create async runtime: {}", e))?;
409 let client = create_llm_client(provider);
410 Ok((rt, client))
411}
412
413pub struct LlmNonStreamRequest<'a> {
415 pub rt: &'a tokio::runtime::Runtime,
416 pub client: &'a crate::llm::LlmClient,
417 pub provider: &'a ModelProvider,
418 pub messages: &'a [ChatMessage],
419 pub tools: &'a [ToolDefinition],
420 pub system_prompt: Option<&'a str>,
421 pub on_retry: Option<&'a RetryCallback>,
422}
423
424pub type RetryCallback = dyn Fn(u32, u32, u64, &str);
429
430pub fn call_llm_non_stream(req: &LlmNonStreamRequest) -> Result<ChatResponse, String> {
438 let request = build_request_with_tools(
439 req.provider,
440 req.messages,
441 req.tools.to_vec(),
442 req.system_prompt,
443 )
444 .map_err(|e| format!("Failed to build request: {}", e))?;
445
446 let mut attempt: u32 = 0;
447 loop {
448 attempt += 1;
449 match req
450 .rt
451 .block_on(async { req.client.chat_completion(&request).await })
452 {
453 Ok(response) => {
454 if response.choices.is_empty() {
456 return Err("[No response from API]".to_string());
457 }
458 return Ok(response);
459 }
460 Err(e) => {
461 let chat_err = ChatError::from(e);
462 if let Some(policy) = derived_retry_policy(&chat_err)
463 && attempt <= policy.max_attempts
464 {
465 let delay_ms = backoff_delay_ms(attempt, policy.base_ms, policy.cap_ms);
466 write_info_log(
467 "SubAgentLLM",
468 &format!(
469 "API 请求失败,{}ms 后重试 ({}/{})",
470 delay_ms, attempt, policy.max_attempts
471 ),
472 );
473 if let Some(cb) = req.on_retry {
474 cb(
475 attempt,
476 policy.max_attempts,
477 delay_ms,
478 &chat_err.display_message(),
479 );
480 }
481 std::thread::sleep(std::time::Duration::from_millis(delay_ms));
482 continue;
483 }
484 return Err(chat_err.display_message());
485 }
486 }
487 }
488}
489
490struct DerivedRetryPolicy {
494 max_attempts: u32,
496 base_ms: u64,
498 cap_ms: u64,
500}
501
502fn derived_retry_policy(error: &ChatError) -> Option<DerivedRetryPolicy> {
507 match error {
508 ChatError::NetworkTimeout(_) | ChatError::NetworkError(_) => Some(DerivedRetryPolicy {
509 max_attempts: 8,
510 base_ms: 2_000,
511 cap_ms: 30_000,
512 }),
513 ChatError::ApiServerError { status, .. } => match status {
514 503 | 504 | 529 => Some(DerivedRetryPolicy {
515 max_attempts: 8,
516 base_ms: 3_000,
517 cap_ms: 30_000,
518 }),
519 500 | 502 => Some(DerivedRetryPolicy {
520 max_attempts: 8,
521 base_ms: 3_000,
522 cap_ms: 30_000,
523 }),
524 _ => None,
525 },
526 ChatError::ApiRateLimit { .. } => Some(DerivedRetryPolicy {
527 max_attempts: 8,
528 base_ms: 5_000,
529 cap_ms: 60_000,
530 }),
531 ChatError::AbnormalFinish(reason)
532 if matches!(reason.as_str(), "network_error" | "timeout" | "overloaded") =>
533 {
534 Some(DerivedRetryPolicy {
535 max_attempts: 8,
536 base_ms: 2_000,
537 cap_ms: 30_000,
538 })
539 }
540 ChatError::Other(msg)
541 if msg.contains("访问量过大")
542 || msg.contains("过载")
543 || msg.contains("overloaded")
544 || msg.contains("too busy")
545 || msg.contains("1305") =>
546 {
547 Some(DerivedRetryPolicy {
548 max_attempts: 8,
549 base_ms: 3_000,
550 cap_ms: 30_000,
551 })
552 }
553 _ => None,
554 }
555}
556
557fn backoff_delay_ms(attempt: u32, base_ms: u64, cap_ms: u64) -> u64 {
561 let shift = (attempt - 1).min(10) as u64;
562 let exp = base_ms.saturating_mul(1u64 << shift).min(cap_ms);
563 let jitter = rand::thread_rng().gen_range(0..=(exp / 5));
564 exp + jitter
565}
566
567pub fn extract_tool_items(tool_calls: &[ToolCall]) -> Vec<ToolCallItem> {
569 tool_calls
570 .iter()
571 .map(|tc| ToolCallItem {
572 id: tc.id.clone(),
573 name: tc.function.name.clone(),
574 arguments: tc.function.arguments.clone(),
575 })
576 .collect()
577}
578
579pub struct ToolExecContext<'a> {
581 pub registry: &'a Arc<ToolRegistry>,
582 pub jcli_config: &'a Arc<JcliConfig>,
583 pub cancelled: &'a Arc<AtomicBool>,
584 pub log_tag: &'a str,
585 pub verbose: bool,
586}
587
588pub fn execute_tool_with_permission(item: &ToolCallItem, ctx: &ToolExecContext) -> ChatMessage {
595 if ctx.cancelled.load(Ordering::Relaxed) {
596 return ChatMessage {
597 role: MessageRole::Tool,
598 content: "[Cancelled]".to_string(),
599 tool_calls: None,
600 tool_call_id: Some(item.id.clone()),
601 images: None,
602 reasoning_content: None,
603 sender_name: None,
604 recipient_name: None,
605 display_hint: DisplayHint::Normal,
606 };
607 }
608
609 if ctx.jcli_config.is_denied(&item.name, &item.arguments) {
611 if ctx.verbose {
612 write_info_log(
613 ctx.log_tag,
614 &format!("Tool denied by deny rule: {}", item.name),
615 );
616 }
617 return ChatMessage {
618 role: MessageRole::Tool,
619 content: format!("Tool '{}' was denied by permission rules.", item.name),
620 tool_calls: None,
621 tool_call_id: Some(item.id.clone()),
622 images: None,
623 reasoning_content: None,
624 sender_name: None,
625 recipient_name: None,
626 display_hint: DisplayHint::Normal,
627 };
628 }
629
630 let tool_ref = ctx.registry.get(&item.name);
632 let requires_confirm = tool_ref.map(|t| t.requires_confirmation()).unwrap_or(false);
633
634 if requires_confirm && !ctx.jcli_config.is_allowed(&item.name, &item.arguments) {
635 if let Some(queue) = ctx.registry.permission_queue.as_ref() {
637 let agent_type = current_agent_type();
638 let agent_name = current_agent_name();
639 let confirm_msg = tool_ref
640 .map(|t| t.confirmation_message(&item.arguments))
641 .unwrap_or_else(|| format!("调用工具 {}", item.name));
642 let req = PendingAgentPerm::new(agent_type, agent_name, item.name.clone(), confirm_msg);
643 write_info_log(
644 ctx.log_tag,
645 &format!(
646 "Tool '{}' queued for user permission (60s timeout)",
647 item.name
648 ),
649 );
650 let approved = queue.request_blocking(req);
651 if !approved {
652 write_info_log(ctx.log_tag, &format!("Tool '{}' denied by user", item.name));
653 return ChatMessage {
654 role: MessageRole::Tool,
655 content: format!("Tool '{}' was denied by the user.", item.name),
656 tool_calls: None,
657 tool_call_id: Some(item.id.clone()),
658 images: None,
659 reasoning_content: None,
660 sender_name: None,
661 recipient_name: None,
662 display_hint: DisplayHint::Normal,
663 };
664 }
665 } else {
667 if ctx.verbose {
668 write_info_log(
669 ctx.log_tag,
670 &format!(
671 "Tool '{}' requires confirmation but not auto-allowed, denying",
672 item.name
673 ),
674 );
675 }
676 return ChatMessage {
677 role: MessageRole::Tool,
678 content: format!(
679 "Tool '{}' requires user confirmation which is not available in sub-agent mode. \
680 Add a permission rule to allow this tool automatically.",
681 item.name
682 ),
683 tool_calls: None,
684 tool_call_id: Some(item.id.clone()),
685 images: None,
686 reasoning_content: None,
687 sender_name: None,
688 recipient_name: None,
689 display_hint: DisplayHint::Normal,
690 };
691 }
692 }
693
694 if ctx.verbose {
695 write_info_log(
696 ctx.log_tag,
697 &format!("Executing tool: {} args: {}", item.name, item.arguments),
698 );
699 }
700
701 let result = ctx
702 .registry
703 .execute(&item.name, &item.arguments, ctx.cancelled);
704
705 if ctx.verbose {
706 write_info_log(
707 ctx.log_tag,
708 &format!(
709 "Tool result: {} is_error={} len={}",
710 item.name,
711 result.is_error,
712 result.output.len()
713 ),
714 );
715 }
716
717 ChatMessage {
718 role: MessageRole::Tool,
719 content: result.output,
720 tool_calls: None,
721 tool_call_id: Some(item.id.clone()),
722 images: None,
723 reasoning_content: None,
724 sender_name: None,
725 recipient_name: None,
726 display_hint: DisplayHint::Normal,
727 }
728}