1use crate::storage::{ChatMessage, MessageRole, TeammateSnapshotPersist};
2use crate::util::log::write_info_log;
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5use std::path::PathBuf;
6use std::sync::{
7 Arc, Mutex, OnceLock,
8 atomic::{AtomicBool, AtomicUsize, Ordering},
9};
10use tokio_util::sync::CancellationToken;
11
12const BROADCAST_LOG_MAX_LEN: usize = 100;
14
15#[derive(Clone, Debug, PartialEq)]
19pub enum TeammateStatus {
20 Initializing,
22 Thinking,
24 Working,
26 WaitingForMessage,
28 Completed,
30 Cancelled,
32 Retrying {
34 attempt: u32,
35 max_attempts: u32,
36 delay_ms: u64,
37 error: String,
38 },
39 Error(String),
41}
42
43#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
45pub enum TeammateStatusPersist {
46 Initializing,
47 Thinking,
48 Working,
49 WaitingForMessage,
50 Completed,
51 Cancelled,
52 Retrying {
53 attempt: u32,
54 max_attempts: u32,
55 delay_ms: u64,
56 error: String,
57 },
58 Error(String),
59}
60
61impl From<TeammateStatus> for TeammateStatusPersist {
62 fn from(status: TeammateStatus) -> Self {
63 match status {
64 TeammateStatus::Initializing => Self::Initializing,
65 TeammateStatus::Thinking => Self::Thinking,
66 TeammateStatus::Working => Self::Working,
67 TeammateStatus::WaitingForMessage => Self::WaitingForMessage,
68 TeammateStatus::Completed => Self::Completed,
69 TeammateStatus::Cancelled => Self::Cancelled,
70 TeammateStatus::Retrying {
71 attempt,
72 max_attempts,
73 delay_ms,
74 error,
75 } => Self::Retrying {
76 attempt,
77 max_attempts,
78 delay_ms,
79 error,
80 },
81 TeammateStatus::Error(e) => Self::Error(e),
82 }
83 }
84}
85
86impl From<TeammateStatusPersist> for TeammateStatus {
87 fn from(status: TeammateStatusPersist) -> Self {
88 match status {
89 TeammateStatusPersist::Initializing => Self::Initializing,
90 TeammateStatusPersist::Thinking => Self::Thinking,
91 TeammateStatusPersist::Working => Self::Working,
92 TeammateStatusPersist::WaitingForMessage => Self::WaitingForMessage,
93 TeammateStatusPersist::Completed => Self::Completed,
94 TeammateStatusPersist::Cancelled => Self::Cancelled,
95 TeammateStatusPersist::Retrying {
96 attempt,
97 max_attempts,
98 delay_ms,
99 error,
100 } => Self::Retrying {
101 attempt,
102 max_attempts,
103 delay_ms,
104 error,
105 },
106 TeammateStatusPersist::Error(e) => Self::Error(e),
107 }
108 }
109}
110
111impl TeammateStatus {
112 pub fn icon(&self) -> &'static str {
114 match self {
115 Self::Initializing => "◐",
116 Self::Thinking => "◐",
117 Self::Working => "●",
118 Self::WaitingForMessage => "○",
119 Self::Retrying { .. } => "↻",
120 Self::Completed => "✓",
121 Self::Cancelled => "✗",
122 Self::Error(_) => "✗",
123 }
124 }
125
126 pub fn label(&self) -> &'static str {
128 match self {
129 Self::Initializing => "初始化",
130 Self::Thinking => "思考中",
131 Self::Working => "执行中",
132 Self::WaitingForMessage => "等待中",
133 Self::Retrying { .. } => "重试中",
134 Self::Completed => "已完成",
135 Self::Cancelled => "已取消",
136 Self::Error(_) => "错误",
137 }
138 }
139
140 pub fn label_en(&self) -> &'static str {
142 match self {
143 Self::Initializing => "initializing",
144 Self::Thinking => "thinking",
145 Self::Working => "working",
146 Self::WaitingForMessage => "waiting",
147 Self::Retrying { .. } => "retrying",
148 Self::Completed => "completed",
149 Self::Cancelled => "cancelled",
150 Self::Error(_) => "error",
151 }
152 }
153
154 pub fn is_terminal(&self) -> bool {
156 matches!(self, Self::Completed | Self::Cancelled | Self::Error(_))
157 }
158}
159
160#[derive(Clone, Debug)]
162pub struct TeammateSnapshot {
163 pub name: String,
164 pub role: String,
165 pub status: TeammateStatus,
166 pub current_tool: Option<String>,
167 pub tool_calls_count: usize,
168}
169
170static GLOBAL_FILE_LOCKS: OnceLock<Mutex<HashMap<PathBuf, String>>> = OnceLock::new();
174
175fn global_file_locks() -> &'static Mutex<HashMap<PathBuf, String>> {
176 GLOBAL_FILE_LOCKS.get_or_init(|| Mutex::new(HashMap::new()))
177}
178
179pub fn acquire_global_file_lock(
182 path: &std::path::Path,
183 agent_name: &str,
184) -> Result<FileLockGuard, String> {
185 let canonical = path.to_path_buf();
186 let mut map = global_file_locks()
187 .lock()
188 .map_err(|_| "file_locks mutex poisoned".to_string())?;
189
190 if let Some(holder) = map.get(&canonical)
191 && holder != agent_name
192 {
193 return Err(holder.clone());
194 }
195
196 map.insert(canonical.clone(), agent_name.to_string());
197 Ok(FileLockGuard {
198 path: canonical,
199 agent_name: agent_name.to_string(),
200 })
201}
202
203#[allow(dead_code)]
208pub struct TeammateHandle {
209 pub name: String,
211 pub role: String,
213 pub broadcast_inbox: Arc<Mutex<Vec<ChatMessage>>>,
215 pub streaming_content: Arc<Mutex<String>>,
217 pub cancel_token: CancellationToken,
219 pub is_running: Arc<AtomicBool>,
221 pub thread_handle: Option<std::thread::JoinHandle<()>>,
223 pub system_prompt_snapshot: Arc<Mutex<String>>,
225 pub messages_snapshot: Arc<Mutex<Vec<ChatMessage>>>,
227 pub status: Arc<Mutex<TeammateStatus>>,
229 pub tool_calls_count: Arc<AtomicUsize>,
231 pub current_tool: Arc<Mutex<Option<String>>>,
233 pub wake_flag: Arc<AtomicBool>,
237 pub work_done: Arc<AtomicBool>,
239}
240
241#[allow(dead_code)]
242impl TeammateHandle {
243 pub fn running(&self) -> bool {
245 self.is_running.load(Ordering::Relaxed)
246 }
247
248 pub fn cancel(&self) {
250 self.cancel_token.cancel();
251 }
252}
253
254pub struct FileLockGuard {
258 path: PathBuf,
259 agent_name: String,
260}
261
262impl Drop for FileLockGuard {
263 fn drop(&mut self) {
264 if let Ok(mut map) = global_file_locks().lock()
265 && map.get(&self.path).map(|s| s.as_str()) == Some(self.agent_name.as_str())
266 {
267 map.remove(&self.path);
268 }
269 }
270}
271
272#[allow(dead_code)]
277pub struct TeammateManager {
278 pub teammates: HashMap<String, TeammateHandle>,
280 pub main_agent_inbox: Arc<Mutex<Vec<ChatMessage>>>,
282 pub display_messages: Arc<Mutex<Vec<ChatMessage>>>,
284 pub context_messages: Arc<Mutex<Vec<ChatMessage>>>,
286 recovered_teammates: HashMap<String, TeammateSnapshotPersist>,
288}
289
290#[allow(dead_code)]
291impl TeammateManager {
292 pub fn new(
294 main_agent_inbox: Arc<Mutex<Vec<ChatMessage>>>,
295 display_messages: Arc<Mutex<Vec<ChatMessage>>>,
296 context_messages: Arc<Mutex<Vec<ChatMessage>>>,
297 ) -> Self {
298 Self {
299 teammates: HashMap::new(),
300 main_agent_inbox,
301 display_messages,
302 context_messages,
303 recovered_teammates: HashMap::new(),
304 }
305 }
306
307 pub fn broadcast(&self, from: &str, text: &str, at_target: Option<&str>) {
316 let broadcast_message = if let Some(target) = at_target {
317 format!("<{}> @{} {} </{}>", from, target, text, from)
318 } else {
319 format!("<{}> {} </{}>", from, text, from)
320 };
321
322 write_info_log(
323 "TeammateManager",
324 &format!(
325 "broadcast from={}: {}",
326 from,
327 &broadcast_message[..{
328 let mut b = broadcast_message.len().min(BROADCAST_LOG_MAX_LEN);
329 while b > 0 && !broadcast_message.is_char_boundary(b) {
330 b -= 1;
331 }
332 b
333 }]
334 ),
335 );
336
337 if from != "Main"
340 && let Ok(mut pending) = self.main_agent_inbox.lock()
341 {
342 pending.push(ChatMessage::text(MessageRole::User, "<system_reminder>A teammate has sent a new message. The full content is already in your context via <Teammate@Name> tags. No action needed for this reminder.</system_reminder>"));
343 }
344
345 for (name, handle) in &self.teammates {
349 if from == format!("Teammate@{}", name) {
351 continue; }
353 if let Ok(mut inbox) = handle.broadcast_inbox.lock() {
354 inbox.push(ChatMessage::text(MessageRole::User, &broadcast_message));
355 }
356 let should_wake = from == "Main" || at_target == Some(name.as_str());
357 if should_wake {
358 handle.wake_flag.store(true, Ordering::Relaxed);
359 }
360 }
361
362 if from != "Main" {
367 let mut display_msg = ChatMessage::text(MessageRole::Assistant, text).with_sender(from);
370 if let Some(target) = at_target {
371 display_msg = display_msg.with_recipient(target);
372 }
373 let context_msg =
374 ChatMessage::text(MessageRole::Assistant, &broadcast_message).with_sender(from);
375 if let Ok(mut context) = self.context_messages.lock() {
376 context.push(context_msg);
377 }
378 if let Ok(mut display) = self.display_messages.lock() {
379 display.push(display_msg);
380 }
381 }
382 }
383
384 pub fn team_summary(&self) -> String {
386 if self.teammates.is_empty() && self.recovered_teammates.is_empty() {
387 return String::new();
388 }
389
390 let mut summary = String::from("## Teammates\n\nCurrent team members:\n");
391 summary.push_str("- Main (coordinator)\n");
392 for (name, handle) in &self.teammates {
393 let status = handle
394 .status
395 .lock()
396 .map(|status_val| format!("{} {}", status_val.icon(), status_val.label_en()))
397 .unwrap_or_else(|_| {
398 if handle.running() {
399 "● working".to_string()
400 } else {
401 "○ idle".to_string()
402 }
403 });
404 summary.push_str(&format!("- {} ({}) [{}]\n", name, handle.role, status));
405 }
406 for (name, snapshot) in &self.recovered_teammates {
408 let status: TeammateStatus = snapshot.status.clone().into();
409 summary.push_str(&format!(
410 "- {} ({}) [{} 🔄session-recovery]\n",
411 name,
412 snapshot.role,
413 status.label_en()
414 ));
415 }
416 summary.push_str(
417 "\nUse the SendMessage tool to send messages to other agents. Use @AgentName to specify a target.\n\n\
418 IMPORTANT: All broadcast messages are visible to all agents. Therefore:\n\
419 - Teammates can communicate directly — do **not** relay messages through Main\n\
420 - If you need A and B to collaborate, tell A to contact B directly instead of relaying\n\
421 - Your role is to assign tasks and coordinate direction, not to act as a message relay\n",
422 );
423 summary
424 }
425
426 pub fn all_names(&self) -> Vec<String> {
428 let mut names = Vec::with_capacity(self.teammates.len() + 1);
429 names.push("Main".to_string());
430 names.extend(self.teammates.keys().cloned());
431 names
432 }
433
434 pub fn teammate_snapshots(&self) -> Vec<TeammateSnapshot> {
436 self.teammates
437 .iter()
438 .map(|(name, handle)| {
439 let status = handle
440 .status
441 .lock()
442 .map(|s| s.clone())
443 .unwrap_or(TeammateStatus::Initializing);
444 let current_tool = handle.current_tool.lock().ok().and_then(|t| t.clone());
445 let tool_calls_count = handle.tool_calls_count.load(Ordering::Relaxed);
446 TeammateSnapshot {
447 name: name.clone(),
448 role: handle.role.clone(),
449 status,
450 current_tool,
451 tool_calls_count,
452 }
453 })
454 .collect()
455 }
456
457 pub fn stop_teammate(&mut self, name: &str) {
459 if let Some(handle) = self.teammates.get(name) {
460 handle.cancel();
461 write_info_log("TeammateManager", &format!("stopped teammate: {}", name));
462 }
463 }
464
465 pub fn stop_all(&mut self) {
467 for (name, handle) in &self.teammates {
468 handle.cancel();
469 write_info_log("TeammateManager", &format!("stopping teammate: {}", name));
470 }
471 }
472
473 pub fn cleanup_finished(&mut self) {
475 let finished: Vec<String> = self
476 .teammates
477 .iter()
478 .filter(|(_, h)| {
479 !h.running()
480 && h.thread_handle
481 .as_ref()
482 .map(|t| t.is_finished())
483 .unwrap_or(true)
484 })
485 .map(|(name, _)| name.clone())
486 .collect();
487
488 for name in finished {
489 if let Some(mut handle) = self.teammates.remove(&name) {
490 if let Some(thread) = handle.thread_handle.take() {
491 let _ = thread.join();
492 }
493 write_info_log("TeammateManager", &format!("cleaned up teammate: {}", name));
494 }
495 }
496 }
497
498 pub fn clear_all(&mut self) {
503 for (name, mut handle) in self.teammates.drain() {
504 handle.cancel();
505 if let Some(thread) = handle.thread_handle.take() {
507 drop(thread);
508 }
509 write_info_log(
510 "TeammateManager",
511 &format!("force cleared teammate: {}", name),
512 );
513 }
514 }
515
516 pub fn register_teammate(&mut self, handle: TeammateHandle) {
518 write_info_log(
519 "TeammateManager",
520 &format!("registered teammate: {} ({})", handle.name, handle.role),
521 );
522 self.teammates.insert(handle.name.clone(), handle);
523 }
524
525 pub fn has_active_teammates(&self) -> bool {
527 self.teammates
528 .iter()
529 .any(|(_, h)| h.running() && h.status.lock().map(|s| !s.is_terminal()).unwrap_or(false))
530 }
531}
532
533impl Default for TeammateManager {
534 fn default() -> Self {
535 Self {
536 teammates: HashMap::new(),
537 main_agent_inbox: Arc::new(Mutex::new(Vec::new())),
538 display_messages: Arc::new(Mutex::new(Vec::new())),
539 context_messages: Arc::new(Mutex::new(Vec::new())),
540 recovered_teammates: HashMap::new(),
541 }
542 }
543}
544
545impl TeammateManager {
548 pub fn set_recovered_teammates(&mut self, teammates: Vec<TeammateSnapshotPersist>) {
550 self.recovered_teammates = teammates.into_iter().map(|t| (t.name.clone(), t)).collect();
551 }
552
553 pub fn clear_recovered_teammates(&mut self) {
555 self.recovered_teammates.clear();
556 }
557
558 pub fn recovered_teammates_snapshot(&self) -> HashMap<String, TeammateSnapshotPersist> {
560 self.recovered_teammates.clone()
561 }
562
563 pub fn get_recovered_teammate(&self, name: &str) -> Option<TeammateSnapshotPersist> {
565 self.recovered_teammates.get(name).cloned()
566 }
567
568 pub fn remove_recovered_teammate(&mut self, name: &str) {
570 self.recovered_teammates.remove(name);
571 }
572}