1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5
6use anyhow::{anyhow, Context, Result};
7use futures::stream::BoxStream;
8use futures::{stream, FutureExt, Stream, StreamExt, TryStreamExt};
9use uuid::Uuid;
10
11use super::final_output_tool::FinalOutputTool;
12use super::platform_tools;
13use super::tool_execution::{ToolCallResult, CHAT_MODE_TOOL_SKIPPED_RESPONSE, DECLINED_RESPONSE};
14use crate::action_required_manager::ActionRequiredManager;
15use crate::agents::error_handling::OverflowHandler;
16use crate::agents::extension::{ExtensionConfig, ExtensionResult, ToolInfo};
17use crate::agents::extension_manager::{get_parameter_names, ExtensionManager};
18use crate::agents::extension_manager_extension::MANAGE_EXTENSIONS_TOOL_NAME_COMPLETE;
19use crate::agents::final_output_tool::{FINAL_OUTPUT_CONTINUATION_MESSAGE, FINAL_OUTPUT_TOOL_NAME};
20use crate::agents::platform_tools::PLATFORM_MANAGE_SCHEDULE_TOOL_NAME;
21use crate::agents::prompt_manager::PromptManager;
22use crate::agents::retry::{RetryManager, RetryResult};
23use crate::agents::subagent_task_config::TaskConfig;
24use crate::agents::subagent_tool::{
25 create_subagent_tool, handle_subagent_tool, SUBAGENT_TOOL_NAME,
26};
27use crate::agents::types::SessionConfig;
28use crate::agents::types::{FrontendTool, SharedProvider, ToolResultReceiver};
29use crate::config::{get_enabled_extensions, AsterMode, Config};
30use crate::context_mgmt::{
31 check_if_compaction_needed, compact_messages, DEFAULT_COMPACTION_THRESHOLD,
32};
33use crate::conversation::message::{
34 ActionRequiredData, Message, MessageContent, ProviderMetadata, SystemNotificationType,
35 ToolRequest,
36};
37use crate::conversation::{debug_conversation_fix, fix_conversation, Conversation};
38use crate::mcp_utils::ToolResult;
39use crate::permission::permission_inspector::PermissionInspector;
40use crate::permission::permission_judge::PermissionCheckResult;
41use crate::permission::PermissionConfirmation;
42use crate::providers::base::Provider;
43use crate::providers::errors::ProviderError;
44use crate::recipe::{Author, Recipe, Response, Settings, SubRecipe};
45use crate::scheduler_trait::SchedulerTrait;
46use crate::security::security_inspector::SecurityInspector;
47use crate::session::extension_data::{EnabledExtensionsState, ExtensionState};
48use crate::session::{Session, SessionManager, SessionStore, SessionType};
49use crate::tool_inspection::ToolInspectionManager;
50use crate::tool_monitor::RepetitionInspector;
51use crate::tools::{
52 register_default_tools, SharedFileReadHistory, ToolRegistrationConfig, ToolRegistry,
53};
54use crate::utils::is_token_cancelled;
55use regex::Regex;
56use rmcp::model::{
57 CallToolRequestParam, CallToolResult, Content, ErrorCode, ErrorData, GetPromptResult, Prompt,
58 ServerNotification, Tool,
59};
60use serde_json::Value;
61use tokio::sync::{mpsc, Mutex, RwLock};
62use tokio_util::sync::CancellationToken;
63use tracing::{debug, error, info, instrument, warn};
64
65const DEFAULT_MAX_TURNS: u32 = 1000;
66const COMPACTION_THINKING_TEXT: &str = "aster is compacting the conversation...";
67
68pub struct ReplyContext {
70 pub conversation: Conversation,
71 pub tools: Vec<Tool>,
72 pub toolshim_tools: Vec<Tool>,
73 pub system_prompt: String,
74 pub aster_mode: AsterMode,
75 pub initial_messages: Vec<Message>,
76}
77
78pub struct ToolCategorizeResult {
79 pub frontend_requests: Vec<ToolRequest>,
80 pub remaining_requests: Vec<ToolRequest>,
81 pub filtered_response: Message,
82}
83
84pub struct Agent {
86 pub(super) provider: SharedProvider,
87
88 pub extension_manager: Arc<ExtensionManager>,
89 pub(super) sub_recipes: Mutex<HashMap<String, SubRecipe>>,
90 pub(super) final_output_tool: Arc<Mutex<Option<FinalOutputTool>>>,
91 pub(super) frontend_tools: Mutex<HashMap<String, FrontendTool>>,
92 pub(super) frontend_instructions: Mutex<Option<String>>,
93 pub(super) prompt_manager: Mutex<PromptManager>,
94 pub(super) confirmation_tx: mpsc::Sender<(String, PermissionConfirmation)>,
95 pub(super) confirmation_rx: Mutex<mpsc::Receiver<(String, PermissionConfirmation)>>,
96 pub(super) tool_result_tx: mpsc::Sender<(String, ToolResult<CallToolResult>)>,
97 pub(super) tool_result_rx: ToolResultReceiver,
98
99 pub(super) scheduler_service: Mutex<Option<Arc<dyn SchedulerTrait>>>,
100 pub(super) retry_manager: RetryManager,
101 pub(super) tool_inspection_manager: ToolInspectionManager,
102
103 pub(super) tool_registry: Arc<RwLock<ToolRegistry>>,
105 pub(super) file_read_history: SharedFileReadHistory,
107
108 pub(super) session_store: Option<Arc<dyn SessionStore>>,
113}
114
115#[derive(Clone, Debug)]
116pub enum AgentEvent {
117 Message(Message),
118 McpNotification((String, ServerNotification)),
119 ModelChange { model: String, mode: String },
120 HistoryReplaced(Conversation),
121}
122
123impl Default for Agent {
124 fn default() -> Self {
125 Self::new()
126 }
127}
128
129pub enum ToolStreamItem<T> {
130 Message(ServerNotification),
131 Result(T),
132}
133
134pub type ToolStream =
135 Pin<Box<dyn Stream<Item = ToolStreamItem<ToolResult<CallToolResult>>> + Send>>;
136
137pub fn tool_stream<S, F>(rx: S, done: F) -> ToolStream
142where
143 S: Stream<Item = ServerNotification> + Send + Unpin + 'static,
144 F: Future<Output = ToolResult<CallToolResult>> + Send + 'static,
145{
146 Box::pin(async_stream::stream! {
147 tokio::pin!(done);
148 let mut rx = rx;
149
150 loop {
151 tokio::select! {
152 Some(msg) = rx.next() => {
153 yield ToolStreamItem::Message(msg);
154 }
155 r = &mut done => {
156 yield ToolStreamItem::Result(r);
157 break;
158 }
159 }
160 }
161 })
162}
163
164impl Agent {
165 pub fn new() -> Self {
166 let (confirm_tx, confirm_rx) = mpsc::channel(32);
168 let (tool_tx, tool_rx) = mpsc::channel(32);
169 let provider = Arc::new(Mutex::new(None));
170
171 let mut tool_registry = ToolRegistry::new();
173 let (file_read_history, _hook_manager) = register_default_tools(&mut tool_registry);
174
175 Self {
176 provider: provider.clone(),
177 extension_manager: Arc::new(ExtensionManager::new(provider.clone())),
178 sub_recipes: Mutex::new(HashMap::new()),
179 final_output_tool: Arc::new(Mutex::new(None)),
180 frontend_tools: Mutex::new(HashMap::new()),
181 frontend_instructions: Mutex::new(None),
182 prompt_manager: Mutex::new(PromptManager::new()),
183 confirmation_tx: confirm_tx,
184 confirmation_rx: Mutex::new(confirm_rx),
185 tool_result_tx: tool_tx,
186 tool_result_rx: Arc::new(Mutex::new(tool_rx)),
187 scheduler_service: Mutex::new(None),
188 retry_manager: RetryManager::new(),
189 tool_inspection_manager: Self::create_default_tool_inspection_manager(),
190 tool_registry: Arc::new(RwLock::new(tool_registry)),
191 file_read_history,
192 session_store: None, }
194 }
195
196 pub fn with_session_store(mut self, store: Arc<dyn SessionStore>) -> Self {
207 self.session_store = Some(store);
208 self
209 }
210
211 pub fn session_store(&self) -> Option<&Arc<dyn SessionStore>> {
213 self.session_store.as_ref()
214 }
215
216 pub fn with_identity(self, identity: super::identity::AgentIdentity) -> Self {
235 if let Ok(mut pm) = self.prompt_manager.try_lock() {
237 pm.set_identity(identity);
238 } else {
239 tracing::warn!("[Agent] with_identity: 无法获取锁,身份设置被跳过");
241 }
242 self
243 }
244
245 pub async fn set_identity(&self, identity: super::identity::AgentIdentity) {
250 let mut pm = self.prompt_manager.lock().await;
251 pm.set_identity(identity);
252 }
253
254 pub fn with_tool_config(config: ToolRegistrationConfig) -> Self {
263 let (confirm_tx, confirm_rx) = mpsc::channel(32);
264 let (tool_tx, tool_rx) = mpsc::channel(32);
265 let provider = Arc::new(Mutex::new(None));
266
267 let mut tool_registry = ToolRegistry::new();
269 let (file_read_history, _hook_manager) =
270 crate::tools::register_all_tools(&mut tool_registry, config);
271
272 Self {
273 provider: provider.clone(),
274 extension_manager: Arc::new(ExtensionManager::new(provider.clone())),
275 sub_recipes: Mutex::new(HashMap::new()),
276 final_output_tool: Arc::new(Mutex::new(None)),
277 frontend_tools: Mutex::new(HashMap::new()),
278 frontend_instructions: Mutex::new(None),
279 prompt_manager: Mutex::new(PromptManager::new()),
280 confirmation_tx: confirm_tx,
281 confirmation_rx: Mutex::new(confirm_rx),
282 tool_result_tx: tool_tx,
283 tool_result_rx: Arc::new(Mutex::new(tool_rx)),
284 scheduler_service: Mutex::new(None),
285 retry_manager: RetryManager::new(),
286 tool_inspection_manager: Self::create_default_tool_inspection_manager(),
287 tool_registry: Arc::new(RwLock::new(tool_registry)),
288 file_read_history,
289 session_store: None,
290 }
291 }
292
293 pub fn tool_registry(&self) -> &Arc<RwLock<ToolRegistry>> {
297 &self.tool_registry
298 }
299
300 pub fn file_read_history(&self) -> &SharedFileReadHistory {
304 &self.file_read_history
305 }
306
307 pub async fn register_mcp_tool(
321 &self,
322 name: String,
323 description: String,
324 input_schema: serde_json::Value,
325 server_name: String,
326 ) {
327 let wrapper =
328 crate::tools::McpToolWrapper::new(name.clone(), description, input_schema, server_name);
329 let mut registry = self.tool_registry.write().await;
330 registry.register_mcp(name, wrapper);
331 }
332
333 fn create_default_tool_inspection_manager() -> ToolInspectionManager {
335 let mut tool_inspection_manager = ToolInspectionManager::new();
336
337 tool_inspection_manager.add_inspector(Box::new(SecurityInspector::new()));
339
340 tool_inspection_manager.add_inspector(Box::new(PermissionInspector::new(
343 AsterMode::SmartApprove,
344 std::collections::HashSet::new(), std::collections::HashSet::new(), )));
347
348 tool_inspection_manager.add_inspector(Box::new(RepetitionInspector::new(None)));
350
351 tool_inspection_manager
352 }
353
354 pub(crate) async fn store_add_message(
359 &self,
360 session_id: &str,
361 message: &Message,
362 ) -> Result<()> {
363 if let Some(store) = &self.session_store {
364 store.add_message(session_id, message).await
365 } else {
366 SessionManager::add_message(session_id, message).await
367 }
368 }
369
370 pub(crate) async fn store_get_session(
372 &self,
373 session_id: &str,
374 include_messages: bool,
375 ) -> Result<Session> {
376 if let Some(store) = &self.session_store {
377 store.get_session(session_id, include_messages).await
378 } else {
379 SessionManager::get_session(session_id, include_messages).await
380 }
381 }
382
383 pub(crate) async fn store_replace_conversation(
385 &self,
386 session_id: &str,
387 conversation: &Conversation,
388 ) -> Result<()> {
389 if let Some(store) = &self.session_store {
390 store.replace_conversation(session_id, conversation).await
391 } else {
392 SessionManager::replace_conversation(session_id, conversation).await
393 }
394 }
395
396 async fn store_update_extension_data(
398 &self,
399 session_id: &str,
400 extension_data: crate::session::ExtensionData,
401 ) -> Result<()> {
402 if let Some(store) = &self.session_store {
403 store
404 .update_extension_data(session_id, extension_data)
405 .await
406 } else {
407 SessionManager::update_session(session_id)
408 .extension_data(extension_data)
409 .apply()
410 .await
411 }
412 }
413
414 async fn store_update_provider_config(
416 &self,
417 session_id: &str,
418 provider_name: String,
419 model_config: crate::model::ModelConfig,
420 ) -> Result<()> {
421 if let Some(store) = &self.session_store {
422 store
423 .update_provider_config(session_id, Some(provider_name), Some(model_config))
424 .await
425 } else {
426 SessionManager::update_session(session_id)
427 .provider_name(provider_name)
428 .model_config(model_config)
429 .apply()
430 .await
431 }
432 }
433
434 pub async fn reset_retry_attempts(&self) {
438 self.retry_manager.reset_attempts().await;
439 }
440
441 pub async fn increment_retry_attempts(&self) -> u32 {
443 self.retry_manager.increment_attempts().await
444 }
445
446 pub async fn get_retry_attempts(&self) -> u32 {
448 self.retry_manager.get_attempts().await
449 }
450
451 async fn handle_retry_logic(
452 &self,
453 messages: &mut Conversation,
454 session_config: &SessionConfig,
455 initial_messages: &[Message],
456 ) -> Result<bool> {
457 let result = self
458 .retry_manager
459 .handle_retry_logic(
460 messages,
461 session_config,
462 initial_messages,
463 &self.final_output_tool,
464 )
465 .await?;
466
467 match result {
468 RetryResult::Retried => Ok(true),
469 RetryResult::Skipped
470 | RetryResult::MaxAttemptsReached
471 | RetryResult::SuccessChecksPassed => Ok(false),
472 }
473 }
474
475 async fn drain_elicitation_messages(&self, session_id: &str) -> Vec<Message> {
477 let mut messages = Vec::new();
478 let mut elicitation_rx = ActionRequiredManager::global().request_rx.lock().await;
479 while let Ok(elicitation_message) = elicitation_rx.try_recv() {
480 if let Err(e) = self
481 .store_add_message(session_id, &elicitation_message)
482 .await
483 {
484 warn!("Failed to save elicitation message to session: {}", e);
485 }
486 messages.push(elicitation_message);
487 }
488 messages
489 }
490
491 async fn prepare_reply_context(
492 &self,
493 unfixed_conversation: Conversation,
494 working_dir: &std::path::Path,
495 session_config: &SessionConfig,
496 ) -> Result<ReplyContext> {
497 let unfixed_messages = unfixed_conversation.messages().clone();
498 let (conversation, issues) = fix_conversation(unfixed_conversation.clone());
499 if !issues.is_empty() {
500 debug!(
501 "Conversation issue fixed: {}",
502 debug_conversation_fix(
503 unfixed_messages.as_slice(),
504 conversation.messages(),
505 &issues
506 )
507 );
508 }
509 let initial_messages = conversation.messages().clone();
510 let config = Config::global();
511
512 let session_prompt = session_config.system_prompt.as_deref();
513 let (tools, toolshim_tools, system_prompt) = self
514 .prepare_tools_and_prompt(working_dir, session_prompt)
515 .await?;
516 let aster_mode = config.get_aster_mode().unwrap_or(AsterMode::Auto);
517
518 self.tool_inspection_manager
519 .update_permission_inspector_mode(aster_mode)
520 .await;
521
522 Ok(ReplyContext {
523 conversation,
524 tools,
525 toolshim_tools,
526 system_prompt,
527 aster_mode,
528 initial_messages,
529 })
530 }
531
532 async fn categorize_tools(
533 &self,
534 response: &Message,
535 tools: &[rmcp::model::Tool],
536 ) -> ToolCategorizeResult {
537 let (frontend_requests, remaining_requests, filtered_response) =
539 self.categorize_tool_requests(response, tools).await;
540
541 ToolCategorizeResult {
542 frontend_requests,
543 remaining_requests,
544 filtered_response,
545 }
546 }
547
548 async fn handle_approved_and_denied_tools(
549 &self,
550 permission_check_result: &PermissionCheckResult,
551 request_to_response_map: &HashMap<String, Arc<Mutex<Message>>>,
552 cancel_token: Option<tokio_util::sync::CancellationToken>,
553 session: &Session,
554 ) -> Result<Vec<(String, ToolStream)>> {
555 let mut tool_futures: Vec<(String, ToolStream)> = Vec::new();
556
557 for request in &permission_check_result.approved {
559 if let Ok(tool_call) = request.tool_call.clone() {
560 let (req_id, tool_result) = self
561 .dispatch_tool_call(
562 tool_call,
563 request.id.clone(),
564 cancel_token.clone(),
565 session,
566 )
567 .await;
568
569 tool_futures.push((
570 req_id,
571 match tool_result {
572 Ok(result) => tool_stream(
573 result
574 .notification_stream
575 .unwrap_or_else(|| Box::new(stream::empty())),
576 result.result,
577 ),
578 Err(e) => {
579 tool_stream(Box::new(stream::empty()), futures::future::ready(Err(e)))
580 }
581 },
582 ));
583 }
584 }
585
586 Self::handle_denied_tools(permission_check_result, request_to_response_map).await;
587 Ok(tool_futures)
588 }
589
590 async fn handle_denied_tools(
591 permission_check_result: &PermissionCheckResult,
592 request_to_response_map: &HashMap<String, Arc<Mutex<Message>>>,
593 ) {
594 for request in &permission_check_result.denied {
595 if let Some(response_msg) = request_to_response_map.get(&request.id) {
596 let mut response = response_msg.lock().await;
597 *response = response.clone().with_tool_response_with_metadata(
598 request.id.clone(),
599 Ok(CallToolResult {
600 content: vec![rmcp::model::Content::text(DECLINED_RESPONSE)],
601 structured_content: None,
602 is_error: Some(true),
603 meta: None,
604 }),
605 request.metadata.as_ref(),
606 );
607 }
608 }
609 }
610
611 pub async fn set_scheduler(&self, scheduler: Arc<dyn SchedulerTrait>) {
612 let mut scheduler_service = self.scheduler_service.lock().await;
613 *scheduler_service = Some(scheduler);
614 }
615
616 pub async fn provider(&self) -> Result<Arc<dyn Provider>, anyhow::Error> {
618 match &*self.provider.lock().await {
619 Some(provider) => Ok(Arc::clone(provider)),
620 None => Err(anyhow!("Provider not set")),
621 }
622 }
623
624 pub async fn is_frontend_tool(&self, name: &str) -> bool {
626 self.frontend_tools.lock().await.contains_key(name)
627 }
628
629 pub async fn get_frontend_tool(&self, name: &str) -> Option<FrontendTool> {
631 self.frontend_tools.lock().await.get(name).cloned()
632 }
633
634 pub async fn add_final_output_tool(&self, response: Response) {
635 let mut final_output_tool = self.final_output_tool.lock().await;
636 let created_final_output_tool = FinalOutputTool::new(response);
637 let final_output_system_prompt = created_final_output_tool.system_prompt();
638 *final_output_tool = Some(created_final_output_tool);
639 self.extend_system_prompt(final_output_system_prompt).await;
640 }
641
642 pub async fn add_sub_recipes(&self, sub_recipes_to_add: Vec<SubRecipe>) {
643 let mut sub_recipes = self.sub_recipes.lock().await;
644 for sr in sub_recipes_to_add {
645 sub_recipes.insert(sr.name.clone(), sr);
646 }
647 }
648
649 pub async fn apply_recipe_components(
650 &self,
651 sub_recipes: Option<Vec<SubRecipe>>,
652 response: Option<Response>,
653 include_final_output: bool,
654 ) {
655 if let Some(sub_recipes) = sub_recipes {
656 self.add_sub_recipes(sub_recipes).await;
657 }
658
659 if include_final_output {
660 if let Some(response) = response {
661 self.add_final_output_tool(response).await;
662 }
663 }
664 }
665
666 #[instrument(skip(self, tool_call, request_id), fields(input, output))]
668 pub async fn dispatch_tool_call(
669 &self,
670 tool_call: CallToolRequestParam,
671 request_id: String,
672 cancellation_token: Option<CancellationToken>,
673 session: &Session,
674 ) -> (String, Result<ToolCallResult, ErrorData>) {
675 if session.session_type == SessionType::SubAgent && tool_call.name == SUBAGENT_TOOL_NAME {
677 return (
678 request_id,
679 Err(ErrorData::new(
680 ErrorCode::INVALID_REQUEST,
681 "Subagents cannot create other subagents".to_string(),
682 None,
683 )),
684 );
685 }
686
687 if tool_call.name == PLATFORM_MANAGE_SCHEDULE_TOOL_NAME {
688 let arguments = tool_call
689 .arguments
690 .map(Value::Object)
691 .unwrap_or(Value::Object(serde_json::Map::new()));
692 let result = self
693 .handle_schedule_management(arguments, request_id.clone())
694 .await;
695 let wrapped_result = result.map(|content| CallToolResult {
696 content,
697 structured_content: None,
698 is_error: Some(false),
699 meta: None,
700 });
701 return (request_id, Ok(ToolCallResult::from(wrapped_result)));
702 }
703
704 if tool_call.name == FINAL_OUTPUT_TOOL_NAME {
705 return if let Some(final_output_tool) = self.final_output_tool.lock().await.as_mut() {
706 let result = final_output_tool.execute_tool_call(tool_call.clone()).await;
707 (request_id, Ok(result))
708 } else {
709 (
710 request_id,
711 Err(ErrorData::new(
712 ErrorCode::INTERNAL_ERROR,
713 "Final output tool not defined".to_string(),
714 None,
715 )),
716 )
717 };
718 }
719
720 debug!("WAITING_TOOL_START: {}", tool_call.name);
721 let result: ToolCallResult = if tool_call.name == SUBAGENT_TOOL_NAME {
722 let provider = match self.provider().await {
723 Ok(p) => p,
724 Err(_) => {
725 return (
726 request_id,
727 Err(ErrorData::new(
728 ErrorCode::INTERNAL_ERROR,
729 "Provider is required".to_string(),
730 None,
731 )),
732 );
733 }
734 };
735
736 let extensions = self.get_extension_configs().await;
737 let task_config =
738 TaskConfig::new(provider, &session.id, &session.working_dir, extensions);
739 let sub_recipes = self.sub_recipes.lock().await.clone();
740
741 let arguments = tool_call
742 .arguments
743 .clone()
744 .map(Value::Object)
745 .unwrap_or(Value::Object(serde_json::Map::new()));
746
747 handle_subagent_tool(
748 arguments,
749 task_config,
750 sub_recipes,
751 session.working_dir.clone(),
752 cancellation_token,
753 )
754 } else if self.is_frontend_tool(&tool_call.name).await {
755 ToolCallResult::from(Err(ErrorData::new(
757 ErrorCode::INTERNAL_ERROR,
758 "Frontend tool execution required".to_string(),
759 None,
760 )))
761 } else {
762 let is_native = self.tool_registry.read().await.contains(&tool_call.name);
765
766 if is_native {
767 let tool_name = tool_call.name.clone();
769 let params = tool_call
770 .arguments
771 .clone()
772 .map(Value::Object)
773 .unwrap_or(Value::Object(serde_json::Map::new()));
774 let context = crate::tools::context::ToolContext::new(session.working_dir.clone())
775 .with_session_id(session.id.clone());
776
777 let registry = self.tool_registry.read().await;
778 let execute_result = registry.execute(&tool_name, params, &context, None).await;
779 drop(registry);
780
781 match execute_result {
782 Ok(result) => {
783 let text = result.output.unwrap_or_default();
784 ToolCallResult::from(Ok(CallToolResult::success(vec![Content::text(text)])))
785 }
786 Err(e) => ToolCallResult::from(Err(ErrorData::new(
787 ErrorCode::INTERNAL_ERROR,
788 e.to_string(),
789 None,
790 ))),
791 }
792 } else {
793 let result = self
795 .extension_manager
796 .dispatch_tool_call(tool_call.clone(), cancellation_token.unwrap_or_default())
797 .await;
798 result.unwrap_or_else(|e| {
799 crate::posthog::emit_error(
800 "tool_execution_failed",
801 &format!("{}: {}", tool_call.name, e),
802 );
803 ToolCallResult::from(Err(ErrorData::new(
804 ErrorCode::INTERNAL_ERROR,
805 e.to_string(),
806 None,
807 )))
808 })
809 }
810 };
811
812 debug!("WAITING_TOOL_END: {}", tool_call.name);
813
814 (
815 request_id,
816 Ok(ToolCallResult {
817 notification_stream: result.notification_stream,
818 result: Box::new(
819 result
820 .result
821 .map(super::large_response_handler::process_tool_response),
822 ),
823 }),
824 )
825 }
826
827 pub async fn save_extension_state(&self, session: &SessionConfig) -> Result<()> {
830 let extension_configs = self.extension_manager.get_extension_configs().await;
831
832 let extensions_state = EnabledExtensionsState::new(extension_configs);
833
834 let mut session_data = self.store_get_session(&session.id, false).await?;
835
836 if let Err(e) = extensions_state.to_extension_data(&mut session_data.extension_data) {
837 warn!("Failed to serialize extension state: {}", e);
838 return Err(anyhow!("Extension state serialization failed: {}", e));
839 }
840
841 self.store_update_extension_data(&session.id, session_data.extension_data)
842 .await?;
843
844 Ok(())
845 }
846
847 pub async fn add_extension(&self, extension: ExtensionConfig) -> ExtensionResult<()> {
848 match &extension {
849 ExtensionConfig::Frontend {
850 tools,
851 instructions,
852 ..
853 } => {
854 let mut frontend_tools = self.frontend_tools.lock().await;
856 for tool in tools {
857 let frontend_tool = FrontendTool {
858 name: tool.name.to_string(),
859 tool: tool.clone(),
860 };
861 frontend_tools.insert(tool.name.to_string(), frontend_tool);
862 }
863 let mut frontend_instructions = self.frontend_instructions.lock().await;
865 if let Some(instructions) = instructions {
866 *frontend_instructions = Some(instructions.clone());
867 } else {
868 *frontend_instructions = Some(
870 "The following tools are provided directly by the frontend and will be executed by the frontend when called.".to_string(),
871 );
872 }
873 }
874 _ => {
875 self.extension_manager
876 .add_extension(extension.clone())
877 .await?;
878 }
879 }
880
881 Ok(())
882 }
883
884 pub async fn subagents_enabled(&self) -> bool {
885 let config = crate::config::Config::global();
886 let is_autonomous = config.get_aster_mode().unwrap_or(AsterMode::Auto) == AsterMode::Auto;
887 if !is_autonomous {
888 return false;
889 }
890 if self
891 .provider()
892 .await
893 .map(|provider| provider.get_active_model_name().starts_with("gemini"))
894 .unwrap_or(false)
895 {
896 return false;
897 }
898 if let Some(ref session_id) = self.extension_manager.get_context().await.session_id {
899 if matches!(
900 self.store_get_session(session_id, false)
901 .await
902 .ok()
903 .map(|session| session.session_type),
904 Some(SessionType::SubAgent)
905 ) {
906 return false;
907 }
908 }
909 !self
910 .extension_manager
911 .list_extensions()
912 .await
913 .map(|ext| ext.is_empty())
914 .unwrap_or(true)
915 }
916
917 pub async fn list_tools(&self, extension_name: Option<String>) -> Vec<Tool> {
918 let mut prefixed_tools = self
919 .extension_manager
920 .get_prefixed_tools(extension_name.clone())
921 .await
922 .unwrap_or_default();
923
924 let subagents_enabled = self.subagents_enabled().await;
925 if (extension_name.is_none() || extension_name.as_deref() == Some("platform"))
927 && self.scheduler_service.lock().await.is_some()
928 {
929 prefixed_tools.push(platform_tools::manage_schedule_tool());
930 }
931
932 if extension_name.is_none() {
933 if let Some(final_output_tool) = self.final_output_tool.lock().await.as_ref() {
934 prefixed_tools.push(final_output_tool.tool());
935 }
936
937 if subagents_enabled {
938 let sub_recipes = self.sub_recipes.lock().await;
939 let sub_recipes_vec: Vec<_> = sub_recipes.values().cloned().collect();
940 prefixed_tools.push(create_subagent_tool(&sub_recipes_vec));
941 }
942
943 let registry = self.tool_registry.read().await;
945 for tool_def in registry.get_definitions() {
946 let tool = Tool::new(
947 tool_def.name,
948 tool_def.description,
949 tool_def
950 .input_schema
951 .as_object()
952 .cloned()
953 .unwrap_or_default(),
954 );
955 prefixed_tools.push(tool);
956 }
957 }
958
959 prefixed_tools
960 }
961
962 pub async fn remove_extension(&self, name: &str) -> Result<()> {
963 self.extension_manager.remove_extension(name).await?;
964 Ok(())
965 }
966
967 pub async fn list_extensions(&self) -> Vec<String> {
968 self.extension_manager
969 .list_extensions()
970 .await
971 .expect("Failed to list extensions")
972 }
973
974 pub async fn get_extension_configs(&self) -> Vec<ExtensionConfig> {
975 self.extension_manager.get_extension_configs().await
976 }
977
978 pub async fn handle_confirmation(
980 &self,
981 request_id: String,
982 confirmation: PermissionConfirmation,
983 ) {
984 if let Err(e) = self.confirmation_tx.send((request_id, confirmation)).await {
985 error!("Failed to send confirmation: {}", e);
986 }
987 }
988
989 #[instrument(skip(self, user_message, session_config), fields(user_message))]
990 pub async fn reply(
991 &self,
992 user_message: Message,
993 session_config: SessionConfig,
994 cancel_token: Option<CancellationToken>,
995 ) -> Result<BoxStream<'_, Result<AgentEvent>>> {
996 for content in &user_message.content {
997 if let MessageContent::ActionRequired(action_required) = content {
998 if let ActionRequiredData::ElicitationResponse { id, user_data } =
999 &action_required.data
1000 {
1001 if let Err(e) = ActionRequiredManager::global()
1002 .submit_response(id.clone(), user_data.clone())
1003 .await
1004 {
1005 let error_text = format!("Failed to submit elicitation response: {}", e);
1006 error!(error_text);
1007 return Ok(Box::pin(stream::once(async {
1008 Ok(AgentEvent::Message(
1009 Message::assistant().with_text(error_text),
1010 ))
1011 })));
1012 }
1013 self.store_add_message(&session_config.id, &user_message)
1014 .await?;
1015 return Ok(Box::pin(futures::stream::empty()));
1016 }
1017 }
1018 }
1019
1020 let message_text = user_message.as_concat_text();
1021
1022 if message_text.trim().starts_with('/') {
1024 let command = message_text.split_whitespace().next();
1025 if let Some(cmd) = command {
1026 if crate::slash_commands::get_recipe_for_command(cmd).is_some() {
1027 crate::posthog::emit_custom_slash_command_used();
1028 }
1029 }
1030 }
1031
1032 let command_result = self
1033 .execute_command(&message_text, &session_config.id)
1034 .await;
1035
1036 match command_result {
1037 Err(e) => {
1038 let error_message = Message::assistant()
1039 .with_text(e.to_string())
1040 .with_visibility(true, false);
1041 return Ok(Box::pin(stream::once(async move {
1042 Ok(AgentEvent::Message(error_message))
1043 })));
1044 }
1045 Ok(Some(response)) if response.role == rmcp::model::Role::Assistant => {
1046 self.store_add_message(
1047 &session_config.id,
1048 &user_message.clone().with_visibility(true, false),
1049 )
1050 .await?;
1051 self.store_add_message(
1052 &session_config.id,
1053 &response.clone().with_visibility(true, false),
1054 )
1055 .await?;
1056
1057 let modifies_history = crate::agents::execute_commands::COMPACT_TRIGGERS
1059 .contains(&message_text.trim())
1060 || message_text.trim() == "/clear";
1061
1062 let session_store_clone = self.session_store.clone();
1064 let session_id_clone = session_config.id.clone();
1065
1066 return Ok(Box::pin(async_stream::try_stream! {
1067 yield AgentEvent::Message(user_message);
1068 yield AgentEvent::Message(response);
1069
1070 if modifies_history {
1072 let updated_session = if let Some(store) = &session_store_clone {
1073 store.get_session(&session_id_clone, true).await
1074 } else {
1075 SessionManager::get_session(&session_id_clone, true).await
1076 }
1077 .map_err(|e| anyhow!("Failed to fetch updated session: {}", e))?;
1078 let updated_conversation = updated_session
1079 .conversation
1080 .ok_or_else(|| anyhow!("Session has no conversation after history modification"))?;
1081 yield AgentEvent::HistoryReplaced(updated_conversation);
1082 }
1083 }));
1084 }
1085 Ok(Some(resolved_message)) => {
1086 self.store_add_message(
1087 &session_config.id,
1088 &user_message.clone().with_visibility(true, false),
1089 )
1090 .await?;
1091 self.store_add_message(
1092 &session_config.id,
1093 &resolved_message.clone().with_visibility(false, true),
1094 )
1095 .await?;
1096 }
1097 Ok(None) => {
1098 self.store_add_message(&session_config.id, &user_message)
1099 .await?;
1100 }
1101 }
1102 let session = self.store_get_session(&session_config.id, true).await?;
1103 let conversation = session
1104 .conversation
1105 .clone()
1106 .ok_or_else(|| anyhow::anyhow!("Session {} has no conversation", session_config.id))?;
1107
1108 let needs_auto_compact = check_if_compaction_needed(
1109 self.provider().await?.as_ref(),
1110 &conversation,
1111 None,
1112 &session,
1113 )
1114 .await?;
1115
1116 let conversation_to_compact = conversation.clone();
1117
1118 Ok(Box::pin(async_stream::try_stream! {
1119 let final_conversation = if !needs_auto_compact {
1120 conversation
1121 } else {
1122 let config = Config::global();
1123 let threshold = config
1124 .get_param::<f64>("ASTER_AUTO_COMPACT_THRESHOLD")
1125 .unwrap_or(DEFAULT_COMPACTION_THRESHOLD);
1126 let threshold_percentage = (threshold * 100.0) as u32;
1127
1128 let inline_msg = format!(
1129 "Exceeded auto-compact threshold of {}%. Performing auto-compaction...",
1130 threshold_percentage
1131 );
1132
1133 yield AgentEvent::Message(
1134 Message::assistant().with_system_notification(
1135 SystemNotificationType::InlineMessage,
1136 inline_msg,
1137 )
1138 );
1139
1140 yield AgentEvent::Message(
1141 Message::assistant().with_system_notification(
1142 SystemNotificationType::ThinkingMessage,
1143 COMPACTION_THINKING_TEXT,
1144 )
1145 );
1146
1147 match compact_messages(self.provider().await?.as_ref(), &conversation_to_compact, false).await {
1148 Ok((compacted_conversation, summarization_usage)) => {
1149 self.store_replace_conversation(&session_config.id, &compacted_conversation).await?;
1150 Self::update_session_metrics(&session_config, &summarization_usage, true, self.session_store.as_ref()).await?;
1151
1152 yield AgentEvent::HistoryReplaced(compacted_conversation.clone());
1153
1154 yield AgentEvent::Message(
1155 Message::assistant().with_system_notification(
1156 SystemNotificationType::InlineMessage,
1157 "Compaction complete",
1158 )
1159 );
1160
1161 compacted_conversation
1162 }
1163 Err(e) => {
1164 yield AgentEvent::Message(
1165 Message::assistant().with_text(
1166 format!("Ran into this error trying to compact: {e}.\n\nPlease try again or create a new session")
1167 )
1168 );
1169 return;
1170 }
1171 }
1172 };
1173
1174 let mut reply_stream = self.reply_internal(final_conversation, session_config, session, cancel_token).await?;
1175 while let Some(event) = reply_stream.next().await {
1176 yield event?;
1177 }
1178 }))
1179 }
1180
1181 async fn reply_internal(
1182 &self,
1183 conversation: Conversation,
1184 session_config: SessionConfig,
1185 session: Session,
1186 cancel_token: Option<CancellationToken>,
1187 ) -> Result<BoxStream<'_, Result<AgentEvent>>> {
1188 let context = self
1189 .prepare_reply_context(conversation, &session.working_dir, &session_config)
1190 .await?;
1191 let ReplyContext {
1192 mut conversation,
1193 mut tools,
1194 mut toolshim_tools,
1195 mut system_prompt,
1196 aster_mode,
1197 initial_messages,
1198 } = context;
1199 let reply_span = tracing::Span::current();
1200 self.reset_retry_attempts().await;
1201
1202 let provider = self.provider().await?;
1203 let session_id = session_config.id.clone();
1204 let working_dir = session.working_dir.clone();
1205 tokio::spawn(async move {
1206 if let Err(e) = SessionManager::maybe_update_name(&session_id, provider).await {
1207 warn!("Failed to generate session description: {}", e);
1208 }
1209 });
1210
1211 Ok(Box::pin(async_stream::try_stream! {
1212 let _ = reply_span.enter();
1213 let mut turns_taken = 0u32;
1214 let max_turns = session_config.max_turns.unwrap_or(DEFAULT_MAX_TURNS);
1215 let mut overflow_handler = OverflowHandler::new(2);
1216
1217 loop {
1218 if is_token_cancelled(&cancel_token) {
1219 break;
1220 }
1221
1222 if let Some(final_output_tool) = self.final_output_tool.lock().await.as_ref() {
1223 if final_output_tool.final_output.is_some() {
1224 let final_event = AgentEvent::Message(
1225 Message::assistant().with_text(final_output_tool.final_output.clone().unwrap())
1226 );
1227 yield final_event;
1228 break;
1229 }
1230 }
1231
1232 turns_taken += 1;
1233 if turns_taken > max_turns {
1234 yield AgentEvent::Message(
1235 Message::assistant().with_text(
1236 "I've reached the maximum number of actions I can do without user input. Would you like me to continue?"
1237 )
1238 );
1239 break;
1240 }
1241
1242 let conversation_with_moim = super::moim::inject_moim(
1243 conversation.clone(),
1244 &self.extension_manager,
1245 ).await;
1246
1247 let mut stream = Self::stream_response_from_provider(
1248 self.provider().await?,
1249 &system_prompt,
1250 conversation_with_moim.messages(),
1251 &tools,
1252 &toolshim_tools,
1253 ).await?;
1254
1255 let mut no_tools_called = true;
1256 let mut messages_to_add = Conversation::default();
1257 let mut tools_updated = false;
1258 let mut did_recovery_compact_this_iteration = false;
1259
1260 while let Some(next) = stream.next().await {
1261 if is_token_cancelled(&cancel_token) {
1262 break;
1263 }
1264
1265 match next {
1266 Ok((response, usage)) => {
1267 overflow_handler.reset();
1268
1269 let provider = self.provider().await?;
1271 if let Some(lead_worker) = provider.as_lead_worker() {
1272 if let Some(ref usage) = usage {
1273 let active_model = usage.model.clone();
1274 let (lead_model, worker_model) = lead_worker.get_model_info();
1275 let mode = if active_model == lead_model {
1276 "lead"
1277 } else if active_model == worker_model {
1278 "worker"
1279 } else {
1280 "unknown"
1281 };
1282
1283 yield AgentEvent::ModelChange {
1284 model: active_model,
1285 mode: mode.to_string(),
1286 };
1287 }
1288 }
1289
1290 if let Some(ref usage) = usage {
1291 Self::update_session_metrics(&session_config, usage, false, self.session_store.as_ref()).await?;
1292 }
1293
1294 if let Some(response) = response {
1295 let ToolCategorizeResult {
1296 frontend_requests,
1297 remaining_requests,
1298 filtered_response,
1299 } = self.categorize_tools(&response, &tools).await;
1300
1301 yield AgentEvent::Message(filtered_response.clone());
1302 tokio::task::yield_now().await;
1303
1304 let num_tool_requests = frontend_requests.len() + remaining_requests.len();
1305 if num_tool_requests == 0 {
1306 messages_to_add.push(response.clone());
1307 continue;
1308 }
1309
1310 let tool_response_messages: Vec<Arc<Mutex<Message>>> = (0..num_tool_requests)
1311 .map(|_| Arc::new(Mutex::new(Message::user().with_id(
1312 format!("msg_{}", Uuid::new_v4())
1313 ))))
1314 .collect();
1315
1316 let mut request_to_response_map = HashMap::new();
1317 let mut request_metadata: HashMap<String, Option<ProviderMetadata>> = HashMap::new();
1318 for (idx, request) in frontend_requests.iter().chain(remaining_requests.iter()).enumerate() {
1319 request_to_response_map.insert(request.id.clone(), tool_response_messages[idx].clone());
1320 request_metadata.insert(request.id.clone(), request.metadata.clone());
1321 }
1322
1323 for (idx, request) in frontend_requests.iter().enumerate() {
1324 let mut frontend_tool_stream = self.handle_frontend_tool_request(
1325 request,
1326 tool_response_messages[idx].clone(),
1327 );
1328
1329 while let Some(msg) = frontend_tool_stream.try_next().await? {
1330 yield AgentEvent::Message(msg);
1331 }
1332 }
1333 if aster_mode == AsterMode::Chat {
1334 for request in remaining_requests.iter() {
1336 if let Some(response_msg) = request_to_response_map.get(&request.id) {
1337 let mut response = response_msg.lock().await;
1338 *response = response.clone().with_tool_response_with_metadata(
1339 request.id.clone(),
1340 Ok(CallToolResult {
1341 content: vec![Content::text(CHAT_MODE_TOOL_SKIPPED_RESPONSE)],
1342 structured_content: None,
1343 is_error: Some(false),
1344 meta: None,
1345 }),
1346 request.metadata.as_ref(),
1347 );
1348 }
1349 }
1350 } else {
1351 let inspection_results = self.tool_inspection_manager
1353 .inspect_tools(
1354 &remaining_requests,
1355 conversation.messages(),
1356 )
1357 .await?;
1358
1359 let permission_check_result = self.tool_inspection_manager
1360 .process_inspection_results_with_permission_inspector(
1361 &remaining_requests,
1362 &inspection_results,
1363 )
1364 .unwrap_or_else(|| {
1365 let mut result = PermissionCheckResult {
1366 approved: vec![],
1367 needs_approval: vec![],
1368 denied: vec![],
1369 };
1370 result.needs_approval.extend(remaining_requests.iter().cloned());
1371 result
1372 });
1373
1374 let mut enable_extension_request_ids = vec![];
1376 for request in &remaining_requests {
1377 if let Ok(tool_call) = &request.tool_call {
1378 if tool_call.name == MANAGE_EXTENSIONS_TOOL_NAME_COMPLETE {
1379 enable_extension_request_ids.push(request.id.clone());
1380 }
1381 }
1382 }
1383
1384 let mut tool_futures = self.handle_approved_and_denied_tools(
1385 &permission_check_result,
1386 &request_to_response_map,
1387 cancel_token.clone(),
1388 &session,
1389 ).await?;
1390
1391 let tool_futures_arc = Arc::new(Mutex::new(tool_futures));
1392
1393 let mut tool_approval_stream = self.handle_approval_tool_requests(
1394 &permission_check_result.needs_approval,
1395 tool_futures_arc.clone(),
1396 &request_to_response_map,
1397 cancel_token.clone(),
1398 &session,
1399 &inspection_results,
1400 );
1401
1402 while let Some(msg) = tool_approval_stream.try_next().await? {
1403 yield AgentEvent::Message(msg);
1404 }
1405
1406 tool_futures = {
1407 let mut futures_lock = tool_futures_arc.lock().await;
1408 futures_lock.drain(..).collect::<Vec<_>>()
1409 };
1410
1411 let with_id = tool_futures
1412 .into_iter()
1413 .map(|(request_id, stream)| {
1414 stream.map(move |item| (request_id.clone(), item))
1415 })
1416 .collect::<Vec<_>>();
1417
1418 let mut combined = stream::select_all(with_id);
1419 let mut all_install_successful = true;
1420
1421 while let Some((request_id, item)) = combined.next().await {
1422 if is_token_cancelled(&cancel_token) {
1423 break;
1424 }
1425
1426 for msg in self.drain_elicitation_messages(&session_config.id).await {
1427 yield AgentEvent::Message(msg);
1428 }
1429
1430 match item {
1431 ToolStreamItem::Result(output) => {
1432 if enable_extension_request_ids.contains(&request_id)
1433 && output.is_err()
1434 {
1435 all_install_successful = false;
1436 }
1437 if let Some(response_msg) = request_to_response_map.get(&request_id) {
1438 let metadata = request_metadata.get(&request_id).and_then(|m| m.as_ref());
1439 let mut response = response_msg.lock().await;
1440 *response = response.clone().with_tool_response_with_metadata(request_id, output, metadata);
1441 }
1442 }
1443 ToolStreamItem::Message(msg) => {
1444 yield AgentEvent::McpNotification((request_id, msg));
1445 }
1446 }
1447 }
1448
1449 for msg in self.drain_elicitation_messages(&session_config.id).await {
1451 yield AgentEvent::Message(msg);
1452 }
1453
1454 if all_install_successful && !enable_extension_request_ids.is_empty() {
1455 if let Err(e) = self.save_extension_state(&session_config).await {
1456 warn!("Failed to save extension state after runtime changes: {}", e);
1457 }
1458 tools_updated = true;
1459 }
1460 }
1461
1462 let thinking_content: Vec<MessageContent> = response.content.iter()
1465 .filter(|c| matches!(c, MessageContent::Thinking(_)))
1466 .cloned()
1467 .collect();
1468 if !thinking_content.is_empty() {
1469 let thinking_msg = Message::new(
1470 response.role.clone(),
1471 response.created,
1472 thinking_content,
1473 ).with_id(format!("msg_{}", Uuid::new_v4()));
1474 messages_to_add.push(thinking_msg);
1475 }
1476
1477 for (idx, request) in frontend_requests.iter().chain(remaining_requests.iter()).enumerate() {
1478 if request.tool_call.is_ok() {
1479 let request_msg = Message::assistant()
1480 .with_id(format!("msg_{}", Uuid::new_v4()))
1481 .with_tool_request_with_metadata(
1482 request.id.clone(),
1483 request.tool_call.clone(),
1484 request.metadata.as_ref(),
1485 request.tool_meta.clone(),
1486 );
1487 messages_to_add.push(request_msg);
1488 let final_response = tool_response_messages[idx]
1489 .lock().await.clone();
1490 yield AgentEvent::Message(final_response.clone());
1491 messages_to_add.push(final_response);
1492 }
1493 }
1494
1495 no_tools_called = false;
1496 }
1497 }
1498 Err(ref provider_err @ ProviderError::ContextLengthExceeded(_)) => {
1499 crate::posthog::emit_error(provider_err.telemetry_type(), &provider_err.to_string());
1500
1501 if !overflow_handler.can_retry() {
1502 error!("Context limit exceeded after compaction - prompt too large");
1503 yield AgentEvent::Message(
1504 Message::assistant().with_system_notification(
1505 SystemNotificationType::InlineMessage,
1506 "Unable to continue: Context limit still exceeded after compaction. Try using a shorter message, a model with a larger context window, or start a new session."
1507 )
1508 );
1509 break;
1510 }
1511
1512 yield AgentEvent::Message(
1513 Message::assistant().with_system_notification(
1514 SystemNotificationType::InlineMessage,
1515 format!(
1516 "Context limit reached. Compacting to continue conversation... (attempt {}/{})",
1517 overflow_handler.compaction_attempts() + 1,
1518 2
1519 ),
1520 )
1521 );
1522 yield AgentEvent::Message(
1523 Message::assistant().with_system_notification(
1524 SystemNotificationType::ThinkingMessage,
1525 COMPACTION_THINKING_TEXT,
1526 )
1527 );
1528
1529 match overflow_handler.handle_overflow(self.provider().await?.as_ref(), &conversation, &session).await {
1530 Ok((compacted_conversation, usage, should_retry)) => {
1531 if should_retry {
1532 self.store_replace_conversation(&session_config.id, &compacted_conversation).await?;
1533 Self::update_session_metrics(&session_config, &usage, true, self.session_store.as_ref()).await?;
1534 conversation = compacted_conversation;
1535 did_recovery_compact_this_iteration = true;
1536 yield AgentEvent::HistoryReplaced(conversation.clone());
1537 }
1538 break;
1539 }
1540 Err(e) => {
1541 crate::posthog::emit_error("compaction_failed", &e.to_string());
1542 error!("Compaction failed: {}", e);
1543 yield AgentEvent::Message(
1544 Message::assistant().with_system_notification(
1545 SystemNotificationType::InlineMessage,
1546 format!("Compaction failed: {}", e),
1547 )
1548 );
1549 break;
1550 }
1551 }
1552 }
1553 Err(ref provider_err) => {
1554 crate::posthog::emit_error(provider_err.telemetry_type(), &provider_err.to_string());
1555 error!("Error: {}", provider_err);
1556 yield AgentEvent::Message(
1557 Message::assistant().with_text(
1558 format!("Ran into this error: {provider_err}.\n\nPlease retry if you think this is a transient or recoverable error.")
1559 )
1560 );
1561 break;
1562 }
1563 }
1564 }
1565 if tools_updated {
1566 let session_prompt = session_config.system_prompt.as_deref();
1567 (tools, toolshim_tools, system_prompt) =
1568 self.prepare_tools_and_prompt(&working_dir, session_prompt).await?;
1569 }
1570 let mut exit_chat = false;
1571 if no_tools_called {
1572 if let Some(final_output_tool) = self.final_output_tool.lock().await.as_ref() {
1573 if final_output_tool.final_output.is_none() {
1574 warn!("Final output tool has not been called yet. Continuing agent loop.");
1575 let message = Message::user().with_text(FINAL_OUTPUT_CONTINUATION_MESSAGE);
1576 messages_to_add.push(message.clone());
1577 yield AgentEvent::Message(message);
1578 } else {
1579 let message = Message::assistant().with_text(final_output_tool.final_output.clone().unwrap());
1580 messages_to_add.push(message.clone());
1581 yield AgentEvent::Message(message);
1582 exit_chat = true;
1583 }
1584 } else if did_recovery_compact_this_iteration {
1585 } else {
1587 match self.handle_retry_logic(&mut conversation, &session_config, &initial_messages).await {
1588 Ok(should_retry) => {
1589 if should_retry {
1590 info!("Retry logic triggered, restarting agent loop");
1591 } else {
1592 exit_chat = true;
1593 }
1594 }
1595 Err(e) => {
1596 error!("Retry logic failed: {}", e);
1597 yield AgentEvent::Message(
1598 Message::assistant().with_text(
1599 format!("Retry logic encountered an error: {}", e)
1600 )
1601 );
1602 exit_chat = true;
1603 }
1604 }
1605 }
1606 }
1607
1608 for msg in &messages_to_add {
1609 self.store_add_message(&session_config.id, msg).await?;
1610 }
1611 conversation.extend(messages_to_add);
1612 if exit_chat {
1613 break;
1614 }
1615
1616 tokio::task::yield_now().await;
1617 }
1618 }))
1619 }
1620
1621 pub async fn extend_system_prompt(&self, instruction: String) {
1622 let mut prompt_manager = self.prompt_manager.lock().await;
1623 prompt_manager.add_system_prompt_extra(instruction);
1624 }
1625
1626 pub async fn update_provider(
1627 &self,
1628 provider: Arc<dyn Provider>,
1629 session_id: &str,
1630 ) -> Result<()> {
1631 let mut current_provider = self.provider.lock().await;
1632 *current_provider = Some(provider.clone());
1633
1634 self.store_update_provider_config(
1635 session_id,
1636 provider.get_name().to_string(),
1637 provider.get_model_config(),
1638 )
1639 .await
1640 .context("Failed to persist provider config to session")
1641 }
1642
1643 pub async fn override_system_prompt(&self, template: String) {
1645 let mut prompt_manager = self.prompt_manager.lock().await;
1646 prompt_manager.set_system_prompt_override(template);
1647 }
1648
1649 pub async fn list_extension_prompts(&self) -> HashMap<String, Vec<Prompt>> {
1650 self.extension_manager
1651 .list_prompts(CancellationToken::default())
1652 .await
1653 .expect("Failed to list prompts")
1654 }
1655
1656 pub async fn get_prompt(&self, name: &str, arguments: Value) -> Result<GetPromptResult> {
1657 let prompts = self
1659 .extension_manager
1660 .list_prompts(CancellationToken::default())
1661 .await
1662 .map_err(|e| anyhow!("Failed to list prompts: {}", e))?;
1663
1664 if let Some(extension) = prompts
1665 .iter()
1666 .find(|(_, prompt_list)| prompt_list.iter().any(|p| p.name == name))
1667 .map(|(extension, _)| extension)
1668 {
1669 return self
1670 .extension_manager
1671 .get_prompt(extension, name, arguments, CancellationToken::default())
1672 .await
1673 .map_err(|e| anyhow!("Failed to get prompt: {}", e));
1674 }
1675
1676 Err(anyhow!("Prompt '{}' not found", name))
1677 }
1678
1679 pub async fn get_plan_prompt(&self) -> Result<String> {
1680 let tools = self.extension_manager.get_prefixed_tools(None).await?;
1681 let tools_info = tools
1682 .into_iter()
1683 .map(|tool| {
1684 ToolInfo::new(
1685 &tool.name,
1686 tool.description
1687 .as_ref()
1688 .map(|d| d.as_ref())
1689 .unwrap_or_default(),
1690 get_parameter_names(&tool),
1691 None,
1692 )
1693 })
1694 .collect();
1695
1696 let plan_prompt = self.extension_manager.get_planning_prompt(tools_info).await;
1697
1698 Ok(plan_prompt)
1699 }
1700
1701 pub async fn handle_tool_result(&self, id: String, result: ToolResult<CallToolResult>) {
1702 if let Err(e) = self.tool_result_tx.send((id, result)).await {
1703 error!("Failed to send tool result: {}", e);
1704 }
1705 }
1706
1707 pub async fn create_recipe(&self, mut messages: Conversation) -> Result<Recipe> {
1708 tracing::info!("Starting recipe creation with {} messages", messages.len());
1709
1710 let extensions_info = self.extension_manager.get_extensions_info().await;
1711 tracing::debug!("Retrieved {} extensions info", extensions_info.len());
1712 let (extension_count, tool_count) =
1713 self.extension_manager.get_extension_and_tool_counts().await;
1714
1715 let provider = self.provider().await.map_err(|e| {
1717 tracing::error!("Failed to get provider for recipe creation: {}", e);
1718 e
1719 })?;
1720 let model_config = provider.get_model_config();
1721 let model_name = &model_config.model_name;
1722 tracing::debug!("Using model: {}", model_name);
1723
1724 let prompt_manager = self.prompt_manager.lock().await;
1725 let system_prompt = prompt_manager
1726 .builder()
1727 .with_extensions(extensions_info.into_iter())
1728 .with_frontend_instructions(self.frontend_instructions.lock().await.clone())
1729 .with_extension_and_tool_counts(extension_count, tool_count)
1730 .build();
1731
1732 let recipe_prompt = prompt_manager.get_recipe_prompt().await;
1733 let tools = self
1734 .extension_manager
1735 .get_prefixed_tools(None)
1736 .await
1737 .map_err(|e| {
1738 tracing::error!("Failed to get tools for recipe creation: {}", e);
1739 e
1740 })?;
1741
1742 messages.push(Message::user().with_text(recipe_prompt));
1743
1744 let (messages, issues) = fix_conversation(messages);
1745 if !issues.is_empty() {
1746 issues
1747 .iter()
1748 .for_each(|issue| tracing::warn!(recipe.conversation.issue = issue));
1749 }
1750
1751 tracing::debug!(
1752 "Added recipe prompt to messages, total messages: {}",
1753 messages.len()
1754 );
1755
1756 tracing::info!("Calling provider to generate recipe content");
1757 let (result, _usage) = self
1758 .provider
1759 .lock()
1760 .await
1761 .as_ref()
1762 .ok_or_else(|| {
1763 let error = anyhow!("Provider not available during recipe creation");
1764 tracing::error!("{}", error);
1765 error
1766 })?
1767 .complete(&system_prompt, messages.messages(), &tools)
1768 .await
1769 .map_err(|e| {
1770 tracing::error!("Provider completion failed during recipe creation: {}", e);
1771 e
1772 })?;
1773
1774 let content = result.as_concat_text();
1775 tracing::debug!(
1776 "Provider returned content with {} characters",
1777 content.len()
1778 );
1779
1780 let re = Regex::new(r"(?s)```[^\n]*\n(.*?)\n```").unwrap();
1782 let clean_content = re
1783 .captures(&content)
1784 .and_then(|caps| caps.get(1).map(|m| m.as_str()))
1785 .unwrap_or(&content)
1786 .trim()
1787 .to_string();
1788
1789 let (instructions, activities) =
1790 if let Ok(json_content) = serde_json::from_str::<Value>(&clean_content) {
1791 let instructions = json_content
1792 .get("instructions")
1793 .ok_or_else(|| anyhow!("Missing 'instructions' in json response"))?
1794 .as_str()
1795 .ok_or_else(|| anyhow!("instructions' is not a string"))?
1796 .to_string();
1797
1798 let activities = json_content
1799 .get("activities")
1800 .ok_or_else(|| anyhow!("Missing 'activities' in json response"))?
1801 .as_array()
1802 .ok_or_else(|| anyhow!("'activities' is not an array'"))?
1803 .iter()
1804 .map(|act| {
1805 act.as_str()
1806 .map(|s| s.to_string())
1807 .ok_or(anyhow!("'activities' array element is not a string"))
1808 })
1809 .collect::<Result<_, _>>()?;
1810
1811 (instructions, activities)
1812 } else {
1813 tracing::warn!("Failed to parse JSON, falling back to string parsing");
1814 let after_instructions = content
1817 .split_once("instructions:")
1818 .map(|(_, rest)| rest)
1819 .unwrap_or(&content);
1820
1821 let (instructions_part, activities_text) = after_instructions
1823 .split_once("activities:")
1824 .unwrap_or((after_instructions, ""));
1825
1826 let instructions = instructions_part
1827 .trim_end_matches(|c: char| c.is_whitespace() || c == '#')
1828 .trim()
1829 .to_string();
1830 let activities_text = activities_text.trim();
1831
1832 let bullet_re = Regex::new(r"^[•\-*\d]+\.?\s*").expect("Invalid regex");
1834
1835 let activities: Vec<String> = activities_text
1837 .lines()
1838 .map(|line| bullet_re.replace(line, "").to_string())
1839 .map(|s| s.trim().to_string())
1840 .filter(|line| !line.is_empty())
1841 .collect();
1842
1843 (instructions, activities)
1844 };
1845
1846 let extension_configs = get_enabled_extensions();
1847
1848 let author = Author {
1849 contact: std::env::var("USER")
1850 .or_else(|_| std::env::var("USERNAME"))
1851 .ok(),
1852 metadata: None,
1853 };
1854
1855 let config = Config::global();
1858 let provider_name: String = config
1859 .get_aster_provider()
1860 .expect("No provider configured. Run 'aster configure' first");
1861
1862 let settings = Settings {
1863 aster_provider: Some(provider_name.clone()),
1864 aster_model: Some(model_name.clone()),
1865 temperature: Some(model_config.temperature.unwrap_or(0.0)),
1866 };
1867
1868 tracing::debug!(
1869 "Building recipe with {} activities and {} extensions",
1870 activities.len(),
1871 extension_configs.len()
1872 );
1873
1874 let (title, description) =
1875 if let Ok(json_content) = serde_json::from_str::<Value>(&clean_content) {
1876 let title = json_content
1877 .get("title")
1878 .and_then(|t| t.as_str())
1879 .unwrap_or("Custom recipe from chat")
1880 .to_string();
1881
1882 let description = json_content
1883 .get("description")
1884 .and_then(|d| d.as_str())
1885 .unwrap_or("a custom recipe instance from this chat session")
1886 .to_string();
1887
1888 (title, description)
1889 } else {
1890 (
1891 "Custom recipe from chat".to_string(),
1892 "a custom recipe instance from this chat session".to_string(),
1893 )
1894 };
1895
1896 let recipe = Recipe::builder()
1897 .title(title)
1898 .description(description)
1899 .instructions(instructions)
1900 .activities(activities)
1901 .extensions(extension_configs)
1902 .settings(settings)
1903 .author(author)
1904 .build()
1905 .map_err(|e| {
1906 tracing::error!("Failed to build recipe: {}", e);
1907 anyhow!("Recipe build failed: {}", e)
1908 })?;
1909
1910 tracing::info!("Recipe creation completed successfully");
1911 Ok(recipe)
1912 }
1913}
1914
1915#[cfg(test)]
1916mod tests {
1917 use super::*;
1918 use crate::recipe::Response;
1919
1920 #[tokio::test]
1921 async fn test_add_final_output_tool() -> Result<()> {
1922 let agent = Agent::new();
1923
1924 let response = Response {
1925 json_schema: Some(serde_json::json!({
1926 "type": "object",
1927 "properties": {
1928 "result": {"type": "string"}
1929 }
1930 })),
1931 };
1932
1933 agent.add_final_output_tool(response).await;
1934
1935 let tools = agent.list_tools(None).await;
1936 let final_output_tool = tools
1937 .iter()
1938 .find(|tool| tool.name == FINAL_OUTPUT_TOOL_NAME);
1939
1940 assert!(
1941 final_output_tool.is_some(),
1942 "Final output tool should be present after adding"
1943 );
1944
1945 let prompt_manager = agent.prompt_manager.lock().await;
1946 let system_prompt = prompt_manager.builder().build();
1947
1948 let final_output_tool_ref = agent.final_output_tool.lock().await;
1949 let final_output_tool_system_prompt =
1950 final_output_tool_ref.as_ref().unwrap().system_prompt();
1951 assert!(system_prompt.contains(&final_output_tool_system_prompt));
1952 Ok(())
1953 }
1954
1955 #[tokio::test]
1956 async fn test_tool_inspection_manager_has_all_inspectors() -> Result<()> {
1957 let agent = Agent::new();
1958
1959 let inspector_names = agent.tool_inspection_manager.inspector_names();
1961
1962 assert!(
1963 inspector_names.contains(&"repetition"),
1964 "Tool inspection manager should contain repetition inspector"
1965 );
1966 assert!(
1967 inspector_names.contains(&"permission"),
1968 "Tool inspection manager should contain permission inspector"
1969 );
1970 assert!(
1971 inspector_names.contains(&"security"),
1972 "Tool inspection manager should contain security inspector"
1973 );
1974
1975 Ok(())
1976 }
1977
1978 #[tokio::test]
1979 async fn test_agent_has_tool_registry() -> Result<()> {
1980 let agent = Agent::new();
1981
1982 let registry = agent.tool_registry();
1984 let registry_guard = registry.read().await;
1985
1986 assert!(
1988 registry_guard.contains("bash"),
1989 "bash tool should be registered"
1990 );
1991 assert!(
1992 registry_guard.contains("read"),
1993 "read tool should be registered"
1994 );
1995 assert!(
1996 registry_guard.contains("write"),
1997 "write tool should be registered"
1998 );
1999 assert!(
2000 registry_guard.contains("edit"),
2001 "edit tool should be registered"
2002 );
2003 assert!(
2004 registry_guard.contains("glob"),
2005 "glob tool should be registered"
2006 );
2007 assert!(
2008 registry_guard.contains("grep"),
2009 "grep tool should be registered"
2010 );
2011
2012 assert!(
2014 registry_guard.native_tool_count() >= 6,
2015 "Should have at least 6 native tools"
2016 );
2017
2018 Ok(())
2019 }
2020
2021 #[tokio::test]
2022 async fn test_agent_with_tool_config() -> Result<()> {
2023 let config = ToolRegistrationConfig::new().with_pdf_enabled(true);
2024 let agent = Agent::with_tool_config(config);
2025
2026 let registry = agent.tool_registry();
2028 let registry_guard = registry.read().await;
2029
2030 assert!(
2032 registry_guard.contains("bash"),
2033 "bash tool should be registered"
2034 );
2035 assert!(
2036 registry_guard.contains("read"),
2037 "read tool should be registered"
2038 );
2039
2040 Ok(())
2041 }
2042
2043 #[tokio::test]
2044 async fn test_agent_register_mcp_tool() -> Result<()> {
2045 let agent = Agent::new();
2046
2047 agent
2049 .register_mcp_tool(
2050 "test_mcp_tool".to_string(),
2051 "A test MCP tool".to_string(),
2052 serde_json::json!({"type": "object"}),
2053 "test_server".to_string(),
2054 )
2055 .await;
2056
2057 let registry = agent.tool_registry();
2059 let registry_guard = registry.read().await;
2060 assert!(
2061 registry_guard.contains("test_mcp_tool"),
2062 "MCP tool should be registered"
2063 );
2064 assert!(
2065 registry_guard.contains_mcp("test_mcp_tool"),
2066 "Should be registered as MCP tool"
2067 );
2068
2069 Ok(())
2070 }
2071
2072 #[tokio::test]
2073 async fn test_agent_file_read_history() -> Result<()> {
2074 let agent = Agent::new();
2075
2076 let history = agent.file_read_history();
2078 assert!(
2079 history.read().unwrap().is_empty(),
2080 "History should be empty initially"
2081 );
2082
2083 Ok(())
2084 }
2085}