use crate::command::chat::permission::JcliConfig;
use crate::command::chat::storage::{
ChatMessage, MessageRole, ModelProvider, SessionEvent, SessionPaths, append_event_to_path,
sanitize_filename,
};
use crate::command::chat::teammate::{TeammateManager, TeammateStatus};
use crate::command::chat::tools::ToolRegistry;
use crate::command::chat::tools::derived_shared::{
call_llm_non_stream, create_runtime_and_client, execute_tool_with_permission,
extract_tool_items,
};
use crate::util::log::write_info_log;
use async_openai::types::chat::ChatCompletionTools;
use std::path::PathBuf;
use std::sync::{
Arc, Mutex,
atomic::{AtomicBool, AtomicUsize, Ordering},
};
use tokio_util::sync::CancellationToken;
pub struct TeammateLoopConfig {
pub name: String,
pub role: String,
pub initial_prompt: String,
pub provider: ModelProvider,
pub base_system_prompt: Option<String>,
pub session_id: Arc<Mutex<String>>,
pub tools: Vec<ChatCompletionTools>,
pub registry: Arc<ToolRegistry>,
pub jcli_config: Arc<JcliConfig>,
pub teammate_manager: Arc<Mutex<TeammateManager>>,
pub pending_user_messages: Arc<Mutex<Vec<ChatMessage>>>,
pub cancel_token: CancellationToken,
pub system_prompt_snapshot: Arc<Mutex<String>>,
pub messages_snapshot: Arc<Mutex<Vec<ChatMessage>>>,
pub status: Arc<Mutex<TeammateStatus>>,
pub tool_calls_count: Arc<AtomicUsize>,
pub current_tool: Arc<Mutex<Option<String>>>,
pub wake_flag: Arc<AtomicBool>,
pub work_done: Arc<AtomicBool>,
}
pub fn run_teammate_loop(config: TeammateLoopConfig) -> String {
let TeammateLoopConfig {
name,
role,
initial_prompt,
provider,
base_system_prompt,
session_id,
tools,
registry,
jcli_config,
teammate_manager,
pending_user_messages,
cancel_token,
system_prompt_snapshot,
messages_snapshot,
status,
tool_calls_count,
current_tool,
wake_flag,
work_done,
} = config;
let transcript_path = |name: &str| -> PathBuf {
let sid = session_id
.lock()
.map(|s| s.clone())
.unwrap_or_else(|_| "unknown".to_string());
SessionPaths::new(&sid).teammate_transcript(&sanitize_filename(name))
};
let append_messages = |msgs: &[ChatMessage]| {
let path = transcript_path(&name);
for m in msgs {
let _ = append_event_to_path(&path, &SessionEvent::msg(m.clone()));
}
};
let set_status = |new_status: TeammateStatus| {
if let Ok(mut s) = status.lock() {
*s = new_status;
}
};
set_status(TeammateStatus::Initializing);
let max_rounds: u32 = 200; let max_consecutive_idle_polls: u32 = 120;
let (rt, client) = match create_runtime_and_client(&provider) {
Ok(pair) => pair,
Err(e) => return e,
};
let system_prompt = build_teammate_system_prompt(
&name,
&role,
base_system_prompt.as_deref(),
&teammate_manager,
);
if let Ok(mut sp) = system_prompt_snapshot.lock() {
*sp = system_prompt.clone();
}
let mut messages: Vec<ChatMessage> = vec![ChatMessage {
role: MessageRole::User,
content: initial_prompt,
tool_calls: None,
tool_call_id: None,
images: None,
}];
append_messages(&messages);
let sync_messages = |msgs: &Vec<ChatMessage>| {
if let Ok(mut snap) = messages_snapshot.lock() {
*snap = msgs.clone();
}
};
sync_messages(&messages);
let mut last_assistant_text = String::new();
let mut consecutive_idle_polls = 0;
let cancel_flag = Arc::new(AtomicBool::new(false));
for round in 0..max_rounds {
if cancel_token.is_cancelled() || cancel_flag.load(Ordering::Relaxed) {
set_status(TeammateStatus::Cancelled);
return format!("{}\n[Teammate '{}' cancelled]", last_assistant_text, name);
}
if work_done.load(Ordering::Relaxed) {
write_info_log(
"TeammateLoop",
&format!("{}: WorkDone flag set, exiting", name),
);
break;
}
let len_before_drain = messages.len();
let _ = drain_broadcast_messages(&mut messages, &pending_user_messages);
if messages.len() > len_before_drain {
append_messages(&messages[len_before_drain..]);
}
sync_messages(&messages);
write_info_log(
"TeammateLoop",
&format!(
"{}: Round {}/{}, messages={}",
name,
round + 1,
max_rounds,
messages.len(),
),
);
set_status(TeammateStatus::Working);
let response_choice = match call_llm_non_stream(
&rt,
&client,
&provider,
&messages,
&tools,
Some(&system_prompt),
) {
Ok(c) => c,
Err(e) => {
set_status(TeammateStatus::Error(e.clone()));
return format!("{}\n{}", last_assistant_text, e);
}
};
let assistant_text = response_choice.message.content.clone().unwrap_or_default();
if !assistant_text.is_empty() {
last_assistant_text = assistant_text.clone();
if let Ok(manager) = teammate_manager.lock()
&& let Ok(mut shared) = manager.ui_messages.lock()
{
shared.push(ChatMessage::text(
MessageRole::Assistant,
format!("<{}> {}", name, &assistant_text),
));
}
}
let has_tool_calls = matches!(
response_choice.finish_reason,
Some(async_openai::types::chat::FinishReason::ToolCalls)
);
if !has_tool_calls || response_choice.message.tool_calls.is_none() {
set_status(TeammateStatus::WaitingForMessage);
if !assistant_text.is_empty() {
messages.push(ChatMessage::text(
MessageRole::Assistant,
assistant_text.clone(),
));
if let Some(last) = messages.last() {
append_messages(std::slice::from_ref(last));
}
}
let len_before_drain = messages.len();
let _ = drain_broadcast_messages(&mut messages, &pending_user_messages);
if messages.len() > len_before_drain {
append_messages(&messages[len_before_drain..]);
}
let has_new = messages.len() > len_before_drain;
if has_new {
if work_done.load(Ordering::Relaxed) {
if wake_flag.swap(false, Ordering::Relaxed) {
work_done.store(false, Ordering::Relaxed);
write_info_log(
"TeammateLoop",
&format!("{}: re-activated after WorkDone by @mention", name),
);
consecutive_idle_polls = 0;
continue;
}
} else {
let _ = wake_flag.swap(false, Ordering::Relaxed); consecutive_idle_polls = 0;
continue;
}
}
let _ = wake_flag.swap(false, Ordering::Relaxed);
consecutive_idle_polls += 1;
if consecutive_idle_polls >= max_consecutive_idle_polls {
write_info_log(
"TeammateLoop",
&format!(
"{}: idle for {} rounds (~2min), exiting",
name, consecutive_idle_polls
),
);
break;
}
for _ in 0..10 {
if cancel_token.is_cancelled() {
set_status(TeammateStatus::Cancelled);
return format!("{}\n[Teammate '{}' cancelled]", last_assistant_text, name);
}
if work_done.load(Ordering::Relaxed) {
break;
}
std::thread::sleep(std::time::Duration::from_millis(100));
let len_before_drain = messages.len();
let _ = drain_broadcast_messages(&mut messages, &pending_user_messages);
if messages.len() > len_before_drain {
append_messages(&messages[len_before_drain..]);
if work_done.load(Ordering::Relaxed) {
if wake_flag.swap(false, Ordering::Relaxed) {
work_done.store(false, Ordering::Relaxed);
write_info_log(
"TeammateLoop",
&format!("{}: re-activated after WorkDone by @mention", name),
);
consecutive_idle_polls = 0;
break;
}
} else {
let _ = wake_flag.swap(false, Ordering::Relaxed);
consecutive_idle_polls = 0;
break;
}
}
let _ = wake_flag.swap(false, Ordering::Relaxed);
}
continue;
}
let Some(tool_calls) = response_choice.message.tool_calls.as_ref() else {
continue;
};
let tool_items = extract_tool_items(tool_calls);
if tool_items.is_empty() {
break;
}
consecutive_idle_polls = 0;
messages.push(ChatMessage {
role: MessageRole::Assistant,
content: assistant_text,
tool_calls: Some(tool_items.clone()),
tool_call_id: None,
images: None,
});
if let Some(last) = messages.last() {
append_messages(std::slice::from_ref(last));
}
if let Ok(manager) = teammate_manager.lock()
&& let Ok(mut shared) = manager.ui_messages.lock()
{
for item in &tool_items {
if item.name != "SendMessage" {
shared.push(ChatMessage::text(
MessageRole::Assistant,
format!("<{}> [调用工具 {}]", name, item.name),
));
}
}
}
for item in &tool_items {
if cancel_token.is_cancelled() {
messages.push(ChatMessage {
role: MessageRole::Tool,
content: "[Cancelled]".to_string(),
tool_calls: None,
tool_call_id: Some(item.id.clone()),
images: None,
});
if let Some(last) = messages.last() {
append_messages(std::slice::from_ref(last));
}
continue;
}
if let Ok(mut ct) = current_tool.lock() {
*ct = Some(item.name.clone());
}
tool_calls_count.fetch_add(1, Ordering::Relaxed);
let result_msg = execute_tool_with_permission(
item,
®istry,
&jcli_config,
&cancel_flag,
"TeammateLoop",
false,
);
messages.push(result_msg);
if let Some(last) = messages.last() {
append_messages(std::slice::from_ref(last));
}
if let Ok(mut ct) = current_tool.lock() {
*ct = None;
}
}
sync_messages(&messages);
}
set_status(TeammateStatus::Completed);
if !work_done.load(Ordering::Relaxed)
&& let Ok(manager) = teammate_manager.lock()
&& let Ok(mut shared) = manager.ui_messages.lock()
{
shared.push(ChatMessage::text(
MessageRole::Assistant,
format!("<{}> [已完成工作]", name),
));
let done_msg = ChatMessage::text(MessageRole::Assistant, "[已完成工作]".to_string());
append_messages(std::slice::from_ref(&done_msg));
}
if last_assistant_text.is_empty() {
format!("[Teammate '{}' completed with no output]", name)
} else {
last_assistant_text
}
}
fn build_teammate_system_prompt(
name: &str,
role: &str,
base_prompt: Option<&str>,
teammate_manager: &Arc<Mutex<TeammateManager>>,
) -> String {
let team_summary = teammate_manager
.lock()
.map(|m| m.team_summary())
.unwrap_or_default();
let base = base_prompt.unwrap_or("You are a helpful assistant.");
format!(
"{}\n\n\
## Your Identity\n\
你是团队中的 **{}**,角色: {}。\n\
你的名字是 `{}`,在发送消息和被提及时使用这个名字。\n\n\
{}\n\
## Communication\n\
- 使用 `SendMessage` 工具与其他 agent 通信\n\
- 收到的广播消息以 `<AgentName>` 前缀出现在对话中\n\
- 用 `@AgentName` 指定消息接收者(消息仍广播给所有人,但只有 @目标 会被真正「唤醒」)\n\n\
## Message Wake Semantics(重要)
聊天室里你会看到三类消息,处理方式不同:
- **@你自己 的消息** 或 **来自 @Main 的消息**:会立即唤醒你去思考和回复
- **你不是接收者的其他 agent 间广播**:也会唤醒你(保持上下文感知),但**不要**主动回复无关消息,否则会造成无限循环
- 旁听消息只是让你了解团队动态;除非其中包含你必须处理的信息,否则简单确认后继续工作\n\n\
## Completing Your Work(重要)\n\
- 任务做完后:先用 SendMessage 告知 @Main 结果摘要,然后调用 `WorkDone` 工具退出\n\
- `WorkDone` 调用后你将进入完成状态,普通消息不再唤醒你\n\
- **但如果有人 @你**,你会被重新激活(WorkDone 被撤销),可以继续工作\n\
- 如果任务还可能需要你配合,**不要**调用 WorkDone,保持空闲等待即可\n\n\
## Rules\n\
- 专注于你的角色职责,不要越界做其他角色的工作\n\
- 如果需要其他 agent 的配合,通过 SendMessage @对方 沟通\n\
- 如果遇到文件编辑冲突(被其他 agent 锁定),等待后重试\n",
base, name, role, name, team_summary
)
}
fn drain_broadcast_messages(
messages: &mut Vec<ChatMessage>,
pending: &Arc<Mutex<Vec<ChatMessage>>>,
) -> bool {
if let Ok(mut pending_msgs) = pending.lock() {
if pending_msgs.is_empty() {
return false;
}
messages.append(&mut *pending_msgs);
true
} else {
false
}
}