use crate::command::chat::agent::api::{build_request_with_tools, create_openai_client};
use crate::command::chat::agent::compact::new_invoked_skills_map;
use crate::command::chat::agent::thread_identity::{current_agent_name, current_agent_type};
use crate::command::chat::app::AskRequest;
use crate::command::chat::error::ChatError;
use crate::command::chat::infra::hook::HookManager;
use crate::command::chat::permission::JcliConfig;
use crate::command::chat::permission::queue::{PendingAgentPerm, PermissionQueue};
use crate::command::chat::storage::{ChatMessage, ModelProvider, ToolCallItem};
use crate::command::chat::tools::ToolRegistry;
use crate::command::chat::tools::background::BackgroundManager;
use crate::command::chat::tools::plan::PlanApprovalQueue;
use crate::command::chat::tools::task::TaskManager;
use crate::util::log::write_info_log;
use rand::Rng;
use std::sync::{
Arc, Mutex,
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
mpsc,
};
use std::time::Instant;
#[derive(Clone, Debug, PartialEq)]
pub enum SubAgentStatus {
Initializing,
Working,
Completed,
Cancelled,
Error(String),
}
impl SubAgentStatus {
pub fn icon(&self) -> &'static str {
match self {
Self::Initializing => "◐",
Self::Working => "●",
Self::Completed => "✓",
Self::Cancelled => "✗",
Self::Error(_) => "✗",
}
}
pub fn label(&self) -> &'static str {
match self {
Self::Initializing => "初始化",
Self::Working => "工作中",
Self::Completed => "已完成",
Self::Cancelled => "已取消",
Self::Error(_) => "错误",
}
}
#[allow(dead_code)]
pub fn is_terminal(&self) -> bool {
matches!(self, Self::Completed | Self::Cancelled | Self::Error(_))
}
}
pub struct SubAgentSnapshot {
pub id: String,
pub description: String,
pub mode: &'static str, pub is_running: Arc<AtomicBool>,
pub system_prompt: Arc<Mutex<String>>,
pub messages: Arc<Mutex<Vec<ChatMessage>>>,
pub status: Arc<Mutex<SubAgentStatus>>,
pub current_tool: Arc<Mutex<Option<String>>>,
pub tool_calls_count: Arc<AtomicUsize>,
pub current_round: Arc<AtomicUsize>,
pub started_at: Instant,
}
#[derive(Clone, Debug)]
#[allow(dead_code)]
pub struct SubAgentDisplay {
pub id: String,
pub description: String,
pub mode: &'static str,
pub status: SubAgentStatus,
pub current_tool: Option<String>,
pub tool_calls_count: usize,
pub current_round: usize,
pub elapsed_secs: u64,
}
pub struct SubAgentTracker {
agents: Mutex<Vec<SubAgentSnapshot>>,
counter: AtomicU64,
}
pub type RunningSubAgentDump = (String, String, &'static str, String, Vec<ChatMessage>);
#[allow(dead_code)]
pub struct SubAgentHandle {
pub id: String,
pub is_running: Arc<AtomicBool>,
pub system_prompt: Arc<Mutex<String>>,
pub messages: Arc<Mutex<Vec<ChatMessage>>>,
pub status: Arc<Mutex<SubAgentStatus>>,
pub current_tool: Arc<Mutex<Option<String>>>,
pub tool_calls_count: Arc<AtomicUsize>,
pub current_round: Arc<AtomicUsize>,
}
impl SubAgentTracker {
pub fn new() -> Self {
Self {
agents: Mutex::new(Vec::new()),
counter: AtomicU64::new(1),
}
}
pub fn allocate_id(&self) -> String {
format!("sub_{:04}", self.counter.fetch_add(1, Ordering::Relaxed))
}
pub fn register_with_id(
&self,
id: String,
description: &str,
mode: &'static str,
) -> SubAgentHandle {
let is_running = Arc::new(AtomicBool::new(true));
let system_prompt = Arc::new(Mutex::new(String::new()));
let messages = Arc::new(Mutex::new(Vec::new()));
let status = Arc::new(Mutex::new(SubAgentStatus::Initializing));
let current_tool = Arc::new(Mutex::new(None));
let tool_calls_count = Arc::new(AtomicUsize::new(0));
let current_round = Arc::new(AtomicUsize::new(0));
if let Ok(mut list) = self.agents.lock() {
list.push(SubAgentSnapshot {
id: id.clone(),
description: description.to_string(),
mode,
is_running: Arc::clone(&is_running),
system_prompt: Arc::clone(&system_prompt),
messages: Arc::clone(&messages),
status: Arc::clone(&status),
current_tool: Arc::clone(¤t_tool),
tool_calls_count: Arc::clone(&tool_calls_count),
current_round: Arc::clone(¤t_round),
started_at: Instant::now(),
});
}
SubAgentHandle {
id,
is_running,
system_prompt,
messages,
status,
current_tool,
tool_calls_count,
current_round,
}
}
pub fn snapshot_running(&self) -> Vec<RunningSubAgentDump> {
let list = match self.agents.lock() {
Ok(l) => l,
Err(_) => return Vec::new(),
};
list.iter()
.filter(|s| s.is_running.load(Ordering::Relaxed))
.map(|s| {
let sp = s
.system_prompt
.lock()
.map(|x| x.clone())
.unwrap_or_default();
let msgs = s.messages.lock().map(|x| x.clone()).unwrap_or_default();
(s.id.clone(), s.description.clone(), s.mode, sp, msgs)
})
.collect()
}
pub fn display_snapshots(&self) -> Vec<SubAgentDisplay> {
let list = match self.agents.lock() {
Ok(l) => l,
Err(_) => return Vec::new(),
};
list.iter()
.map(|s| {
let status = s
.status
.lock()
.map(|x| x.clone())
.unwrap_or(SubAgentStatus::Working);
let current_tool = s.current_tool.lock().ok().and_then(|t| t.clone());
SubAgentDisplay {
id: s.id.clone(),
description: s.description.clone(),
mode: s.mode,
status,
current_tool,
tool_calls_count: s.tool_calls_count.load(Ordering::Relaxed),
current_round: s.current_round.load(Ordering::Relaxed),
elapsed_secs: s.started_at.elapsed().as_secs(),
}
})
.collect()
}
pub fn gc_finished(&self) {
if let Ok(mut list) = self.agents.lock() {
list.retain(|s| {
if s.is_running.load(Ordering::Relaxed) {
return true;
}
s.started_at.elapsed().as_secs() < 30
|| matches!(
s.status.lock().map(|x| x.clone()),
Ok(SubAgentStatus::Working) | Ok(SubAgentStatus::Initializing)
)
});
}
}
}
impl Default for SubAgentTracker {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone)]
pub struct DerivedAgentShared {
pub background_manager: Arc<BackgroundManager>,
pub provider: Arc<Mutex<ModelProvider>>,
pub system_prompt: Arc<Mutex<Option<String>>>,
pub jcli_config: Arc<JcliConfig>,
pub hook_manager: Arc<Mutex<HookManager>>,
pub task_manager: Arc<TaskManager>,
pub disabled_tools: Arc<Vec<String>>,
pub permission_queue: Arc<PermissionQueue>,
pub plan_approval_queue: Arc<PlanApprovalQueue>,
pub sub_agent_tracker: Arc<SubAgentTracker>,
pub ui_messages: Arc<Mutex<Vec<ChatMessage>>>,
pub session_id: Arc<Mutex<String>>,
}
impl DerivedAgentShared {
pub fn build_child_registry(
&self,
todos_file_path: std::path::PathBuf,
) -> (ToolRegistry, mpsc::Receiver<AskRequest>) {
let (ask_tx, ask_rx) = mpsc::channel::<AskRequest>();
let mut registry = ToolRegistry::new(
vec![], ask_tx,
Arc::clone(&self.background_manager),
Arc::clone(&self.task_manager),
Arc::clone(&self.hook_manager),
new_invoked_skills_map(),
todos_file_path,
);
registry.permission_queue = Some(Arc::clone(&self.permission_queue));
registry.plan_approval_queue = Some(Arc::clone(&self.plan_approval_queue));
(registry, ask_rx)
}
}
pub fn create_runtime_and_client(
provider: &ModelProvider,
) -> Result<
(
tokio::runtime::Runtime,
async_openai::Client<async_openai::config::OpenAIConfig>,
),
String,
> {
let rt = tokio::runtime::Runtime::new()
.map_err(|e| format!("Failed to create async runtime: {}", e))?;
let client = create_openai_client(provider);
Ok((rt, client))
}
pub fn call_llm_non_stream(
rt: &tokio::runtime::Runtime,
client: &async_openai::Client<async_openai::config::OpenAIConfig>,
provider: &ModelProvider,
messages: &[ChatMessage],
tools: &[async_openai::types::chat::ChatCompletionTools],
system_prompt: Option<&str>,
) -> Result<async_openai::types::chat::ChatChoice, String> {
let request = build_request_with_tools(provider, messages, tools.to_vec(), system_prompt)
.map_err(|e| format!("Failed to build request: {}", e))?;
let mut attempt: u32 = 0;
loop {
attempt += 1;
match rt.block_on(async { client.chat().create(request.clone()).await }) {
Ok(response) => {
return response
.choices
.into_iter()
.next()
.ok_or_else(|| "[No response from API]".to_string());
}
Err(e) => {
let chat_err = ChatError::from(e);
if let Some(policy) = derived_retry_policy(&chat_err)
&& attempt <= policy.max_attempts
{
let delay_ms = backoff_delay_ms(attempt, policy.base_ms, policy.cap_ms);
write_info_log(
"SubAgentLLM",
&format!(
"API 请求失败,{}ms 后重试 ({}/{})",
delay_ms, attempt, policy.max_attempts
),
);
std::thread::sleep(std::time::Duration::from_millis(delay_ms));
continue;
}
return Err(chat_err.display_message());
}
}
}
}
struct DerivedRetryPolicy {
max_attempts: u32,
base_ms: u64,
cap_ms: u64,
}
fn derived_retry_policy(error: &ChatError) -> Option<DerivedRetryPolicy> {
match error {
ChatError::NetworkTimeout(_) | ChatError::NetworkError(_) => Some(DerivedRetryPolicy {
max_attempts: 2,
base_ms: 2_000,
cap_ms: 15_000,
}),
ChatError::ApiServerError { status, .. } => match status {
503 | 504 | 529 => Some(DerivedRetryPolicy {
max_attempts: 2,
base_ms: 3_000,
cap_ms: 15_000,
}),
500 | 502 => Some(DerivedRetryPolicy {
max_attempts: 1,
base_ms: 3_000,
cap_ms: 15_000,
}),
_ => None,
},
ChatError::ApiRateLimit { .. } => Some(DerivedRetryPolicy {
max_attempts: 2,
base_ms: 5_000,
cap_ms: 30_000,
}),
ChatError::AbnormalFinish(reason)
if matches!(reason.as_str(), "network_error" | "timeout" | "overloaded") =>
{
Some(DerivedRetryPolicy {
max_attempts: 2,
base_ms: 2_000,
cap_ms: 15_000,
})
}
ChatError::Other(msg)
if msg.contains("访问量过大")
|| msg.contains("过载")
|| msg.contains("overloaded")
|| msg.contains("too busy")
|| msg.contains("1305") =>
{
Some(DerivedRetryPolicy {
max_attempts: 2,
base_ms: 3_000,
cap_ms: 15_000,
})
}
_ => None,
}
}
fn backoff_delay_ms(attempt: u32, base_ms: u64, cap_ms: u64) -> u64 {
let shift = (attempt - 1).min(10) as u64;
let exp = base_ms.saturating_mul(1u64 << shift).min(cap_ms);
let jitter = rand::thread_rng().gen_range(0..=(exp / 5));
exp + jitter
}
pub fn extract_tool_items(
tool_calls: &[async_openai::types::chat::ChatCompletionMessageToolCalls],
) -> Vec<ToolCallItem> {
tool_calls
.iter()
.filter_map(|tc| {
if let async_openai::types::chat::ChatCompletionMessageToolCalls::Function(f) = tc {
Some(ToolCallItem {
id: f.id.clone(),
name: f.function.name.clone(),
arguments: f.function.arguments.clone(),
})
} else {
None
}
})
.collect()
}
pub fn execute_tool_with_permission(
item: &ToolCallItem,
registry: &Arc<ToolRegistry>,
jcli_config: &Arc<JcliConfig>,
cancelled: &Arc<AtomicBool>,
log_tag: &str,
verbose: bool,
) -> ChatMessage {
if cancelled.load(Ordering::Relaxed) {
return ChatMessage {
role: "tool".to_string(),
content: "[Cancelled]".to_string(),
tool_calls: None,
tool_call_id: Some(item.id.clone()),
images: None,
};
}
if jcli_config.is_denied(&item.name, &item.arguments) {
if verbose {
write_info_log(log_tag, &format!("Tool denied by deny rule: {}", item.name));
}
return ChatMessage {
role: "tool".to_string(),
content: format!("Tool '{}' was denied by permission rules.", item.name),
tool_calls: None,
tool_call_id: Some(item.id.clone()),
images: None,
};
}
let tool_ref = registry.get(&item.name);
let requires_confirm = tool_ref.map(|t| t.requires_confirmation()).unwrap_or(false);
if requires_confirm && !jcli_config.is_allowed(&item.name, &item.arguments) {
if let Some(queue) = registry.permission_queue.as_ref() {
let agent_type = current_agent_type();
let agent_name = current_agent_name();
let confirm_msg = tool_ref
.map(|t| t.confirmation_message(&item.arguments))
.unwrap_or_else(|| format!("调用工具 {}", item.name));
let req = PendingAgentPerm::new(
agent_type,
agent_name,
item.name.clone(),
item.arguments.clone(),
confirm_msg,
);
write_info_log(
log_tag,
&format!(
"Tool '{}' queued for user permission (60s timeout)",
item.name
),
);
let approved = queue.request_blocking(req);
if !approved {
write_info_log(log_tag, &format!("Tool '{}' denied by user", item.name));
return ChatMessage {
role: "tool".to_string(),
content: format!("Tool '{}' was denied by the user.", item.name),
tool_calls: None,
tool_call_id: Some(item.id.clone()),
images: None,
};
}
} else {
if verbose {
write_info_log(
log_tag,
&format!(
"Tool '{}' requires confirmation but not auto-allowed, denying",
item.name
),
);
}
return ChatMessage {
role: "tool".to_string(),
content: format!(
"Tool '{}' requires user confirmation which is not available in sub-agent mode. \
Add a permission rule to allow this tool automatically.",
item.name
),
tool_calls: None,
tool_call_id: Some(item.id.clone()),
images: None,
};
}
}
if verbose {
write_info_log(
log_tag,
&format!("Executing tool: {} args: {}", item.name, item.arguments),
);
}
let result = registry.execute(&item.name, &item.arguments, cancelled);
if verbose {
write_info_log(
log_tag,
&format!(
"Tool result: {} is_error={} len={}",
item.name,
result.is_error,
result.output.len()
),
);
}
ChatMessage {
role: "tool".to_string(),
content: result.output,
tool_calls: None,
tool_call_id: Some(item.id.clone()),
images: None,
}
}