Skip to main content

aster/agents/
agent.rs

1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5
6use anyhow::{anyhow, Context, Result};
7use futures::stream::BoxStream;
8use futures::{stream, FutureExt, Stream, StreamExt, TryStreamExt};
9use uuid::Uuid;
10
11use super::final_output_tool::FinalOutputTool;
12use super::platform_tools;
13use super::tool_execution::{ToolCallResult, CHAT_MODE_TOOL_SKIPPED_RESPONSE, DECLINED_RESPONSE};
14use crate::action_required_manager::ActionRequiredManager;
15use crate::agents::error_handling::OverflowHandler;
16use crate::agents::extension::{ExtensionConfig, ExtensionResult, ToolInfo};
17use crate::agents::extension_manager::{get_parameter_names, ExtensionManager};
18use crate::agents::extension_manager_extension::MANAGE_EXTENSIONS_TOOL_NAME_COMPLETE;
19use crate::agents::final_output_tool::{FINAL_OUTPUT_CONTINUATION_MESSAGE, FINAL_OUTPUT_TOOL_NAME};
20use crate::agents::platform_tools::PLATFORM_MANAGE_SCHEDULE_TOOL_NAME;
21use crate::agents::prompt_manager::PromptManager;
22use crate::agents::retry::{RetryManager, RetryResult};
23use crate::agents::subagent_task_config::TaskConfig;
24use crate::agents::subagent_tool::{
25    create_subagent_tool, handle_subagent_tool, SUBAGENT_TOOL_NAME,
26};
27use crate::agents::types::SessionConfig;
28use crate::agents::types::{FrontendTool, SharedProvider, ToolResultReceiver};
29use crate::config::{get_enabled_extensions, AsterMode, Config};
30use crate::context_mgmt::{
31    check_if_compaction_needed, compact_messages, DEFAULT_COMPACTION_THRESHOLD,
32};
33use crate::conversation::message::{
34    ActionRequiredData, Message, MessageContent, ProviderMetadata, SystemNotificationType,
35    ToolRequest,
36};
37use crate::conversation::{debug_conversation_fix, fix_conversation, Conversation};
38use crate::mcp_utils::ToolResult;
39use crate::permission::permission_inspector::PermissionInspector;
40use crate::permission::permission_judge::PermissionCheckResult;
41use crate::permission::PermissionConfirmation;
42use crate::providers::base::Provider;
43use crate::providers::errors::ProviderError;
44use crate::recipe::{Author, Recipe, Response, Settings, SubRecipe};
45use crate::scheduler_trait::SchedulerTrait;
46use crate::security::security_inspector::SecurityInspector;
47use crate::session::extension_data::{EnabledExtensionsState, ExtensionState};
48use crate::session::{Session, SessionManager, SessionStore, SessionType};
49use crate::tool_inspection::ToolInspectionManager;
50use crate::tool_monitor::RepetitionInspector;
51use crate::tools::{
52    register_default_tools, SharedFileReadHistory, ToolRegistrationConfig, ToolRegistry,
53};
54use crate::utils::is_token_cancelled;
55use regex::Regex;
56use rmcp::model::{
57    CallToolRequestParam, CallToolResult, Content, ErrorCode, ErrorData, GetPromptResult, Prompt,
58    ServerNotification, Tool,
59};
60use serde_json::Value;
61use tokio::sync::{mpsc, Mutex, RwLock};
62use tokio_util::sync::CancellationToken;
63use tracing::{debug, error, info, instrument, warn};
64
65const DEFAULT_MAX_TURNS: u32 = 1000;
66const COMPACTION_THINKING_TEXT: &str = "aster is compacting the conversation...";
67
68/// Context needed for the reply function
69pub struct ReplyContext {
70    pub conversation: Conversation,
71    pub tools: Vec<Tool>,
72    pub toolshim_tools: Vec<Tool>,
73    pub system_prompt: String,
74    pub aster_mode: AsterMode,
75    pub initial_messages: Vec<Message>,
76}
77
78pub struct ToolCategorizeResult {
79    pub frontend_requests: Vec<ToolRequest>,
80    pub remaining_requests: Vec<ToolRequest>,
81    pub filtered_response: Message,
82}
83
84/// The main aster Agent
85pub struct Agent {
86    pub(super) provider: SharedProvider,
87
88    pub extension_manager: Arc<ExtensionManager>,
89    pub(super) sub_recipes: Mutex<HashMap<String, SubRecipe>>,
90    pub(super) final_output_tool: Arc<Mutex<Option<FinalOutputTool>>>,
91    pub(super) frontend_tools: Mutex<HashMap<String, FrontendTool>>,
92    pub(super) frontend_instructions: Mutex<Option<String>>,
93    pub(super) prompt_manager: Mutex<PromptManager>,
94    pub(super) confirmation_tx: mpsc::Sender<(String, PermissionConfirmation)>,
95    pub(super) confirmation_rx: Mutex<mpsc::Receiver<(String, PermissionConfirmation)>>,
96    pub(super) tool_result_tx: mpsc::Sender<(String, ToolResult<CallToolResult>)>,
97    pub(super) tool_result_rx: ToolResultReceiver,
98
99    pub(super) scheduler_service: Mutex<Option<Arc<dyn SchedulerTrait>>>,
100    pub(super) retry_manager: RetryManager,
101    pub(super) tool_inspection_manager: ToolInspectionManager,
102
103    /// Tool registry for native tools (Requirements: 11.3, 11.4, 11.5)
104    pub(super) tool_registry: Arc<RwLock<ToolRegistry>>,
105    /// Shared file read history for file tools
106    pub(super) file_read_history: SharedFileReadHistory,
107
108    /// 可选的 session 存储
109    ///
110    /// 如果设置,Agent 会使用此存储保存消息。
111    /// 如果未设置,会回退到全局 SessionManager(向后兼容)。
112    pub(super) session_store: Option<Arc<dyn SessionStore>>,
113}
114
115#[derive(Clone, Debug)]
116pub enum AgentEvent {
117    Message(Message),
118    McpNotification((String, ServerNotification)),
119    ModelChange { model: String, mode: String },
120    HistoryReplaced(Conversation),
121}
122
123impl Default for Agent {
124    fn default() -> Self {
125        Self::new()
126    }
127}
128
129pub enum ToolStreamItem<T> {
130    Message(ServerNotification),
131    Result(T),
132}
133
134pub type ToolStream =
135    Pin<Box<dyn Stream<Item = ToolStreamItem<ToolResult<CallToolResult>>> + Send>>;
136
137// tool_stream combines a stream of ServerNotifications with a future representing the
138// final result of the tool call. MCP notifications are not request-scoped, but
139// this lets us capture all notifications emitted during the tool call for
140// simpler consumption
141pub fn tool_stream<S, F>(rx: S, done: F) -> ToolStream
142where
143    S: Stream<Item = ServerNotification> + Send + Unpin + 'static,
144    F: Future<Output = ToolResult<CallToolResult>> + Send + 'static,
145{
146    Box::pin(async_stream::stream! {
147        tokio::pin!(done);
148        let mut rx = rx;
149
150        loop {
151            tokio::select! {
152                Some(msg) = rx.next() => {
153                    yield ToolStreamItem::Message(msg);
154                }
155                r = &mut done => {
156                    yield ToolStreamItem::Result(r);
157                    break;
158                }
159            }
160        }
161    })
162}
163
164impl Agent {
165    pub fn new() -> Self {
166        // Create channels with buffer size 32 (adjust if needed)
167        let (confirm_tx, confirm_rx) = mpsc::channel(32);
168        let (tool_tx, tool_rx) = mpsc::channel(32);
169        let provider = Arc::new(Mutex::new(None));
170
171        // Initialize ToolRegistry with all native tools (Requirements: 11.3, 11.4)
172        let mut tool_registry = ToolRegistry::new();
173        let (file_read_history, _hook_manager) = register_default_tools(&mut tool_registry);
174
175        Self {
176            provider: provider.clone(),
177            extension_manager: Arc::new(ExtensionManager::new(provider.clone())),
178            sub_recipes: Mutex::new(HashMap::new()),
179            final_output_tool: Arc::new(Mutex::new(None)),
180            frontend_tools: Mutex::new(HashMap::new()),
181            frontend_instructions: Mutex::new(None),
182            prompt_manager: Mutex::new(PromptManager::new()),
183            confirmation_tx: confirm_tx,
184            confirmation_rx: Mutex::new(confirm_rx),
185            tool_result_tx: tool_tx,
186            tool_result_rx: Arc::new(Mutex::new(tool_rx)),
187            scheduler_service: Mutex::new(None),
188            retry_manager: RetryManager::new(),
189            tool_inspection_manager: Self::create_default_tool_inspection_manager(),
190            tool_registry: Arc::new(RwLock::new(tool_registry)),
191            file_read_history,
192            session_store: None, // 默认使用全局 SessionManager
193        }
194    }
195
196    /// 设置自定义 session 存储
197    ///
198    /// 允许应用层注入自己的存储实现,而不是使用默认的 SQLite 存储。
199    /// 如果设置为 None,会回退到全局 SessionManager。
200    ///
201    /// # Example
202    /// ```ignore
203    /// let store = Arc::new(MyCustomStore::new());
204    /// let agent = Agent::new().with_session_store(store);
205    /// ```
206    pub fn with_session_store(mut self, store: Arc<dyn SessionStore>) -> Self {
207        self.session_store = Some(store);
208        self
209    }
210
211    /// 获取当前的 session 存储引用
212    pub fn session_store(&self) -> Option<&Arc<dyn SessionStore>> {
213        self.session_store.as_ref()
214    }
215
216    /// 设置 Agent 身份配置(Builder 模式)
217    ///
218    /// 允许应用层完全控制 Agent 的身份,包括名称、语言、描述等。
219    /// 这会替换默认的 "aster by Block" 身份。
220    ///
221    /// 注意:此方法使用 try_lock,如果锁被占用会静默失败。
222    /// 建议在 Agent 创建后立即调用,或使用异步版本 `set_identity()`。
223    ///
224    /// # Example
225    /// ```ignore
226    /// use aster::agents::{Agent, AgentIdentity};
227    ///
228    /// let identity = AgentIdentity::new("ProxyCast 助手")
229    ///     .with_language("Chinese")
230    ///     .with_description("一个专业的 AI 代理服务助手");
231    ///
232    /// let agent = Agent::new().with_identity(identity);
233    /// ```
234    pub fn with_identity(self, identity: super::identity::AgentIdentity) -> Self {
235        // 使用 try_lock 避免在异步运行时中阻塞
236        if let Ok(mut pm) = self.prompt_manager.try_lock() {
237            pm.set_identity(identity);
238        } else {
239            // 如果锁被占用,记录警告
240            tracing::warn!("[Agent] with_identity: 无法获取锁,身份设置被跳过");
241        }
242        self
243    }
244
245    /// 设置 Agent 身份(异步方法)
246    ///
247    /// 用于在 Agent 创建后动态修改身份配置。
248    /// 这是在异步上下文中设置身份的推荐方式。
249    pub async fn set_identity(&self, identity: super::identity::AgentIdentity) {
250        let mut pm = self.prompt_manager.lock().await;
251        pm.set_identity(identity);
252    }
253
254    /// Create a new Agent with custom tool registration configuration
255    ///
256    /// This allows customizing which tools are registered and their configuration.
257    ///
258    /// # Arguments
259    /// * `config` - Configuration for tool registration
260    ///
261    /// Requirements: 11.3, 11.4
262    pub fn with_tool_config(config: ToolRegistrationConfig) -> Self {
263        let (confirm_tx, confirm_rx) = mpsc::channel(32);
264        let (tool_tx, tool_rx) = mpsc::channel(32);
265        let provider = Arc::new(Mutex::new(None));
266
267        // Initialize ToolRegistry with configured tools
268        let mut tool_registry = ToolRegistry::new();
269        let (file_read_history, _hook_manager) =
270            crate::tools::register_all_tools(&mut tool_registry, config);
271
272        Self {
273            provider: provider.clone(),
274            extension_manager: Arc::new(ExtensionManager::new(provider.clone())),
275            sub_recipes: Mutex::new(HashMap::new()),
276            final_output_tool: Arc::new(Mutex::new(None)),
277            frontend_tools: Mutex::new(HashMap::new()),
278            frontend_instructions: Mutex::new(None),
279            prompt_manager: Mutex::new(PromptManager::new()),
280            confirmation_tx: confirm_tx,
281            confirmation_rx: Mutex::new(confirm_rx),
282            tool_result_tx: tool_tx,
283            tool_result_rx: Arc::new(Mutex::new(tool_rx)),
284            scheduler_service: Mutex::new(None),
285            retry_manager: RetryManager::new(),
286            tool_inspection_manager: Self::create_default_tool_inspection_manager(),
287            tool_registry: Arc::new(RwLock::new(tool_registry)),
288            file_read_history,
289            session_store: None,
290        }
291    }
292
293    /// Get a reference to the tool registry
294    ///
295    /// Requirements: 11.3
296    pub fn tool_registry(&self) -> &Arc<RwLock<ToolRegistry>> {
297        &self.tool_registry
298    }
299
300    /// Get a reference to the shared file read history
301    ///
302    /// This is useful for tools that need to track file reads.
303    pub fn file_read_history(&self) -> &SharedFileReadHistory {
304        &self.file_read_history
305    }
306
307    /// Register an MCP tool with the registry
308    ///
309    /// This method allows registering MCP tools from extensions into the
310    /// native tool registry. Native tools have priority over MCP tools
311    /// with the same name.
312    ///
313    /// # Arguments
314    /// * `name` - The tool name
315    /// * `description` - Tool description
316    /// * `input_schema` - JSON schema for tool input
317    /// * `server_name` - Name of the MCP server providing this tool
318    ///
319    /// Requirements: 11.4, 11.5
320    pub async fn register_mcp_tool(
321        &self,
322        name: String,
323        description: String,
324        input_schema: serde_json::Value,
325        server_name: String,
326    ) {
327        let wrapper =
328            crate::tools::McpToolWrapper::new(name.clone(), description, input_schema, server_name);
329        let mut registry = self.tool_registry.write().await;
330        registry.register_mcp(name, wrapper);
331    }
332
333    /// Create a tool inspection manager with default inspectors
334    fn create_default_tool_inspection_manager() -> ToolInspectionManager {
335        let mut tool_inspection_manager = ToolInspectionManager::new();
336
337        // Add security inspector (highest priority - runs first)
338        tool_inspection_manager.add_inspector(Box::new(SecurityInspector::new()));
339
340        // Add permission inspector (medium-high priority)
341        // Note: mode will be updated dynamically based on session config
342        tool_inspection_manager.add_inspector(Box::new(PermissionInspector::new(
343            AsterMode::SmartApprove,
344            std::collections::HashSet::new(), // readonly tools - will be populated from extension manager
345            std::collections::HashSet::new(), // regular tools - will be populated from extension manager
346        )));
347
348        // Add repetition inspector (lower priority - basic repetition checking)
349        tool_inspection_manager.add_inspector(Box::new(RepetitionInspector::new(None)));
350
351        tool_inspection_manager
352    }
353
354    // ========== Session 存储辅助方法 ==========
355    // 这些方法会优先使用注入的 session_store,如果没有则回退到全局 SessionManager
356
357    /// 添加消息到 session
358    pub(crate) async fn store_add_message(
359        &self,
360        session_id: &str,
361        message: &Message,
362    ) -> Result<()> {
363        if let Some(store) = &self.session_store {
364            store.add_message(session_id, message).await
365        } else {
366            SessionManager::add_message(session_id, message).await
367        }
368    }
369
370    /// 获取 session
371    pub(crate) async fn store_get_session(
372        &self,
373        session_id: &str,
374        include_messages: bool,
375    ) -> Result<Session> {
376        if let Some(store) = &self.session_store {
377            store.get_session(session_id, include_messages).await
378        } else {
379            SessionManager::get_session(session_id, include_messages).await
380        }
381    }
382
383    /// 替换整个对话历史
384    pub(crate) async fn store_replace_conversation(
385        &self,
386        session_id: &str,
387        conversation: &Conversation,
388    ) -> Result<()> {
389        if let Some(store) = &self.session_store {
390            store.replace_conversation(session_id, conversation).await
391        } else {
392            SessionManager::replace_conversation(session_id, conversation).await
393        }
394    }
395
396    /// 更新 session 扩展数据
397    async fn store_update_extension_data(
398        &self,
399        session_id: &str,
400        extension_data: crate::session::ExtensionData,
401    ) -> Result<()> {
402        if let Some(store) = &self.session_store {
403            store
404                .update_extension_data(session_id, extension_data)
405                .await
406        } else {
407            SessionManager::update_session(session_id)
408                .extension_data(extension_data)
409                .apply()
410                .await
411        }
412    }
413
414    /// 更新 session 的 provider 和 model 配置
415    async fn store_update_provider_config(
416        &self,
417        session_id: &str,
418        provider_name: String,
419        model_config: crate::model::ModelConfig,
420    ) -> Result<()> {
421        if let Some(store) = &self.session_store {
422            store
423                .update_provider_config(session_id, Some(provider_name), Some(model_config))
424                .await
425        } else {
426            SessionManager::update_session(session_id)
427                .provider_name(provider_name)
428                .model_config(model_config)
429                .apply()
430                .await
431        }
432    }
433
434    // ========== End Session 存储辅助方法 ==========
435
436    /// Reset the retry attempts counter to 0
437    pub async fn reset_retry_attempts(&self) {
438        self.retry_manager.reset_attempts().await;
439    }
440
441    /// Increment the retry attempts counter and return the new value
442    pub async fn increment_retry_attempts(&self) -> u32 {
443        self.retry_manager.increment_attempts().await
444    }
445
446    /// Get the current retry attempts count
447    pub async fn get_retry_attempts(&self) -> u32 {
448        self.retry_manager.get_attempts().await
449    }
450
451    async fn handle_retry_logic(
452        &self,
453        messages: &mut Conversation,
454        session_config: &SessionConfig,
455        initial_messages: &[Message],
456    ) -> Result<bool> {
457        let result = self
458            .retry_manager
459            .handle_retry_logic(
460                messages,
461                session_config,
462                initial_messages,
463                &self.final_output_tool,
464            )
465            .await?;
466
467        match result {
468            RetryResult::Retried => Ok(true),
469            RetryResult::Skipped
470            | RetryResult::MaxAttemptsReached
471            | RetryResult::SuccessChecksPassed => Ok(false),
472        }
473    }
474
475    /// 排空 elicitation 消息队列并保存到 session
476    async fn drain_elicitation_messages(&self, session_id: &str) -> Vec<Message> {
477        let mut messages = Vec::new();
478        let mut elicitation_rx = ActionRequiredManager::global().request_rx.lock().await;
479        while let Ok(elicitation_message) = elicitation_rx.try_recv() {
480            if let Err(e) = self
481                .store_add_message(session_id, &elicitation_message)
482                .await
483            {
484                warn!("Failed to save elicitation message to session: {}", e);
485            }
486            messages.push(elicitation_message);
487        }
488        messages
489    }
490
491    async fn prepare_reply_context(
492        &self,
493        unfixed_conversation: Conversation,
494        working_dir: &std::path::Path,
495        session_config: &SessionConfig,
496    ) -> Result<ReplyContext> {
497        let unfixed_messages = unfixed_conversation.messages().clone();
498        let (conversation, issues) = fix_conversation(unfixed_conversation.clone());
499        if !issues.is_empty() {
500            debug!(
501                "Conversation issue fixed: {}",
502                debug_conversation_fix(
503                    unfixed_messages.as_slice(),
504                    conversation.messages(),
505                    &issues
506                )
507            );
508        }
509        let initial_messages = conversation.messages().clone();
510        let config = Config::global();
511
512        let session_prompt = session_config.system_prompt.as_deref();
513        let (tools, toolshim_tools, system_prompt) = self
514            .prepare_tools_and_prompt(working_dir, session_prompt)
515            .await?;
516        let aster_mode = config.get_aster_mode().unwrap_or(AsterMode::Auto);
517
518        self.tool_inspection_manager
519            .update_permission_inspector_mode(aster_mode)
520            .await;
521
522        Ok(ReplyContext {
523            conversation,
524            tools,
525            toolshim_tools,
526            system_prompt,
527            aster_mode,
528            initial_messages,
529        })
530    }
531
532    async fn categorize_tools(
533        &self,
534        response: &Message,
535        tools: &[rmcp::model::Tool],
536    ) -> ToolCategorizeResult {
537        // Categorize tool requests
538        let (frontend_requests, remaining_requests, filtered_response) =
539            self.categorize_tool_requests(response, tools).await;
540
541        ToolCategorizeResult {
542            frontend_requests,
543            remaining_requests,
544            filtered_response,
545        }
546    }
547
548    async fn handle_approved_and_denied_tools(
549        &self,
550        permission_check_result: &PermissionCheckResult,
551        request_to_response_map: &HashMap<String, Arc<Mutex<Message>>>,
552        cancel_token: Option<tokio_util::sync::CancellationToken>,
553        session: &Session,
554    ) -> Result<Vec<(String, ToolStream)>> {
555        let mut tool_futures: Vec<(String, ToolStream)> = Vec::new();
556
557        // Handle pre-approved and read-only tools
558        for request in &permission_check_result.approved {
559            if let Ok(tool_call) = request.tool_call.clone() {
560                let (req_id, tool_result) = self
561                    .dispatch_tool_call(
562                        tool_call,
563                        request.id.clone(),
564                        cancel_token.clone(),
565                        session,
566                    )
567                    .await;
568
569                tool_futures.push((
570                    req_id,
571                    match tool_result {
572                        Ok(result) => tool_stream(
573                            result
574                                .notification_stream
575                                .unwrap_or_else(|| Box::new(stream::empty())),
576                            result.result,
577                        ),
578                        Err(e) => {
579                            tool_stream(Box::new(stream::empty()), futures::future::ready(Err(e)))
580                        }
581                    },
582                ));
583            }
584        }
585
586        Self::handle_denied_tools(permission_check_result, request_to_response_map).await;
587        Ok(tool_futures)
588    }
589
590    async fn handle_denied_tools(
591        permission_check_result: &PermissionCheckResult,
592        request_to_response_map: &HashMap<String, Arc<Mutex<Message>>>,
593    ) {
594        for request in &permission_check_result.denied {
595            if let Some(response_msg) = request_to_response_map.get(&request.id) {
596                let mut response = response_msg.lock().await;
597                *response = response.clone().with_tool_response_with_metadata(
598                    request.id.clone(),
599                    Ok(CallToolResult {
600                        content: vec![rmcp::model::Content::text(DECLINED_RESPONSE)],
601                        structured_content: None,
602                        is_error: Some(true),
603                        meta: None,
604                    }),
605                    request.metadata.as_ref(),
606                );
607            }
608        }
609    }
610
611    pub async fn set_scheduler(&self, scheduler: Arc<dyn SchedulerTrait>) {
612        let mut scheduler_service = self.scheduler_service.lock().await;
613        *scheduler_service = Some(scheduler);
614    }
615
616    /// Get a reference count clone to the provider
617    pub async fn provider(&self) -> Result<Arc<dyn Provider>, anyhow::Error> {
618        match &*self.provider.lock().await {
619            Some(provider) => Ok(Arc::clone(provider)),
620            None => Err(anyhow!("Provider not set")),
621        }
622    }
623
624    /// Check if a tool is a frontend tool
625    pub async fn is_frontend_tool(&self, name: &str) -> bool {
626        self.frontend_tools.lock().await.contains_key(name)
627    }
628
629    /// Get a reference to a frontend tool
630    pub async fn get_frontend_tool(&self, name: &str) -> Option<FrontendTool> {
631        self.frontend_tools.lock().await.get(name).cloned()
632    }
633
634    pub async fn add_final_output_tool(&self, response: Response) {
635        let mut final_output_tool = self.final_output_tool.lock().await;
636        let created_final_output_tool = FinalOutputTool::new(response);
637        let final_output_system_prompt = created_final_output_tool.system_prompt();
638        *final_output_tool = Some(created_final_output_tool);
639        self.extend_system_prompt(final_output_system_prompt).await;
640    }
641
642    pub async fn add_sub_recipes(&self, sub_recipes_to_add: Vec<SubRecipe>) {
643        let mut sub_recipes = self.sub_recipes.lock().await;
644        for sr in sub_recipes_to_add {
645            sub_recipes.insert(sr.name.clone(), sr);
646        }
647    }
648
649    pub async fn apply_recipe_components(
650        &self,
651        sub_recipes: Option<Vec<SubRecipe>>,
652        response: Option<Response>,
653        include_final_output: bool,
654    ) {
655        if let Some(sub_recipes) = sub_recipes {
656            self.add_sub_recipes(sub_recipes).await;
657        }
658
659        if include_final_output {
660            if let Some(response) = response {
661                self.add_final_output_tool(response).await;
662            }
663        }
664    }
665
666    /// Dispatch a single tool call to the appropriate client
667    #[instrument(skip(self, tool_call, request_id), fields(input, output))]
668    pub async fn dispatch_tool_call(
669        &self,
670        tool_call: CallToolRequestParam,
671        request_id: String,
672        cancellation_token: Option<CancellationToken>,
673        session: &Session,
674    ) -> (String, Result<ToolCallResult, ErrorData>) {
675        // Prevent subagents from creating other subagents
676        if session.session_type == SessionType::SubAgent && tool_call.name == SUBAGENT_TOOL_NAME {
677            return (
678                request_id,
679                Err(ErrorData::new(
680                    ErrorCode::INVALID_REQUEST,
681                    "Subagents cannot create other subagents".to_string(),
682                    None,
683                )),
684            );
685        }
686
687        if tool_call.name == PLATFORM_MANAGE_SCHEDULE_TOOL_NAME {
688            let arguments = tool_call
689                .arguments
690                .map(Value::Object)
691                .unwrap_or(Value::Object(serde_json::Map::new()));
692            let result = self
693                .handle_schedule_management(arguments, request_id.clone())
694                .await;
695            let wrapped_result = result.map(|content| CallToolResult {
696                content,
697                structured_content: None,
698                is_error: Some(false),
699                meta: None,
700            });
701            return (request_id, Ok(ToolCallResult::from(wrapped_result)));
702        }
703
704        if tool_call.name == FINAL_OUTPUT_TOOL_NAME {
705            return if let Some(final_output_tool) = self.final_output_tool.lock().await.as_mut() {
706                let result = final_output_tool.execute_tool_call(tool_call.clone()).await;
707                (request_id, Ok(result))
708            } else {
709                (
710                    request_id,
711                    Err(ErrorData::new(
712                        ErrorCode::INTERNAL_ERROR,
713                        "Final output tool not defined".to_string(),
714                        None,
715                    )),
716                )
717            };
718        }
719
720        debug!("WAITING_TOOL_START: {}", tool_call.name);
721        let result: ToolCallResult = if tool_call.name == SUBAGENT_TOOL_NAME {
722            let provider = match self.provider().await {
723                Ok(p) => p,
724                Err(_) => {
725                    return (
726                        request_id,
727                        Err(ErrorData::new(
728                            ErrorCode::INTERNAL_ERROR,
729                            "Provider is required".to_string(),
730                            None,
731                        )),
732                    );
733                }
734            };
735
736            let extensions = self.get_extension_configs().await;
737            let task_config =
738                TaskConfig::new(provider, &session.id, &session.working_dir, extensions);
739            let sub_recipes = self.sub_recipes.lock().await.clone();
740
741            let arguments = tool_call
742                .arguments
743                .clone()
744                .map(Value::Object)
745                .unwrap_or(Value::Object(serde_json::Map::new()));
746
747            handle_subagent_tool(
748                arguments,
749                task_config,
750                sub_recipes,
751                session.working_dir.clone(),
752                cancellation_token,
753            )
754        } else if self.is_frontend_tool(&tool_call.name).await {
755            // For frontend tools, return an error indicating we need frontend execution
756            ToolCallResult::from(Err(ErrorData::new(
757                ErrorCode::INTERNAL_ERROR,
758                "Frontend tool execution required".to_string(),
759                None,
760            )))
761        } else {
762            // 参考 claude-code-open 架构:优先检查 tool_registry 中的原生工具
763            // 原生工具直接在进程内执行,不需要 MCP 子进程
764            let is_native = self.tool_registry.read().await.contains(&tool_call.name);
765
766            if is_native {
767                // 原生工具:直接通过 tool_registry 执行(类似 claude-code-open 的 toolRegistry.execute)
768                let tool_name = tool_call.name.clone();
769                let params = tool_call
770                    .arguments
771                    .clone()
772                    .map(Value::Object)
773                    .unwrap_or(Value::Object(serde_json::Map::new()));
774                let context = crate::tools::context::ToolContext::new(session.working_dir.clone())
775                    .with_session_id(session.id.clone());
776
777                let registry = self.tool_registry.read().await;
778                let execute_result = registry.execute(&tool_name, params, &context, None).await;
779                drop(registry);
780
781                match execute_result {
782                    Ok(result) => {
783                        let text = result.output.unwrap_or_default();
784                        ToolCallResult::from(Ok(CallToolResult::success(vec![Content::text(text)])))
785                    }
786                    Err(e) => ToolCallResult::from(Err(ErrorData::new(
787                        ErrorCode::INTERNAL_ERROR,
788                        e.to_string(),
789                        None,
790                    ))),
791                }
792            } else {
793                // MCP 工具:通过 extension_manager 分发
794                let result = self
795                    .extension_manager
796                    .dispatch_tool_call(tool_call.clone(), cancellation_token.unwrap_or_default())
797                    .await;
798                result.unwrap_or_else(|e| {
799                    crate::posthog::emit_error(
800                        "tool_execution_failed",
801                        &format!("{}: {}", tool_call.name, e),
802                    );
803                    ToolCallResult::from(Err(ErrorData::new(
804                        ErrorCode::INTERNAL_ERROR,
805                        e.to_string(),
806                        None,
807                    )))
808                })
809            }
810        };
811
812        debug!("WAITING_TOOL_END: {}", tool_call.name);
813
814        (
815            request_id,
816            Ok(ToolCallResult {
817                notification_stream: result.notification_stream,
818                result: Box::new(
819                    result
820                        .result
821                        .map(super::large_response_handler::process_tool_response),
822                ),
823            }),
824        )
825    }
826
827    /// Save current extension state to session metadata
828    /// Should be called after any extension add/remove operation
829    pub async fn save_extension_state(&self, session: &SessionConfig) -> Result<()> {
830        let extension_configs = self.extension_manager.get_extension_configs().await;
831
832        let extensions_state = EnabledExtensionsState::new(extension_configs);
833
834        let mut session_data = self.store_get_session(&session.id, false).await?;
835
836        if let Err(e) = extensions_state.to_extension_data(&mut session_data.extension_data) {
837            warn!("Failed to serialize extension state: {}", e);
838            return Err(anyhow!("Extension state serialization failed: {}", e));
839        }
840
841        self.store_update_extension_data(&session.id, session_data.extension_data)
842            .await?;
843
844        Ok(())
845    }
846
847    pub async fn add_extension(&self, extension: ExtensionConfig) -> ExtensionResult<()> {
848        match &extension {
849            ExtensionConfig::Frontend {
850                tools,
851                instructions,
852                ..
853            } => {
854                // For frontend tools, just store them in the frontend_tools map
855                let mut frontend_tools = self.frontend_tools.lock().await;
856                for tool in tools {
857                    let frontend_tool = FrontendTool {
858                        name: tool.name.to_string(),
859                        tool: tool.clone(),
860                    };
861                    frontend_tools.insert(tool.name.to_string(), frontend_tool);
862                }
863                // Store instructions if provided, using "frontend" as the key
864                let mut frontend_instructions = self.frontend_instructions.lock().await;
865                if let Some(instructions) = instructions {
866                    *frontend_instructions = Some(instructions.clone());
867                } else {
868                    // Default frontend instructions if none provided
869                    *frontend_instructions = Some(
870                        "The following tools are provided directly by the frontend and will be executed by the frontend when called.".to_string(),
871                    );
872                }
873            }
874            _ => {
875                self.extension_manager
876                    .add_extension(extension.clone())
877                    .await?;
878            }
879        }
880
881        Ok(())
882    }
883
884    pub async fn subagents_enabled(&self) -> bool {
885        let config = crate::config::Config::global();
886        let is_autonomous = config.get_aster_mode().unwrap_or(AsterMode::Auto) == AsterMode::Auto;
887        if !is_autonomous {
888            return false;
889        }
890        if self
891            .provider()
892            .await
893            .map(|provider| provider.get_active_model_name().starts_with("gemini"))
894            .unwrap_or(false)
895        {
896            return false;
897        }
898        if let Some(ref session_id) = self.extension_manager.get_context().await.session_id {
899            if matches!(
900                self.store_get_session(session_id, false)
901                    .await
902                    .ok()
903                    .map(|session| session.session_type),
904                Some(SessionType::SubAgent)
905            ) {
906                return false;
907            }
908        }
909        !self
910            .extension_manager
911            .list_extensions()
912            .await
913            .map(|ext| ext.is_empty())
914            .unwrap_or(true)
915    }
916
917    pub async fn list_tools(&self, extension_name: Option<String>) -> Vec<Tool> {
918        let mut prefixed_tools = self
919            .extension_manager
920            .get_prefixed_tools(extension_name.clone())
921            .await
922            .unwrap_or_default();
923
924        let subagents_enabled = self.subagents_enabled().await;
925        // 只在 scheduler 服务可用时才暴露 schedule 工具
926        if (extension_name.is_none() || extension_name.as_deref() == Some("platform"))
927            && self.scheduler_service.lock().await.is_some()
928        {
929            prefixed_tools.push(platform_tools::manage_schedule_tool());
930        }
931
932        if extension_name.is_none() {
933            if let Some(final_output_tool) = self.final_output_tool.lock().await.as_ref() {
934                prefixed_tools.push(final_output_tool.tool());
935            }
936
937            if subagents_enabled {
938                let sub_recipes = self.sub_recipes.lock().await;
939                let sub_recipes_vec: Vec<_> = sub_recipes.values().cloned().collect();
940                prefixed_tools.push(create_subagent_tool(&sub_recipes_vec));
941            }
942
943            // 添加 tool_registry 中的原生工具(包括 SkillTool)
944            let registry = self.tool_registry.read().await;
945            for tool_def in registry.get_definitions() {
946                let tool = Tool::new(
947                    tool_def.name,
948                    tool_def.description,
949                    tool_def
950                        .input_schema
951                        .as_object()
952                        .cloned()
953                        .unwrap_or_default(),
954                );
955                prefixed_tools.push(tool);
956            }
957        }
958
959        prefixed_tools
960    }
961
962    pub async fn remove_extension(&self, name: &str) -> Result<()> {
963        self.extension_manager.remove_extension(name).await?;
964        Ok(())
965    }
966
967    pub async fn list_extensions(&self) -> Vec<String> {
968        self.extension_manager
969            .list_extensions()
970            .await
971            .expect("Failed to list extensions")
972    }
973
974    pub async fn get_extension_configs(&self) -> Vec<ExtensionConfig> {
975        self.extension_manager.get_extension_configs().await
976    }
977
978    /// Handle a confirmation response for a tool request
979    pub async fn handle_confirmation(
980        &self,
981        request_id: String,
982        confirmation: PermissionConfirmation,
983    ) {
984        if let Err(e) = self.confirmation_tx.send((request_id, confirmation)).await {
985            error!("Failed to send confirmation: {}", e);
986        }
987    }
988
989    #[instrument(skip(self, user_message, session_config), fields(user_message))]
990    pub async fn reply(
991        &self,
992        user_message: Message,
993        session_config: SessionConfig,
994        cancel_token: Option<CancellationToken>,
995    ) -> Result<BoxStream<'_, Result<AgentEvent>>> {
996        for content in &user_message.content {
997            if let MessageContent::ActionRequired(action_required) = content {
998                if let ActionRequiredData::ElicitationResponse { id, user_data } =
999                    &action_required.data
1000                {
1001                    if let Err(e) = ActionRequiredManager::global()
1002                        .submit_response(id.clone(), user_data.clone())
1003                        .await
1004                    {
1005                        let error_text = format!("Failed to submit elicitation response: {}", e);
1006                        error!(error_text);
1007                        return Ok(Box::pin(stream::once(async {
1008                            Ok(AgentEvent::Message(
1009                                Message::assistant().with_text(error_text),
1010                            ))
1011                        })));
1012                    }
1013                    self.store_add_message(&session_config.id, &user_message)
1014                        .await?;
1015                    return Ok(Box::pin(futures::stream::empty()));
1016                }
1017            }
1018        }
1019
1020        let message_text = user_message.as_concat_text();
1021
1022        // Track custom slash command usage (don't track command name for privacy)
1023        if message_text.trim().starts_with('/') {
1024            let command = message_text.split_whitespace().next();
1025            if let Some(cmd) = command {
1026                if crate::slash_commands::get_recipe_for_command(cmd).is_some() {
1027                    crate::posthog::emit_custom_slash_command_used();
1028                }
1029            }
1030        }
1031
1032        let command_result = self
1033            .execute_command(&message_text, &session_config.id)
1034            .await;
1035
1036        match command_result {
1037            Err(e) => {
1038                let error_message = Message::assistant()
1039                    .with_text(e.to_string())
1040                    .with_visibility(true, false);
1041                return Ok(Box::pin(stream::once(async move {
1042                    Ok(AgentEvent::Message(error_message))
1043                })));
1044            }
1045            Ok(Some(response)) if response.role == rmcp::model::Role::Assistant => {
1046                self.store_add_message(
1047                    &session_config.id,
1048                    &user_message.clone().with_visibility(true, false),
1049                )
1050                .await?;
1051                self.store_add_message(
1052                    &session_config.id,
1053                    &response.clone().with_visibility(true, false),
1054                )
1055                .await?;
1056
1057                // Check if this was a command that modifies conversation history
1058                let modifies_history = crate::agents::execute_commands::COMPACT_TRIGGERS
1059                    .contains(&message_text.trim())
1060                    || message_text.trim() == "/clear";
1061
1062                // 克隆 session_store 引用供 async_stream 宏内部使用
1063                let session_store_clone = self.session_store.clone();
1064                let session_id_clone = session_config.id.clone();
1065
1066                return Ok(Box::pin(async_stream::try_stream! {
1067                    yield AgentEvent::Message(user_message);
1068                    yield AgentEvent::Message(response);
1069
1070                    // After commands that modify history, notify UI that history was replaced
1071                    if modifies_history {
1072                        let updated_session = if let Some(store) = &session_store_clone {
1073                            store.get_session(&session_id_clone, true).await
1074                        } else {
1075                            SessionManager::get_session(&session_id_clone, true).await
1076                        }
1077                            .map_err(|e| anyhow!("Failed to fetch updated session: {}", e))?;
1078                        let updated_conversation = updated_session
1079                            .conversation
1080                            .ok_or_else(|| anyhow!("Session has no conversation after history modification"))?;
1081                        yield AgentEvent::HistoryReplaced(updated_conversation);
1082                    }
1083                }));
1084            }
1085            Ok(Some(resolved_message)) => {
1086                self.store_add_message(
1087                    &session_config.id,
1088                    &user_message.clone().with_visibility(true, false),
1089                )
1090                .await?;
1091                self.store_add_message(
1092                    &session_config.id,
1093                    &resolved_message.clone().with_visibility(false, true),
1094                )
1095                .await?;
1096            }
1097            Ok(None) => {
1098                self.store_add_message(&session_config.id, &user_message)
1099                    .await?;
1100            }
1101        }
1102        let session = self.store_get_session(&session_config.id, true).await?;
1103        let conversation = session
1104            .conversation
1105            .clone()
1106            .ok_or_else(|| anyhow::anyhow!("Session {} has no conversation", session_config.id))?;
1107
1108        let needs_auto_compact = check_if_compaction_needed(
1109            self.provider().await?.as_ref(),
1110            &conversation,
1111            None,
1112            &session,
1113        )
1114        .await?;
1115
1116        let conversation_to_compact = conversation.clone();
1117
1118        Ok(Box::pin(async_stream::try_stream! {
1119            let final_conversation = if !needs_auto_compact {
1120                conversation
1121            } else {
1122                let config = Config::global();
1123                let threshold = config
1124                    .get_param::<f64>("ASTER_AUTO_COMPACT_THRESHOLD")
1125                    .unwrap_or(DEFAULT_COMPACTION_THRESHOLD);
1126                let threshold_percentage = (threshold * 100.0) as u32;
1127
1128                let inline_msg = format!(
1129                    "Exceeded auto-compact threshold of {}%. Performing auto-compaction...",
1130                    threshold_percentage
1131                );
1132
1133                yield AgentEvent::Message(
1134                    Message::assistant().with_system_notification(
1135                        SystemNotificationType::InlineMessage,
1136                        inline_msg,
1137                    )
1138                );
1139
1140                yield AgentEvent::Message(
1141                    Message::assistant().with_system_notification(
1142                        SystemNotificationType::ThinkingMessage,
1143                        COMPACTION_THINKING_TEXT,
1144                    )
1145                );
1146
1147                match compact_messages(self.provider().await?.as_ref(), &conversation_to_compact, false).await {
1148                    Ok((compacted_conversation, summarization_usage)) => {
1149                        self.store_replace_conversation(&session_config.id, &compacted_conversation).await?;
1150                        Self::update_session_metrics(&session_config, &summarization_usage, true, self.session_store.as_ref()).await?;
1151
1152                        yield AgentEvent::HistoryReplaced(compacted_conversation.clone());
1153
1154                        yield AgentEvent::Message(
1155                            Message::assistant().with_system_notification(
1156                                SystemNotificationType::InlineMessage,
1157                                "Compaction complete",
1158                            )
1159                        );
1160
1161                        compacted_conversation
1162                    }
1163                    Err(e) => {
1164                        yield AgentEvent::Message(
1165                            Message::assistant().with_text(
1166                                format!("Ran into this error trying to compact: {e}.\n\nPlease try again or create a new session")
1167                            )
1168                        );
1169                        return;
1170                    }
1171                }
1172            };
1173
1174            let mut reply_stream = self.reply_internal(final_conversation, session_config, session, cancel_token).await?;
1175            while let Some(event) = reply_stream.next().await {
1176                yield event?;
1177            }
1178        }))
1179    }
1180
1181    async fn reply_internal(
1182        &self,
1183        conversation: Conversation,
1184        session_config: SessionConfig,
1185        session: Session,
1186        cancel_token: Option<CancellationToken>,
1187    ) -> Result<BoxStream<'_, Result<AgentEvent>>> {
1188        let context = self
1189            .prepare_reply_context(conversation, &session.working_dir, &session_config)
1190            .await?;
1191        let ReplyContext {
1192            mut conversation,
1193            mut tools,
1194            mut toolshim_tools,
1195            mut system_prompt,
1196            aster_mode,
1197            initial_messages,
1198        } = context;
1199        let reply_span = tracing::Span::current();
1200        self.reset_retry_attempts().await;
1201
1202        let provider = self.provider().await?;
1203        let session_id = session_config.id.clone();
1204        let working_dir = session.working_dir.clone();
1205        tokio::spawn(async move {
1206            if let Err(e) = SessionManager::maybe_update_name(&session_id, provider).await {
1207                warn!("Failed to generate session description: {}", e);
1208            }
1209        });
1210
1211        Ok(Box::pin(async_stream::try_stream! {
1212            let _ = reply_span.enter();
1213            let mut turns_taken = 0u32;
1214            let max_turns = session_config.max_turns.unwrap_or(DEFAULT_MAX_TURNS);
1215            let mut overflow_handler = OverflowHandler::new(2);
1216
1217            loop {
1218                if is_token_cancelled(&cancel_token) {
1219                    break;
1220                }
1221
1222                if let Some(final_output_tool) = self.final_output_tool.lock().await.as_ref() {
1223                    if final_output_tool.final_output.is_some() {
1224                        let final_event = AgentEvent::Message(
1225                            Message::assistant().with_text(final_output_tool.final_output.clone().unwrap())
1226                        );
1227                        yield final_event;
1228                        break;
1229                    }
1230                }
1231
1232                turns_taken += 1;
1233                if turns_taken > max_turns {
1234                    yield AgentEvent::Message(
1235                        Message::assistant().with_text(
1236                            "I've reached the maximum number of actions I can do without user input. Would you like me to continue?"
1237                        )
1238                    );
1239                    break;
1240                }
1241
1242                let conversation_with_moim = super::moim::inject_moim(
1243                    conversation.clone(),
1244                    &self.extension_manager,
1245                ).await;
1246
1247                let mut stream = Self::stream_response_from_provider(
1248                    self.provider().await?,
1249                    &system_prompt,
1250                    conversation_with_moim.messages(),
1251                    &tools,
1252                    &toolshim_tools,
1253                ).await?;
1254
1255                let mut no_tools_called = true;
1256                let mut messages_to_add = Conversation::default();
1257                let mut tools_updated = false;
1258                let mut did_recovery_compact_this_iteration = false;
1259
1260                while let Some(next) = stream.next().await {
1261                    if is_token_cancelled(&cancel_token) {
1262                        break;
1263                    }
1264
1265                    match next {
1266                        Ok((response, usage)) => {
1267                            overflow_handler.reset();
1268
1269                            // Emit model change event if provider is lead-worker
1270                            let provider = self.provider().await?;
1271                            if let Some(lead_worker) = provider.as_lead_worker() {
1272                                if let Some(ref usage) = usage {
1273                                    let active_model = usage.model.clone();
1274                                    let (lead_model, worker_model) = lead_worker.get_model_info();
1275                                    let mode = if active_model == lead_model {
1276                                        "lead"
1277                                    } else if active_model == worker_model {
1278                                        "worker"
1279                                    } else {
1280                                        "unknown"
1281                                    };
1282
1283                                    yield AgentEvent::ModelChange {
1284                                        model: active_model,
1285                                        mode: mode.to_string(),
1286                                    };
1287                                }
1288                            }
1289
1290                            if let Some(ref usage) = usage {
1291                                Self::update_session_metrics(&session_config, usage, false, self.session_store.as_ref()).await?;
1292                            }
1293
1294                            if let Some(response) = response {
1295                                let ToolCategorizeResult {
1296                                    frontend_requests,
1297                                    remaining_requests,
1298                                    filtered_response,
1299                                } = self.categorize_tools(&response, &tools).await;
1300
1301                                yield AgentEvent::Message(filtered_response.clone());
1302                                tokio::task::yield_now().await;
1303
1304                                let num_tool_requests = frontend_requests.len() + remaining_requests.len();
1305                                if num_tool_requests == 0 {
1306                                    messages_to_add.push(response.clone());
1307                                    continue;
1308                                }
1309
1310                                let tool_response_messages: Vec<Arc<Mutex<Message>>> = (0..num_tool_requests)
1311                                    .map(|_| Arc::new(Mutex::new(Message::user().with_id(
1312                                        format!("msg_{}", Uuid::new_v4())
1313                                    ))))
1314                                    .collect();
1315
1316                                let mut request_to_response_map = HashMap::new();
1317                                let mut request_metadata: HashMap<String, Option<ProviderMetadata>> = HashMap::new();
1318                                for (idx, request) in frontend_requests.iter().chain(remaining_requests.iter()).enumerate() {
1319                                    request_to_response_map.insert(request.id.clone(), tool_response_messages[idx].clone());
1320                                    request_metadata.insert(request.id.clone(), request.metadata.clone());
1321                                }
1322
1323                                for (idx, request) in frontend_requests.iter().enumerate() {
1324                                    let mut frontend_tool_stream = self.handle_frontend_tool_request(
1325                                        request,
1326                                        tool_response_messages[idx].clone(),
1327                                    );
1328
1329                                    while let Some(msg) = frontend_tool_stream.try_next().await? {
1330                                        yield AgentEvent::Message(msg);
1331                                    }
1332                                }
1333                                if aster_mode == AsterMode::Chat {
1334                                    // Skip all remaining tool calls in chat mode
1335                                    for request in remaining_requests.iter() {
1336                                        if let Some(response_msg) = request_to_response_map.get(&request.id) {
1337                                            let mut response = response_msg.lock().await;
1338                                            *response = response.clone().with_tool_response_with_metadata(
1339                                                request.id.clone(),
1340                                                Ok(CallToolResult {
1341                                                    content: vec![Content::text(CHAT_MODE_TOOL_SKIPPED_RESPONSE)],
1342                                                    structured_content: None,
1343                                                    is_error: Some(false),
1344                                                    meta: None,
1345                                                }),
1346                                                request.metadata.as_ref(),
1347                                            );
1348                                        }
1349                                    }
1350                                } else {
1351                                    // Run all tool inspectors
1352                                    let inspection_results = self.tool_inspection_manager
1353                                        .inspect_tools(
1354                                            &remaining_requests,
1355                                            conversation.messages(),
1356                                        )
1357                                        .await?;
1358
1359                                    let permission_check_result = self.tool_inspection_manager
1360                                        .process_inspection_results_with_permission_inspector(
1361                                            &remaining_requests,
1362                                            &inspection_results,
1363                                        )
1364                                        .unwrap_or_else(|| {
1365                                            let mut result = PermissionCheckResult {
1366                                                approved: vec![],
1367                                                needs_approval: vec![],
1368                                                denied: vec![],
1369                                            };
1370                                            result.needs_approval.extend(remaining_requests.iter().cloned());
1371                                            result
1372                                        });
1373
1374                                    // Track extension requests
1375                                    let mut enable_extension_request_ids = vec![];
1376                                    for request in &remaining_requests {
1377                                        if let Ok(tool_call) = &request.tool_call {
1378                                            if tool_call.name == MANAGE_EXTENSIONS_TOOL_NAME_COMPLETE {
1379                                                enable_extension_request_ids.push(request.id.clone());
1380                                            }
1381                                        }
1382                                    }
1383
1384                                    let mut tool_futures = self.handle_approved_and_denied_tools(
1385                                        &permission_check_result,
1386                                        &request_to_response_map,
1387                                        cancel_token.clone(),
1388                                        &session,
1389                                    ).await?;
1390
1391                                    let tool_futures_arc = Arc::new(Mutex::new(tool_futures));
1392
1393                                    let mut tool_approval_stream = self.handle_approval_tool_requests(
1394                                        &permission_check_result.needs_approval,
1395                                        tool_futures_arc.clone(),
1396                                        &request_to_response_map,
1397                                        cancel_token.clone(),
1398                                        &session,
1399                                        &inspection_results,
1400                                    );
1401
1402                                    while let Some(msg) = tool_approval_stream.try_next().await? {
1403                                        yield AgentEvent::Message(msg);
1404                                    }
1405
1406                                    tool_futures = {
1407                                        let mut futures_lock = tool_futures_arc.lock().await;
1408                                        futures_lock.drain(..).collect::<Vec<_>>()
1409                                    };
1410
1411                                    let with_id = tool_futures
1412                                        .into_iter()
1413                                        .map(|(request_id, stream)| {
1414                                            stream.map(move |item| (request_id.clone(), item))
1415                                        })
1416                                        .collect::<Vec<_>>();
1417
1418                                    let mut combined = stream::select_all(with_id);
1419                                    let mut all_install_successful = true;
1420
1421                                    while let Some((request_id, item)) = combined.next().await {
1422                                        if is_token_cancelled(&cancel_token) {
1423                                            break;
1424                                        }
1425
1426                                        for msg in self.drain_elicitation_messages(&session_config.id).await {
1427                                            yield AgentEvent::Message(msg);
1428                                        }
1429
1430                                        match item {
1431                                            ToolStreamItem::Result(output) => {
1432                                                if enable_extension_request_ids.contains(&request_id)
1433                                                    && output.is_err()
1434                                                {
1435                                                    all_install_successful = false;
1436                                                }
1437                                                if let Some(response_msg) = request_to_response_map.get(&request_id) {
1438                                                    let metadata = request_metadata.get(&request_id).and_then(|m| m.as_ref());
1439                                                    let mut response = response_msg.lock().await;
1440                                                    *response = response.clone().with_tool_response_with_metadata(request_id, output, metadata);
1441                                                }
1442                                            }
1443                                            ToolStreamItem::Message(msg) => {
1444                                                yield AgentEvent::McpNotification((request_id, msg));
1445                                            }
1446                                        }
1447                                    }
1448
1449                                    // check for remaining elicitation messages after all tools complete
1450                                    for msg in self.drain_elicitation_messages(&session_config.id).await {
1451                                        yield AgentEvent::Message(msg);
1452                                    }
1453
1454                                    if all_install_successful && !enable_extension_request_ids.is_empty() {
1455                                        if let Err(e) = self.save_extension_state(&session_config).await {
1456                                            warn!("Failed to save extension state after runtime changes: {}", e);
1457                                        }
1458                                        tools_updated = true;
1459                                    }
1460                                }
1461
1462                                // Preserve thinking content from the original response
1463                                // Gemini (and other thinking models) require thinking to be echoed back
1464                                let thinking_content: Vec<MessageContent> = response.content.iter()
1465                                    .filter(|c| matches!(c, MessageContent::Thinking(_)))
1466                                    .cloned()
1467                                    .collect();
1468                                if !thinking_content.is_empty() {
1469                                    let thinking_msg = Message::new(
1470                                        response.role.clone(),
1471                                        response.created,
1472                                        thinking_content,
1473                                    ).with_id(format!("msg_{}", Uuid::new_v4()));
1474                                    messages_to_add.push(thinking_msg);
1475                                }
1476
1477                                for (idx, request) in frontend_requests.iter().chain(remaining_requests.iter()).enumerate() {
1478                                    if request.tool_call.is_ok() {
1479                                        let request_msg = Message::assistant()
1480                                            .with_id(format!("msg_{}", Uuid::new_v4()))
1481                                            .with_tool_request_with_metadata(
1482                                                request.id.clone(),
1483                                                request.tool_call.clone(),
1484                                                request.metadata.as_ref(),
1485                                                request.tool_meta.clone(),
1486                                            );
1487                                        messages_to_add.push(request_msg);
1488                                        let final_response = tool_response_messages[idx]
1489                                                                .lock().await.clone();
1490                                        yield AgentEvent::Message(final_response.clone());
1491                                        messages_to_add.push(final_response);
1492                                    }
1493                                }
1494
1495                                no_tools_called = false;
1496                            }
1497                        }
1498                        Err(ref provider_err @ ProviderError::ContextLengthExceeded(_)) => {
1499                            crate::posthog::emit_error(provider_err.telemetry_type(), &provider_err.to_string());
1500
1501                            if !overflow_handler.can_retry() {
1502                                error!("Context limit exceeded after compaction - prompt too large");
1503                                yield AgentEvent::Message(
1504                                    Message::assistant().with_system_notification(
1505                                        SystemNotificationType::InlineMessage,
1506                                        "Unable to continue: Context limit still exceeded after compaction. Try using a shorter message, a model with a larger context window, or start a new session."
1507                                    )
1508                                );
1509                                break;
1510                            }
1511
1512                            yield AgentEvent::Message(
1513                                Message::assistant().with_system_notification(
1514                                    SystemNotificationType::InlineMessage,
1515                                    format!(
1516                                        "Context limit reached. Compacting to continue conversation... (attempt {}/{})",
1517                                        overflow_handler.compaction_attempts() + 1,
1518                                        2
1519                                    ),
1520                                )
1521                            );
1522                            yield AgentEvent::Message(
1523                                Message::assistant().with_system_notification(
1524                                    SystemNotificationType::ThinkingMessage,
1525                                    COMPACTION_THINKING_TEXT,
1526                                )
1527                            );
1528
1529                            match overflow_handler.handle_overflow(self.provider().await?.as_ref(), &conversation, &session).await {
1530                                Ok((compacted_conversation, usage, should_retry)) => {
1531                                    if should_retry {
1532                                        self.store_replace_conversation(&session_config.id, &compacted_conversation).await?;
1533                                        Self::update_session_metrics(&session_config, &usage, true, self.session_store.as_ref()).await?;
1534                                        conversation = compacted_conversation;
1535                                        did_recovery_compact_this_iteration = true;
1536                                        yield AgentEvent::HistoryReplaced(conversation.clone());
1537                                    }
1538                                    break;
1539                                }
1540                                Err(e) => {
1541                                    crate::posthog::emit_error("compaction_failed", &e.to_string());
1542                                    error!("Compaction failed: {}", e);
1543                                    yield AgentEvent::Message(
1544                                        Message::assistant().with_system_notification(
1545                                            SystemNotificationType::InlineMessage,
1546                                            format!("Compaction failed: {}", e),
1547                                        )
1548                                    );
1549                                    break;
1550                                }
1551                            }
1552                        }
1553                        Err(ref provider_err) => {
1554                            crate::posthog::emit_error(provider_err.telemetry_type(), &provider_err.to_string());
1555                            error!("Error: {}", provider_err);
1556                            yield AgentEvent::Message(
1557                                Message::assistant().with_text(
1558                                    format!("Ran into this error: {provider_err}.\n\nPlease retry if you think this is a transient or recoverable error.")
1559                                )
1560                            );
1561                            break;
1562                        }
1563                    }
1564                }
1565                if tools_updated {
1566                    let session_prompt = session_config.system_prompt.as_deref();
1567                    (tools, toolshim_tools, system_prompt) =
1568                        self.prepare_tools_and_prompt(&working_dir, session_prompt).await?;
1569                }
1570                let mut exit_chat = false;
1571                if no_tools_called {
1572                    if let Some(final_output_tool) = self.final_output_tool.lock().await.as_ref() {
1573                        if final_output_tool.final_output.is_none() {
1574                            warn!("Final output tool has not been called yet. Continuing agent loop.");
1575                            let message = Message::user().with_text(FINAL_OUTPUT_CONTINUATION_MESSAGE);
1576                            messages_to_add.push(message.clone());
1577                            yield AgentEvent::Message(message);
1578                        } else {
1579                            let message = Message::assistant().with_text(final_output_tool.final_output.clone().unwrap());
1580                            messages_to_add.push(message.clone());
1581                            yield AgentEvent::Message(message);
1582                            exit_chat = true;
1583                        }
1584                    } else if did_recovery_compact_this_iteration {
1585                        // Avoid setting exit_chat; continue from last user message in the conversation
1586                    } else {
1587                        match self.handle_retry_logic(&mut conversation, &session_config, &initial_messages).await {
1588                            Ok(should_retry) => {
1589                                if should_retry {
1590                                    info!("Retry logic triggered, restarting agent loop");
1591                                } else {
1592                                    exit_chat = true;
1593                                }
1594                            }
1595                            Err(e) => {
1596                                error!("Retry logic failed: {}", e);
1597                                yield AgentEvent::Message(
1598                                    Message::assistant().with_text(
1599                                        format!("Retry logic encountered an error: {}", e)
1600                                    )
1601                                );
1602                                exit_chat = true;
1603                            }
1604                        }
1605                    }
1606                }
1607
1608                for msg in &messages_to_add {
1609                    self.store_add_message(&session_config.id, msg).await?;
1610                }
1611                conversation.extend(messages_to_add);
1612                if exit_chat {
1613                    break;
1614                }
1615
1616                tokio::task::yield_now().await;
1617            }
1618        }))
1619    }
1620
1621    pub async fn extend_system_prompt(&self, instruction: String) {
1622        let mut prompt_manager = self.prompt_manager.lock().await;
1623        prompt_manager.add_system_prompt_extra(instruction);
1624    }
1625
1626    pub async fn update_provider(
1627        &self,
1628        provider: Arc<dyn Provider>,
1629        session_id: &str,
1630    ) -> Result<()> {
1631        let mut current_provider = self.provider.lock().await;
1632        *current_provider = Some(provider.clone());
1633
1634        self.store_update_provider_config(
1635            session_id,
1636            provider.get_name().to_string(),
1637            provider.get_model_config(),
1638        )
1639        .await
1640        .context("Failed to persist provider config to session")
1641    }
1642
1643    /// Override the system prompt with a custom template
1644    pub async fn override_system_prompt(&self, template: String) {
1645        let mut prompt_manager = self.prompt_manager.lock().await;
1646        prompt_manager.set_system_prompt_override(template);
1647    }
1648
1649    pub async fn list_extension_prompts(&self) -> HashMap<String, Vec<Prompt>> {
1650        self.extension_manager
1651            .list_prompts(CancellationToken::default())
1652            .await
1653            .expect("Failed to list prompts")
1654    }
1655
1656    pub async fn get_prompt(&self, name: &str, arguments: Value) -> Result<GetPromptResult> {
1657        // First find which extension has this prompt
1658        let prompts = self
1659            .extension_manager
1660            .list_prompts(CancellationToken::default())
1661            .await
1662            .map_err(|e| anyhow!("Failed to list prompts: {}", e))?;
1663
1664        if let Some(extension) = prompts
1665            .iter()
1666            .find(|(_, prompt_list)| prompt_list.iter().any(|p| p.name == name))
1667            .map(|(extension, _)| extension)
1668        {
1669            return self
1670                .extension_manager
1671                .get_prompt(extension, name, arguments, CancellationToken::default())
1672                .await
1673                .map_err(|e| anyhow!("Failed to get prompt: {}", e));
1674        }
1675
1676        Err(anyhow!("Prompt '{}' not found", name))
1677    }
1678
1679    pub async fn get_plan_prompt(&self) -> Result<String> {
1680        let tools = self.extension_manager.get_prefixed_tools(None).await?;
1681        let tools_info = tools
1682            .into_iter()
1683            .map(|tool| {
1684                ToolInfo::new(
1685                    &tool.name,
1686                    tool.description
1687                        .as_ref()
1688                        .map(|d| d.as_ref())
1689                        .unwrap_or_default(),
1690                    get_parameter_names(&tool),
1691                    None,
1692                )
1693            })
1694            .collect();
1695
1696        let plan_prompt = self.extension_manager.get_planning_prompt(tools_info).await;
1697
1698        Ok(plan_prompt)
1699    }
1700
1701    pub async fn handle_tool_result(&self, id: String, result: ToolResult<CallToolResult>) {
1702        if let Err(e) = self.tool_result_tx.send((id, result)).await {
1703            error!("Failed to send tool result: {}", e);
1704        }
1705    }
1706
1707    pub async fn create_recipe(&self, mut messages: Conversation) -> Result<Recipe> {
1708        tracing::info!("Starting recipe creation with {} messages", messages.len());
1709
1710        let extensions_info = self.extension_manager.get_extensions_info().await;
1711        tracing::debug!("Retrieved {} extensions info", extensions_info.len());
1712        let (extension_count, tool_count) =
1713            self.extension_manager.get_extension_and_tool_counts().await;
1714
1715        // Get model name from provider
1716        let provider = self.provider().await.map_err(|e| {
1717            tracing::error!("Failed to get provider for recipe creation: {}", e);
1718            e
1719        })?;
1720        let model_config = provider.get_model_config();
1721        let model_name = &model_config.model_name;
1722        tracing::debug!("Using model: {}", model_name);
1723
1724        let prompt_manager = self.prompt_manager.lock().await;
1725        let system_prompt = prompt_manager
1726            .builder()
1727            .with_extensions(extensions_info.into_iter())
1728            .with_frontend_instructions(self.frontend_instructions.lock().await.clone())
1729            .with_extension_and_tool_counts(extension_count, tool_count)
1730            .build();
1731
1732        let recipe_prompt = prompt_manager.get_recipe_prompt().await;
1733        let tools = self
1734            .extension_manager
1735            .get_prefixed_tools(None)
1736            .await
1737            .map_err(|e| {
1738                tracing::error!("Failed to get tools for recipe creation: {}", e);
1739                e
1740            })?;
1741
1742        messages.push(Message::user().with_text(recipe_prompt));
1743
1744        let (messages, issues) = fix_conversation(messages);
1745        if !issues.is_empty() {
1746            issues
1747                .iter()
1748                .for_each(|issue| tracing::warn!(recipe.conversation.issue = issue));
1749        }
1750
1751        tracing::debug!(
1752            "Added recipe prompt to messages, total messages: {}",
1753            messages.len()
1754        );
1755
1756        tracing::info!("Calling provider to generate recipe content");
1757        let (result, _usage) = self
1758            .provider
1759            .lock()
1760            .await
1761            .as_ref()
1762            .ok_or_else(|| {
1763                let error = anyhow!("Provider not available during recipe creation");
1764                tracing::error!("{}", error);
1765                error
1766            })?
1767            .complete(&system_prompt, messages.messages(), &tools)
1768            .await
1769            .map_err(|e| {
1770                tracing::error!("Provider completion failed during recipe creation: {}", e);
1771                e
1772            })?;
1773
1774        let content = result.as_concat_text();
1775        tracing::debug!(
1776            "Provider returned content with {} characters",
1777            content.len()
1778        );
1779
1780        // the response may be contained in ```json ```, strip that before parsing json
1781        let re = Regex::new(r"(?s)```[^\n]*\n(.*?)\n```").unwrap();
1782        let clean_content = re
1783            .captures(&content)
1784            .and_then(|caps| caps.get(1).map(|m| m.as_str()))
1785            .unwrap_or(&content)
1786            .trim()
1787            .to_string();
1788
1789        let (instructions, activities) =
1790            if let Ok(json_content) = serde_json::from_str::<Value>(&clean_content) {
1791                let instructions = json_content
1792                    .get("instructions")
1793                    .ok_or_else(|| anyhow!("Missing 'instructions' in json response"))?
1794                    .as_str()
1795                    .ok_or_else(|| anyhow!("instructions' is not a string"))?
1796                    .to_string();
1797
1798                let activities = json_content
1799                    .get("activities")
1800                    .ok_or_else(|| anyhow!("Missing 'activities' in json response"))?
1801                    .as_array()
1802                    .ok_or_else(|| anyhow!("'activities' is not an array'"))?
1803                    .iter()
1804                    .map(|act| {
1805                        act.as_str()
1806                            .map(|s| s.to_string())
1807                            .ok_or(anyhow!("'activities' array element is not a string"))
1808                    })
1809                    .collect::<Result<_, _>>()?;
1810
1811                (instructions, activities)
1812            } else {
1813                tracing::warn!("Failed to parse JSON, falling back to string parsing");
1814                // If we can't get valid JSON, try string parsing
1815                // Use split_once to get the content after "Instructions:".
1816                let after_instructions = content
1817                    .split_once("instructions:")
1818                    .map(|(_, rest)| rest)
1819                    .unwrap_or(&content);
1820
1821                // Split once more to separate instructions from activities.
1822                let (instructions_part, activities_text) = after_instructions
1823                    .split_once("activities:")
1824                    .unwrap_or((after_instructions, ""));
1825
1826                let instructions = instructions_part
1827                    .trim_end_matches(|c: char| c.is_whitespace() || c == '#')
1828                    .trim()
1829                    .to_string();
1830                let activities_text = activities_text.trim();
1831
1832                // Regex to remove bullet markers or numbers with an optional dot.
1833                let bullet_re = Regex::new(r"^[•\-*\d]+\.?\s*").expect("Invalid regex");
1834
1835                // Process each line in the activities section.
1836                let activities: Vec<String> = activities_text
1837                    .lines()
1838                    .map(|line| bullet_re.replace(line, "").to_string())
1839                    .map(|s| s.trim().to_string())
1840                    .filter(|line| !line.is_empty())
1841                    .collect();
1842
1843                (instructions, activities)
1844            };
1845
1846        let extension_configs = get_enabled_extensions();
1847
1848        let author = Author {
1849            contact: std::env::var("USER")
1850                .or_else(|_| std::env::var("USERNAME"))
1851                .ok(),
1852            metadata: None,
1853        };
1854
1855        // Ideally we'd get the name of the provider we are using from the provider itself,
1856        // but it doesn't know and the plumbing looks complicated.
1857        let config = Config::global();
1858        let provider_name: String = config
1859            .get_aster_provider()
1860            .expect("No provider configured. Run 'aster configure' first");
1861
1862        let settings = Settings {
1863            aster_provider: Some(provider_name.clone()),
1864            aster_model: Some(model_name.clone()),
1865            temperature: Some(model_config.temperature.unwrap_or(0.0)),
1866        };
1867
1868        tracing::debug!(
1869            "Building recipe with {} activities and {} extensions",
1870            activities.len(),
1871            extension_configs.len()
1872        );
1873
1874        let (title, description) =
1875            if let Ok(json_content) = serde_json::from_str::<Value>(&clean_content) {
1876                let title = json_content
1877                    .get("title")
1878                    .and_then(|t| t.as_str())
1879                    .unwrap_or("Custom recipe from chat")
1880                    .to_string();
1881
1882                let description = json_content
1883                    .get("description")
1884                    .and_then(|d| d.as_str())
1885                    .unwrap_or("a custom recipe instance from this chat session")
1886                    .to_string();
1887
1888                (title, description)
1889            } else {
1890                (
1891                    "Custom recipe from chat".to_string(),
1892                    "a custom recipe instance from this chat session".to_string(),
1893                )
1894            };
1895
1896        let recipe = Recipe::builder()
1897            .title(title)
1898            .description(description)
1899            .instructions(instructions)
1900            .activities(activities)
1901            .extensions(extension_configs)
1902            .settings(settings)
1903            .author(author)
1904            .build()
1905            .map_err(|e| {
1906                tracing::error!("Failed to build recipe: {}", e);
1907                anyhow!("Recipe build failed: {}", e)
1908            })?;
1909
1910        tracing::info!("Recipe creation completed successfully");
1911        Ok(recipe)
1912    }
1913}
1914
1915#[cfg(test)]
1916mod tests {
1917    use super::*;
1918    use crate::recipe::Response;
1919
1920    #[tokio::test]
1921    async fn test_add_final_output_tool() -> Result<()> {
1922        let agent = Agent::new();
1923
1924        let response = Response {
1925            json_schema: Some(serde_json::json!({
1926                "type": "object",
1927                "properties": {
1928                    "result": {"type": "string"}
1929                }
1930            })),
1931        };
1932
1933        agent.add_final_output_tool(response).await;
1934
1935        let tools = agent.list_tools(None).await;
1936        let final_output_tool = tools
1937            .iter()
1938            .find(|tool| tool.name == FINAL_OUTPUT_TOOL_NAME);
1939
1940        assert!(
1941            final_output_tool.is_some(),
1942            "Final output tool should be present after adding"
1943        );
1944
1945        let prompt_manager = agent.prompt_manager.lock().await;
1946        let system_prompt = prompt_manager.builder().build();
1947
1948        let final_output_tool_ref = agent.final_output_tool.lock().await;
1949        let final_output_tool_system_prompt =
1950            final_output_tool_ref.as_ref().unwrap().system_prompt();
1951        assert!(system_prompt.contains(&final_output_tool_system_prompt));
1952        Ok(())
1953    }
1954
1955    #[tokio::test]
1956    async fn test_tool_inspection_manager_has_all_inspectors() -> Result<()> {
1957        let agent = Agent::new();
1958
1959        // Verify that the tool inspection manager has all expected inspectors
1960        let inspector_names = agent.tool_inspection_manager.inspector_names();
1961
1962        assert!(
1963            inspector_names.contains(&"repetition"),
1964            "Tool inspection manager should contain repetition inspector"
1965        );
1966        assert!(
1967            inspector_names.contains(&"permission"),
1968            "Tool inspection manager should contain permission inspector"
1969        );
1970        assert!(
1971            inspector_names.contains(&"security"),
1972            "Tool inspection manager should contain security inspector"
1973        );
1974
1975        Ok(())
1976    }
1977
1978    #[tokio::test]
1979    async fn test_agent_has_tool_registry() -> Result<()> {
1980        let agent = Agent::new();
1981
1982        // Verify that the tool registry is initialized
1983        let registry = agent.tool_registry();
1984        let registry_guard = registry.read().await;
1985
1986        // Verify core native tools are registered
1987        assert!(
1988            registry_guard.contains("bash"),
1989            "bash tool should be registered"
1990        );
1991        assert!(
1992            registry_guard.contains("read"),
1993            "read tool should be registered"
1994        );
1995        assert!(
1996            registry_guard.contains("write"),
1997            "write tool should be registered"
1998        );
1999        assert!(
2000            registry_guard.contains("edit"),
2001            "edit tool should be registered"
2002        );
2003        assert!(
2004            registry_guard.contains("glob"),
2005            "glob tool should be registered"
2006        );
2007        assert!(
2008            registry_guard.contains("grep"),
2009            "grep tool should be registered"
2010        );
2011
2012        // Verify tool count
2013        assert!(
2014            registry_guard.native_tool_count() >= 6,
2015            "Should have at least 6 native tools"
2016        );
2017
2018        Ok(())
2019    }
2020
2021    #[tokio::test]
2022    async fn test_agent_with_tool_config() -> Result<()> {
2023        let config = ToolRegistrationConfig::new().with_pdf_enabled(true);
2024        let agent = Agent::with_tool_config(config);
2025
2026        // Verify that the tool registry is initialized
2027        let registry = agent.tool_registry();
2028        let registry_guard = registry.read().await;
2029
2030        // Verify core native tools are registered
2031        assert!(
2032            registry_guard.contains("bash"),
2033            "bash tool should be registered"
2034        );
2035        assert!(
2036            registry_guard.contains("read"),
2037            "read tool should be registered"
2038        );
2039
2040        Ok(())
2041    }
2042
2043    #[tokio::test]
2044    async fn test_agent_register_mcp_tool() -> Result<()> {
2045        let agent = Agent::new();
2046
2047        // Register an MCP tool
2048        agent
2049            .register_mcp_tool(
2050                "test_mcp_tool".to_string(),
2051                "A test MCP tool".to_string(),
2052                serde_json::json!({"type": "object"}),
2053                "test_server".to_string(),
2054            )
2055            .await;
2056
2057        // Verify the MCP tool is registered
2058        let registry = agent.tool_registry();
2059        let registry_guard = registry.read().await;
2060        assert!(
2061            registry_guard.contains("test_mcp_tool"),
2062            "MCP tool should be registered"
2063        );
2064        assert!(
2065            registry_guard.contains_mcp("test_mcp_tool"),
2066            "Should be registered as MCP tool"
2067        );
2068
2069        Ok(())
2070    }
2071
2072    #[tokio::test]
2073    async fn test_agent_file_read_history() -> Result<()> {
2074        let agent = Agent::new();
2075
2076        // Verify that the file read history is initialized and accessible
2077        let history = agent.file_read_history();
2078        assert!(
2079            history.read().unwrap().is_empty(),
2080            "History should be empty initially"
2081        );
2082
2083        Ok(())
2084    }
2085}