Skip to main content

a3s_code_core/session/
manager.rs

1//! SessionManager — manages multiple concurrent sessions
2
3use super::{ContextUsage, Session, SessionConfig, SessionState};
4use crate::agent::{AgentConfig, AgentEvent, AgentLoop, AgentResult};
5use crate::hitl::ConfirmationPolicy;
6use crate::llm::{self, LlmClient, LlmConfig, Message};
7use crate::mcp::McpManager;
8use crate::memory::AgentMemory;
9use crate::permissions::PermissionPolicy;
10use crate::prompts::SystemPromptSlots;
11use crate::skills::SkillRegistry;
12use crate::store::{FileSessionStore, LlmConfigData, SessionData, SessionStore};
13use crate::text::truncate_utf8;
14use crate::tools::ToolExecutor;
15use crate::DocumentParserRegistry;
16use a3s_memory::MemoryStore;
17use anyhow::{Context, Result};
18use std::collections::HashMap;
19use std::sync::Arc;
20use tokio::sync::{mpsc, RwLock};
21
22/// Session manager handles multiple concurrent sessions
23#[derive(Clone)]
24pub struct SessionManager {
25    pub(crate) sessions: Arc<RwLock<HashMap<String, Arc<RwLock<Session>>>>>,
26    /// Default LLM client used when a session has no per-session client configured.
27    /// Wrapped in RwLock so it can be hot-swapped at runtime without restart.
28    pub(crate) llm_client: Arc<RwLock<Option<Arc<dyn LlmClient>>>>,
29    pub(crate) tool_executor: Arc<ToolExecutor>,
30    /// Session stores by storage type
31    pub(crate) stores: Arc<RwLock<HashMap<crate::config::StorageBackend, Arc<dyn SessionStore>>>>,
32    /// Track which storage type each session uses
33    pub(crate) session_storage_types: Arc<RwLock<HashMap<String, crate::config::StorageBackend>>>,
34    /// LLM configurations for sessions (stored separately for persistence)
35    pub(crate) llm_configs: Arc<RwLock<HashMap<String, LlmConfigData>>>,
36    /// Ongoing operations (session_id -> CancellationToken)
37    pub(crate) ongoing_operations:
38        Arc<RwLock<HashMap<String, tokio_util::sync::CancellationToken>>>,
39    /// Skill registry for runtime skill management
40    pub(crate) skill_registry: Arc<RwLock<Option<Arc<SkillRegistry>>>>,
41    /// Per-session skill registries layered on top of the shared registry.
42    pub(crate) session_skill_registries: Arc<RwLock<HashMap<String, Arc<SkillRegistry>>>>,
43    /// Shared memory store for agent long-term memory.
44    /// When set, each `generate`/`generate_streaming` call wraps this in
45    /// `AgentMemory` and injects it into `AgentConfig.memory`.
46    pub(crate) memory_store: Arc<RwLock<Option<Arc<dyn MemoryStore>>>>,
47    /// Shared MCP manager. When set, MCP tools are registered into the
48    /// `ToolExecutor` at startup so all sessions can use them.
49    pub(crate) mcp_manager: Arc<RwLock<Option<Arc<McpManager>>>>,
50    /// Shared document parser registry for document-aware tools.
51    pub(crate) document_parser_registry: Arc<RwLock<Option<Arc<DocumentParserRegistry>>>>,
52}
53
54impl SessionManager {
55    fn compact_json_value(value: &serde_json::Value) -> String {
56        let raw = match value {
57            serde_json::Value::Null => String::new(),
58            serde_json::Value::String(s) => s.clone(),
59            _ => serde_json::to_string(value).unwrap_or_default(),
60        };
61        let compact = raw.split_whitespace().collect::<Vec<_>>().join(" ");
62        if compact.len() > 180 {
63            format!("{}...", truncate_utf8(&compact, 180))
64        } else {
65            compact
66        }
67    }
68
69    /// Create a new session manager without persistence
70    pub fn new(llm_client: Option<Arc<dyn LlmClient>>, tool_executor: Arc<ToolExecutor>) -> Self {
71        Self {
72            sessions: Arc::new(RwLock::new(HashMap::new())),
73            llm_client: Arc::new(RwLock::new(llm_client)),
74            tool_executor,
75            stores: Arc::new(RwLock::new(HashMap::new())),
76            session_storage_types: Arc::new(RwLock::new(HashMap::new())),
77            llm_configs: Arc::new(RwLock::new(HashMap::new())),
78            ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
79            skill_registry: Arc::new(RwLock::new(None)),
80            session_skill_registries: Arc::new(RwLock::new(HashMap::new())),
81            memory_store: Arc::new(RwLock::new(None)),
82            mcp_manager: Arc::new(RwLock::new(None)),
83            document_parser_registry: Arc::new(RwLock::new(Some(Arc::new(
84                DocumentParserRegistry::new(),
85            )))),
86        }
87    }
88
89    /// Create a session manager with file-based persistence
90    ///
91    /// Sessions will be automatically saved to disk and restored on startup.
92    pub async fn with_persistence<P: AsRef<std::path::Path>>(
93        llm_client: Option<Arc<dyn LlmClient>>,
94        tool_executor: Arc<ToolExecutor>,
95        sessions_dir: P,
96    ) -> Result<Self> {
97        let store = FileSessionStore::new(sessions_dir).await?;
98        let mut stores = HashMap::new();
99        stores.insert(
100            crate::config::StorageBackend::File,
101            Arc::new(store) as Arc<dyn SessionStore>,
102        );
103
104        let manager = Self {
105            sessions: Arc::new(RwLock::new(HashMap::new())),
106            llm_client: Arc::new(RwLock::new(llm_client)),
107            tool_executor,
108            stores: Arc::new(RwLock::new(stores)),
109            session_storage_types: Arc::new(RwLock::new(HashMap::new())),
110            llm_configs: Arc::new(RwLock::new(HashMap::new())),
111            ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
112            skill_registry: Arc::new(RwLock::new(None)),
113            session_skill_registries: Arc::new(RwLock::new(HashMap::new())),
114            memory_store: Arc::new(RwLock::new(None)),
115            mcp_manager: Arc::new(RwLock::new(None)),
116            document_parser_registry: Arc::new(RwLock::new(Some(Arc::new(
117                DocumentParserRegistry::new(),
118            )))),
119        };
120
121        Ok(manager)
122    }
123
124    /// Create a session manager with a custom store
125    ///
126    /// The `backend` parameter determines which `StorageBackend` key the store is registered under.
127    /// Sessions created with a matching `storage_type` will use this store.
128    pub fn with_store(
129        llm_client: Option<Arc<dyn LlmClient>>,
130        tool_executor: Arc<ToolExecutor>,
131        store: Arc<dyn SessionStore>,
132        backend: crate::config::StorageBackend,
133    ) -> Self {
134        let mut stores = HashMap::new();
135        stores.insert(backend, store);
136
137        Self {
138            sessions: Arc::new(RwLock::new(HashMap::new())),
139            llm_client: Arc::new(RwLock::new(llm_client)),
140            tool_executor,
141            stores: Arc::new(RwLock::new(stores)),
142            session_storage_types: Arc::new(RwLock::new(HashMap::new())),
143            llm_configs: Arc::new(RwLock::new(HashMap::new())),
144            ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
145            skill_registry: Arc::new(RwLock::new(None)),
146            session_skill_registries: Arc::new(RwLock::new(HashMap::new())),
147            memory_store: Arc::new(RwLock::new(None)),
148            mcp_manager: Arc::new(RwLock::new(None)),
149            document_parser_registry: Arc::new(RwLock::new(Some(Arc::new(
150                DocumentParserRegistry::new(),
151            )))),
152        }
153    }
154
155    /// Hot-swap the default LLM client used by sessions without a per-session client.
156    ///
157    /// Called by SafeClaw when `PUT /api/agent/config` updates the model config.
158    /// All subsequent `generate` / `generate_streaming` calls will use the new client.
159    pub async fn set_default_llm(&self, client: Option<Arc<dyn LlmClient>>) {
160        *self.llm_client.write().await = client;
161    }
162
163    /// Set the skill registry for runtime skill management.
164    ///
165    /// Skills registered here will be injected into the system prompt
166    /// for all subsequent `generate` / `generate_streaming` calls.
167    /// Also registers the `manage_skill` tool on the `ToolExecutor` so
168    /// the Agent can create, list, and remove skills at runtime.
169    pub async fn set_skill_registry(
170        &self,
171        registry: Arc<SkillRegistry>,
172        skills_dir: std::path::PathBuf,
173    ) {
174        // Register the manage_skill tool for self-bootstrap
175        let manage_tool = crate::skills::ManageSkillTool::new(registry.clone(), skills_dir);
176        self.tool_executor
177            .register_dynamic_tool(Arc::new(manage_tool));
178
179        *self.skill_registry.write().await = Some(registry);
180    }
181
182    /// Get the current skill registry, if any.
183    pub async fn skill_registry(&self) -> Option<Arc<SkillRegistry>> {
184        self.skill_registry.read().await.clone()
185    }
186
187    /// Override the active skill registry for a single session.
188    pub async fn set_session_skill_registry(
189        &self,
190        session_id: impl Into<String>,
191        registry: Arc<SkillRegistry>,
192    ) {
193        self.session_skill_registries
194            .write()
195            .await
196            .insert(session_id.into(), registry);
197    }
198
199    /// Get the active skill registry for a single session, if one was set.
200    pub async fn session_skill_registry(&self, session_id: &str) -> Option<Arc<SkillRegistry>> {
201        self.session_skill_registries
202            .read()
203            .await
204            .get(session_id)
205            .cloned()
206    }
207
208    /// Set the shared memory store for agent long-term memory.
209    ///
210    /// When set, every `generate`/`generate_streaming` call wraps this store
211    /// in an `AgentMemory` and injects it into `AgentConfig.memory`, enabling
212    /// automatic `recall_similar` before prompts and `remember_success`/
213    /// `remember_failure` after tool execution.
214    pub async fn set_memory_store(&self, store: Arc<dyn MemoryStore>) {
215        *self.memory_store.write().await = Some(store);
216    }
217
218    /// Get the current memory store, if any.
219    pub async fn memory_store(&self) -> Option<Arc<dyn MemoryStore>> {
220        self.memory_store.read().await.clone()
221    }
222
223    /// Set the shared MCP manager.
224    ///
225    /// When set, all tools from connected MCP servers are registered into the
226    /// `ToolExecutor` so they are available in every session.
227    pub async fn set_mcp_manager(&self, manager: Arc<McpManager>) {
228        let all_tools = manager.get_all_tools().await;
229        let mut by_server: HashMap<String, Vec<crate::mcp::McpTool>> = HashMap::new();
230        for (server, tool) in all_tools {
231            by_server.entry(server).or_default().push(tool);
232        }
233        for (server_name, tools) in by_server {
234            for tool in
235                crate::mcp::tools::create_mcp_tools(&server_name, tools, Arc::clone(&manager))
236            {
237                self.tool_executor.register_dynamic_tool(tool);
238            }
239        }
240        *self.mcp_manager.write().await = Some(manager);
241    }
242
243    /// Dynamically add and connect an MCP server to the shared manager.
244    ///
245    /// Registers the server config, connects it, and registers its tools into
246    /// the `ToolExecutor` so all subsequent sessions can use them.
247    pub async fn add_mcp_server(&self, config: crate::mcp::McpServerConfig) -> Result<()> {
248        let manager = {
249            let guard = self.mcp_manager.read().await;
250            match guard.clone() {
251                Some(m) => m,
252                None => {
253                    drop(guard);
254                    let m = Arc::new(McpManager::new());
255                    *self.mcp_manager.write().await = Some(Arc::clone(&m));
256                    m
257                }
258            }
259        };
260        let name = config.name.clone();
261        manager.register_server(config).await;
262        manager.connect(&name).await?;
263        let tools = manager.get_server_tools(&name).await;
264        for tool in crate::mcp::tools::create_mcp_tools(&name, tools, Arc::clone(&manager)) {
265            self.tool_executor.register_dynamic_tool(tool);
266        }
267        Ok(())
268    }
269
270    /// Disconnect and remove an MCP server from the shared manager.
271    ///
272    /// Also unregisters its tools from the `ToolExecutor`.
273    pub async fn remove_mcp_server(&self, name: &str) -> Result<()> {
274        let guard = self.mcp_manager.read().await;
275        if let Some(ref manager) = *guard {
276            manager.disconnect(name).await?;
277        }
278        self.tool_executor
279            .unregister_tools_by_prefix(&format!("mcp__{name}__"));
280        Ok(())
281    }
282
283    /// Get status of all connected MCP servers.
284    pub async fn mcp_status(
285        &self,
286    ) -> std::collections::HashMap<String, crate::mcp::McpServerStatus> {
287        let guard = self.mcp_manager.read().await;
288        match guard.as_ref() {
289            Some(m) => {
290                let m = Arc::clone(m);
291                drop(guard);
292                m.get_status().await
293            }
294            None => std::collections::HashMap::new(),
295        }
296    }
297
298    /// Set the shared document parser registry for document-aware tools.
299    pub async fn set_document_parser_registry(&self, registry: Arc<DocumentParserRegistry>) {
300        *self.document_parser_registry.write().await = Some(registry);
301    }
302
303    /// Get the current shared document parser registry, if any.
304    pub async fn document_parser_registry(&self) -> Option<Arc<DocumentParserRegistry>> {
305        self.document_parser_registry.read().await.clone()
306    }
307
308    /// Restore a single session by ID from the store
309    ///
310    /// Searches all registered stores for the given session ID and restores it
311    /// into the in-memory session map. Returns an error if not found.
312    pub async fn restore_session_by_id(&self, session_id: &str) -> Result<()> {
313        // Check if already loaded
314        {
315            let sessions = self.sessions.read().await;
316            if sessions.contains_key(session_id) {
317                return Ok(());
318            }
319        }
320
321        let stores = self.stores.read().await;
322        for (backend, store) in stores.iter() {
323            match store.load(session_id).await {
324                Ok(Some(data)) => {
325                    {
326                        let mut storage_types = self.session_storage_types.write().await;
327                        storage_types.insert(data.id.clone(), backend.clone());
328                    }
329                    self.restore_session(data).await?;
330                    return Ok(());
331                }
332                Ok(None) => continue,
333                Err(e) => {
334                    tracing::warn!(
335                        "Failed to load session {} from {:?}: {}",
336                        session_id,
337                        backend,
338                        e
339                    );
340                    continue;
341                }
342            }
343        }
344
345        Err(anyhow::anyhow!(
346            "Session {} not found in any store",
347            session_id
348        ))
349    }
350
351    /// Load all sessions from all registered stores
352    pub async fn load_all_sessions(&mut self) -> Result<usize> {
353        let stores = self.stores.read().await;
354        let mut loaded = 0;
355
356        for (backend, store) in stores.iter() {
357            let session_ids = match store.list().await {
358                Ok(ids) => ids,
359                Err(e) => {
360                    tracing::warn!("Failed to list sessions from {:?} store: {}", backend, e);
361                    continue;
362                }
363            };
364
365            for id in session_ids {
366                match store.load(&id).await {
367                    Ok(Some(data)) => {
368                        // Record the storage type for this session
369                        {
370                            let mut storage_types = self.session_storage_types.write().await;
371                            storage_types.insert(data.id.clone(), backend.clone());
372                        }
373
374                        if let Err(e) = self.restore_session(data).await {
375                            tracing::warn!("Failed to restore session {}: {}", id, e);
376                        } else {
377                            loaded += 1;
378                        }
379                    }
380                    Ok(None) => {
381                        tracing::warn!("Session {} not found in store", id);
382                    }
383                    Err(e) => {
384                        tracing::warn!("Failed to load session {}: {}", id, e);
385                    }
386                }
387            }
388        }
389
390        tracing::info!("Loaded {} sessions from store", loaded);
391        Ok(loaded)
392    }
393
394    /// Restore a session from SessionData
395    async fn restore_session(&self, data: SessionData) -> Result<()> {
396        let tools = self.tool_executor.definitions();
397        let mut session = Session::new(data.id.clone(), data.config.clone(), tools).await?;
398
399        // Restore serializable state
400        session.restore_from_data(&data);
401
402        // Restore LLM config if present (without API key - must be reconfigured)
403        if let Some(llm_config) = &data.llm_config {
404            let mut configs = self.llm_configs.write().await;
405            configs.insert(data.id.clone(), llm_config.clone());
406        }
407
408        let mut sessions = self.sessions.write().await;
409        sessions.insert(data.id.clone(), Arc::new(RwLock::new(session)));
410
411        tracing::info!("Restored session: {}", data.id);
412        Ok(())
413    }
414
415    /// Save a session to the store
416    async fn save_session(&self, session_id: &str) -> Result<()> {
417        // Get the storage type for this session
418        let storage_type = {
419            let storage_types = self.session_storage_types.read().await;
420            storage_types.get(session_id).cloned()
421        };
422
423        let Some(storage_type) = storage_type else {
424            // No storage type means memory-only session
425            return Ok(());
426        };
427
428        // Skip saving for memory storage
429        if storage_type == crate::config::StorageBackend::Memory {
430            return Ok(());
431        }
432
433        // Get the appropriate store
434        let stores = self.stores.read().await;
435        let Some(store) = stores.get(&storage_type) else {
436            tracing::warn!("No store available for storage type: {:?}", storage_type);
437            return Ok(());
438        };
439
440        let session_lock = self.get_session(session_id).await?;
441        let session = session_lock.read().await;
442
443        // Get LLM config if set
444        let llm_config = {
445            let configs = self.llm_configs.read().await;
446            configs.get(session_id).cloned()
447        };
448
449        let data = session.to_session_data(llm_config);
450        store.save(&data).await?;
451
452        tracing::debug!("Saved session: {}", session_id);
453        Ok(())
454    }
455
456    /// Persist a session to store, logging and emitting an event on failure.
457    /// This is a non-fatal wrapper around `save_session` — the operation
458    /// succeeds in memory even if persistence fails.
459    async fn persist_or_warn(&self, session_id: &str, operation: &str) {
460        if let Err(e) = self.save_session(session_id).await {
461            tracing::warn!(
462                "Failed to persist session {} after {}: {}",
463                session_id,
464                operation,
465                e
466            );
467            // Emit event so SDK clients can react
468            if let Ok(session_lock) = self.get_session(session_id).await {
469                let session = session_lock.read().await;
470                let _ = session.event_tx().send(AgentEvent::PersistenceFailed {
471                    session_id: session_id.to_string(),
472                    operation: operation.to_string(),
473                    error: e.to_string(),
474                });
475            }
476        }
477    }
478
479    /// Spawn persistence as a background task (non-blocking).
480    ///
481    /// All `SessionManager` fields are `Arc`-wrapped, so `clone()` is a
482    /// cheap pointer-copy. This keeps file I/O off the response path.
483    fn persist_in_background(&self, session_id: &str, operation: &str) {
484        let mgr = self.clone();
485        let sid = session_id.to_string();
486        let op = operation.to_string();
487        tokio::spawn(async move {
488            mgr.persist_or_warn(&sid, &op).await;
489        });
490    }
491
492    /// Create a new session
493    pub async fn create_session(&self, id: String, config: SessionConfig) -> Result<String> {
494        tracing::info!(name: "a3s.session.create", session_id = %id, "Creating session");
495
496        // Record the storage type for this session
497        {
498            let mut storage_types = self.session_storage_types.write().await;
499            storage_types.insert(id.clone(), config.storage_type.clone());
500        }
501
502        // Get tool definitions from the executor
503        let tools = self.tool_executor.definitions();
504        let mut session = Session::new(id.clone(), config, tools).await?;
505
506        // Start the command queue
507        session.start_queue().await?;
508
509        // Set max context length if provided
510        if session.config.max_context_length > 0 {
511            session.context_usage.max_tokens = session.config.max_context_length as usize;
512        }
513
514        {
515            let mut sessions = self.sessions.write().await;
516            sessions.insert(id.clone(), Arc::new(RwLock::new(session)));
517        }
518
519        // Persist to store
520        self.persist_in_background(&id, "create");
521
522        tracing::info!("Created session: {}", id);
523        Ok(id)
524    }
525
526    /// Destroy a session
527    pub async fn destroy_session(&self, id: &str) -> Result<()> {
528        tracing::info!(name: "a3s.session.destroy", session_id = %id, "Destroying session");
529
530        // Get the storage type before removing the session
531        let storage_type = {
532            let storage_types = self.session_storage_types.read().await;
533            storage_types.get(id).cloned()
534        };
535
536        {
537            let mut sessions = self.sessions.write().await;
538            sessions.remove(id);
539        }
540
541        // Remove LLM config
542        {
543            let mut configs = self.llm_configs.write().await;
544            configs.remove(id);
545        }
546
547        {
548            let mut registries = self.session_skill_registries.write().await;
549            registries.remove(id);
550        }
551
552        // Remove storage type tracking
553        {
554            let mut storage_types = self.session_storage_types.write().await;
555            storage_types.remove(id);
556        }
557
558        // Delete from store if applicable
559        if let Some(storage_type) = storage_type {
560            if storage_type != crate::config::StorageBackend::Memory {
561                let stores = self.stores.read().await;
562                if let Some(store) = stores.get(&storage_type) {
563                    if let Err(e) = store.delete(id).await {
564                        tracing::warn!("Failed to delete session {} from store: {}", id, e);
565                    }
566                }
567            }
568        }
569
570        tracing::info!("Destroyed session: {}", id);
571        Ok(())
572    }
573
574    /// Get a session by ID
575    pub async fn get_session(&self, id: &str) -> Result<Arc<RwLock<Session>>> {
576        let sessions = self.sessions.read().await;
577        sessions
578            .get(id)
579            .cloned()
580            .context(format!("Session not found: {}", id))
581    }
582
583    /// Create a child session for a subagent
584    ///
585    /// Child sessions inherit the parent's LLM client but have their own
586    /// permission policy and configuration.
587    pub async fn create_child_session(
588        &self,
589        parent_id: &str,
590        child_id: String,
591        mut config: SessionConfig,
592    ) -> Result<String> {
593        // Verify parent exists and inherit HITL policy
594        let parent_lock = self.get_session(parent_id).await?;
595        let parent_llm_client = {
596            let parent = parent_lock.read().await;
597
598            // Inherit parent's confirmation policy if child doesn't have one
599            if config.confirmation_policy.is_none() {
600                let parent_policy = parent.confirmation_manager.policy().await;
601                config.confirmation_policy = Some(parent_policy);
602            }
603
604            parent.llm_client.clone()
605        };
606
607        // Set parent_id in config
608        config.parent_id = Some(parent_id.to_string());
609
610        // Get tool definitions from the executor
611        let tools = self.tool_executor.definitions();
612        let mut session = Session::new(child_id.clone(), config, tools).await?;
613
614        // Inherit LLM client from parent if not set
615        if session.llm_client.is_none() {
616            let default_llm = self.llm_client.read().await.clone();
617            session.llm_client = parent_llm_client.or(default_llm);
618        }
619
620        // Start the command queue
621        session.start_queue().await?;
622
623        // Set max context length if provided
624        if session.config.max_context_length > 0 {
625            session.context_usage.max_tokens = session.config.max_context_length as usize;
626        }
627
628        {
629            let mut sessions = self.sessions.write().await;
630            sessions.insert(child_id.clone(), Arc::new(RwLock::new(session)));
631        }
632
633        // Persist to store
634        self.persist_in_background(&child_id, "create_child");
635
636        tracing::info!(
637            "Created child session: {} (parent: {})",
638            child_id,
639            parent_id
640        );
641        Ok(child_id)
642    }
643
644    /// Get all child sessions for a parent session
645    pub async fn get_child_sessions(&self, parent_id: &str) -> Vec<String> {
646        let sessions = self.sessions.read().await;
647        let mut children = Vec::new();
648
649        for (id, session_lock) in sessions.iter() {
650            let session = session_lock.read().await;
651            if session.parent_id.as_deref() == Some(parent_id) {
652                children.push(id.clone());
653            }
654        }
655
656        children
657    }
658
659    /// Check if a session is a child session
660    pub async fn is_child_session(&self, session_id: &str) -> Result<bool> {
661        let session_lock = self.get_session(session_id).await?;
662        let session = session_lock.read().await;
663        Ok(session.is_child_session())
664    }
665
666    /// Generate response for a prompt
667    pub async fn generate(&self, session_id: &str, prompt: &str) -> Result<AgentResult> {
668        let session_lock = self.get_session(session_id).await?;
669
670        // Check if session is paused
671        {
672            let session = session_lock.read().await;
673            if session.state == SessionState::Paused {
674                anyhow::bail!(
675                    "Session {} is paused. Call Resume before generating.",
676                    session_id
677                );
678            }
679        }
680
681        // Get session state and LLM client
682        let (
683            history,
684            system,
685            tools,
686            session_llm_client,
687            permission_checker,
688            confirmation_manager,
689            context_providers,
690            session_workspace,
691            tool_metrics,
692            hook_engine,
693            planning_enabled,
694            goal_tracking,
695        ) = {
696            let session = session_lock.read().await;
697            (
698                session.messages.clone(),
699                session.system().map(String::from),
700                session.tools.clone(),
701                session.llm_client.clone(),
702                session.permission_checker.clone(),
703                session.confirmation_manager.clone(),
704                session.context_providers.clone(),
705                session.config.workspace.clone(),
706                session.tool_metrics.clone(),
707                session.config.hook_engine.clone(),
708                session.config.planning_enabled,
709                session.config.goal_tracking,
710            )
711        };
712
713        // Use session's LLM client if configured, otherwise use default
714        let llm_client = if let Some(client) = session_llm_client {
715            client
716        } else if let Some(client) = self.llm_client.read().await.clone() {
717            client
718        } else {
719            anyhow::bail!(
720                "LLM client not configured for session {}. Please call Configure RPC with model configuration first.",
721                session_id
722            );
723        };
724
725        // Construct per-session ToolContext from session workspace, falling back to server default
726        let tool_context = if session_workspace.is_empty() {
727            crate::tools::ToolContext::new(self.tool_executor.workspace().clone())
728                .with_session_id(session_id)
729        } else {
730            crate::tools::ToolContext::new(std::path::PathBuf::from(&session_workspace))
731                .with_session_id(session_id)
732        };
733        let tool_context = if let Some(registry) = self.document_parser_registry().await {
734            tool_context.with_document_parsers(registry)
735        } else {
736            tool_context
737        };
738        let tool_context = if let Some(command_env) = self.tool_executor.command_env() {
739            tool_context.with_command_env(command_env)
740        } else {
741            tool_context
742        };
743
744        // Inject skill registry into system prompt and agent config
745        let skill_registry = match self.session_skill_registry(session_id).await {
746            Some(registry) => Some(registry),
747            None => self.skill_registry.read().await.clone(),
748        };
749        let system = if let Some(ref registry) = skill_registry {
750            let skill_prompt = registry.to_system_prompt();
751            if skill_prompt.is_empty() {
752                system
753            } else {
754                match system {
755                    Some(existing) => Some(format!("{}\n\n{}", existing, skill_prompt)),
756                    None => Some(skill_prompt),
757                }
758            }
759        } else {
760            system
761        };
762
763        // Inject matched skill content on-demand before skill_registry is moved into AgentConfig
764        let effective_prompt = if let Some(ref registry) = skill_registry {
765            let skill_content = registry.match_skills(prompt);
766            if skill_content.is_empty() {
767                prompt.to_string()
768            } else {
769                format!("{}\n\n---\n\n{}", skill_content, prompt)
770            }
771        } else {
772            prompt.to_string()
773        };
774
775        // Build AgentMemory from shared store (if configured)
776        let memory = self
777            .memory_store
778            .read()
779            .await
780            .as_ref()
781            .map(|store| Arc::new(AgentMemory::new(store.clone())));
782
783        // Create agent loop with permission policy, confirmation manager, and context providers
784        let config = AgentConfig {
785            prompt_slots: match system {
786                Some(s) => SystemPromptSlots::from_legacy(s),
787                None => SystemPromptSlots::default(),
788            },
789            tools,
790            max_tool_rounds: 50,
791            permission_checker: Some(permission_checker),
792            confirmation_manager: Some(confirmation_manager),
793            context_providers,
794            planning_enabled,
795            goal_tracking,
796            hook_engine,
797            skill_registry,
798            memory,
799            ..AgentConfig::default()
800        };
801
802        let agent = AgentLoop::new(llm_client, self.tool_executor.clone(), tool_context, config)
803            .with_tool_metrics(tool_metrics);
804
805        // Execute with session context
806        let result = agent
807            .execute_with_session(&history, &effective_prompt, Some(session_id), None, None)
808            .await?;
809
810        // Update session
811        {
812            let mut session = session_lock.write().await;
813            session.messages = result.messages.clone();
814            session.update_usage(&result.usage);
815        }
816
817        // Persist to store
818        self.persist_in_background(session_id, "generate");
819
820        // Auto-compact if context usage exceeds threshold
821        if let Err(e) = self.maybe_auto_compact(session_id).await {
822            tracing::warn!("Auto-compact failed for session {}: {}", session_id, e);
823        }
824
825        Ok(result)
826    }
827
828    /// Generate response with streaming events
829    pub async fn generate_streaming(
830        &self,
831        session_id: &str,
832        prompt: &str,
833    ) -> Result<(
834        mpsc::Receiver<AgentEvent>,
835        tokio::task::JoinHandle<Result<AgentResult>>,
836        tokio_util::sync::CancellationToken,
837    )> {
838        let session_lock = self.get_session(session_id).await?;
839
840        // Check if session is paused
841        {
842            let session = session_lock.read().await;
843            if session.state == SessionState::Paused {
844                anyhow::bail!(
845                    "Session {} is paused. Call Resume before generating.",
846                    session_id
847                );
848            }
849        }
850
851        // Get session state and LLM client
852        let (
853            history,
854            system,
855            tools,
856            session_llm_client,
857            permission_checker,
858            confirmation_manager,
859            context_providers,
860            session_workspace,
861            tool_metrics,
862            hook_engine,
863            planning_enabled,
864            goal_tracking,
865        ) = {
866            let session = session_lock.read().await;
867            (
868                session.messages.clone(),
869                session.system().map(String::from),
870                session.tools.clone(),
871                session.llm_client.clone(),
872                session.permission_checker.clone(),
873                session.confirmation_manager.clone(),
874                session.context_providers.clone(),
875                session.config.workspace.clone(),
876                session.tool_metrics.clone(),
877                session.config.hook_engine.clone(),
878                session.config.planning_enabled,
879                session.config.goal_tracking,
880            )
881        };
882
883        // Use session's LLM client if configured, otherwise use default
884        let llm_client = if let Some(client) = session_llm_client {
885            client
886        } else if let Some(client) = self.llm_client.read().await.clone() {
887            client
888        } else {
889            anyhow::bail!(
890                "LLM client not configured for session {}. Please call Configure RPC with model configuration first.",
891                session_id
892            );
893        };
894
895        // Construct per-session ToolContext from session workspace, falling back to server default
896        let tool_context = if session_workspace.is_empty() {
897            crate::tools::ToolContext::new(self.tool_executor.workspace().clone())
898                .with_session_id(session_id)
899        } else {
900            crate::tools::ToolContext::new(std::path::PathBuf::from(&session_workspace))
901                .with_session_id(session_id)
902        };
903        let tool_context = if let Some(registry) = self.document_parser_registry().await {
904            tool_context.with_document_parsers(registry)
905        } else {
906            tool_context
907        };
908        let tool_context = if let Some(command_env) = self.tool_executor.command_env() {
909            tool_context.with_command_env(command_env)
910        } else {
911            tool_context
912        };
913
914        // Inject skill registry into system prompt and agent config
915        let skill_registry = match self.session_skill_registry(session_id).await {
916            Some(registry) => Some(registry),
917            None => self.skill_registry.read().await.clone(),
918        };
919        let system = if let Some(ref registry) = skill_registry {
920            let skill_prompt = registry.to_system_prompt();
921            if skill_prompt.is_empty() {
922                system
923            } else {
924                match system {
925                    Some(existing) => Some(format!("{}\n\n{}", existing, skill_prompt)),
926                    None => Some(skill_prompt),
927                }
928            }
929        } else {
930            system
931        };
932
933        // Inject matched skill content on-demand before skill_registry is moved into AgentConfig
934        let effective_prompt = if let Some(ref registry) = skill_registry {
935            let skill_content = registry.match_skills(prompt);
936            if skill_content.is_empty() {
937                prompt.to_string()
938            } else {
939                format!("{}\n\n---\n\n{}", skill_content, prompt)
940            }
941        } else {
942            prompt.to_string()
943        };
944
945        // Build AgentMemory from shared store (if configured)
946        let memory = self
947            .memory_store
948            .read()
949            .await
950            .as_ref()
951            .map(|store| Arc::new(AgentMemory::new(store.clone())));
952
953        // Create agent loop with permission policy, confirmation manager, and context providers
954        let config = AgentConfig {
955            prompt_slots: match system {
956                Some(s) => SystemPromptSlots::from_legacy(s),
957                None => SystemPromptSlots::default(),
958            },
959            tools,
960            max_tool_rounds: 50,
961            permission_checker: Some(permission_checker),
962            confirmation_manager: Some(confirmation_manager),
963            context_providers,
964            planning_enabled,
965            goal_tracking,
966            hook_engine,
967            skill_registry,
968            memory,
969            ..AgentConfig::default()
970        };
971
972        let agent = AgentLoop::new(llm_client, self.tool_executor.clone(), tool_context, config)
973            .with_tool_metrics(tool_metrics);
974
975        // Execute with streaming
976        let (rx, handle, cancel_token) =
977            agent.execute_streaming(&history, &effective_prompt).await?;
978
979        // Store the cancellation token for cancellation support
980        let cancel_token_clone = cancel_token.clone();
981        {
982            let mut ops = self.ongoing_operations.write().await;
983            ops.insert(session_id.to_string(), cancel_token);
984        }
985
986        // Spawn task to update session after completion
987        let session_lock_clone = session_lock.clone();
988        let original_handle = handle;
989        let stores = self.stores.clone();
990        let session_storage_types = self.session_storage_types.clone();
991        let llm_configs = self.llm_configs.clone();
992        let session_id_owned = session_id.to_string();
993        let ongoing_operations = self.ongoing_operations.clone();
994        let session_manager = self.clone();
995
996        let wrapped_handle = tokio::spawn(async move {
997            let result = original_handle.await??;
998
999            // Remove from ongoing operations
1000            {
1001                let mut ops = ongoing_operations.write().await;
1002                ops.remove(&session_id_owned);
1003            }
1004
1005            // Update session
1006            {
1007                let mut session = session_lock_clone.write().await;
1008                session.messages = result.messages.clone();
1009                session.update_usage(&result.usage);
1010            }
1011
1012            // Persist to store
1013            let storage_type = {
1014                let storage_types = session_storage_types.read().await;
1015                storage_types.get(&session_id_owned).cloned()
1016            };
1017
1018            if let Some(storage_type) = storage_type {
1019                if storage_type != crate::config::StorageBackend::Memory {
1020                    let stores_guard = stores.read().await;
1021                    if let Some(store) = stores_guard.get(&storage_type) {
1022                        let session = session_lock_clone.read().await;
1023                        let llm_config = {
1024                            let configs = llm_configs.read().await;
1025                            configs.get(&session_id_owned).cloned()
1026                        };
1027                        let data = session.to_session_data(llm_config);
1028                        if let Err(e) = store.save(&data).await {
1029                            tracing::warn!(
1030                                "Failed to persist session {} after streaming: {}",
1031                                session_id_owned,
1032                                e
1033                            );
1034                        }
1035                    }
1036                }
1037            }
1038
1039            // Auto-compact if context usage exceeds threshold
1040            if let Err(e) = session_manager.maybe_auto_compact(&session_id_owned).await {
1041                tracing::warn!(
1042                    "Auto-compact failed for session {}: {}",
1043                    session_id_owned,
1044                    e
1045                );
1046            }
1047
1048            Ok(result)
1049        });
1050
1051        Ok((rx, wrapped_handle, cancel_token_clone))
1052    }
1053
1054    /// Get context usage for a session
1055    pub async fn context_usage(&self, session_id: &str) -> Result<ContextUsage> {
1056        let session_lock = self.get_session(session_id).await?;
1057        let session = session_lock.read().await;
1058        Ok(session.context_usage.clone())
1059    }
1060
1061    /// Get conversation history for a session
1062    pub async fn history(&self, session_id: &str) -> Result<Vec<Message>> {
1063        let session_lock = self.get_session(session_id).await?;
1064        let session = session_lock.read().await;
1065        Ok(session.messages.clone())
1066    }
1067
1068    /// Clear session history
1069    pub async fn clear(&self, session_id: &str) -> Result<()> {
1070        {
1071            let session_lock = self.get_session(session_id).await?;
1072            let mut session = session_lock.write().await;
1073            session.clear();
1074        }
1075
1076        // Persist to store
1077        self.persist_in_background(session_id, "clear");
1078
1079        Ok(())
1080    }
1081
1082    /// Compact session context
1083    pub async fn compact(&self, session_id: &str) -> Result<()> {
1084        tracing::info!(name: "a3s.session.compact", session_id = %session_id, "Compacting session context");
1085
1086        {
1087            let session_lock = self.get_session(session_id).await?;
1088            let mut session = session_lock.write().await;
1089
1090            // Get LLM client for compaction (if available)
1091            let llm_client = if let Some(client) = &session.llm_client {
1092                client.clone()
1093            } else if let Some(client) = self.llm_client.read().await.clone() {
1094                client
1095            } else {
1096                // If no LLM client available, just do simple truncation
1097                tracing::warn!("No LLM client configured for compaction, using simple truncation");
1098                let keep_messages = 20;
1099                if session.messages.len() > keep_messages {
1100                    let len = session.messages.len();
1101                    session.messages = session.messages.split_off(len - keep_messages);
1102                }
1103                // Persist after truncation
1104                drop(session);
1105                self.persist_in_background(session_id, "compact");
1106                return Ok(());
1107            };
1108
1109            session.compact(&llm_client).await?;
1110        }
1111
1112        // Persist to store
1113        self.persist_in_background(session_id, "compact");
1114
1115        Ok(())
1116    }
1117
1118    /// Check if auto-compaction should be triggered and perform it if needed.
1119    ///
1120    /// Called after `generate()` / `generate_streaming()` updates session usage.
1121    /// Triggers compaction when:
1122    /// - `auto_compact` is enabled in session config
1123    /// - `context_usage.percent` exceeds `auto_compact_threshold`
1124    pub async fn maybe_auto_compact(&self, session_id: &str) -> Result<bool> {
1125        let (should_compact, percent_before, messages_before) = {
1126            let session_lock = self.get_session(session_id).await?;
1127            let session = session_lock.read().await;
1128
1129            if !session.config.auto_compact {
1130                return Ok(false);
1131            }
1132
1133            let threshold = session.config.auto_compact_threshold;
1134            let percent = session.context_usage.percent;
1135            let msg_count = session.messages.len();
1136
1137            tracing::debug!(
1138                "Auto-compact check for session {}: percent={:.2}%, threshold={:.2}%, messages={}",
1139                session_id,
1140                percent * 100.0,
1141                threshold * 100.0,
1142                msg_count,
1143            );
1144
1145            (percent >= threshold, percent, msg_count)
1146        };
1147
1148        if !should_compact {
1149            return Ok(false);
1150        }
1151
1152        tracing::info!(
1153            name: "a3s.session.auto_compact",
1154            session_id = %session_id,
1155            percent_before = %format!("{:.1}%", percent_before * 100.0),
1156            messages_before = %messages_before,
1157            "Auto-compacting session due to high context usage"
1158        );
1159
1160        // Perform compaction (reuses existing compact logic)
1161        self.compact(session_id).await?;
1162
1163        // Get post-compaction message count
1164        let messages_after = {
1165            let session_lock = self.get_session(session_id).await?;
1166            let session = session_lock.read().await;
1167            session.messages.len()
1168        };
1169
1170        // Broadcast event to notify clients
1171        let event = AgentEvent::ContextCompacted {
1172            session_id: session_id.to_string(),
1173            before_messages: messages_before,
1174            after_messages: messages_after,
1175            percent_before,
1176        };
1177
1178        // Try to send via session's event broadcaster
1179        if let Ok(session_lock) = self.get_session(session_id).await {
1180            let session = session_lock.read().await;
1181            let _ = session.event_tx.send(event);
1182        }
1183
1184        tracing::info!(
1185            name: "a3s.session.auto_compact.done",
1186            session_id = %session_id,
1187            messages_before = %messages_before,
1188            messages_after = %messages_after,
1189            "Auto-compaction complete"
1190        );
1191
1192        Ok(true)
1193    }
1194
1195    /// Resolve the LLM client for a session (session-level -> default fallback)
1196    ///
1197    /// Returns `None` if no LLM client is configured at either level.
1198    pub async fn get_llm_for_session(
1199        &self,
1200        session_id: &str,
1201    ) -> Result<Option<Arc<dyn LlmClient>>> {
1202        let session_lock = self.get_session(session_id).await?;
1203        let session = session_lock.read().await;
1204
1205        if let Some(client) = &session.llm_client {
1206            return Ok(Some(client.clone()));
1207        }
1208
1209        Ok(self.llm_client.read().await.clone())
1210    }
1211
1212    /// Ask an ephemeral side question for an existing managed session.
1213    ///
1214    /// Uses a read-only snapshot of the session history plus current queue /
1215    /// confirmation / task state. Optional `runtime_context` lets hosts inject
1216    /// extra transient context such as browser-only UI state.
1217    pub async fn btw_with_context(
1218        &self,
1219        session_id: &str,
1220        question: &str,
1221        runtime_context: Option<&str>,
1222    ) -> Result<crate::agent_api::BtwResult> {
1223        let question = question.trim();
1224        if question.is_empty() {
1225            anyhow::bail!("btw: question cannot be empty");
1226        }
1227
1228        let session_lock = self.get_session(session_id).await?;
1229        let (history_snapshot, session_runtime) = {
1230            let session = session_lock.read().await;
1231            let mut sections = Vec::new();
1232
1233            let pending_confirmations = session.confirmation_manager.pending_confirmations().await;
1234            if !pending_confirmations.is_empty() {
1235                let mut lines = pending_confirmations
1236                    .into_iter()
1237                    .map(|pending| {
1238                        let arg_summary = Self::compact_json_value(&pending.args);
1239                        if arg_summary.is_empty() {
1240                            format!(
1241                                "- {} [{}] remaining={}ms",
1242                                pending.tool_name, pending.tool_id, pending.remaining_ms
1243                            )
1244                        } else {
1245                            format!(
1246                                "- {} [{}] remaining={}ms {}",
1247                                pending.tool_name,
1248                                pending.tool_id,
1249                                pending.remaining_ms,
1250                                arg_summary
1251                            )
1252                        }
1253                    })
1254                    .collect::<Vec<_>>();
1255                lines.sort();
1256                sections.push(format!("[pending confirmations]\n{}", lines.join("\n")));
1257            }
1258
1259            let stats = session.command_queue.stats().await;
1260            if stats.total_active > 0 || stats.total_pending > 0 || stats.external_pending > 0 {
1261                let mut lines = vec![format!(
1262                    "active={}, pending={}, external_pending={}",
1263                    stats.total_active, stats.total_pending, stats.external_pending
1264                )];
1265                let mut lanes = stats
1266                    .lanes
1267                    .into_values()
1268                    .filter(|lane| lane.active > 0 || lane.pending > 0)
1269                    .map(|lane| {
1270                        format!(
1271                            "- {:?}: active={}, pending={}, handler={:?}",
1272                            lane.lane, lane.active, lane.pending, lane.handler_mode
1273                        )
1274                    })
1275                    .collect::<Vec<_>>();
1276                lanes.sort();
1277                lines.extend(lanes);
1278                sections.push(format!("[session queue]\n{}", lines.join("\n")));
1279            }
1280
1281            let external_tasks = session.command_queue.pending_external_tasks().await;
1282            if !external_tasks.is_empty() {
1283                let mut lines = external_tasks
1284                    .into_iter()
1285                    .take(6)
1286                    .map(|task| {
1287                        let payload_summary = Self::compact_json_value(&task.payload);
1288                        if payload_summary.is_empty() {
1289                            format!(
1290                                "- {} {:?} remaining={}ms",
1291                                task.command_type,
1292                                task.lane,
1293                                task.remaining_ms()
1294                            )
1295                        } else {
1296                            format!(
1297                                "- {} {:?} remaining={}ms {}",
1298                                task.command_type,
1299                                task.lane,
1300                                task.remaining_ms(),
1301                                payload_summary
1302                            )
1303                        }
1304                    })
1305                    .collect::<Vec<_>>();
1306                lines.sort();
1307                sections.push(format!("[pending external tasks]\n{}", lines.join("\n")));
1308            }
1309
1310            let active_tasks = session
1311                .tasks
1312                .iter()
1313                .filter(|task| task.status.is_active())
1314                .take(6)
1315                .map(|task| match &task.tool {
1316                    Some(tool) if !tool.is_empty() => {
1317                        format!("- [{}] {} ({})", task.status, task.content, tool)
1318                    }
1319                    _ => format!("- [{}] {}", task.status, task.content),
1320                })
1321                .collect::<Vec<_>>();
1322            if !active_tasks.is_empty() {
1323                sections.push(format!("[tracked tasks]\n{}", active_tasks.join("\n")));
1324            }
1325
1326            sections.push(format!(
1327                "[session state]\n{}",
1328                match session.state {
1329                    SessionState::Active => "active",
1330                    SessionState::Paused => "paused",
1331                    SessionState::Completed => "completed",
1332                    SessionState::Error => "error",
1333                    SessionState::Unknown => "unknown",
1334                }
1335            ));
1336
1337            (session.messages.clone(), sections.join("\n\n"))
1338        };
1339
1340        let llm_client = self
1341            .get_llm_for_session(session_id)
1342            .await?
1343            .context("btw failed: no model configured for session")?;
1344
1345        let mut messages = history_snapshot;
1346        let mut injected_sections = Vec::new();
1347        if !session_runtime.is_empty() {
1348            injected_sections.push(format!("[session runtime context]\n{}", session_runtime));
1349        }
1350        if let Some(extra) = runtime_context.map(str::trim).filter(|ctx| !ctx.is_empty()) {
1351            injected_sections.push(format!("[host runtime context]\n{}", extra));
1352        }
1353        if !injected_sections.is_empty() {
1354            let injected_context = format!(
1355                "Use the following runtime context only as background for the next side question. Do not treat it as a new user request.\n\n{}",
1356                injected_sections.join("\n\n")
1357            );
1358            messages.push(Message::user(&injected_context));
1359        }
1360        messages.push(Message::user(question));
1361
1362        let response = llm_client
1363            .complete(&messages, Some(crate::prompts::BTW_SYSTEM), &[])
1364            .await
1365            .map_err(|e| anyhow::anyhow!("btw: ephemeral LLM call failed: {e}"))?;
1366
1367        Ok(crate::agent_api::BtwResult {
1368            question: question.to_string(),
1369            answer: response.text(),
1370            usage: response.usage,
1371        })
1372    }
1373
1374    /// Configure session
1375    pub async fn configure(
1376        &self,
1377        session_id: &str,
1378        thinking: Option<bool>,
1379        budget: Option<usize>,
1380        model_config: Option<LlmConfig>,
1381    ) -> Result<()> {
1382        {
1383            let session_lock = self.get_session(session_id).await?;
1384            let mut session = session_lock.write().await;
1385
1386            if let Some(t) = thinking {
1387                session.thinking_enabled = t;
1388            }
1389            if let Some(b) = budget {
1390                session.thinking_budget = Some(b);
1391            }
1392            if let Some(ref config) = model_config {
1393                tracing::info!(
1394                    "Configuring session {} with LLM: provider={}, model={}",
1395                    session_id,
1396                    config.provider,
1397                    config.model
1398                );
1399                session.model_name = Some(config.model.clone());
1400                session.llm_client = Some(llm::create_client_with_config(config.clone()));
1401            }
1402        }
1403
1404        // Store LLM config for persistence (without API key)
1405        if let Some(config) = model_config {
1406            let llm_config_data = LlmConfigData {
1407                provider: config.provider,
1408                model: config.model,
1409                api_key: None, // Don't persist API key
1410                base_url: config.base_url,
1411            };
1412            let mut configs = self.llm_configs.write().await;
1413            configs.insert(session_id.to_string(), llm_config_data);
1414        }
1415
1416        // Persist to store
1417        self.persist_in_background(session_id, "configure");
1418
1419        Ok(())
1420    }
1421
1422    /// Update a session's system prompt in place.
1423    pub async fn set_system_prompt(
1424        &self,
1425        session_id: &str,
1426        system_prompt: Option<String>,
1427    ) -> Result<()> {
1428        {
1429            let session_lock = self.get_session(session_id).await?;
1430            let mut session = session_lock.write().await;
1431            session.config.system_prompt = system_prompt;
1432        }
1433
1434        self.persist_in_background(session_id, "set_system_prompt");
1435        Ok(())
1436    }
1437
1438    /// Get session count
1439    pub async fn session_count(&self) -> usize {
1440        let sessions = self.sessions.read().await;
1441        sessions.len()
1442    }
1443
1444    /// Check health of all registered stores
1445    pub async fn store_health(&self) -> Vec<(String, Result<()>)> {
1446        let stores = self.stores.read().await;
1447        let mut results = Vec::new();
1448        for (_, store) in stores.iter() {
1449            let name = store.backend_name().to_string();
1450            let result = store.health_check().await;
1451            results.push((name, result));
1452        }
1453        results
1454    }
1455
1456    /// List all loaded tools (built-in tools)
1457    pub fn list_tools(&self) -> Vec<crate::llm::ToolDefinition> {
1458        self.tool_executor.definitions()
1459    }
1460
1461    /// Pause a session
1462    pub async fn pause_session(&self, session_id: &str) -> Result<bool> {
1463        let paused = {
1464            let session_lock = self.get_session(session_id).await?;
1465            let mut session = session_lock.write().await;
1466            session.pause()
1467        };
1468
1469        if paused {
1470            self.persist_in_background(session_id, "pause");
1471        }
1472
1473        Ok(paused)
1474    }
1475
1476    /// Resume a session
1477    pub async fn resume_session(&self, session_id: &str) -> Result<bool> {
1478        let resumed = {
1479            let session_lock = self.get_session(session_id).await?;
1480            let mut session = session_lock.write().await;
1481            session.resume()
1482        };
1483
1484        if resumed {
1485            self.persist_in_background(session_id, "resume");
1486        }
1487
1488        Ok(resumed)
1489    }
1490
1491    /// Cancel an ongoing operation for a session
1492    ///
1493    /// Returns true if an operation was cancelled, false if no operation was running.
1494    pub async fn cancel_operation(&self, session_id: &str) -> Result<bool> {
1495        // First, cancel any pending HITL confirmations
1496        let session_lock = self.get_session(session_id).await?;
1497        let cancelled_confirmations = {
1498            let session = session_lock.read().await;
1499            session.confirmation_manager.cancel_all().await
1500        };
1501
1502        if cancelled_confirmations > 0 {
1503            tracing::info!(
1504                "Cancelled {} pending confirmations for session {}",
1505                cancelled_confirmations,
1506                session_id
1507            );
1508        }
1509
1510        // Then, cancel the ongoing operation if any
1511        let cancel_token = {
1512            let mut ops = self.ongoing_operations.write().await;
1513            ops.remove(session_id)
1514        };
1515
1516        if let Some(token) = cancel_token {
1517            token.cancel();
1518            tracing::info!("Cancelled ongoing operation for session {}", session_id);
1519            Ok(true)
1520        } else if cancelled_confirmations > 0 {
1521            // We cancelled confirmations but no main operation
1522            Ok(true)
1523        } else {
1524            tracing::debug!("No ongoing operation to cancel for session {}", session_id);
1525            Ok(false)
1526        }
1527    }
1528
1529    /// Get all sessions (returns session locks for iteration)
1530    pub async fn get_all_sessions(&self) -> Vec<Arc<RwLock<Session>>> {
1531        let sessions = self.sessions.read().await;
1532        sessions.values().cloned().collect()
1533    }
1534
1535    /// Get tool executor reference
1536    pub fn tool_executor(&self) -> &Arc<ToolExecutor> {
1537        &self.tool_executor
1538    }
1539
1540    /// Confirm a tool execution (HITL)
1541    pub async fn confirm_tool(
1542        &self,
1543        session_id: &str,
1544        tool_id: &str,
1545        approved: bool,
1546        reason: Option<String>,
1547    ) -> Result<bool> {
1548        let session_lock = self.get_session(session_id).await?;
1549        let session = session_lock.read().await;
1550        session
1551            .confirmation_manager
1552            .confirm(tool_id, approved, reason)
1553            .await
1554            .map_err(|e| anyhow::anyhow!(e))
1555    }
1556
1557    /// Set confirmation policy for a session (HITL)
1558    pub async fn set_confirmation_policy(
1559        &self,
1560        session_id: &str,
1561        policy: ConfirmationPolicy,
1562    ) -> Result<ConfirmationPolicy> {
1563        {
1564            let session_lock = self.get_session(session_id).await?;
1565            let session = session_lock.read().await;
1566            session.set_confirmation_policy(policy.clone()).await;
1567        }
1568
1569        // Update config for persistence
1570        {
1571            let session_lock = self.get_session(session_id).await?;
1572            let mut session = session_lock.write().await;
1573            session.config.confirmation_policy = Some(policy.clone());
1574        }
1575
1576        // Persist to store
1577        self.persist_in_background(session_id, "set_confirmation_policy");
1578
1579        Ok(policy)
1580    }
1581
1582    /// Set permission policy for a session.
1583    pub async fn set_permission_policy(
1584        &self,
1585        session_id: &str,
1586        policy: PermissionPolicy,
1587    ) -> Result<PermissionPolicy> {
1588        {
1589            let session_lock = self.get_session(session_id).await?;
1590            let mut session = session_lock.write().await;
1591            session.set_permission_policy(policy.clone());
1592        }
1593
1594        self.persist_in_background(session_id, "set_permission_policy");
1595
1596        Ok(policy)
1597    }
1598
1599    /// Get confirmation policy for a session (HITL)
1600    pub async fn get_confirmation_policy(&self, session_id: &str) -> Result<ConfirmationPolicy> {
1601        let session_lock = self.get_session(session_id).await?;
1602        let session = session_lock.read().await;
1603        Ok(session.confirmation_policy().await)
1604    }
1605}