use crate::command::chat::context::message_compress::{
DEFAULT_OTHER_AGENT_TOOLCALL_THRESHOLD, compress_other_agent_toolcalls,
};
use crate::command::chat::permission::JcliConfig;
use crate::command::chat::storage::{
ChatMessage, MessageRole, ModelProvider, SessionEvent, SessionPaths, append_event_to_path,
sanitize_filename,
};
use crate::command::chat::teammate::{TeammateManager, TeammateStatus};
use crate::command::chat::tools::ToolRegistry;
use crate::command::chat::tools::derived_shared::{
call_llm_non_stream, create_runtime_and_client, execute_tool_with_permission,
extract_tool_items,
};
use crate::llm::ToolDefinition;
use crate::util::log::write_info_log;
use std::path::PathBuf;
use std::sync::{
Arc, Mutex,
atomic::{AtomicBool, AtomicUsize, Ordering},
};
use tokio_util::sync::CancellationToken;
pub struct TeammateLoopConfig {
pub name: String,
pub role: String,
pub initial_prompt: String,
pub provider: ModelProvider,
pub base_system_prompt: Option<String>,
pub session_id: Arc<Mutex<String>>,
pub tools: Vec<ToolDefinition>,
pub registry: Arc<ToolRegistry>,
pub jcli_config: Arc<JcliConfig>,
pub teammate_manager: Arc<Mutex<TeammateManager>>,
pub pending_user_messages: Arc<Mutex<Vec<ChatMessage>>>,
pub cancel_token: CancellationToken,
pub system_prompt_snapshot: Arc<Mutex<String>>,
pub messages_snapshot: Arc<Mutex<Vec<ChatMessage>>>,
pub status: Arc<Mutex<TeammateStatus>>,
pub tool_calls_count: Arc<AtomicUsize>,
pub current_tool: Arc<Mutex<Option<String>>>,
pub wake_flag: Arc<AtomicBool>,
pub work_done: Arc<AtomicBool>,
}
pub fn run_teammate_loop(config: TeammateLoopConfig) -> String {
let TeammateLoopConfig {
name,
role,
initial_prompt,
provider,
base_system_prompt,
session_id,
tools,
registry,
jcli_config,
teammate_manager,
pending_user_messages,
cancel_token,
system_prompt_snapshot,
messages_snapshot,
status,
tool_calls_count,
current_tool,
wake_flag,
work_done,
} = config;
let transcript_path = |name: &str| -> PathBuf {
let sid = session_id
.lock()
.map(|s| s.clone())
.unwrap_or_else(|_| "unknown".to_string());
SessionPaths::new(&sid).teammate_transcript(&sanitize_filename(name))
};
let append_messages = |msgs: &[ChatMessage]| {
let path = transcript_path(&name);
for m in msgs {
let _ = append_event_to_path(&path, &SessionEvent::msg(m.clone()));
}
};
let set_status = |new_status: TeammateStatus| {
if let Ok(mut s) = status.lock() {
*s = new_status;
}
};
set_status(TeammateStatus::Initializing);
let max_rounds: u32 = 200; let max_consecutive_idle_polls: u32 = 120;
let (rt, client) = match create_runtime_and_client(&provider) {
Ok(pair) => pair,
Err(e) => return e,
};
let system_prompt = build_teammate_system_prompt(
&name,
&role,
base_system_prompt.as_deref(),
&teammate_manager,
);
if let Ok(mut sp) = system_prompt_snapshot.lock() {
*sp = system_prompt.clone();
}
let mut messages: Vec<ChatMessage> = vec![ChatMessage {
role: MessageRole::User,
content: initial_prompt,
tool_calls: None,
tool_call_id: None,
images: None,
reasoning_content: None,
}];
append_messages(&messages);
let sync_messages = |msgs: &Vec<ChatMessage>| {
if let Ok(mut snap) = messages_snapshot.lock() {
*snap = msgs.clone();
}
};
sync_messages(&messages);
let mut last_assistant_text = String::new();
let mut consecutive_idle_polls = 0;
let cancel_flag = Arc::new(AtomicBool::new(false));
for round in 0..max_rounds {
if cancel_token.is_cancelled() || cancel_flag.load(Ordering::Relaxed) {
set_status(TeammateStatus::Cancelled);
return format!("{}\n[Teammate '{}' cancelled]", last_assistant_text, name);
}
if work_done.load(Ordering::Relaxed) {
write_info_log(
"TeammateLoop",
&format!("{}: WorkDone flag set, exiting", name),
);
break;
}
let len_before_drain = messages.len();
let _ = drain_broadcast_messages(&mut messages, &pending_user_messages);
if messages.len() > len_before_drain {
append_messages(&messages[len_before_drain..]);
}
sync_messages(&messages);
write_info_log(
"TeammateLoop",
&format!(
"{}: Round {}/{}, messages={}",
name,
round + 1,
max_rounds,
messages.len(),
),
);
set_status(TeammateStatus::Working);
let compressed_messages = compress_other_agent_toolcalls(
&messages,
&name,
DEFAULT_OTHER_AGENT_TOOLCALL_THRESHOLD,
);
let response_choice = match call_llm_non_stream(
&rt,
&client,
&provider,
&compressed_messages,
&tools,
Some(&system_prompt),
None, ) {
Ok(c) => c,
Err(e) => {
set_status(TeammateStatus::Error(e.clone()));
return format!("{}\n{}", last_assistant_text, e);
}
};
let assistant_text = response_choice.message.content.clone().unwrap_or_default();
let reasoning_content = response_choice.message.reasoning_content.clone();
if !assistant_text.is_empty() {
last_assistant_text = assistant_text.clone();
if let Ok(manager) = teammate_manager.lock() {
let msg = ChatMessage::text(
MessageRole::Assistant,
format!("<{}> {}", name, &assistant_text),
);
if let Ok(mut display) = manager.display_messages.lock() {
display.push(msg.clone());
}
if let Ok(mut context) = manager.context_messages.lock() {
context.push(msg);
}
}
}
let has_tool_calls = response_choice.finish_reason.as_deref() == Some("tool_calls");
if !has_tool_calls || response_choice.message.tool_calls.is_none() {
set_status(TeammateStatus::WaitingForMessage);
if !assistant_text.is_empty() {
messages.push(ChatMessage::text(
MessageRole::Assistant,
assistant_text.clone(),
));
if let Some(last) = messages.last() {
append_messages(std::slice::from_ref(last));
}
}
let len_before_drain = messages.len();
let _ = drain_broadcast_messages(&mut messages, &pending_user_messages);
if messages.len() > len_before_drain {
append_messages(&messages[len_before_drain..]);
}
let has_new = messages.len() > len_before_drain;
if has_new {
if work_done.load(Ordering::Relaxed) {
if wake_flag.swap(false, Ordering::Relaxed) {
work_done.store(false, Ordering::Relaxed);
write_info_log(
"TeammateLoop",
&format!("{}: re-activated after WorkDone by @mention", name),
);
consecutive_idle_polls = 0;
continue;
}
} else {
let _ = wake_flag.swap(false, Ordering::Relaxed); consecutive_idle_polls = 0;
continue;
}
}
let _ = wake_flag.swap(false, Ordering::Relaxed);
consecutive_idle_polls += 1;
if consecutive_idle_polls >= max_consecutive_idle_polls {
write_info_log(
"TeammateLoop",
&format!(
"{}: idle for {} rounds (~2min), exiting",
name, consecutive_idle_polls
),
);
break;
}
for _ in 0..10 {
if cancel_token.is_cancelled() {
set_status(TeammateStatus::Cancelled);
return format!("{}\n[Teammate '{}' cancelled]", last_assistant_text, name);
}
if work_done.load(Ordering::Relaxed) {
break;
}
std::thread::sleep(std::time::Duration::from_millis(100));
let len_before_drain = messages.len();
let _ = drain_broadcast_messages(&mut messages, &pending_user_messages);
if messages.len() > len_before_drain {
append_messages(&messages[len_before_drain..]);
if work_done.load(Ordering::Relaxed) {
if wake_flag.swap(false, Ordering::Relaxed) {
work_done.store(false, Ordering::Relaxed);
write_info_log(
"TeammateLoop",
&format!("{}: re-activated after WorkDone by @mention", name),
);
consecutive_idle_polls = 0;
break;
}
} else {
let _ = wake_flag.swap(false, Ordering::Relaxed);
consecutive_idle_polls = 0;
break;
}
}
let _ = wake_flag.swap(false, Ordering::Relaxed);
}
continue;
}
let Some(tool_calls) = response_choice.message.tool_calls.as_ref() else {
continue;
};
let tool_items = extract_tool_items(tool_calls);
if tool_items.is_empty() {
break;
}
consecutive_idle_polls = 0;
messages.push(ChatMessage {
role: MessageRole::Assistant,
content: assistant_text,
tool_calls: Some(tool_items.clone()),
tool_call_id: None,
images: None,
reasoning_content,
});
if let Some(last) = messages.last() {
append_messages(std::slice::from_ref(last));
}
if let Ok(manager) = teammate_manager.lock() {
for item in &tool_items {
if item.name != "SendMessage" {
let msg = ChatMessage::text(
MessageRole::Assistant,
format!("<{}> [调用工具 {}]", name, item.name),
);
if let Ok(mut display) = manager.display_messages.lock() {
display.push(msg.clone());
}
if let Ok(mut context) = manager.context_messages.lock() {
context.push(msg);
}
}
}
}
for item in &tool_items {
if cancel_token.is_cancelled() {
messages.push(ChatMessage {
role: MessageRole::Tool,
content: "[Cancelled]".to_string(),
tool_calls: None,
tool_call_id: Some(item.id.clone()),
images: None,
reasoning_content: None,
});
if let Some(last) = messages.last() {
append_messages(std::slice::from_ref(last));
}
continue;
}
if let Ok(mut ct) = current_tool.lock() {
*ct = Some(item.name.clone());
}
tool_calls_count.fetch_add(1, Ordering::Relaxed);
let result_msg = execute_tool_with_permission(
item,
®istry,
&jcli_config,
&cancel_flag,
"TeammateLoop",
false,
);
messages.push(result_msg);
if let Some(last) = messages.last() {
append_messages(std::slice::from_ref(last));
}
if let Ok(mut ct) = current_tool.lock() {
*ct = None;
}
}
sync_messages(&messages);
}
set_status(TeammateStatus::Completed);
if !work_done.load(Ordering::Relaxed)
&& let Ok(manager) = teammate_manager.lock()
{
let msg = ChatMessage::text(MessageRole::Assistant, format!("<{}> [已完成工作]", name));
if let Ok(mut display) = manager.display_messages.lock() {
display.push(msg.clone());
}
if let Ok(mut context) = manager.context_messages.lock() {
context.push(msg);
}
let done_msg = ChatMessage::text(MessageRole::Assistant, "[已完成工作]".to_string());
append_messages(std::slice::from_ref(&done_msg));
}
if last_assistant_text.is_empty() {
format!("[Teammate '{}' completed with no output]", name)
} else {
last_assistant_text
}
}
fn build_teammate_system_prompt(
name: &str,
role: &str,
base_prompt: Option<&str>,
teammate_manager: &Arc<Mutex<TeammateManager>>,
) -> String {
let template = crate::assets::teammate_system_prompt_template();
let base = base_prompt.unwrap_or("You are a helpful assistant.");
let team_summary = teammate_manager
.lock()
.map(|m| m.team_summary())
.unwrap_or_default();
template
.as_ref()
.replace("{{.base_prompt}}", base)
.replace("{{.name}}", name)
.replace("{{.role}}", role)
.replace("{{.team_summary}}", &team_summary)
}
fn drain_broadcast_messages(
messages: &mut Vec<ChatMessage>,
pending: &Arc<Mutex<Vec<ChatMessage>>>,
) -> bool {
if let Ok(mut pending_msgs) = pending.lock() {
if pending_msgs.is_empty() {
return false;
}
messages.append(&mut *pending_msgs);
true
} else {
false
}
}