1use anyhow::Result;
4use std::sync::Arc;
5use std::sync::atomic::{AtomicU8, Ordering};
6use tokio::sync::mpsc;
7
8use crate::approval::ApproveMode;
9use crate::cancel::CancellationToken;
10use crate::compress::{CompressionConfig, CompressionStrategy, compress_messages, estimate_total_tokens, should_compress};
11use crate::event::{AgentEvent, EventData, EventType};
12use crate::prompt::{PromptProfile, preprocess::{preprocess_with_skills, ProcessResult}};
13use crate::providers::{ChatRequest, Message, MessageContent, Role};
14use crate::skills::Skill;
15use crate::tools::Tool;
16use crate::tools::ToolDefinition;
17use crate::tools::toolproxy::{ProxyToolDef, ProxyToolExecutor};
18
19use super::core::{AgentConfig, AgentState};
20use super::context::AgentContext;
21use super::session::SessionManager;
22use super::types::{Agent, AgentBuilder, MAX_ITERATIONS};
23
24#[allow(dead_code)]
25impl Agent {
26 pub(crate) fn new(builder: AgentBuilder) -> Self {
27 let event_tx = builder.event_tx.unwrap_or_else(|| {
29 let (tx, _) = mpsc::channel(100);
30 tx
31 });
32
33 let config = AgentConfig::new(
35 builder.max_tokens,
36 builder.context_size_override,
37 builder.think,
38 builder.compression_config,
39 );
40
41 let state = AgentState::new();
42
43 let context = AgentContext::with_context(
45 builder.profile,
46 builder.skills,
47 builder.project_overview,
48 builder.memory_summary,
49 builder.project_path,
50 );
51
52 let session = SessionManager::with_all_channels(
54 event_tx.clone(),
55 None, builder.pending_input_rx,
57 );
58
59 Self {
60 config,
62 state,
63 context,
64 session,
65
66 provider: builder.provider,
68 model_name: builder.model_name,
69 tools: builder.tools,
70
71 event_tx,
73
74 approve_mode: Arc::new(AtomicU8::new(builder.approve_mode.to_u8())),
76
77 proxy_tool_defs: builder.proxy_tool_defs,
79 proxy_executor: builder.proxy_executor,
80
81 mcp_registry: builder.mcp_registry,
83 lsp_registry: builder.lsp_registry,
84 }
85 }
86
87 pub(crate) fn messages(&self) -> &Vec<Message> {
92 self.state.messages()
93 }
94
95 pub(crate) fn messages_mut(&mut self) -> &mut Vec<Message> {
97 self.state.messages_mut()
98 }
99
100 pub(crate) fn system_prompt(&self) -> &str {
102 self.context.system_prompt()
103 }
104
105 pub(crate) fn max_tokens(&self) -> u32 {
107 self.config.max_tokens()
108 }
109
110 pub(crate) fn context_size_override(&self) -> Option<u32> {
112 self.config.context_size_override()
113 }
114
115 pub(crate) fn think(&self) -> bool {
117 self.config.think()
118 }
119
120 pub(crate) fn compression_config(&self) -> &CompressionConfig {
122 self.config.compression_config()
123 }
124
125 pub(crate) fn compression_config_mut(&mut self) -> &mut CompressionConfig {
127 self.config.compression_config_mut()
128 }
129
130 pub(crate) fn cancel_token(&self) -> Option<&CancellationToken> {
132 self.session.cancel_token()
133 }
134
135 pub(crate) fn event_tx(&self) -> &mpsc::Sender<AgentEvent> {
137 &self.event_tx
138 }
139
140 pub(crate) fn skills(&self) -> &[Skill] {
142 self.context.skills()
143 }
144
145 pub(crate) fn profile(&self) -> &PromptProfile {
147 self.context.profile()
148 }
149
150 pub(crate) fn project_overview(&self) -> Option<&str> {
152 self.context.project_overview()
153 }
154
155 pub(crate) fn memory_summary(&self) -> Option<&str> {
157 self.context.memory_summary()
158 }
159
160 pub(crate) fn project_path(&self) -> Option<&std::path::PathBuf> {
162 self.context.project_path()
163 }
164
165 pub(crate) fn is_cancelled(&self) -> bool {
167 self.session.is_cancelled()
168 }
169
170 pub(crate) fn total_input_tokens(&self) -> u64 {
172 self.state.total_input_tokens()
173 }
174
175 pub(crate) fn total_output_tokens(&self) -> u64 {
177 self.state.total_output_tokens()
178 }
179
180 pub(crate) fn last_input_tokens(&self) -> u64 {
182 self.state.last_input_tokens()
183 }
184
185 pub(crate) fn todo_reminder_count(&self) -> &std::collections::HashMap<String, usize> {
187 self.state.todo_reminder_count_map()
188 }
189
190 pub(crate) fn todo_reminder_count_mut(&mut self) -> &mut std::collections::HashMap<String, usize> {
192 self.state.todo_reminder_count_map_mut()
193 }
194
195 pub(crate) fn pending_inputs(&self) -> &Vec<String> {
197 self.state.pending_inputs_vec()
198 }
199
200 pub(crate) fn pending_inputs_mut(&mut self) -> &mut Vec<String> {
202 self.state.pending_inputs_vec_mut()
203 }
204
205 pub(crate) fn ask_rx(&mut self) -> Option<&mut mpsc::Receiver<String>> {
207 self.session.ask_rx()
208 }
209
210 pub(crate) fn effective_context_size(&self) -> Option<u32> {
212 self.config.context_size_override()
213 .or_else(|| self.provider.context_size())
214 }
215
216 pub fn event_sender(&self) -> mpsc::Sender<AgentEvent> {
218 self.event_tx.clone()
219 }
220
221 pub fn set_ask_channel(&mut self, rx: mpsc::Receiver<String>) {
223 self.session.set_ask_channel(rx);
224 }
225
226 pub(crate) fn has_ask_channel(&self) -> bool {
228 self.session.has_ask_channel()
229 }
230
231 pub(crate) fn ask_channel(&mut self) -> Option<&mut mpsc::Receiver<String>> {
233 self.session.ask_rx()
234 }
235
236 pub fn set_proxy_executor(
238 &mut self,
239 executor: Arc<dyn ProxyToolExecutor>,
240 tool_defs: Vec<ProxyToolDef>,
241 ) {
242 self.proxy_executor = Some(executor);
243 self.proxy_tool_defs = tool_defs;
244 }
245
246 pub fn set_cancel_token(&mut self, token: CancellationToken) {
248 self.session.set_cancel_token(token);
249 }
250
251 pub(crate) fn get_cancel_token(&self) -> Option<&CancellationToken> {
253 self.session.cancel_token()
254 }
255
256 pub fn set_approve_mode(&mut self, mode: ApproveMode) {
258 let old = ApproveMode::from_u8(self.approve_mode.load(Ordering::Relaxed));
259 log::info!("Agent approve mode changed: {} -> {}", old, mode);
260 self.approve_mode.store(mode.to_u8(), Ordering::Relaxed);
261 }
262
263 pub fn approve_mode_shared(&self) -> Arc<AtomicU8> {
265 self.approve_mode.clone()
266 }
267
268 pub fn set_approve_mode_shared(&mut self, shared: Arc<AtomicU8>) {
270 self.approve_mode = shared;
271 }
272
273 pub fn update_memory_summary(&mut self, summary: Option<String>) {
276 self.context.update_memory(summary);
277 }
279
280 pub fn refresh_codegraph_tools(&mut self) {
284 if let Some(path) = self.context.project_path() {
285 let should_have_codegraph =
287 crate::tools::codegraph::should_inject_codegraph_tools(path);
288
289 let has_codegraph = self.tools.iter().any(|t| {
291 let name = t.definition().name;
292 name.starts_with("code_") && name != "code_review"
293 });
294
295 if should_have_codegraph != has_codegraph {
297 if should_have_codegraph {
299 let codegraph_tools = crate::tools::codegraph::codegraph_tools(path);
300 for tool in codegraph_tools {
301 self.tools.push(Arc::from(tool));
302 }
303 } else {
304 self.tools.retain(|t| {
306 let name = t.definition().name;
307 !name.starts_with("code_") || name == "code_review"
308 });
309 }
310 self.context.rebuild_system_prompt_with_workflows(Some(path.clone()));
312 }
313 }
314 }
315
316 pub async fn run(&mut self, user_input: String) -> Result<Vec<AgentEvent>> {
318 self.emit(AgentEvent::session_started())?;
319
320 let preprocess_result = self.preprocess_input(&user_input);
322
323 let processed_input = match preprocess_result {
325 ProcessResult::SkillTriggered {
326 skill_id,
327 confidence,
328 skill_body,
329 } => {
330 log::info!(
331 "Skill triggered: {} (confidence: {:.2})",
332 skill_id,
333 confidence
334 );
335 self.emit(AgentEvent::progress(
336 format!("🎯 触发技能: {}", skill_id),
337 None,
338 ))?;
339
340 if let Some(body) = skill_body {
342 let enhanced_input = format!(
344 "<command-name>{}</command-name>\n\n{}\n\n---\n\nUser request: {}",
345 skill_id,
346 body,
347 user_input
348 );
349 enhanced_input
350 } else {
351 let enhanced_input = format!(
353 "User invoked skill '{}'. Use the `skill` tool with name '{}' to load its instructions before proceeding.\n\nUser request: {}",
354 skill_id,
355 skill_id,
356 user_input
357 );
358 enhanced_input
359 }
360 }
361 ProcessResult::WorkflowTriggered {
362 workflow_id,
363 inputs,
364 } => {
365 log::info!("Workflow triggered: {} with inputs: {:?}", workflow_id, inputs);
366 self.emit(AgentEvent::progress(
367 format!("🔄 触发工作流: {}", workflow_id),
368 None,
369 ))?;
370 let inputs_json = serde_json::to_string_pretty(&inputs).unwrap_or_default();
372 let enhanced_input = format!(
373 "Workflow '{}' triggered with extracted inputs:\n{}\n\nUser request: {}",
374 workflow_id,
375 inputs_json,
376 user_input
377 );
378 enhanced_input
379 }
380 ProcessResult::Continue => {
381 user_input
383 }
384 };
385
386 self.state.add_message(Message {
388 role: Role::User,
389 content: MessageContent::Text(processed_input),
390 });
391
392 let mut iterations = 0;
393 let mut should_continue = true;
394 const ITERATION_WARNING_THRESHOLD: usize = MAX_ITERATIONS - 10;
395
396 while should_continue && iterations < MAX_ITERATIONS {
397 iterations += 1;
398
399 self.drain_pending_inputs();
402 if self.has_pending_inputs() {
403 let pending = self.take_pending_inputs();
404 let count = pending.len();
405 let merged = pending.join("\n\n---\n\n");
406 log::info!("Adding {} pending input messages to request", count);
407
408 self.emit(AgentEvent::queue_processed(count, pending.clone()))?;
410
411 self.state.add_message(Message {
412 role: Role::User,
413 content: MessageContent::Text(merged),
414 });
415 }
416
417 if self.session.is_cancelled() {
418 self.emit(AgentEvent::error(
419 crate::prompt::MSG_OPERATION_CANCELLED.to_string(),
420 None,
421 None,
422 ))?;
423 break;
424 }
425
426 if iterations == ITERATION_WARNING_THRESHOLD {
428 self.emit(AgentEvent::progress(
429 crate::prompt::MSG_ITERATION_WARNING_UI
430 .replace("{iterations}", &iterations.to_string())
431 .replace("{max_iterations}", &MAX_ITERATIONS.to_string()),
432 None,
433 ))?;
434 }
435
436 let context_size = self.effective_context_size();
439 let estimated_tokens = estimate_total_tokens(self.state.messages());
440
441 if should_compress(estimated_tokens, context_size, self.config.compression_config()) {
442 self.emit(AgentEvent::progress("⚠️ 上下文过大,正在预压缩...", None))?;
443
444 match compress_messages(
445 self.state.messages(),
446 CompressionStrategy::SlidingWindow,
447 self.config.compression_config(),
448 ) {
449 Ok(compressed) => {
450 let compressed_tokens = estimate_total_tokens(&compressed);
451 self.state.set_messages(compressed);
452 crate::debug::debug_log().compression(
453 estimated_tokens,
454 compressed_tokens,
455 compressed_tokens as f32 / estimated_tokens as f32,
456 );
457 }
458 Err(e) => {
459 self.emit(AgentEvent::progress(format!("预压缩失败: {}", e), None))?;
460 }
461 }
462 }
463
464 let tool_defs: Vec<ToolDefinition> = {
466 let mut defs: Vec<ToolDefinition> = self
467 .tools
468 .iter()
469 .map(|t| {
470 let def = t.definition();
471 let description = def.description_for_llm();
472 ToolDefinition {
473 name: def.name,
474 description,
475 parameters: def.parameters,
476 is_priority: def.is_priority,
477 }
478 })
479 .collect();
480 defs.extend(self.proxy_tool_defs.iter().map(|t| {
482 let def = &t.definition;
483 let description = def.description_for_llm();
484 ToolDefinition {
485 name: def.name.clone(),
486 description,
487 parameters: def.parameters.clone(),
488 is_priority: def.is_priority,
489 }
490 }));
491 defs
492 };
493 let request = ChatRequest {
494 system: Some(self.system_prompt().to_string()),
495 messages: self.state.messages().clone(),
496 max_tokens: self.max_tokens(),
497 tools: tool_defs,
498 think: self.think(),
499 enable_caching: true,
500 server_tools: Vec::new(),
501 };
502
503 let response = self.call_streaming(&request).await?;
504
505 self.track_usage(&response.usage);
506
507 crate::debug::debug_log().api_call(
508 &self.model_name,
509 response.usage.input_tokens,
510 response.usage.cache_read_input_tokens > 0,
511 );
512
513 should_continue = self.process_response(&response).await?;
514
515 if !should_continue && iterations < MAX_ITERATIONS - 1 {
518 self.drain_pending_inputs();
520
521 if self.has_pending_inputs() {
522 log::info!("Agent: found pending inputs at session end, continuing loop");
523 should_continue = true;
524 continue; }
526
527 if self.last_message_was_todo_reminder() {
530 log::info!("Skipping todo check: reminder already sent in recent messages");
531 } else {
532 const MAX_TODO_REMINDERS: usize = 2;
533
534 let reminder_count_clone = self.state.todo_reminder_count_map().clone();
536 let (pending, all_at_limit) = self.get_pending_todos_with_limit(
537 &reminder_count_clone,
538 MAX_TODO_REMINDERS
539 );
540
541 if !pending.is_empty() {
542 for (_, content) in &pending {
544 self.state.increment_todo_reminder(content.clone());
545 }
546
547 let pending_list = pending
548 .iter()
549 .map(|(status, content)| {
550 let marker = match status.as_str() {
551 "in_progress" => "[~]",
552 "pending" => "[ ]",
553 _ => "[?]",
554 };
555 format!(" {} {}", marker, content)
556 })
557 .collect::<Vec<_>>()
558 .join("\n");
559
560 let reminder = format!(
561 "📋 任务尚未完成。以下待办项需要处理:\n{}\n\n请继续执行,或在 todo_write 中标记为 completed。如遇阻塞请说明原因。",
562 pending_list
563 );
564
565 self.state.add_message(Message {
566 role: Role::User,
567 content: MessageContent::Text(reminder),
568 });
569 should_continue = true;
570 } else if all_at_limit && !self.state.todo_reminder_count_map().is_empty() {
571 let remaining_count = self.state.todo_reminder_count_map().len();
574 self.emit(AgentEvent::progress(
575 format!(
576 "⚠️ 会话结束:{} 个待办项未完成(已提醒 {} 次,达到上限)",
577 remaining_count, MAX_TODO_REMINDERS
578 ),
579 None,
580 ))?;
581 log::warn!(
582 "Session ending with {} incomplete todos (reminder limit reached)",
583 remaining_count
584 );
585 }
586 }
587 }
588
589 let context_size = self.effective_context_size();
590 let api_tokens = self.state.last_input_tokens() as u32;
591 let estimated_tokens = estimate_total_tokens(self.state.messages());
592
593 let current_tokens = if api_tokens > 0 {
598 api_tokens
599 } else {
600 estimated_tokens
601 };
602
603 if let Some(ctx_size) = context_size {
606 self.emit(AgentEvent::with_data(
608 EventType::ContextSize,
609 EventData::ContextSize {
610 context_size: ctx_size as u64,
611 },
612 ))?;
613
614 let usage_ratio = current_tokens as f64 / ctx_size as f64;
615 if usage_ratio >= 0.3 {
616 crate::debug::debug_log().log(
617 "checkcompress",
618 &format!(
619 "usage={:.1}%, tokens={}, context={}, threshold={}%",
620 usage_ratio * 100.0,
621 current_tokens,
622 ctx_size,
623 self.config.compression_config().threshold * 100.0
624 ),
625 );
626 }
627 }
628
629 if should_compress(current_tokens, context_size, self.config.compression_config()) {
630 self.emit(AgentEvent::progress(crate::prompt::MSG_COMPRESSING_CONTEXT, None))?;
631
632 let original_tokens = current_tokens;
633
634 match compress_messages(
635 self.state.messages(),
636 CompressionStrategy::SlidingWindow,
637 self.config.compression_config(),
638 ) {
639 Ok(compressed) => {
640 let compressed_tokens = estimate_total_tokens(&compressed);
641 self.state.set_messages(compressed);
642 self.state.set_total_input_tokens(compressed_tokens as u64);
643 self.state.set_last_input_tokens(compressed_tokens as u64);
644
645 let ratio = compressed_tokens as f32 / original_tokens as f32;
646 crate::debug::debug_log().compression(
647 original_tokens,
648 compressed_tokens,
649 ratio,
650 );
651
652 self.emit(AgentEvent::with_data(
653 EventType::CompressionCompleted,
654 EventData::Compression {
655 original_tokens: original_tokens as u64,
656 compressed_tokens: compressed_tokens as u64,
657 ratio: compressed_tokens as f32 / original_tokens as f32,
658 },
659 ))?;
660 }
661 Err(e) => {
662 self.emit(AgentEvent::progress(
663 format!("{}{}", crate::prompt::MSG_COMPRESSION_FAILED, e),
664 None,
665 ))?;
666 }
667 }
668 }
669 }
670
671 if iterations >= MAX_ITERATIONS && should_continue {
673 self.emit(AgentEvent::error(
674 crate::prompt::MSG_MAX_ITERATIONS_REACHED
675 .replace("{max_iterations}", &MAX_ITERATIONS.to_string())
676 .replace("{iterations}", &iterations.to_string()),
677 Some("MAX_ITERATIONS_REACHED".to_string()),
678 Some("agent/run.rs".to_string()),
679 ))?;
680 }
681
682 self.emit(AgentEvent::usage_with_cache(
683 self.state.total_input_tokens(),
684 self.state.total_output_tokens(),
685 0,
686 0,
687 ))?;
688
689 self.emit(AgentEvent::session_ended())?;
690
691 Ok(Vec::new())
692 }
693
694 pub fn set_messages(&mut self, messages: Vec<Message>) {
696 self.state.set_messages(messages);
697 }
698
699 pub fn get_messages(&self) -> &[Message] {
701 self.messages()
702 }
703
704 pub fn get_tools(&self) -> &[Arc<dyn Tool>] {
706 &self.tools
707 }
708
709 pub fn get_system_prompt(&self) -> &str {
711 self.system_prompt()
712 }
713
714 pub fn get_token_counts(&self) -> (u64, u64) {
716 (
717 self.state.total_input_tokens(),
718 self.state.total_output_tokens(),
719 )
720 }
721
722 pub fn clear_history(&mut self) {
724 self.messages_mut().clear();
725 self.state.set_total_input_tokens(0);
726 self.state.set_total_output_tokens(0);
727 self.state.set_last_input_tokens(0);
728 }
729
730 pub fn message_count(&self) -> usize {
732 self.messages().len()
733 }
734
735 pub fn preprocess_input(&self, user_input: &str) -> ProcessResult {
751 preprocess_with_skills(user_input, self.skills())
753 }
754
755 pub fn inject_skill_context(&self, skill_id: &str, skill_body: Option<&str>) -> String {
767 if let Some(body) = skill_body {
768 format!(
769 "<command-name>{}</command-name>\n\n{}\n\n**Important**: Follow the skill instructions above before responding to the user request below.",
770 skill_id,
771 body.trim_end()
772 )
773 } else {
774 format!(
775 "Skill '{}' was triggered but not auto-loaded. The model should call the `skill` tool with name '{}' to load its instructions.",
776 skill_id,
777 skill_id
778 )
779 }
780 }
781
782 pub async fn add_mcp_server(
796 &mut self,
797 name: &str,
798 config: crate::mcp::McpServerConfig,
799 ) -> Result<()> {
800 if let Some(registry) = &self.mcp_registry {
801 let mut reg = registry.write().await;
802 reg.add_server(name.to_string(), config);
803 log::info!("MCP server '{}' added to registry", name);
804 } else {
805 log::warn!("MCP registry not initialized, cannot add server '{}'", name);
806 }
807 Ok(())
808 }
809
810 pub async fn remove_mcp_server(&mut self, name: &str) -> Result<()> {
812 if let Some(registry) = &self.mcp_registry {
813 let mut reg = registry.write().await;
814 reg.remove_server(name).await?;
815 log::info!("MCP server '{}' removed from registry", name);
816 }
817 Ok(())
818 }
819
820 pub async fn mcp_server_status(&self) -> Vec<crate::mcp::ServerStatus> {
822 if let Some(registry) = &self.mcp_registry {
823 let reg = registry.read().await;
824 reg.server_status().await.values().cloned().collect()
825 } else {
826 Vec::new()
827 }
828 }
829
830 pub async fn start_mcp_server(
832 &self,
833 name: &str,
834 ) -> Result<Vec<Arc<crate::mcp::McpToolWrapper>>> {
835 if let Some(registry) = &self.mcp_registry {
836 let reg = registry.read().await;
837 if let Some(placeholder) = reg.get_server(name) {
838 let tools = placeholder.start().await?;
839 log::info!("MCP server '{}' started with {} tools", name, tools.len());
840 Ok(tools)
841 } else {
842 Err(anyhow::anyhow!(
843 "MCP server '{}' not found in registry",
844 name
845 ))
846 }
847 } else {
848 Err(anyhow::anyhow!("MCP registry not initialized"))
849 }
850 }
851}