1use crate::infra::hook::{HookContext, HookEvent, HookManager};
2use crate::permission::JcliConfig;
3use crate::storage::{
4 ChatMessage, DisplayHint, MessageRole, ModelProvider, SessionEvent, SessionPaths,
5 append_event_to_path, sanitize_filename,
6};
7use crate::teammate::{TeammateManager, TeammateStatus};
8use crate::tools::ToolRegistry;
9use crate::tools::derived_shared::{
10 AgentContextConfig, LlmNonStreamRequest, SubAgentMetrics, ToolExecContext, call_llm_non_stream,
11 create_runtime_and_client, execute_tool_with_permission, extract_tool_items,
12};
13use crate::util::log::write_info_log;
14use std::path::PathBuf;
15use std::sync::{
16 Arc, Mutex,
17 atomic::{AtomicBool, AtomicUsize, Ordering},
18};
19use tokio_util::sync::CancellationToken;
20
21const MAX_TEAMMATE_ROUNDS: u32 = 200;
23const MAX_CONSECUTIVE_IDLE_POLLS: u32 = 120;
25const POLL_CHECK_INTERVAL: u32 = 10;
27const POLL_SLEEP_MILLIS: u64 = 100;
29const MAX_SEND_GATE_RETRIES: u32 = 2;
31
32pub struct TeammateLoopConfig {
34 pub name: String,
35 pub role: String,
36 pub initial_prompt: String,
37 pub provider: ModelProvider,
38 pub base_system_prompt: Option<String>,
39 pub session_id: Arc<Mutex<String>>,
41 pub disabled_tools: Vec<String>,
43 pub deferred_tools: Arc<Mutex<Vec<String>>>,
45 #[allow(dead_code)]
47 pub session_loaded_deferred: Arc<Mutex<Vec<String>>>,
48 pub registry: Arc<ToolRegistry>,
49 pub jcli_config: Arc<JcliConfig>,
50 pub teammate_manager: Arc<Mutex<TeammateManager>>,
51 pub broadcast_inbox: Arc<Mutex<Vec<ChatMessage>>>,
53 pub cancel_token: CancellationToken,
54 pub system_prompt_snapshot: Arc<Mutex<String>>,
56 pub messages_snapshot: Arc<Mutex<Vec<ChatMessage>>>,
58 pub status: Arc<Mutex<TeammateStatus>>,
60 pub tool_calls_count: Arc<AtomicUsize>,
62 pub current_tool: Arc<Mutex<Option<String>>>,
64 pub wake_flag: Arc<AtomicBool>,
67 pub work_done: Arc<AtomicBool>,
69 pub hook_manager: Arc<Mutex<HookManager>>,
71 pub disabled_hooks: Arc<Mutex<Vec<String>>>,
73 pub context_config: Arc<Mutex<AgentContextConfig>>,
75 pub sub_agent_metrics: Arc<Mutex<SubAgentMetrics>>,
77}
78
79pub fn run_teammate_loop(config: TeammateLoopConfig) -> String {
88 let TeammateLoopConfig {
89 name,
90 role,
91 initial_prompt,
92 provider,
93 base_system_prompt,
94 session_id,
95 disabled_tools,
96 deferred_tools,
97 session_loaded_deferred: _,
98 registry,
99 jcli_config,
100 teammate_manager,
101 broadcast_inbox,
102 cancel_token,
103 system_prompt_snapshot,
104 messages_snapshot,
105 status,
106 tool_calls_count,
107 current_tool,
108 wake_flag,
109 work_done,
110 hook_manager,
111 disabled_hooks,
112 context_config,
113 sub_agent_metrics,
114 } = config;
115
116 let transcript_path = |name: &str| -> PathBuf {
118 let sid = session_id
119 .lock()
120 .map(|s| s.clone())
121 .unwrap_or_else(|_| "unknown".to_string());
122 SessionPaths::new(&sid).teammate_transcript(&sanitize_filename(name))
123 };
124
125 let append_messages = |msgs: &[ChatMessage]| {
126 let path = transcript_path(&name);
127 for m in msgs {
128 let _ = append_event_to_path(&path, &SessionEvent::msg(m.clone()));
129 }
130 };
131
132 let set_status = |new_status: TeammateStatus| {
134 if let Ok(mut s) = status.lock() {
135 *s = new_status;
136 }
137 };
138
139 set_status(TeammateStatus::Initializing);
140
141 let (rt, client) = match create_runtime_and_client(&provider) {
142 Ok(pair) => pair,
143 Err(e) => return e,
144 };
145
146 let system_prompt = build_teammate_system_prompt(
148 &name,
149 &role,
150 base_system_prompt.as_deref(),
151 &teammate_manager,
152 );
153
154 if let Ok(mut sp) = system_prompt_snapshot.lock() {
156 *sp = system_prompt.clone();
157 }
158
159 let mut messages: Vec<ChatMessage> = Vec::with_capacity(1 + initial_prompt.len());
160 messages.push(ChatMessage {
161 role: MessageRole::User,
162 content: initial_prompt,
163 tool_calls: None,
164 tool_call_id: None,
165 images: None,
166 reasoning_content: None,
167 sender_name: None,
168 recipient_name: None,
169 display_hint: DisplayHint::Normal,
170 });
171 append_messages(&messages);
173
174 let sync_messages = |msgs: &Vec<ChatMessage>| {
176 if let Ok(mut snap) = messages_snapshot.lock() {
177 *snap = msgs.clone();
178 }
179 };
180 sync_messages(&messages);
181
182 let mut last_assistant_text = String::new();
183 let mut consecutive_idle_polls = 0;
184 let mut send_gate_retries: u32 = 0;
185
186 let cancel_flag = Arc::new(AtomicBool::new(false));
188
189 for round in 0..MAX_TEAMMATE_ROUNDS {
190 let deferred_snapshot = match deferred_tools.lock() {
193 Ok(guard) => guard.clone(),
194 Err(e) => e.into_inner().clone(),
195 };
196 let tools = registry.to_llm_tools_non_deferred(&disabled_tools, &deferred_snapshot);
197
198 if cancel_token.is_cancelled() || cancel_flag.load(Ordering::Relaxed) {
200 set_status(TeammateStatus::Cancelled);
201 return format!("{}\n[Teammate '{}' cancelled]", last_assistant_text, name);
202 }
203
204 if work_done.load(Ordering::Relaxed) {
206 write_info_log(
207 "TeammateLoop",
208 &format!("{}: WorkDone flag set, exiting", name),
209 );
210 break;
211 }
212
213 let len_before_drain = messages.len();
217 let _ = drain_broadcast_messages(&mut messages, &broadcast_inbox);
218 if messages.len() > len_before_drain {
219 append_messages(&messages[len_before_drain..]);
220 }
221
222 sync_messages(&messages);
224
225 write_info_log(
226 "TeammateLoop",
227 &format!(
228 "{}: Round {}/{}, messages={}",
229 name,
230 round + 1,
231 MAX_TEAMMATE_ROUNDS,
232 messages.len(),
233 ),
234 );
235
236 set_status(TeammateStatus::Thinking);
238
239 let ctx_cfg = match context_config.lock() {
242 Ok(g) => g.clone(),
243 Err(e) => {
244 set_status(TeammateStatus::Error(format!("context_config lock: {}", e)));
245 return format!("{}\ncontext_config lock poisoned", last_assistant_text);
246 }
247 };
248 let mut api_messages = crate::context::window::select_messages(
249 &messages,
250 ctx_cfg.max_history_messages,
251 ctx_cfg.max_context_tokens,
252 ctx_cfg.compact.keep_recent,
253 &ctx_cfg.compact.micro_compact_exempt_tools,
254 );
255 if ctx_cfg.compact.enabled {
256 crate::context::compact::micro_compact(
257 &mut api_messages,
258 ctx_cfg.compact.keep_recent,
259 &ctx_cfg.compact.micro_compact_exempt_tools,
260 );
261 }
262
263 let mut effective_system_prompt = system_prompt.clone();
265 {
266 let hook_mgr = match hook_manager.lock() {
267 Ok(g) => g,
268 Err(e) => {
269 set_status(TeammateStatus::Error(format!("hook_manager lock: {}", e)));
270 return format!("{}\nhook_manager lock poisoned", last_assistant_text);
271 }
272 };
273 if hook_mgr.has_hooks_for(HookEvent::PreLlmRequest) {
274 let disabled_snapshot: Vec<String> =
275 disabled_hooks.lock().map(|g| g.clone()).unwrap_or_default();
276 let ctx = HookContext {
277 event: HookEvent::PreLlmRequest,
278 messages: Some(api_messages.clone()),
279 system_prompt: Some(effective_system_prompt.clone()),
280 model: Some(provider.model.clone()),
281 session_id: session_id.lock().ok().map(|g| g.clone()),
282 cwd: std::env::current_dir()
283 .map(|p| p.display().to_string())
284 .unwrap_or_else(|_| ".".to_string()),
285 ..Default::default()
286 };
287 if let Some(result) =
288 hook_mgr.execute(HookEvent::PreLlmRequest, ctx, &disabled_snapshot)
289 {
290 if result.is_stop() {
291 set_status(TeammateStatus::Error("hook requested stop".to_string()));
292 return format!(
293 "{}\n[Teammate halted by PreLlmRequest hook]",
294 last_assistant_text
295 );
296 }
297 if let Some(new_msgs) = result.messages {
298 api_messages = new_msgs;
299 }
300 if let Some(new_prompt) = result.system_prompt {
301 effective_system_prompt = new_prompt;
302 }
303 if let Some(inject) = result.inject_messages {
304 api_messages.extend(inject);
305 }
306 }
307 }
308 }
309
310 let status_for_retry = Arc::clone(&status);
311 let retry_callback = move |attempt: u32, max_attempts: u32, delay_ms: u64, error: &str| {
312 if let Ok(mut s) = status_for_retry.lock() {
313 *s = TeammateStatus::Retrying {
314 attempt,
315 max_attempts,
316 delay_ms,
317 error: error.to_string(),
318 };
319 }
320 };
321
322 let response = match call_llm_non_stream(&LlmNonStreamRequest {
323 rt: &rt,
324 client: &client,
325 provider: &provider,
326 messages: &api_messages,
327 tools: &tools,
328 system_prompt: Some(&effective_system_prompt),
329 on_retry: Some(&retry_callback),
330 }) {
331 Ok(r) => r,
332 Err(e) => {
333 set_status(TeammateStatus::Error(e.clone()));
334 return format!("{}\n{}", last_assistant_text, e);
335 }
336 };
337
338 let response_choice = response
340 .choices
341 .into_iter()
342 .next()
343 .expect("call_llm_non_stream validates non-empty choices");
344
345 if let Some(usage) = response.usage
347 && let Ok(mut m) = sub_agent_metrics.lock()
348 {
349 m.total_llm_calls += 1;
350 m.total_input_tokens += usage.prompt_tokens;
351 m.total_output_tokens += usage.completion_tokens;
352 }
353
354 let assistant_text = response_choice.message.content.clone().unwrap_or_default();
355 let reasoning_content = response_choice.message.reasoning_content.clone();
356
357 let is_ignore_only = response_choice
360 .message
361 .tool_calls
362 .as_ref()
363 .map(|calls| {
364 !calls.is_empty() && calls.iter().all(|c| c.function.name == "IgnoreMessage")
365 })
366 .unwrap_or(false);
367
368 if !assistant_text.is_empty() && !is_ignore_only {
369 last_assistant_text = assistant_text.clone();
370 if let Ok(manager) = teammate_manager.lock() {
374 let sender_label = format!("Teammate@{}", name);
375 let display_msg = ChatMessage::text(MessageRole::Assistant, &assistant_text)
376 .with_sender(&sender_label)
377 .with_display_hint(DisplayHint::Draft);
378 if let Ok(mut display) = manager.display_messages.lock() {
379 display.push(display_msg);
380 }
381 }
382 }
383
384 let has_tool_calls = response_choice.finish_reason.as_deref() == Some("tool_calls");
386
387 if !has_tool_calls || response_choice.message.tool_calls.is_none() {
388 set_status(TeammateStatus::WaitingForMessage);
390
391 if !assistant_text.is_empty() {
394 messages.push(ChatMessage::text(
395 MessageRole::Assistant,
396 assistant_text.clone(),
397 ));
398 if let Some(last) = messages.last() {
399 append_messages(std::slice::from_ref(last));
400 }
401 }
402
403 let len_before_drain = messages.len();
405 let _ = drain_broadcast_messages(&mut messages, &broadcast_inbox);
406 if messages.len() > len_before_drain {
407 append_messages(&messages[len_before_drain..]);
408 }
409
410 if wake_flag.swap(false, Ordering::Relaxed) {
415 if work_done.load(Ordering::Relaxed) {
416 work_done.store(false, Ordering::Relaxed);
417 write_info_log(
418 "TeammateLoop",
419 &format!("{}: re-activated after WorkDone by @mention", name),
420 );
421 }
422 consecutive_idle_polls = 0;
423 continue;
424 }
425
426 consecutive_idle_polls += 1;
427 if consecutive_idle_polls >= MAX_CONSECUTIVE_IDLE_POLLS {
428 write_info_log(
429 "TeammateLoop",
430 &format!(
431 "{}: idle for {} rounds (~2min), exiting",
432 name, consecutive_idle_polls
433 ),
434 );
435 break;
436 }
437
438 for _ in 0..POLL_CHECK_INTERVAL {
440 if cancel_token.is_cancelled() {
441 set_status(TeammateStatus::Cancelled);
442 return format!("{}\n[Teammate '{}' cancelled]", last_assistant_text, name);
443 }
444 if work_done.load(Ordering::Relaxed) {
445 break;
446 }
447 std::thread::sleep(std::time::Duration::from_millis(POLL_SLEEP_MILLIS));
448
449 let len_before_drain = messages.len();
452 let _ = drain_broadcast_messages(&mut messages, &broadcast_inbox);
453 if messages.len() > len_before_drain {
454 append_messages(&messages[len_before_drain..]);
455 }
456
457 if wake_flag.swap(false, Ordering::Relaxed) {
458 if work_done.load(Ordering::Relaxed) {
459 work_done.store(false, Ordering::Relaxed);
460 write_info_log(
461 "TeammateLoop",
462 &format!("{}: re-activated after WorkDone by @mention", name),
463 );
464 }
465 consecutive_idle_polls = 0;
466 break;
467 }
468 }
469 continue;
470 }
471
472 let Some(tool_calls) = response_choice.message.tool_calls.as_ref() else {
474 continue;
475 };
476 let tool_items = extract_tool_items(tool_calls);
477 if tool_items.is_empty() {
478 break;
479 }
480
481 consecutive_idle_polls = 0;
483
484 messages.push(ChatMessage {
485 role: MessageRole::Assistant,
486 content: assistant_text,
487 tool_calls: Some(tool_items.clone()),
488 tool_call_id: None,
489 images: None,
490 reasoning_content,
491 sender_name: None,
492 recipient_name: None,
493 display_hint: DisplayHint::Normal,
494 });
495 if let Some(last) = messages.last() {
496 append_messages(std::slice::from_ref(last));
497 }
498
499 if let Ok(manager) = teammate_manager.lock() {
505 let sender_label = format!("Teammate@{}", name);
506 for item in &tool_items {
507 if !matches!(item.name.as_str(), "SendMessage" | "IgnoreMessage") {
509 let context_msg = ChatMessage::text(
510 MessageRole::Assistant,
511 format!(
512 "<{}>[called tool {}]</{}>",
513 sender_label, item.name, sender_label
514 ),
515 )
516 .with_sender(&sender_label);
517 if let Ok(mut context) = manager.context_messages.lock() {
518 context.push(context_msg);
519 }
520 }
521 if let Ok(mut display) = manager.display_messages.lock() {
523 display.push(ChatMessage {
524 role: MessageRole::Assistant,
525 content: String::new(),
526 tool_calls: Some(vec![item.clone()]),
527 tool_call_id: None,
528 images: None,
529 reasoning_content: None,
530 sender_name: Some(sender_label.clone()),
531 recipient_name: None,
532 display_hint: DisplayHint::Normal,
533 });
534 }
535 }
536 }
537
538 let mut gate_triggered = false;
540 let mut gated_send_content: Option<String> = None;
541
542 for item in &tool_items {
543 if cancel_token.is_cancelled() {
544 messages.push(ChatMessage {
545 role: MessageRole::Tool,
546 content: "[Cancelled]".to_string(),
547 tool_calls: None,
548 tool_call_id: Some(item.id.clone()),
549 images: None,
550 reasoning_content: None,
551 sender_name: None,
552 recipient_name: None,
553 display_hint: DisplayHint::Normal,
554 });
555 if let Some(last) = messages.last() {
556 append_messages(std::slice::from_ref(last));
557 }
558 continue;
559 }
560
561 if item.name == "SendMessage" {
563 let inbox_has_new = broadcast_inbox
564 .lock()
565 .map(|inbox| !inbox.is_empty())
566 .unwrap_or(false);
567
568 if inbox_has_new && send_gate_retries < MAX_SEND_GATE_RETRIES {
569 gate_triggered = true;
571 gated_send_content = Some(item.arguments.clone());
572 send_gate_retries += 1;
573
574 write_info_log(
575 "TeammateLoop",
576 &format!(
577 "{}: SendMessage gated (attempt {}/{}), new messages in inbox",
578 name, send_gate_retries, MAX_SEND_GATE_RETRIES
579 ),
580 );
581
582 messages.push(ChatMessage {
584 role: MessageRole::Tool,
585 content: "[SendMessage held: new messages arrived during your thinking, please re-evaluate]".to_string(),
586 tool_calls: None,
587 tool_call_id: Some(item.id.clone()),
588 images: None,
589 reasoning_content: None,
590 sender_name: None,
591 recipient_name: None,
592 display_hint: DisplayHint::Normal,
593 });
594 if let Some(last) = messages.last() {
595 append_messages(std::slice::from_ref(last));
596 }
597 continue; } else if !inbox_has_new {
599 send_gate_retries = 0;
601 }
602 }
604
605 if let Ok(mut ct) = current_tool.lock() {
607 *ct = Some(item.name.clone());
608 }
609 set_status(TeammateStatus::Working);
610 tool_calls_count.fetch_add(1, Ordering::Relaxed);
611
612 let result_msg = execute_tool_with_permission(
613 item,
614 &ToolExecContext {
615 registry: ®istry,
616 jcli_config: &jcli_config,
617 cancelled: &cancel_flag,
618 log_tag: "TeammateLoop",
619 verbose: false,
620 },
621 );
622
623 if let Ok(mut m) = sub_agent_metrics.lock() {
625 m.total_tool_calls += 1;
626 }
627
628 if let Ok(manager) = teammate_manager.lock()
630 && let Ok(mut display) = manager.display_messages.lock()
631 {
632 display.push(ChatMessage {
633 role: MessageRole::Tool,
634 content: result_msg.content.clone(),
635 tool_calls: None,
636 tool_call_id: result_msg.tool_call_id.clone(),
637 images: None,
638 reasoning_content: None,
639 sender_name: Some(format!("Teammate@{}", name)),
640 recipient_name: None,
641 display_hint: DisplayHint::Normal,
642 });
643 }
644 messages.push(result_msg);
645 if let Some(last) = messages.last() {
646 append_messages(std::slice::from_ref(last));
647 }
648
649 if let Ok(mut ct) = current_tool.lock() {
651 *ct = None;
652 }
653 set_status(TeammateStatus::Thinking);
654 }
655
656 if gate_triggered {
658 let len_before = messages.len();
659 let _ = drain_broadcast_messages(&mut messages, &broadcast_inbox);
660 if messages.len() > len_before {
661 append_messages(&messages[len_before..]);
662 }
663
664 if let Some(ref content) = gated_send_content {
665 let reminder = format!(
666 "<system_reminder>\
667 New messages arrived while you were thinking. Your pending SendMessage was held (attempt {}/{}).\n\
668 Held content: {}\n\n\
669 The new messages have been injected above. Please review and decide:\n\
670 - Call SendMessage again (possibly revised) to send\n\
671 - Or don't call SendMessage to discard the held message\
672 </system_reminder>",
673 send_gate_retries, MAX_SEND_GATE_RETRIES, content
674 );
675 messages.push(ChatMessage::text(MessageRole::User, &reminder));
676 if let Some(last) = messages.last() {
677 append_messages(std::slice::from_ref(last));
678 }
679 }
680 }
681
682 sync_messages(&messages);
684 }
685
686 set_status(TeammateStatus::Completed);
688 if !work_done.load(Ordering::Relaxed)
691 && let Ok(manager) = teammate_manager.lock()
692 {
693 let sender_label = format!("Teammate@{}", name);
694 let display_msg = ChatMessage::text(MessageRole::Assistant, "[work completed]")
695 .with_sender(&sender_label);
696 let context_msg = ChatMessage::text(
697 MessageRole::Assistant,
698 format!("<{}>[work completed]</{}>", sender_label, sender_label),
699 )
700 .with_sender(&sender_label);
701 if let Ok(mut display) = manager.display_messages.lock() {
702 display.push(display_msg);
703 }
704 if let Ok(mut context) = manager.context_messages.lock() {
705 context.push(context_msg);
706 }
707 let done_msg = ChatMessage::text(MessageRole::Assistant, "[work completed]".to_string());
709 append_messages(std::slice::from_ref(&done_msg));
710 }
711
712 if last_assistant_text.is_empty() {
713 format!("[Teammate '{}' completed with no output]", name)
714 } else {
715 last_assistant_text
716 }
717}
718
719fn build_teammate_system_prompt(
727 name: &str,
728 role: &str,
729 base_prompt: Option<&str>,
730 teammate_manager: &Arc<Mutex<TeammateManager>>,
731) -> String {
732 let template = crate::template::teammate_system_prompt_template();
733 let base = base_prompt.unwrap_or("You are a helpful assistant.");
734 let team_summary = teammate_manager
735 .lock()
736 .map(|m| m.team_summary())
737 .unwrap_or_default();
738
739 template
740 .replace("{{.base_prompt}}", base)
741 .replace("{{.name}}", name)
742 .replace("{{.role}}", role)
743 .replace("{{.team_summary}}", &team_summary)
744}
745
746fn drain_broadcast_messages(
749 messages: &mut Vec<ChatMessage>,
750 pending: &Arc<Mutex<Vec<ChatMessage>>>,
751) -> bool {
752 if let Ok(mut pending_msgs) = pending.lock() {
753 if pending_msgs.is_empty() {
754 return false;
755 }
756 messages.append(&mut *pending_msgs);
757 true
758 } else {
759 false
760 }
761}