Skip to main content

j_agent/teammate/
manager.rs

1use crate::storage::{ChatMessage, MessageRole, TeammateSnapshotPersist};
2use crate::util::log::write_info_log;
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5use std::path::PathBuf;
6use std::sync::{
7    Arc, Mutex, OnceLock,
8    atomic::{AtomicBool, AtomicUsize, Ordering},
9};
10use tokio_util::sync::CancellationToken;
11
12/// 广播消息日志截断长度
13const BROADCAST_LOG_MAX_LEN: usize = 100;
14
15// ========== Teammate 状态枚举 ==========
16
17/// Teammate 的细粒度运行状态
18#[derive(Clone, Debug, PartialEq)]
19pub enum TeammateStatus {
20    /// 刚创建,尚未开始
21    Initializing,
22    /// 正在调用 LLM(等待模型回复)
23    Thinking,
24    /// 正在执行工具
25    Working,
26    /// 空闲轮询等待新消息
27    WaitingForMessage,
28    /// 正常完成
29    Completed,
30    /// 被取消
31    Cancelled,
32    /// LLM 调用重试中
33    Retrying {
34        attempt: u32,
35        max_attempts: u32,
36        delay_ms: u64,
37        error: String,
38    },
39    /// 出错
40    Error(String),
41}
42
43/// Teammate 状态的可序列化版本(用于 session 持久化)
44#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
45pub enum TeammateStatusPersist {
46    Initializing,
47    Thinking,
48    Working,
49    WaitingForMessage,
50    Completed,
51    Cancelled,
52    Retrying {
53        attempt: u32,
54        max_attempts: u32,
55        delay_ms: u64,
56        error: String,
57    },
58    Error(String),
59}
60
61impl From<TeammateStatus> for TeammateStatusPersist {
62    fn from(status: TeammateStatus) -> Self {
63        match status {
64            TeammateStatus::Initializing => Self::Initializing,
65            TeammateStatus::Thinking => Self::Thinking,
66            TeammateStatus::Working => Self::Working,
67            TeammateStatus::WaitingForMessage => Self::WaitingForMessage,
68            TeammateStatus::Completed => Self::Completed,
69            TeammateStatus::Cancelled => Self::Cancelled,
70            TeammateStatus::Retrying {
71                attempt,
72                max_attempts,
73                delay_ms,
74                error,
75            } => Self::Retrying {
76                attempt,
77                max_attempts,
78                delay_ms,
79                error,
80            },
81            TeammateStatus::Error(e) => Self::Error(e),
82        }
83    }
84}
85
86impl From<TeammateStatusPersist> for TeammateStatus {
87    fn from(status: TeammateStatusPersist) -> Self {
88        match status {
89            TeammateStatusPersist::Initializing => Self::Initializing,
90            TeammateStatusPersist::Thinking => Self::Thinking,
91            TeammateStatusPersist::Working => Self::Working,
92            TeammateStatusPersist::WaitingForMessage => Self::WaitingForMessage,
93            TeammateStatusPersist::Completed => Self::Completed,
94            TeammateStatusPersist::Cancelled => Self::Cancelled,
95            TeammateStatusPersist::Retrying {
96                attempt,
97                max_attempts,
98                delay_ms,
99                error,
100            } => Self::Retrying {
101                attempt,
102                max_attempts,
103                delay_ms,
104                error,
105            },
106            TeammateStatusPersist::Error(e) => Self::Error(e),
107        }
108    }
109}
110
111impl TeammateStatus {
112    /// 状态符号(极简风格)
113    pub fn icon(&self) -> &'static str {
114        match self {
115            Self::Initializing => "◐",
116            Self::Thinking => "◐",
117            Self::Working => "●",
118            Self::WaitingForMessage => "○",
119            Self::Retrying { .. } => "↻",
120            Self::Completed => "✓",
121            Self::Cancelled => "✗",
122            Self::Error(_) => "✗",
123        }
124    }
125
126    /// 状态文字(中文,供 TUI 界面使用)
127    pub fn label(&self) -> &'static str {
128        match self {
129            Self::Initializing => "初始化",
130            Self::Thinking => "思考中",
131            Self::Working => "执行中",
132            Self::WaitingForMessage => "等待中",
133            Self::Retrying { .. } => "重试中",
134            Self::Completed => "已完成",
135            Self::Cancelled => "已取消",
136            Self::Error(_) => "错误",
137        }
138    }
139
140    /// Status label in English (for system prompt injection)
141    pub fn label_en(&self) -> &'static str {
142        match self {
143            Self::Initializing => "initializing",
144            Self::Thinking => "thinking",
145            Self::Working => "working",
146            Self::WaitingForMessage => "waiting",
147            Self::Retrying { .. } => "retrying",
148            Self::Completed => "completed",
149            Self::Cancelled => "cancelled",
150            Self::Error(_) => "error",
151        }
152    }
153
154    /// 是否为终态(不会再变化)
155    pub fn is_terminal(&self) -> bool {
156        matches!(self, Self::Completed | Self::Cancelled | Self::Error(_))
157    }
158}
159
160/// Teammate 状态快照(供 UI 渲染用,无锁)
161#[derive(Clone, Debug)]
162pub struct TeammateSnapshot {
163    pub name: String,
164    pub role: String,
165    pub status: TeammateStatus,
166    pub current_tool: Option<String>,
167    pub tool_calls_count: usize,
168}
169
170// ========== 全局文件编辑锁 ==========
171
172/// 全局文件编辑锁(所有 agent 共享,进程级单例)
173static GLOBAL_FILE_LOCKS: OnceLock<Mutex<HashMap<PathBuf, String>>> = OnceLock::new();
174
175fn global_file_locks() -> &'static Mutex<HashMap<PathBuf, String>> {
176    GLOBAL_FILE_LOCKS.get_or_init(|| Mutex::new(HashMap::new()))
177}
178
179/// 尝试获取全局文件编辑锁
180/// 返回 Ok(FileLockGuard) 成功,Err(holder_name) 表示被其他 agent 持有
181pub fn acquire_global_file_lock(
182    path: &std::path::Path,
183    agent_name: &str,
184) -> Result<FileLockGuard, String> {
185    let canonical = path.to_path_buf();
186    let mut map = global_file_locks()
187        .lock()
188        .map_err(|_| "file_locks mutex poisoned".to_string())?;
189
190    if let Some(holder) = map.get(&canonical)
191        && holder != agent_name
192    {
193        return Err(holder.clone());
194    }
195
196    map.insert(canonical.clone(), agent_name.to_string());
197    Ok(FileLockGuard {
198        path: canonical,
199        agent_name: agent_name.to_string(),
200    })
201}
202
203// ========== TeammateHandle ==========
204
205// NOTE: Cannot derive Debug - contains JoinHandle<()> and CancellationToken which do not implement Debug
206/// 单个 Teammate 的句柄(持有其 agent loop 的引用和通道)
207#[allow(dead_code)]
208pub struct TeammateHandle {
209    /// Teammate 名称(如 "Frontend", "Backend")
210    pub name: String,
211    /// 角色描述(如 "React frontend developer")
212    pub role: String,
213    /// Teammate 的广播收件箱(其他 agent 的广播消息注入到这里)
214    pub broadcast_inbox: Arc<Mutex<Vec<ChatMessage>>>,
215    /// Teammate 的流式内容缓冲区
216    pub streaming_content: Arc<Mutex<String>>,
217    /// 取消令牌
218    pub cancel_token: CancellationToken,
219    /// 是否正在运行
220    pub is_running: Arc<AtomicBool>,
221    /// agent loop 线程句柄
222    pub thread_handle: Option<std::thread::JoinHandle<()>>,
223    /// Teammate 当前 system prompt 快照(由 agent loop 在启动时写入,供 /dump 读取)
224    pub system_prompt_snapshot: Arc<Mutex<String>>,
225    /// Teammate 当前 messages 快照(由 agent loop 每轮同步,供 /dump 读取)
226    pub messages_snapshot: Arc<Mutex<Vec<ChatMessage>>>,
227    /// 细粒度运行状态
228    pub status: Arc<Mutex<TeammateStatus>>,
229    /// 累计工具调用次数
230    pub tool_calls_count: Arc<AtomicUsize>,
231    /// 当前正在执行的工具名(None 表示未在执行工具)
232    pub current_tool: Arc<Mutex<Option<String>>>,
233    /// 唤醒标志:@自己 或来自 Main 时 set。
234    /// 未 WorkDone 时,任何 pending 消息都会唤醒 teammate;
235    /// WorkDone 后,只有 @self 才能重新激活(清除 work_done)。
236    pub wake_flag: Arc<AtomicBool>,
237    /// WorkDone 终态标志:WorkDone 工具调用后 set,teammate_loop 读到后立即进入 Completed。
238    pub work_done: Arc<AtomicBool>,
239}
240
241#[allow(dead_code)]
242impl TeammateHandle {
243    /// 检查 teammate 是否仍在运行
244    pub fn running(&self) -> bool {
245        self.is_running.load(Ordering::Relaxed)
246    }
247
248    /// 取消 teammate 的 agent loop
249    pub fn cancel(&self) {
250        self.cancel_token.cancel();
251    }
252}
253
254// ========== FileLockGuard ==========
255
256/// RAII 文件锁守卫:Drop 时自动释放锁
257pub struct FileLockGuard {
258    path: PathBuf,
259    agent_name: String,
260}
261
262impl Drop for FileLockGuard {
263    fn drop(&mut self) {
264        if let Ok(mut map) = global_file_locks().lock()
265            && map.get(&self.path).map(|s| s.as_str()) == Some(self.agent_name.as_str())
266        {
267            map.remove(&self.path);
268        }
269    }
270}
271
272// ========== TeammateManager ==========
273
274// NOTE: Cannot derive Debug - contains TeammateHandle which has JoinHandle and CancellationToken
275/// Teammate 管理器:管理所有 teammate 实例、消息广播
276#[allow(dead_code)]
277pub struct TeammateManager {
278    /// 所有 teammate 的句柄(key = name)
279    pub teammates: HashMap<String, TeammateHandle>,
280    /// Teammate → Main agent LLM 上下文通道(broadcast 时注入,Main Agent 通过 context_messages 同步消费)
281    pub main_agent_inbox: Arc<Mutex<Vec<ChatMessage>>>,
282    /// Agent/Teammate → UI 显示通道(仅 UI 渲染用,不作为 LLM context 数据源)
283    pub display_messages: Arc<Mutex<Vec<ChatMessage>>>,
284    /// Agent/Teammate → LLM context 同步通道(显式注入 Main Agent context)
285    pub context_messages: Arc<Mutex<Vec<ChatMessage>>>,
286    /// 从 session 恢复的 teammate 快照(只读展示,无活跃线程)
287    recovered_teammates: HashMap<String, TeammateSnapshotPersist>,
288}
289
290#[allow(dead_code)]
291impl TeammateManager {
292    /// 创建管理器
293    pub fn new(
294        main_agent_inbox: Arc<Mutex<Vec<ChatMessage>>>,
295        display_messages: Arc<Mutex<Vec<ChatMessage>>>,
296        context_messages: Arc<Mutex<Vec<ChatMessage>>>,
297    ) -> Self {
298        Self {
299            teammates: HashMap::new(),
300            main_agent_inbox,
301            display_messages,
302            context_messages,
303            recovered_teammates: HashMap::new(),
304        }
305    }
306
307    /// 广播消息到所有其他 agent 的 broadcast_inbox
308    ///
309    /// - `from`: 发送者名称
310    /// - `text`: 消息内容
311    /// - `at_target`: 可选的 @目标(消息仍广播给所有人,但带 @前缀)
312    ///
313    /// 消息格式: `<FromAgent> @Target text` 或 `<FromAgent> text`
314    /// 以 user 角色注入(和用户 append 消息走同一个 drain 机制)
315    pub fn broadcast(&self, from: &str, text: &str, at_target: Option<&str>) {
316        let broadcast_message = if let Some(target) = at_target {
317            format!("<{}> @{} {} </{}>", from, target, text, from)
318        } else {
319            format!("<{}> {} </{}>", from, text, from)
320        };
321
322        write_info_log(
323            "TeammateManager",
324            &format!(
325                "broadcast from={}: {}",
326                from,
327                &broadcast_message[..{
328                    let mut b = broadcast_message.len().min(BROADCAST_LOG_MAX_LEN);
329                    while b > 0 && !broadcast_message.is_char_boundary(b) {
330                        b -= 1;
331                    }
332                    b
333                }]
334            ),
335        );
336
337        // 注入到主 agent 的 inbox 作为唤醒信号(如果发送者不是主 agent)
338        // 完整广播内容已通过 context_messages 同步,inbox 只需非空即可触发 wake
339        if from != "Main"
340            && let Ok(mut pending) = self.main_agent_inbox.lock()
341        {
342            pending.push(ChatMessage::text(MessageRole::User, "<system_reminder>A teammate has sent a new message. The full content is already in your context via <Teammate@Name> tags. No action needed for this reminder.</system_reminder>"));
343        }
344
345        // 注入到所有其他 teammate 的 pending
346        // 唤醒语义:@self 或 from==Main 时 set wake_flag(用于 WorkDone 后重新激活判断)
347        // 非 WorkDone 状态下,pending 有消息就唤醒,不依赖 wake_flag
348        for (name, handle) in &self.teammates {
349            // from 含类型前缀(如 "Teammate@Frontend"),精确匹配完整身份
350            if from == format!("Teammate@{}", name) {
351                continue; // 不给自己发
352            }
353            if let Ok(mut inbox) = handle.broadcast_inbox.lock() {
354                inbox.push(ChatMessage::text(MessageRole::User, &broadcast_message));
355            }
356            let should_wake = from == "Main" || at_target == Some(name.as_str());
357            if should_wake {
358                handle.wake_flag.store(true, Ordering::Relaxed);
359            }
360        }
361
362        // Teammate 发出的消息写入 display_messages 以在 TUI 中显示
363        // Main agent 的消息不需要(Main 的工具调用本身已通过 agent loop 显示)
364        // ★ context 通道:XML 包裹文本(Main Agent LLM context 需要 <Name> 标签识别来源)
365        // ★ display 通道:纯文本(sender_name 已标注来源,XML 标签多余)
366        if from != "Main" {
367            // display 用纯文本(不含 <from> XML 前缀,不重复 @target 前缀)
368            // recipient_name 字段 + label 行 → Target 已标示目标,content 不再冗余
369            let mut display_msg = ChatMessage::text(MessageRole::Assistant, text).with_sender(from);
370            if let Some(target) = at_target {
371                display_msg = display_msg.with_recipient(target);
372            }
373            let context_msg =
374                ChatMessage::text(MessageRole::Assistant, &broadcast_message).with_sender(from);
375            if let Ok(mut context) = self.context_messages.lock() {
376                context.push(context_msg);
377            }
378            if let Ok(mut display) = self.display_messages.lock() {
379                display.push(display_msg);
380            }
381        }
382    }
383
384    /// 获取团队成员摘要(供 system prompt 使用)
385    pub fn team_summary(&self) -> String {
386        if self.teammates.is_empty() && self.recovered_teammates.is_empty() {
387            return String::new();
388        }
389
390        let mut summary = String::from("## Teammates\n\nCurrent team members:\n");
391        summary.push_str("- Main (coordinator)\n");
392        for (name, handle) in &self.teammates {
393            let status = handle
394                .status
395                .lock()
396                .map(|status_val| format!("{} {}", status_val.icon(), status_val.label_en()))
397                .unwrap_or_else(|_| {
398                    if handle.running() {
399                        "● working".to_string()
400                    } else {
401                        "○ idle".to_string()
402                    }
403                });
404            summary.push_str(&format!("- {} ({}) [{}]\n", name, handle.role, status));
405        }
406        // Show recovered teammates from session (read-only history)
407        for (name, snapshot) in &self.recovered_teammates {
408            let status: TeammateStatus = snapshot.status.clone().into();
409            summary.push_str(&format!(
410                "- {} ({}) [{} 🔄session-recovery]\n",
411                name,
412                snapshot.role,
413                status.label_en()
414            ));
415        }
416        summary.push_str(
417            "\nUse the SendMessage tool to send messages to other agents. Use @AgentName to specify a target.\n\n\
418             IMPORTANT: All broadcast messages are visible to all agents. Therefore:\n\
419             - Teammates can communicate directly — do **not** relay messages through Main\n\
420             - If you need A and B to collaborate, tell A to contact B directly instead of relaying\n\
421             - Your role is to assign tasks and coordinate direction, not to act as a message relay\n",
422        );
423        summary
424    }
425
426    /// 获取所有 teammate 名称列表(包含 "Main")
427    pub fn all_names(&self) -> Vec<String> {
428        let mut names = Vec::with_capacity(self.teammates.len() + 1);
429        names.push("Main".to_string());
430        names.extend(self.teammates.keys().cloned());
431        names
432    }
433
434    /// 获取所有 teammate 的状态快照(供 UI 渲染用,无锁拷贝)
435    pub fn teammate_snapshots(&self) -> Vec<TeammateSnapshot> {
436        self.teammates
437            .iter()
438            .map(|(name, handle)| {
439                let status = handle
440                    .status
441                    .lock()
442                    .map(|s| s.clone())
443                    .unwrap_or(TeammateStatus::Initializing);
444                let current_tool = handle.current_tool.lock().ok().and_then(|t| t.clone());
445                let tool_calls_count = handle.tool_calls_count.load(Ordering::Relaxed);
446                TeammateSnapshot {
447                    name: name.clone(),
448                    role: handle.role.clone(),
449                    status,
450                    current_tool,
451                    tool_calls_count,
452                }
453            })
454            .collect()
455    }
456
457    /// 停止指定 teammate
458    pub fn stop_teammate(&mut self, name: &str) {
459        if let Some(handle) = self.teammates.get(name) {
460            handle.cancel();
461            write_info_log("TeammateManager", &format!("stopped teammate: {}", name));
462        }
463    }
464
465    /// 停止所有 teammates
466    pub fn stop_all(&mut self) {
467        for (name, handle) in &self.teammates {
468            handle.cancel();
469            write_info_log("TeammateManager", &format!("stopping teammate: {}", name));
470        }
471    }
472
473    /// 清理已完成的 teammate(回收 thread handle)
474    pub fn cleanup_finished(&mut self) {
475        let finished: Vec<String> = self
476            .teammates
477            .iter()
478            .filter(|(_, h)| {
479                !h.running()
480                    && h.thread_handle
481                        .as_ref()
482                        .map(|t| t.is_finished())
483                        .unwrap_or(true)
484            })
485            .map(|(name, _)| name.clone())
486            .collect();
487
488        for name in finished {
489            if let Some(mut handle) = self.teammates.remove(&name) {
490                if let Some(thread) = handle.thread_handle.take() {
491                    let _ = thread.join();
492                }
493                write_info_log("TeammateManager", &format!("cleaned up teammate: {}", name));
494            }
495        }
496    }
497
498    /// 强制清除所有 teammates(发送取消信号后立即移除,不等待线程结束)
499    ///
500    /// 用于 /clear 等场景:需要立即清空 teammate 列表,
501    /// 线程会在检测到 cancel_token 后自行退出。
502    pub fn clear_all(&mut self) {
503        for (name, mut handle) in self.teammates.drain() {
504            handle.cancel();
505            // detach 线程(不 join),线程会在 cancel_token 响应后自行退出
506            if let Some(thread) = handle.thread_handle.take() {
507                drop(thread);
508            }
509            write_info_log(
510                "TeammateManager",
511                &format!("force cleared teammate: {}", name),
512            );
513        }
514    }
515
516    /// 注册一个 teammate(由 TeammateTool 或 teammate_loop 调用)
517    pub fn register_teammate(&mut self, handle: TeammateHandle) {
518        write_info_log(
519            "TeammateManager",
520            &format!("registered teammate: {} ({})", handle.name, handle.role),
521        );
522        self.teammates.insert(handle.name.clone(), handle);
523    }
524
525    /// 是否存在活跃的 teammate(running 且非终态)
526    pub fn has_active_teammates(&self) -> bool {
527        self.teammates
528            .iter()
529            .any(|(_, h)| h.running() && h.status.lock().map(|s| !s.is_terminal()).unwrap_or(false))
530    }
531}
532
533impl Default for TeammateManager {
534    fn default() -> Self {
535        Self {
536            teammates: HashMap::new(),
537            main_agent_inbox: Arc::new(Mutex::new(Vec::new())),
538            display_messages: Arc::new(Mutex::new(Vec::new())),
539            context_messages: Arc::new(Mutex::new(Vec::new())),
540            recovered_teammates: HashMap::new(),
541        }
542    }
543}
544
545// ========== Recovered Teammates 方法 ==========
546
547impl TeammateManager {
548    /// 设置从 session 恢复的 teammate 快照
549    pub fn set_recovered_teammates(&mut self, teammates: Vec<TeammateSnapshotPersist>) {
550        self.recovered_teammates = teammates.into_iter().map(|t| (t.name.clone(), t)).collect();
551    }
552
553    /// 清除所有 recovered teammates
554    pub fn clear_recovered_teammates(&mut self) {
555        self.recovered_teammates.clear();
556    }
557
558    /// 获取 recovered teammates 的快照引用(用于 save 时合并 prompt 信息)
559    pub fn recovered_teammates_snapshot(&self) -> HashMap<String, TeammateSnapshotPersist> {
560        self.recovered_teammates.clone()
561    }
562
563    /// 获取指定名称的 recovered teammate(用于 RespawnTeammate)
564    pub fn get_recovered_teammate(&self, name: &str) -> Option<TeammateSnapshotPersist> {
565        self.recovered_teammates.get(name).cloned()
566    }
567
568    /// 移除一个 recovered teammate(respawn 成功后)
569    pub fn remove_recovered_teammate(&mut self, name: &str) {
570        self.recovered_teammates.remove(name);
571    }
572}