use crate::command::chat::infra::hook::{HookContext, HookEvent, HookManager};
use crate::command::chat::permission::JcliConfig;
use crate::command::chat::storage::{
ChatMessage, MessageRole, ModelProvider, SessionEvent, SessionPaths, append_event_to_path,
sanitize_filename,
};
use crate::command::chat::teammate::{TeammateManager, TeammateStatus};
use crate::command::chat::tools::ToolRegistry;
use crate::command::chat::tools::derived_shared::{
AgentContextConfig, call_llm_non_stream, create_runtime_and_client,
execute_tool_with_permission, extract_tool_items,
};
use crate::llm::ToolDefinition;
use crate::util::log::write_info_log;
use std::path::PathBuf;
use std::sync::{
Arc, Mutex,
atomic::{AtomicBool, AtomicUsize, Ordering},
};
use tokio_util::sync::CancellationToken;
const MAX_TEAMMATE_ROUNDS: u32 = 200;
const MAX_CONSECUTIVE_IDLE_POLLS: u32 = 120;
const POLL_CHECK_INTERVAL: u32 = 10;
const POLL_SLEEP_MILLIS: u64 = 100;
pub struct TeammateLoopConfig {
pub name: String,
pub role: String,
pub initial_prompt: String,
pub provider: ModelProvider,
pub base_system_prompt: Option<String>,
pub session_id: Arc<Mutex<String>>,
pub tools: Vec<ToolDefinition>,
pub registry: Arc<ToolRegistry>,
pub jcli_config: Arc<JcliConfig>,
pub teammate_manager: Arc<Mutex<TeammateManager>>,
pub pending_user_messages: Arc<Mutex<Vec<ChatMessage>>>,
pub cancel_token: CancellationToken,
pub system_prompt_snapshot: Arc<Mutex<String>>,
pub messages_snapshot: Arc<Mutex<Vec<ChatMessage>>>,
pub status: Arc<Mutex<TeammateStatus>>,
pub tool_calls_count: Arc<AtomicUsize>,
pub current_tool: Arc<Mutex<Option<String>>>,
pub wake_flag: Arc<AtomicBool>,
pub work_done: Arc<AtomicBool>,
pub hook_manager: Arc<Mutex<HookManager>>,
pub disabled_hooks: Arc<Mutex<Vec<String>>>,
pub context_config: Arc<Mutex<AgentContextConfig>>,
}
pub fn run_teammate_loop(config: TeammateLoopConfig) -> String {
let TeammateLoopConfig {
name,
role,
initial_prompt,
provider,
base_system_prompt,
session_id,
tools,
registry,
jcli_config,
teammate_manager,
pending_user_messages,
cancel_token,
system_prompt_snapshot,
messages_snapshot,
status,
tool_calls_count,
current_tool,
wake_flag,
work_done,
hook_manager,
disabled_hooks,
context_config,
} = config;
let transcript_path = |name: &str| -> PathBuf {
let sid = session_id
.lock()
.map(|s| s.clone())
.unwrap_or_else(|_| "unknown".to_string());
SessionPaths::new(&sid).teammate_transcript(&sanitize_filename(name))
};
let append_messages = |msgs: &[ChatMessage]| {
let path = transcript_path(&name);
for m in msgs {
let _ = append_event_to_path(&path, &SessionEvent::msg(m.clone()));
}
};
let set_status = |new_status: TeammateStatus| {
if let Ok(mut s) = status.lock() {
*s = new_status;
}
};
set_status(TeammateStatus::Initializing);
let (rt, client) = match create_runtime_and_client(&provider) {
Ok(pair) => pair,
Err(e) => return e,
};
let system_prompt = build_teammate_system_prompt(
&name,
&role,
base_system_prompt.as_deref(),
&teammate_manager,
);
if let Ok(mut sp) = system_prompt_snapshot.lock() {
*sp = system_prompt.clone();
}
let mut messages: Vec<ChatMessage> = Vec::with_capacity(1 + initial_prompt.len());
messages.push(ChatMessage {
role: MessageRole::User,
content: initial_prompt,
tool_calls: None,
tool_call_id: None,
images: None,
reasoning_content: None,
});
append_messages(&messages);
let sync_messages = |msgs: &Vec<ChatMessage>| {
if let Ok(mut snap) = messages_snapshot.lock() {
*snap = msgs.clone();
}
};
sync_messages(&messages);
let mut last_assistant_text = String::new();
let mut consecutive_idle_polls = 0;
let cancel_flag = Arc::new(AtomicBool::new(false));
for round in 0..MAX_TEAMMATE_ROUNDS {
if cancel_token.is_cancelled() || cancel_flag.load(Ordering::Relaxed) {
set_status(TeammateStatus::Cancelled);
return format!("{}\n[Teammate '{}' cancelled]", last_assistant_text, name);
}
if work_done.load(Ordering::Relaxed) {
write_info_log(
"TeammateLoop",
&format!("{}: WorkDone flag set, exiting", name),
);
break;
}
let len_before_drain = messages.len();
let _ = drain_broadcast_messages(&mut messages, &pending_user_messages);
if messages.len() > len_before_drain {
append_messages(&messages[len_before_drain..]);
}
sync_messages(&messages);
write_info_log(
"TeammateLoop",
&format!(
"{}: Round {}/{}, messages={}",
name,
round + 1,
MAX_TEAMMATE_ROUNDS,
messages.len(),
),
);
set_status(TeammateStatus::Working);
let ctx_cfg = match context_config.lock() {
Ok(g) => g.clone(),
Err(e) => {
set_status(TeammateStatus::Error(format!("context_config lock: {}", e)));
return format!("{}\ncontext_config lock poisoned", last_assistant_text);
}
};
let mut api_messages = crate::command::chat::context::window::select_messages(
&messages,
ctx_cfg.max_history_messages,
ctx_cfg.max_context_tokens,
ctx_cfg.compact.keep_recent,
&ctx_cfg.compact.micro_compact_exempt_tools,
);
if ctx_cfg.compact.enabled {
crate::command::chat::context::compact::micro_compact(
&mut api_messages,
ctx_cfg.compact.keep_recent,
&ctx_cfg.compact.micro_compact_exempt_tools,
);
}
let mut effective_system_prompt = system_prompt.clone();
{
let hook_mgr = match hook_manager.lock() {
Ok(g) => g,
Err(e) => {
set_status(TeammateStatus::Error(format!("hook_manager lock: {}", e)));
return format!("{}\nhook_manager lock poisoned", last_assistant_text);
}
};
if hook_mgr.has_hooks_for(HookEvent::PreLlmRequest) {
let disabled_snapshot: Vec<String> =
disabled_hooks.lock().map(|g| g.clone()).unwrap_or_default();
let ctx = HookContext {
event: HookEvent::PreLlmRequest,
messages: Some(api_messages.clone()),
system_prompt: Some(effective_system_prompt.clone()),
model: Some(provider.model.clone()),
session_id: session_id.lock().ok().map(|g| g.clone()),
cwd: std::env::current_dir()
.map(|p| p.display().to_string())
.unwrap_or_else(|_| ".".to_string()),
..Default::default()
};
if let Some(result) =
hook_mgr.execute(HookEvent::PreLlmRequest, ctx, &disabled_snapshot)
{
if result.is_stop() {
set_status(TeammateStatus::Error("hook requested stop".to_string()));
return format!(
"{}\n[Teammate halted by PreLlmRequest hook]",
last_assistant_text
);
}
if let Some(new_msgs) = result.messages {
api_messages = new_msgs;
}
if let Some(new_prompt) = result.system_prompt {
effective_system_prompt = new_prompt;
}
if let Some(inject) = result.inject_messages {
api_messages.extend(inject);
}
}
}
}
let status_for_retry = Arc::clone(&status);
let retry_callback = move |attempt: u32, max_attempts: u32, delay_ms: u64, error: &str| {
if let Ok(mut s) = status_for_retry.lock() {
*s = TeammateStatus::Retrying {
attempt,
max_attempts,
delay_ms,
error: error.to_string(),
};
}
};
let response_choice = match call_llm_non_stream(
&rt,
&client,
&provider,
&api_messages,
&tools,
Some(&effective_system_prompt),
Some(&retry_callback),
) {
Ok(c) => c,
Err(e) => {
set_status(TeammateStatus::Error(e.clone()));
return format!("{}\n{}", last_assistant_text, e);
}
};
let assistant_text = response_choice.message.content.clone().unwrap_or_default();
let reasoning_content = response_choice.message.reasoning_content.clone();
if !assistant_text.is_empty() {
last_assistant_text = assistant_text.clone();
if let Ok(manager) = teammate_manager.lock() {
let msg = ChatMessage::text(
MessageRole::Assistant,
format!("<Teammate@{}> {}", name, &assistant_text),
);
if let Ok(mut display) = manager.display_messages.lock() {
display.push(msg.clone());
}
if let Ok(mut context) = manager.context_messages.lock() {
context.push(msg);
}
}
}
let has_tool_calls = response_choice.finish_reason.as_deref() == Some("tool_calls");
if !has_tool_calls || response_choice.message.tool_calls.is_none() {
set_status(TeammateStatus::WaitingForMessage);
if !assistant_text.is_empty() {
messages.push(ChatMessage::text(
MessageRole::Assistant,
assistant_text.clone(),
));
if let Some(last) = messages.last() {
append_messages(std::slice::from_ref(last));
}
}
let len_before_drain = messages.len();
let _ = drain_broadcast_messages(&mut messages, &pending_user_messages);
if messages.len() > len_before_drain {
append_messages(&messages[len_before_drain..]);
}
let has_new = messages.len() > len_before_drain;
if has_new {
if work_done.load(Ordering::Relaxed) {
if wake_flag.swap(false, Ordering::Relaxed) {
work_done.store(false, Ordering::Relaxed);
write_info_log(
"TeammateLoop",
&format!("{}: re-activated after WorkDone by @mention", name),
);
consecutive_idle_polls = 0;
continue;
}
} else {
let _ = wake_flag.swap(false, Ordering::Relaxed); consecutive_idle_polls = 0;
continue;
}
}
let _ = wake_flag.swap(false, Ordering::Relaxed);
consecutive_idle_polls += 1;
if consecutive_idle_polls >= MAX_CONSECUTIVE_IDLE_POLLS {
write_info_log(
"TeammateLoop",
&format!(
"{}: idle for {} rounds (~2min), exiting",
name, consecutive_idle_polls
),
);
break;
}
for _ in 0..POLL_CHECK_INTERVAL {
if cancel_token.is_cancelled() {
set_status(TeammateStatus::Cancelled);
return format!("{}\n[Teammate '{}' cancelled]", last_assistant_text, name);
}
if work_done.load(Ordering::Relaxed) {
break;
}
std::thread::sleep(std::time::Duration::from_millis(POLL_SLEEP_MILLIS));
let len_before_drain = messages.len();
let _ = drain_broadcast_messages(&mut messages, &pending_user_messages);
if messages.len() > len_before_drain {
append_messages(&messages[len_before_drain..]);
if work_done.load(Ordering::Relaxed) {
if wake_flag.swap(false, Ordering::Relaxed) {
work_done.store(false, Ordering::Relaxed);
write_info_log(
"TeammateLoop",
&format!("{}: re-activated after WorkDone by @mention", name),
);
consecutive_idle_polls = 0;
break;
}
} else {
let _ = wake_flag.swap(false, Ordering::Relaxed);
consecutive_idle_polls = 0;
break;
}
}
let _ = wake_flag.swap(false, Ordering::Relaxed);
}
continue;
}
let Some(tool_calls) = response_choice.message.tool_calls.as_ref() else {
continue;
};
let tool_items = extract_tool_items(tool_calls);
if tool_items.is_empty() {
break;
}
consecutive_idle_polls = 0;
messages.push(ChatMessage {
role: MessageRole::Assistant,
content: assistant_text,
tool_calls: Some(tool_items.clone()),
tool_call_id: None,
images: None,
reasoning_content,
});
if let Some(last) = messages.last() {
append_messages(std::slice::from_ref(last));
}
if let Ok(manager) = teammate_manager.lock() {
for item in &tool_items {
if item.name != "SendMessage" {
let msg = ChatMessage::text(
MessageRole::Assistant,
format!("<Teammate@{}> [调用工具 {}]", name, item.name),
);
if let Ok(mut display) = manager.display_messages.lock() {
display.push(msg.clone());
}
if let Ok(mut context) = manager.context_messages.lock() {
context.push(msg);
}
}
}
}
for item in &tool_items {
if cancel_token.is_cancelled() {
messages.push(ChatMessage {
role: MessageRole::Tool,
content: "[Cancelled]".to_string(),
tool_calls: None,
tool_call_id: Some(item.id.clone()),
images: None,
reasoning_content: None,
});
if let Some(last) = messages.last() {
append_messages(std::slice::from_ref(last));
}
continue;
}
if let Ok(mut ct) = current_tool.lock() {
*ct = Some(item.name.clone());
}
tool_calls_count.fetch_add(1, Ordering::Relaxed);
let result_msg = execute_tool_with_permission(
item,
®istry,
&jcli_config,
&cancel_flag,
"TeammateLoop",
false,
);
messages.push(result_msg);
if let Some(last) = messages.last() {
append_messages(std::slice::from_ref(last));
}
if let Ok(mut ct) = current_tool.lock() {
*ct = None;
}
}
sync_messages(&messages);
}
set_status(TeammateStatus::Completed);
if !work_done.load(Ordering::Relaxed)
&& let Ok(manager) = teammate_manager.lock()
{
let msg = ChatMessage::text(
MessageRole::Assistant,
format!("<Teammate@{}> [已完成工作]", name),
);
if let Ok(mut display) = manager.display_messages.lock() {
display.push(msg.clone());
}
if let Ok(mut context) = manager.context_messages.lock() {
context.push(msg);
}
let done_msg = ChatMessage::text(MessageRole::Assistant, "[已完成工作]".to_string());
append_messages(std::slice::from_ref(&done_msg));
}
if last_assistant_text.is_empty() {
format!("[Teammate '{}' completed with no output]", name)
} else {
last_assistant_text
}
}
fn build_teammate_system_prompt(
name: &str,
role: &str,
base_prompt: Option<&str>,
teammate_manager: &Arc<Mutex<TeammateManager>>,
) -> String {
let template = crate::assets::teammate_system_prompt_template();
let base = base_prompt.unwrap_or("You are a helpful assistant.");
let team_summary = teammate_manager
.lock()
.map(|m| m.team_summary())
.unwrap_or_default();
template
.as_ref()
.replace("{{.base_prompt}}", base)
.replace("{{.name}}", name)
.replace("{{.role}}", role)
.replace("{{.team_summary}}", &team_summary)
}
fn drain_broadcast_messages(
messages: &mut Vec<ChatMessage>,
pending: &Arc<Mutex<Vec<ChatMessage>>>,
) -> bool {
if let Ok(mut pending_msgs) = pending.lock() {
if pending_msgs.is_empty() {
return false;
}
messages.append(&mut *pending_msgs);
true
} else {
false
}
}