1use anyhow::Result;
4use std::sync::Arc;
5use std::sync::atomic::{AtomicU8, Ordering};
6use tokio::sync::mpsc;
7
8use crate::approval::ApproveMode;
9use crate::cancel::CancellationToken;
10use crate::compress::{CompressionConfig, CompressionStrategy, compress_messages, estimate_total_tokens, should_compress};
11use crate::event::{AgentEvent, EventData, EventType};
12use crate::prompt::{PromptProfile, preprocess::{preprocess_with_skills, ProcessResult}};
13use crate::providers::{ChatRequest, Message, MessageContent, Role};
14use crate::skills::Skill;
15use crate::tools::Tool;
16use crate::tools::ToolDefinition;
17use crate::tools::toolproxy::{ProxyToolDef, ProxyToolExecutor};
18
19use super::core::{AgentConfig, AgentState};
20use super::context::AgentContext;
21use super::session::SessionManager;
22use super::types::{Agent, AgentBuilder, MAX_ITERATIONS};
23
24#[allow(dead_code)]
25impl Agent {
26 pub(crate) fn new(builder: AgentBuilder) -> Self {
27 let event_tx = builder.event_tx.unwrap_or_else(|| {
29 let (tx, _) = mpsc::channel(100);
30 tx
31 });
32
33 let config = AgentConfig::new(
35 builder.max_tokens,
36 builder.context_size_override,
37 builder.think,
38 builder.compression_config,
39 );
40
41 let state = AgentState::new();
42
43 let context = AgentContext::with_context(
45 builder.profile,
46 builder.skills,
47 builder.project_overview,
48 builder.memory_summary,
49 builder.project_path,
50 );
51
52 let session = SessionManager::with_all_channels(
54 event_tx.clone(),
55 None, builder.pending_input_rx,
57 );
58
59 Self {
60 config,
62 state,
63 context,
64 session,
65
66 provider: builder.provider,
68 model_name: builder.model_name,
69 tools: builder.tools,
70
71 event_tx,
73
74 approve_mode: Arc::new(AtomicU8::new(builder.approve_mode.to_u8())),
76
77 proxy_tool_defs: builder.proxy_tool_defs,
79 proxy_executor: builder.proxy_executor,
80
81 mcp_registry: builder.mcp_registry,
83 lsp_registry: builder.lsp_registry,
84 }
85 }
86
87 pub(crate) fn messages(&self) -> &Vec<Message> {
92 self.state.messages()
93 }
94
95 pub(crate) fn messages_mut(&mut self) -> &mut Vec<Message> {
97 self.state.messages_mut()
98 }
99
100 pub(crate) fn system_prompt(&self) -> &str {
102 self.context.system_prompt()
103 }
104
105 pub(crate) fn max_tokens(&self) -> u32 {
107 self.config.max_tokens()
108 }
109
110 pub(crate) fn context_size_override(&self) -> Option<u32> {
112 self.config.context_size_override()
113 }
114
115 pub(crate) fn think(&self) -> bool {
117 self.config.think()
118 }
119
120 pub(crate) fn compression_config(&self) -> &CompressionConfig {
122 self.config.compression_config()
123 }
124
125 pub(crate) fn compression_config_mut(&mut self) -> &mut CompressionConfig {
127 self.config.compression_config_mut()
128 }
129
130 pub(crate) fn cancel_token(&self) -> Option<&CancellationToken> {
132 self.session.cancel_token()
133 }
134
135 pub(crate) fn event_tx(&self) -> &mpsc::Sender<AgentEvent> {
137 &self.event_tx
138 }
139
140 pub(crate) fn skills(&self) -> &[Skill] {
142 self.context.skills()
143 }
144
145 pub(crate) fn profile(&self) -> &PromptProfile {
147 self.context.profile()
148 }
149
150 pub(crate) fn project_overview(&self) -> Option<&str> {
152 self.context.project_overview()
153 }
154
155 pub(crate) fn memory_summary(&self) -> Option<&str> {
157 self.context.memory_summary()
158 }
159
160 pub(crate) fn project_path(&self) -> Option<&std::path::PathBuf> {
162 self.context.project_path()
163 }
164
165 pub(crate) fn is_cancelled(&self) -> bool {
167 self.session.is_cancelled()
168 }
169
170 pub(crate) fn total_input_tokens(&self) -> u64 {
172 self.state.total_input_tokens()
173 }
174
175 pub(crate) fn total_output_tokens(&self) -> u64 {
177 self.state.total_output_tokens()
178 }
179
180 pub(crate) fn last_input_tokens(&self) -> u64 {
182 self.state.last_input_tokens()
183 }
184
185 pub(crate) fn todo_reminder_count(&self) -> &std::collections::HashMap<String, usize> {
187 self.state.todo_reminder_count_map()
188 }
189
190 pub(crate) fn todo_reminder_count_mut(&mut self) -> &mut std::collections::HashMap<String, usize> {
192 self.state.todo_reminder_count_map_mut()
193 }
194
195 pub(crate) fn pending_inputs(&self) -> &Vec<String> {
197 self.state.pending_inputs_vec()
198 }
199
200 pub(crate) fn pending_inputs_mut(&mut self) -> &mut Vec<String> {
202 self.state.pending_inputs_vec_mut()
203 }
204
205 pub(crate) fn ask_rx(&mut self) -> Option<&mut mpsc::Receiver<String>> {
207 self.session.ask_rx()
208 }
209
210 pub(crate) fn effective_context_size(&self) -> Option<u32> {
212 self.config.context_size_override()
213 .or_else(|| self.provider.context_size())
214 }
215
216 pub fn event_sender(&self) -> mpsc::Sender<AgentEvent> {
218 self.event_tx.clone()
219 }
220
221 pub fn set_ask_channel(&mut self, rx: mpsc::Receiver<String>) {
223 self.session.set_ask_channel(rx);
224 }
225
226 pub(crate) fn has_ask_channel(&self) -> bool {
228 self.session.has_ask_channel()
229 }
230
231 pub(crate) fn ask_channel(&mut self) -> Option<&mut mpsc::Receiver<String>> {
233 self.session.ask_rx()
234 }
235
236 pub fn set_proxy_executor(
238 &mut self,
239 executor: Arc<dyn ProxyToolExecutor>,
240 tool_defs: Vec<ProxyToolDef>,
241 ) {
242 self.proxy_executor = Some(executor);
243 self.proxy_tool_defs = tool_defs;
244 }
245
246 pub fn set_cancel_token(&mut self, token: CancellationToken) {
248 self.session.set_cancel_token(token);
249 }
250
251 pub(crate) fn get_cancel_token(&self) -> Option<&CancellationToken> {
253 self.session.cancel_token()
254 }
255
256 pub fn set_approve_mode(&mut self, mode: ApproveMode) {
258 let old = ApproveMode::from_u8(self.approve_mode.load(Ordering::Relaxed));
259 log::info!("Agent approve mode changed: {} -> {}", old, mode);
260 self.approve_mode.store(mode.to_u8(), Ordering::Relaxed);
261 }
262
263 pub fn approve_mode_shared(&self) -> Arc<AtomicU8> {
265 self.approve_mode.clone()
266 }
267
268 pub fn set_approve_mode_shared(&mut self, shared: Arc<AtomicU8>) {
270 self.approve_mode = shared;
271 }
272
273 pub fn update_memory_summary(&mut self, summary: Option<String>) {
276 self.context.update_memory(summary);
277 }
279
280 pub fn refresh_codegraph_tools(&mut self) {
284 if let Some(path) = self.context.project_path() {
285 let should_have_codegraph =
287 crate::tools::codegraph::should_inject_codegraph_tools(path);
288
289 let has_codegraph = self.tools.iter().any(|t| {
291 let name = t.definition().name;
292 name.starts_with("code_") && name != "code_review"
293 });
294
295 if should_have_codegraph != has_codegraph {
297 if should_have_codegraph {
299 let codegraph_tools = crate::tools::codegraph::codegraph_tools(path);
300 for tool in codegraph_tools {
301 self.tools.push(Arc::from(tool));
302 }
303 } else {
304 self.tools.retain(|t| {
306 let name = t.definition().name;
307 !name.starts_with("code_") || name == "code_review"
308 });
309 }
310 self.context.rebuild_system_prompt_with_workflows(Some(path.clone()));
312 }
313 }
314 }
315
316 pub async fn run(&mut self, user_input: String) -> Result<Vec<AgentEvent>> {
318 self.emit(AgentEvent::session_started())?;
319
320 let preprocess_result = self.preprocess_input(&user_input);
322
323 let processed_input = match preprocess_result {
325 ProcessResult::SkillTriggered {
326 skill_id,
327 confidence,
328 skill_body,
329 } => {
330 log::info!(
331 "Skill triggered: {} (confidence: {:.2})",
332 skill_id,
333 confidence
334 );
335 self.emit(AgentEvent::progress(
336 format!("🎯 触发技能: {}", skill_id),
337 None,
338 ))?;
339
340 if let Some(body) = skill_body {
342 let enhanced_input = format!(
344 "<command-name>{}</command-name>\n\n{}\n\n---\n\nUser request: {}",
345 skill_id,
346 body,
347 user_input
348 );
349 enhanced_input
350 } else {
351 let enhanced_input = format!(
353 "User invoked skill '{}'. Use the `skill` tool with name '{}' to load its instructions before proceeding.\n\nUser request: {}",
354 skill_id,
355 skill_id,
356 user_input
357 );
358 enhanced_input
359 }
360 }
361 ProcessResult::WorkflowTriggered {
362 workflow_id,
363 inputs,
364 } => {
365 log::info!("Workflow triggered: {} with inputs: {:?}", workflow_id, inputs);
366 self.emit(AgentEvent::progress(
367 format!("🔄 触发工作流: {}", workflow_id),
368 None,
369 ))?;
370 let inputs_json = serde_json::to_string_pretty(&inputs).unwrap_or_default();
372 let enhanced_input = format!(
373 "Workflow '{}' triggered with extracted inputs:\n{}\n\nUser request: {}",
374 workflow_id,
375 inputs_json,
376 user_input
377 );
378 enhanced_input
379 }
380 ProcessResult::Continue => {
381 user_input
383 }
384 };
385
386 self.state.add_message(Message {
388 role: Role::User,
389 content: MessageContent::Text(processed_input),
390 });
391
392 let mut iterations = 0;
393 let mut should_continue = true;
394 const ITERATION_WARNING_THRESHOLD: usize = MAX_ITERATIONS - 10;
395
396 while should_continue && iterations < MAX_ITERATIONS {
397 iterations += 1;
398
399 self.drain_pending_inputs();
402 if self.has_pending_inputs() {
403 let pending = self.take_pending_inputs();
404 let count = pending.len();
405 let merged = pending.join("\n\n---\n\n");
406 log::info!("Adding {} pending input messages to request", count);
407
408 self.emit(AgentEvent::queue_processed(count, pending.clone()))?;
410
411 self.state.add_message(Message {
412 role: Role::User,
413 content: MessageContent::Text(merged),
414 });
415 }
416
417 if self.session.is_cancelled() {
418 self.emit(AgentEvent::error(
419 crate::prompt::MSG_OPERATION_CANCELLED.to_string(),
420 None,
421 None,
422 ))?;
423 break;
424 }
425
426 if iterations == ITERATION_WARNING_THRESHOLD {
428 self.emit(AgentEvent::progress(
429 crate::prompt::MSG_ITERATION_WARNING_UI
430 .replace("{iterations}", &iterations.to_string())
431 .replace("{max_iterations}", &MAX_ITERATIONS.to_string()),
432 None,
433 ))?;
434 }
435
436 let context_size = self.effective_context_size();
439 let estimated_tokens = estimate_total_tokens(self.state.messages());
440
441 if should_compress(estimated_tokens, context_size, self.config.compression_config()) {
442 self.emit(AgentEvent::progress("⚠️ 上下文过大,正在预压缩...", None))?;
443
444 match compress_messages(
445 self.state.messages(),
446 CompressionStrategy::SlidingWindow,
447 self.config.compression_config(),
448 ) {
449 Ok(compressed) => {
450 let compressed_tokens = estimate_total_tokens(&compressed);
451 self.state.set_messages(compressed);
452 crate::debug::debug_log().compression(
453 estimated_tokens,
454 compressed_tokens,
455 compressed_tokens as f32 / estimated_tokens as f32,
456 );
457 }
458 Err(e) => {
459 self.emit(AgentEvent::progress(format!("预压缩失败: {}", e), None))?;
460 }
461 }
462 }
463
464 let tool_defs: Vec<ToolDefinition> = {
466 let mut defs: Vec<ToolDefinition> = self
467 .tools
468 .iter()
469 .map(|t| {
470 let def = t.definition();
471 let description = def.description_for_llm();
472 ToolDefinition {
473 name: def.name,
474 description,
475 parameters: def.parameters,
476 is_priority: def.is_priority,
477 }
478 })
479 .collect();
480 defs.extend(self.proxy_tool_defs.iter().map(|t| {
482 let def = &t.definition;
483 let description = def.description_for_llm();
484 ToolDefinition {
485 name: def.name.clone(),
486 description,
487 parameters: def.parameters.clone(),
488 is_priority: def.is_priority,
489 }
490 }));
491 defs
492 };
493 let request = ChatRequest {
494 system: Some(self.system_prompt().to_string()),
495 messages: self.state.messages().clone(),
496 max_tokens: self.max_tokens(),
497 tools: tool_defs,
498 think: self.think(),
499 enable_caching: true,
500 server_tools: Vec::new(),
501 };
502
503 let response = self.call_streaming(&request).await?;
504
505 self.track_usage(&response.usage);
506
507 crate::debug::debug_log().api_call(
508 &self.model_name,
509 response.usage.input_tokens,
510 response.usage.cache_read_input_tokens > 0,
511 );
512
513 should_continue = self.process_response(&response).await?;
514
515 if !should_continue && iterations < MAX_ITERATIONS - 1 {
518 self.drain_pending_inputs();
520
521 if self.has_pending_inputs() {
522 log::info!("Agent: found pending inputs at session end, continuing loop");
523 should_continue = true;
524 continue; }
526
527 if self.last_message_was_todo_reminder() {
530 log::info!("Skipping todo check: reminder already sent in recent messages");
531 } else {
532 const MAX_TODO_REMINDERS: usize = 2;
533
534 let reminder_count_clone = self.state.todo_reminder_count_map().clone();
536 let (pending, all_at_limit) = self.get_pending_todos_with_limit(
537 &reminder_count_clone,
538 MAX_TODO_REMINDERS
539 );
540
541 if !pending.is_empty() {
542 for (_, content) in &pending {
544 self.state.increment_todo_reminder(content.clone());
545 }
546
547 let pending_list = pending
548 .iter()
549 .map(|(status, content)| {
550 let marker = match status.as_str() {
551 "in_progress" => "[~]",
552 "pending" => "[ ]",
553 _ => "[?]",
554 };
555 format!(" {} {}", marker, content)
556 })
557 .collect::<Vec<_>>()
558 .join("\n");
559
560 let reminder = format!(
561 "📋 任务尚未完成。以下待办项需要处理:\n{}\n\n请继续执行,或在 todo_write 中标记为 completed。如遇阻塞请说明原因。",
562 pending_list
563 );
564
565 self.state.add_message(Message {
566 role: Role::User,
567 content: MessageContent::Text(reminder),
568 });
569 should_continue = true;
570 } else if all_at_limit && !self.state.todo_reminder_count_map().is_empty() {
571 let remaining_count = self.state.todo_reminder_count_map().len();
574 self.emit(AgentEvent::progress(
575 format!(
576 "⚠️ 会话结束:{} 个待办项未完成(已提醒 {} 次,达到上限)",
577 remaining_count, MAX_TODO_REMINDERS
578 ),
579 None,
580 ))?;
581 log::warn!(
582 "Session ending with {} incomplete todos (reminder limit reached)",
583 remaining_count
584 );
585 }
586 }
587 }
588
589 let context_size = self.effective_context_size();
590 let api_tokens = self.state.last_input_tokens() as u32;
591 let estimated_tokens = estimate_total_tokens(self.state.messages());
592
593 let current_tokens = if api_tokens > 0 && api_tokens >= estimated_tokens / 2 {
594 api_tokens
595 } else {
596 estimated_tokens
597 };
598
599 if let Some(ctx_size) = context_size {
602 self.emit(AgentEvent::with_data(
604 EventType::ContextSize,
605 EventData::ContextSize {
606 context_size: ctx_size as u64,
607 },
608 ))?;
609
610 let usage_ratio = current_tokens as f64 / ctx_size as f64;
611 if usage_ratio >= 0.3 {
612 crate::debug::debug_log().log(
613 "checkcompress",
614 &format!(
615 "usage={:.1}%, tokens={}, context={}, threshold={}%",
616 usage_ratio * 100.0,
617 current_tokens,
618 ctx_size,
619 self.config.compression_config().threshold * 100.0
620 ),
621 );
622 }
623 }
624
625 if should_compress(current_tokens, context_size, self.config.compression_config()) {
626 self.emit(AgentEvent::progress(crate::prompt::MSG_COMPRESSING_CONTEXT, None))?;
627
628 let original_tokens = current_tokens;
629
630 match compress_messages(
631 self.state.messages(),
632 CompressionStrategy::SlidingWindow,
633 self.config.compression_config(),
634 ) {
635 Ok(compressed) => {
636 let compressed_tokens = estimate_total_tokens(&compressed);
637 self.state.set_messages(compressed);
638 self.state.set_total_input_tokens(compressed_tokens as u64);
639 self.state.set_last_input_tokens(compressed_tokens as u64);
640
641 let ratio = compressed_tokens as f32 / original_tokens as f32;
642 crate::debug::debug_log().compression(
643 original_tokens,
644 compressed_tokens,
645 ratio,
646 );
647
648 self.emit(AgentEvent::with_data(
649 EventType::CompressionCompleted,
650 EventData::Compression {
651 original_tokens: original_tokens as u64,
652 compressed_tokens: compressed_tokens as u64,
653 ratio: compressed_tokens as f32 / original_tokens as f32,
654 },
655 ))?;
656 }
657 Err(e) => {
658 self.emit(AgentEvent::progress(
659 format!("{}{}", crate::prompt::MSG_COMPRESSION_FAILED, e),
660 None,
661 ))?;
662 }
663 }
664 }
665 }
666
667 if iterations >= MAX_ITERATIONS && should_continue {
669 self.emit(AgentEvent::error(
670 crate::prompt::MSG_MAX_ITERATIONS_REACHED
671 .replace("{max_iterations}", &MAX_ITERATIONS.to_string())
672 .replace("{iterations}", &iterations.to_string()),
673 Some("MAX_ITERATIONS_REACHED".to_string()),
674 Some("agent/run.rs".to_string()),
675 ))?;
676 }
677
678 self.emit(AgentEvent::usage_with_cache(
679 self.state.total_input_tokens(),
680 self.state.total_output_tokens(),
681 0,
682 0,
683 ))?;
684
685 self.emit(AgentEvent::session_ended())?;
686
687 Ok(Vec::new())
688 }
689
690 pub fn set_messages(&mut self, messages: Vec<Message>) {
692 self.state.set_messages(messages);
693 }
694
695 pub fn get_messages(&self) -> &[Message] {
697 self.messages()
698 }
699
700 pub fn get_tools(&self) -> &[Arc<dyn Tool>] {
702 &self.tools
703 }
704
705 pub fn get_system_prompt(&self) -> &str {
707 self.system_prompt()
708 }
709
710 pub fn get_token_counts(&self) -> (u64, u64) {
712 (
713 self.state.total_input_tokens(),
714 self.state.total_output_tokens(),
715 )
716 }
717
718 pub fn clear_history(&mut self) {
720 self.messages_mut().clear();
721 self.state.set_total_input_tokens(0);
722 self.state.set_total_output_tokens(0);
723 self.state.set_last_input_tokens(0);
724 }
725
726 pub fn message_count(&self) -> usize {
728 self.messages().len()
729 }
730
731 pub fn preprocess_input(&self, user_input: &str) -> ProcessResult {
747 preprocess_with_skills(user_input, self.skills())
749 }
750
751 pub fn inject_skill_context(&self, skill_id: &str, skill_body: Option<&str>) -> String {
763 if let Some(body) = skill_body {
764 format!(
765 "<command-name>{}</command-name>\n\n{}\n\n**Important**: Follow the skill instructions above before responding to the user request below.",
766 skill_id,
767 body.trim_end()
768 )
769 } else {
770 format!(
771 "Skill '{}' was triggered but not auto-loaded. The model should call the `skill` tool with name '{}' to load its instructions.",
772 skill_id,
773 skill_id
774 )
775 }
776 }
777
778 pub async fn add_mcp_server(
792 &mut self,
793 name: &str,
794 config: crate::mcp::McpServerConfig,
795 ) -> Result<()> {
796 if let Some(registry) = &self.mcp_registry {
797 let mut reg = registry.write().await;
798 reg.add_server(name.to_string(), config);
799 log::info!("MCP server '{}' added to registry", name);
800 } else {
801 log::warn!("MCP registry not initialized, cannot add server '{}'", name);
802 }
803 Ok(())
804 }
805
806 pub async fn remove_mcp_server(&mut self, name: &str) -> Result<()> {
808 if let Some(registry) = &self.mcp_registry {
809 let mut reg = registry.write().await;
810 reg.remove_server(name).await?;
811 log::info!("MCP server '{}' removed from registry", name);
812 }
813 Ok(())
814 }
815
816 pub async fn mcp_server_status(&self) -> Vec<crate::mcp::ServerStatus> {
818 if let Some(registry) = &self.mcp_registry {
819 let reg = registry.read().await;
820 reg.server_status().await.values().cloned().collect()
821 } else {
822 Vec::new()
823 }
824 }
825
826 pub async fn start_mcp_server(
828 &self,
829 name: &str,
830 ) -> Result<Vec<Arc<crate::mcp::McpToolWrapper>>> {
831 if let Some(registry) = &self.mcp_registry {
832 let reg = registry.read().await;
833 if let Some(placeholder) = reg.get_server(name) {
834 let tools = placeholder.start().await?;
835 log::info!("MCP server '{}' started with {} tools", name, tools.len());
836 Ok(tools)
837 } else {
838 Err(anyhow::anyhow!(
839 "MCP server '{}' not found in registry",
840 name
841 ))
842 }
843 } else {
844 Err(anyhow::anyhow!("MCP registry not initialized"))
845 }
846 }
847}