Skip to main content

a3s_code_core/session/
manager.rs

1//! SessionManager — manages multiple concurrent sessions
2
3use super::{ContextUsage, Session, SessionConfig, SessionState};
4use crate::agent::{AgentConfig, AgentEvent, AgentLoop, AgentResult};
5use crate::document_parser::DocumentParserRegistry;
6use crate::hitl::ConfirmationPolicy;
7use crate::llm::{self, LlmClient, LlmConfig, Message};
8use crate::mcp::McpManager;
9use crate::memory::AgentMemory;
10use crate::permissions::PermissionPolicy;
11use crate::prompts::SystemPromptSlots;
12use crate::skills::SkillRegistry;
13use crate::store::{FileSessionStore, LlmConfigData, SessionData, SessionStore};
14use crate::text::truncate_utf8;
15use crate::tools::ToolExecutor;
16use a3s_memory::MemoryStore;
17use anyhow::{Context, Result};
18use std::collections::HashMap;
19use std::sync::Arc;
20use tokio::sync::{mpsc, RwLock};
21
22/// Session manager handles multiple concurrent sessions
23#[derive(Clone)]
24pub struct SessionManager {
25    pub(crate) sessions: Arc<RwLock<HashMap<String, Arc<RwLock<Session>>>>>,
26    /// Default LLM client used when a session has no per-session client configured.
27    /// Wrapped in RwLock so it can be hot-swapped at runtime without restart.
28    pub(crate) llm_client: Arc<RwLock<Option<Arc<dyn LlmClient>>>>,
29    pub(crate) tool_executor: Arc<ToolExecutor>,
30    /// Session stores by storage type
31    pub(crate) stores: Arc<RwLock<HashMap<crate::config::StorageBackend, Arc<dyn SessionStore>>>>,
32    /// Track which storage type each session uses
33    pub(crate) session_storage_types: Arc<RwLock<HashMap<String, crate::config::StorageBackend>>>,
34    /// LLM configurations for sessions (stored separately for persistence)
35    pub(crate) llm_configs: Arc<RwLock<HashMap<String, LlmConfigData>>>,
36    /// Ongoing operations (session_id -> CancellationToken)
37    pub(crate) ongoing_operations:
38        Arc<RwLock<HashMap<String, tokio_util::sync::CancellationToken>>>,
39    /// Skill registry for runtime skill management
40    pub(crate) skill_registry: Arc<RwLock<Option<Arc<SkillRegistry>>>>,
41    /// Per-session skill registries layered on top of the shared registry.
42    pub(crate) session_skill_registries: Arc<RwLock<HashMap<String, Arc<SkillRegistry>>>>,
43    /// Shared memory store for agent long-term memory.
44    /// When set, each `generate`/`generate_streaming` call wraps this in
45    /// `AgentMemory` and injects it into `AgentConfig.memory`.
46    pub(crate) memory_store: Arc<RwLock<Option<Arc<dyn MemoryStore>>>>,
47    /// Shared MCP manager. When set, MCP tools are registered into the
48    /// `ToolExecutor` at startup so all sessions can use them.
49    pub(crate) mcp_manager: Arc<RwLock<Option<Arc<McpManager>>>>,
50    /// Shared document parser registry for document context extraction.
51    pub(crate) document_parser_registry: Arc<RwLock<Option<Arc<DocumentParserRegistry>>>>,
52}
53
54impl SessionManager {
55    fn compact_json_value(value: &serde_json::Value) -> String {
56        let raw = match value {
57            serde_json::Value::Null => String::new(),
58            serde_json::Value::String(s) => s.clone(),
59            _ => serde_json::to_string(value).unwrap_or_default(),
60        };
61        let compact = raw.split_whitespace().collect::<Vec<_>>().join(" ");
62        if compact.len() > 180 {
63            format!("{}...", truncate_utf8(&compact, 180))
64        } else {
65            compact
66        }
67    }
68
69    /// Create a new session manager without persistence
70    pub fn new(llm_client: Option<Arc<dyn LlmClient>>, tool_executor: Arc<ToolExecutor>) -> Self {
71        Self {
72            sessions: Arc::new(RwLock::new(HashMap::new())),
73            llm_client: Arc::new(RwLock::new(llm_client)),
74            tool_executor,
75            stores: Arc::new(RwLock::new(HashMap::new())),
76            session_storage_types: Arc::new(RwLock::new(HashMap::new())),
77            llm_configs: Arc::new(RwLock::new(HashMap::new())),
78            ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
79            skill_registry: Arc::new(RwLock::new(None)),
80            session_skill_registries: Arc::new(RwLock::new(HashMap::new())),
81            memory_store: Arc::new(RwLock::new(None)),
82            mcp_manager: Arc::new(RwLock::new(None)),
83            document_parser_registry: Arc::new(RwLock::new(Some(Arc::new(
84                crate::document_registry_factory::build_document_parser_registry(
85                    crate::config::DocumentParserConfig::default(),
86                    None,
87                ),
88            )))),
89        }
90    }
91
92    /// Create a session manager with file-based persistence
93    ///
94    /// Sessions will be automatically saved to disk and restored on startup.
95    pub async fn with_persistence<P: AsRef<std::path::Path>>(
96        llm_client: Option<Arc<dyn LlmClient>>,
97        tool_executor: Arc<ToolExecutor>,
98        sessions_dir: P,
99    ) -> Result<Self> {
100        let store = FileSessionStore::new(sessions_dir).await?;
101        let mut stores = HashMap::new();
102        stores.insert(
103            crate::config::StorageBackend::File,
104            Arc::new(store) as Arc<dyn SessionStore>,
105        );
106
107        let manager = Self {
108            sessions: Arc::new(RwLock::new(HashMap::new())),
109            llm_client: Arc::new(RwLock::new(llm_client)),
110            tool_executor,
111            stores: Arc::new(RwLock::new(stores)),
112            session_storage_types: Arc::new(RwLock::new(HashMap::new())),
113            llm_configs: Arc::new(RwLock::new(HashMap::new())),
114            ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
115            skill_registry: Arc::new(RwLock::new(None)),
116            session_skill_registries: Arc::new(RwLock::new(HashMap::new())),
117            memory_store: Arc::new(RwLock::new(None)),
118            mcp_manager: Arc::new(RwLock::new(None)),
119            document_parser_registry: Arc::new(RwLock::new(Some(Arc::new(
120                crate::document_registry_factory::build_document_parser_registry(
121                    crate::config::DocumentParserConfig::default(),
122                    None,
123                ),
124            )))),
125        };
126
127        Ok(manager)
128    }
129
130    /// Create a session manager with a custom store
131    ///
132    /// The `backend` parameter determines which `StorageBackend` key the store is registered under.
133    /// Sessions created with a matching `storage_type` will use this store.
134    pub fn with_store(
135        llm_client: Option<Arc<dyn LlmClient>>,
136        tool_executor: Arc<ToolExecutor>,
137        store: Arc<dyn SessionStore>,
138        backend: crate::config::StorageBackend,
139    ) -> Self {
140        let mut stores = HashMap::new();
141        stores.insert(backend, store);
142
143        Self {
144            sessions: Arc::new(RwLock::new(HashMap::new())),
145            llm_client: Arc::new(RwLock::new(llm_client)),
146            tool_executor,
147            stores: Arc::new(RwLock::new(stores)),
148            session_storage_types: Arc::new(RwLock::new(HashMap::new())),
149            llm_configs: Arc::new(RwLock::new(HashMap::new())),
150            ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
151            skill_registry: Arc::new(RwLock::new(None)),
152            session_skill_registries: Arc::new(RwLock::new(HashMap::new())),
153            memory_store: Arc::new(RwLock::new(None)),
154            mcp_manager: Arc::new(RwLock::new(None)),
155            document_parser_registry: Arc::new(RwLock::new(Some(Arc::new(
156                crate::document_registry_factory::build_document_parser_registry(
157                    crate::config::DocumentParserConfig::default(),
158                    None,
159                ),
160            )))),
161        }
162    }
163
164    /// Hot-swap the default LLM client used by sessions without a per-session client.
165    ///
166    /// Called by SafeClaw when `PUT /api/agent/config` updates the model config.
167    /// All subsequent `generate` / `generate_streaming` calls will use the new client.
168    pub async fn set_default_llm(&self, client: Option<Arc<dyn LlmClient>>) {
169        *self.llm_client.write().await = client;
170    }
171
172    /// Set the skill registry for runtime skill management.
173    ///
174    /// Skills registered here will be injected into the system prompt
175    /// for all subsequent `generate` / `generate_streaming` calls.
176    /// Also registers the `manage_skill` tool on the `ToolExecutor` so
177    /// the Agent can create, list, and remove skills at runtime.
178    pub async fn set_skill_registry(
179        &self,
180        registry: Arc<SkillRegistry>,
181        skills_dir: std::path::PathBuf,
182    ) {
183        // Register the manage_skill tool for self-bootstrap
184        let manage_tool = crate::skills::ManageSkillTool::new(registry.clone(), skills_dir);
185        self.tool_executor
186            .register_dynamic_tool(Arc::new(manage_tool));
187
188        *self.skill_registry.write().await = Some(registry);
189    }
190
191    /// Get the current skill registry, if any.
192    pub async fn skill_registry(&self) -> Option<Arc<SkillRegistry>> {
193        self.skill_registry.read().await.clone()
194    }
195
196    /// Override the active skill registry for a single session.
197    pub async fn set_session_skill_registry(
198        &self,
199        session_id: impl Into<String>,
200        registry: Arc<SkillRegistry>,
201    ) {
202        self.session_skill_registries
203            .write()
204            .await
205            .insert(session_id.into(), registry);
206    }
207
208    /// Get the active skill registry for a single session, if one was set.
209    pub async fn session_skill_registry(&self, session_id: &str) -> Option<Arc<SkillRegistry>> {
210        self.session_skill_registries
211            .read()
212            .await
213            .get(session_id)
214            .cloned()
215    }
216
217    /// Set the shared memory store for agent long-term memory.
218    ///
219    /// When set, every `generate`/`generate_streaming` call wraps this store
220    /// in an `AgentMemory` and injects it into `AgentConfig.memory`, enabling
221    /// automatic `recall_similar` before prompts and `remember_success`/
222    /// `remember_failure` after tool execution.
223    pub async fn set_memory_store(&self, store: Arc<dyn MemoryStore>) {
224        *self.memory_store.write().await = Some(store);
225    }
226
227    /// Get the current memory store, if any.
228    pub async fn memory_store(&self) -> Option<Arc<dyn MemoryStore>> {
229        self.memory_store.read().await.clone()
230    }
231
232    /// Set the shared MCP manager.
233    ///
234    /// When set, all tools from connected MCP servers are registered into the
235    /// `ToolExecutor` so they are available in every session.
236    pub async fn set_mcp_manager(&self, manager: Arc<McpManager>) {
237        let all_tools = manager.get_all_tools().await;
238        let mut by_server: HashMap<String, Vec<crate::mcp::McpTool>> = HashMap::new();
239        for (server, tool) in all_tools {
240            by_server.entry(server).or_default().push(tool);
241        }
242        for (server_name, tools) in by_server {
243            for tool in
244                crate::mcp::tools::create_mcp_tools(&server_name, tools, Arc::clone(&manager))
245            {
246                self.tool_executor.register_dynamic_tool(tool);
247            }
248        }
249        *self.mcp_manager.write().await = Some(manager);
250    }
251
252    /// Dynamically add and connect an MCP server to the shared manager.
253    ///
254    /// Registers the server config, connects it, and registers its tools into
255    /// the `ToolExecutor` so all subsequent sessions can use them.
256    pub async fn add_mcp_server(&self, config: crate::mcp::McpServerConfig) -> Result<()> {
257        let manager = {
258            let guard = self.mcp_manager.read().await;
259            match guard.clone() {
260                Some(m) => m,
261                None => {
262                    drop(guard);
263                    let m = Arc::new(McpManager::new());
264                    *self.mcp_manager.write().await = Some(Arc::clone(&m));
265                    m
266                }
267            }
268        };
269        let name = config.name.clone();
270        manager.register_server(config).await;
271        manager.connect(&name).await?;
272        let tools = manager.get_server_tools(&name).await;
273        for tool in crate::mcp::tools::create_mcp_tools(&name, tools, Arc::clone(&manager)) {
274            self.tool_executor.register_dynamic_tool(tool);
275        }
276        Ok(())
277    }
278
279    /// Disconnect and remove an MCP server from the shared manager.
280    ///
281    /// Also unregisters its tools from the `ToolExecutor`.
282    pub async fn remove_mcp_server(&self, name: &str) -> Result<()> {
283        let guard = self.mcp_manager.read().await;
284        if let Some(ref manager) = *guard {
285            manager.disconnect(name).await?;
286        }
287        self.tool_executor
288            .unregister_tools_by_prefix(&format!("mcp__{name}__"));
289        Ok(())
290    }
291
292    /// Get status of all connected MCP servers.
293    pub async fn mcp_status(
294        &self,
295    ) -> std::collections::HashMap<String, crate::mcp::McpServerStatus> {
296        let guard = self.mcp_manager.read().await;
297        match guard.as_ref() {
298            Some(m) => {
299                let m = Arc::clone(m);
300                drop(guard);
301                m.get_status().await
302            }
303            None => std::collections::HashMap::new(),
304        }
305    }
306
307    /// Set the shared document parser registry for document context extraction.
308    pub async fn set_document_parser_registry(&self, registry: Arc<DocumentParserRegistry>) {
309        *self.document_parser_registry.write().await = Some(registry);
310    }
311
312    /// Get the current shared document parser registry, if any.
313    pub async fn document_parser_registry(&self) -> Option<Arc<DocumentParserRegistry>> {
314        self.document_parser_registry.read().await.clone()
315    }
316
317    /// Restore a single session by ID from the store
318    ///
319    /// Searches all registered stores for the given session ID and restores it
320    /// into the in-memory session map. Returns an error if not found.
321    pub async fn restore_session_by_id(&self, session_id: &str) -> Result<()> {
322        // Check if already loaded
323        {
324            let sessions = self.sessions.read().await;
325            if sessions.contains_key(session_id) {
326                return Ok(());
327            }
328        }
329
330        let stores = self.stores.read().await;
331        for (backend, store) in stores.iter() {
332            match store.load(session_id).await {
333                Ok(Some(data)) => {
334                    {
335                        let mut storage_types = self.session_storage_types.write().await;
336                        storage_types.insert(data.id.clone(), backend.clone());
337                    }
338                    self.restore_session(data).await?;
339                    return Ok(());
340                }
341                Ok(None) => continue,
342                Err(e) => {
343                    tracing::warn!(
344                        "Failed to load session {} from {:?}: {}",
345                        session_id,
346                        backend,
347                        e
348                    );
349                    continue;
350                }
351            }
352        }
353
354        Err(anyhow::anyhow!(
355            "Session {} not found in any store",
356            session_id
357        ))
358    }
359
360    /// Load all sessions from all registered stores
361    pub async fn load_all_sessions(&mut self) -> Result<usize> {
362        let stores = self.stores.read().await;
363        let mut loaded = 0;
364
365        for (backend, store) in stores.iter() {
366            let session_ids = match store.list().await {
367                Ok(ids) => ids,
368                Err(e) => {
369                    tracing::warn!("Failed to list sessions from {:?} store: {}", backend, e);
370                    continue;
371                }
372            };
373
374            for id in session_ids {
375                match store.load(&id).await {
376                    Ok(Some(data)) => {
377                        // Record the storage type for this session
378                        {
379                            let mut storage_types = self.session_storage_types.write().await;
380                            storage_types.insert(data.id.clone(), backend.clone());
381                        }
382
383                        if let Err(e) = self.restore_session(data).await {
384                            tracing::warn!("Failed to restore session {}: {}", id, e);
385                        } else {
386                            loaded += 1;
387                        }
388                    }
389                    Ok(None) => {
390                        tracing::warn!("Session {} not found in store", id);
391                    }
392                    Err(e) => {
393                        tracing::warn!("Failed to load session {}: {}", id, e);
394                    }
395                }
396            }
397        }
398
399        tracing::info!("Loaded {} sessions from store", loaded);
400        Ok(loaded)
401    }
402
403    /// Restore a session from SessionData
404    async fn restore_session(&self, data: SessionData) -> Result<()> {
405        let tools = self.tool_executor.definitions();
406        let mut session = Session::new(data.id.clone(), data.config.clone(), tools).await?;
407
408        // Restore serializable state
409        session.restore_from_data(&data);
410
411        // Restore LLM config if present (without API key - must be reconfigured)
412        if let Some(llm_config) = &data.llm_config {
413            let mut configs = self.llm_configs.write().await;
414            configs.insert(data.id.clone(), llm_config.clone());
415        }
416
417        let mut sessions = self.sessions.write().await;
418        sessions.insert(data.id.clone(), Arc::new(RwLock::new(session)));
419
420        tracing::info!("Restored session: {}", data.id);
421        Ok(())
422    }
423
424    /// Save a session to the store
425    async fn save_session(&self, session_id: &str) -> Result<()> {
426        // Get the storage type for this session
427        let storage_type = {
428            let storage_types = self.session_storage_types.read().await;
429            storage_types.get(session_id).cloned()
430        };
431
432        let Some(storage_type) = storage_type else {
433            // No storage type means memory-only session
434            return Ok(());
435        };
436
437        // Skip saving for memory storage
438        if storage_type == crate::config::StorageBackend::Memory {
439            return Ok(());
440        }
441
442        // Get the appropriate store
443        let stores = self.stores.read().await;
444        let Some(store) = stores.get(&storage_type) else {
445            tracing::warn!("No store available for storage type: {:?}", storage_type);
446            return Ok(());
447        };
448
449        let session_lock = self.get_session(session_id).await?;
450        let session = session_lock.read().await;
451
452        // Get LLM config if set
453        let llm_config = {
454            let configs = self.llm_configs.read().await;
455            configs.get(session_id).cloned()
456        };
457
458        let data = session.to_session_data(llm_config);
459        store.save(&data).await?;
460
461        tracing::debug!("Saved session: {}", session_id);
462        Ok(())
463    }
464
465    /// Persist a session to store, logging and emitting an event on failure.
466    /// This is a non-fatal wrapper around `save_session` — the operation
467    /// succeeds in memory even if persistence fails.
468    async fn persist_or_warn(&self, session_id: &str, operation: &str) {
469        if let Err(e) = self.save_session(session_id).await {
470            tracing::warn!(
471                "Failed to persist session {} after {}: {}",
472                session_id,
473                operation,
474                e
475            );
476            // Emit event so SDK clients can react
477            if let Ok(session_lock) = self.get_session(session_id).await {
478                let session = session_lock.read().await;
479                let _ = session.event_tx().send(AgentEvent::PersistenceFailed {
480                    session_id: session_id.to_string(),
481                    operation: operation.to_string(),
482                    error: e.to_string(),
483                });
484            }
485        }
486    }
487
488    /// Spawn persistence as a background task (non-blocking).
489    ///
490    /// All `SessionManager` fields are `Arc`-wrapped, so `clone()` is a
491    /// cheap pointer-copy. This keeps file I/O off the response path.
492    fn persist_in_background(&self, session_id: &str, operation: &str) {
493        let mgr = self.clone();
494        let sid = session_id.to_string();
495        let op = operation.to_string();
496        tokio::spawn(async move {
497            mgr.persist_or_warn(&sid, &op).await;
498        });
499    }
500
501    /// Create a new session
502    pub async fn create_session(&self, id: String, config: SessionConfig) -> Result<String> {
503        tracing::info!(name: "a3s.session.create", session_id = %id, "Creating session");
504
505        // Record the storage type for this session
506        {
507            let mut storage_types = self.session_storage_types.write().await;
508            storage_types.insert(id.clone(), config.storage_type.clone());
509        }
510
511        // Get tool definitions from the executor
512        let tools = self.tool_executor.definitions();
513        let mut session = Session::new(id.clone(), config, tools).await?;
514
515        // Start the command queue
516        session.start_queue().await?;
517
518        // Set max context length if provided
519        if session.config.max_context_length > 0 {
520            session.context_usage.max_tokens = session.config.max_context_length as usize;
521        }
522
523        {
524            let mut sessions = self.sessions.write().await;
525            sessions.insert(id.clone(), Arc::new(RwLock::new(session)));
526        }
527
528        // Persist to store
529        self.persist_in_background(&id, "create");
530
531        tracing::info!("Created session: {}", id);
532        Ok(id)
533    }
534
535    /// Destroy a session
536    pub async fn destroy_session(&self, id: &str) -> Result<()> {
537        tracing::info!(name: "a3s.session.destroy", session_id = %id, "Destroying session");
538
539        // Get the storage type before removing the session
540        let storage_type = {
541            let storage_types = self.session_storage_types.read().await;
542            storage_types.get(id).cloned()
543        };
544
545        {
546            let mut sessions = self.sessions.write().await;
547            sessions.remove(id);
548        }
549
550        // Remove LLM config
551        {
552            let mut configs = self.llm_configs.write().await;
553            configs.remove(id);
554        }
555
556        {
557            let mut registries = self.session_skill_registries.write().await;
558            registries.remove(id);
559        }
560
561        // Remove storage type tracking
562        {
563            let mut storage_types = self.session_storage_types.write().await;
564            storage_types.remove(id);
565        }
566
567        // Delete from store if applicable
568        if let Some(storage_type) = storage_type {
569            if storage_type != crate::config::StorageBackend::Memory {
570                let stores = self.stores.read().await;
571                if let Some(store) = stores.get(&storage_type) {
572                    if let Err(e) = store.delete(id).await {
573                        tracing::warn!("Failed to delete session {} from store: {}", id, e);
574                    }
575                }
576            }
577        }
578
579        tracing::info!("Destroyed session: {}", id);
580        Ok(())
581    }
582
583    /// Get a session by ID
584    pub async fn get_session(&self, id: &str) -> Result<Arc<RwLock<Session>>> {
585        let sessions = self.sessions.read().await;
586        sessions
587            .get(id)
588            .cloned()
589            .context(format!("Session not found: {}", id))
590    }
591
592    /// Create a child session for a subagent
593    ///
594    /// Child sessions inherit the parent's LLM client but have their own
595    /// permission policy and configuration.
596    pub async fn create_child_session(
597        &self,
598        parent_id: &str,
599        child_id: String,
600        mut config: SessionConfig,
601    ) -> Result<String> {
602        // Verify parent exists and inherit HITL policy
603        let parent_lock = self.get_session(parent_id).await?;
604        let parent_llm_client = {
605            let parent = parent_lock.read().await;
606
607            // Inherit parent's confirmation policy if child doesn't have one
608            if config.confirmation_policy.is_none() {
609                let parent_policy = parent.confirmation_manager.policy().await;
610                config.confirmation_policy = Some(parent_policy);
611            }
612
613            parent.llm_client.clone()
614        };
615
616        // Set parent_id in config
617        config.parent_id = Some(parent_id.to_string());
618
619        // Get tool definitions from the executor
620        let tools = self.tool_executor.definitions();
621        let mut session = Session::new(child_id.clone(), config, tools).await?;
622
623        // Inherit LLM client from parent if not set
624        if session.llm_client.is_none() {
625            let default_llm = self.llm_client.read().await.clone();
626            session.llm_client = parent_llm_client.or(default_llm);
627        }
628
629        // Start the command queue
630        session.start_queue().await?;
631
632        // Set max context length if provided
633        if session.config.max_context_length > 0 {
634            session.context_usage.max_tokens = session.config.max_context_length as usize;
635        }
636
637        {
638            let mut sessions = self.sessions.write().await;
639            sessions.insert(child_id.clone(), Arc::new(RwLock::new(session)));
640        }
641
642        // Persist to store
643        self.persist_in_background(&child_id, "create_child");
644
645        tracing::info!(
646            "Created child session: {} (parent: {})",
647            child_id,
648            parent_id
649        );
650        Ok(child_id)
651    }
652
653    /// Get all child sessions for a parent session
654    pub async fn get_child_sessions(&self, parent_id: &str) -> Vec<String> {
655        let sessions = self.sessions.read().await;
656        let mut children = Vec::new();
657
658        for (id, session_lock) in sessions.iter() {
659            let session = session_lock.read().await;
660            if session.parent_id.as_deref() == Some(parent_id) {
661                children.push(id.clone());
662            }
663        }
664
665        children
666    }
667
668    /// Check if a session is a child session
669    pub async fn is_child_session(&self, session_id: &str) -> Result<bool> {
670        let session_lock = self.get_session(session_id).await?;
671        let session = session_lock.read().await;
672        Ok(session.is_child_session())
673    }
674
675    /// Generate response for a prompt
676    pub async fn generate(&self, session_id: &str, prompt: &str) -> Result<AgentResult> {
677        let session_lock = self.get_session(session_id).await?;
678
679        // Check if session is paused
680        {
681            let session = session_lock.read().await;
682            if session.state == SessionState::Paused {
683                anyhow::bail!(
684                    "Session {} is paused. Call Resume before generating.",
685                    session_id
686                );
687            }
688        }
689
690        // Get session state and LLM client
691        let (
692            history,
693            system,
694            tools,
695            session_llm_client,
696            permission_checker,
697            confirmation_manager,
698            context_providers,
699            session_workspace,
700            tool_metrics,
701            hook_engine,
702            planning_mode,
703            goal_tracking,
704        ) = {
705            let session = session_lock.read().await;
706            (
707                session.messages.clone(),
708                session.system().map(String::from),
709                session.tools.clone(),
710                session.llm_client.clone(),
711                session.permission_checker.clone(),
712                session.confirmation_manager.clone(),
713                session.context_providers.clone(),
714                session.config.workspace.clone(),
715                session.tool_metrics.clone(),
716                session.config.hook_engine.clone(),
717                session.config.planning_mode,
718                session.config.goal_tracking,
719            )
720        };
721
722        // Use session's LLM client if configured, otherwise use default
723        let llm_client = if let Some(client) = session_llm_client {
724            client
725        } else if let Some(client) = self.llm_client.read().await.clone() {
726            client
727        } else {
728            anyhow::bail!(
729                "LLM client not configured for session {}. Please call Configure RPC with model configuration first.",
730                session_id
731            );
732        };
733
734        // Construct per-session ToolContext from session workspace, falling back to server default
735        let tool_context = if session_workspace.is_empty() {
736            crate::tools::ToolContext::new(self.tool_executor.workspace().clone())
737                .with_session_id(session_id)
738        } else {
739            crate::tools::ToolContext::new(std::path::PathBuf::from(&session_workspace))
740                .with_session_id(session_id)
741        };
742        let tool_context = if let Some(registry) = self.document_parser_registry().await {
743            tool_context.with_document_parsers(registry)
744        } else {
745            tool_context
746        };
747        let tool_context = tool_context.with_document_pipeline(Arc::new(
748            crate::document_pipeline_defaults::build_default_document_pipeline_registry(),
749        ));
750        let tool_context = if let Some(command_env) = self.tool_executor.command_env() {
751            tool_context.with_command_env(command_env)
752        } else {
753            tool_context
754        };
755
756        // Inject skill registry into system prompt and agent config
757        let skill_registry = match self.session_skill_registry(session_id).await {
758            Some(registry) => Some(registry),
759            None => self.skill_registry.read().await.clone(),
760        };
761        let system = if let Some(ref registry) = skill_registry {
762            let skill_prompt = registry.to_system_prompt();
763            if skill_prompt.is_empty() {
764                system
765            } else {
766                match system {
767                    Some(existing) => Some(format!("{}\n\n{}", existing, skill_prompt)),
768                    None => Some(skill_prompt),
769                }
770            }
771        } else {
772            system
773        };
774
775        // Inject matched skill content on-demand before skill_registry is moved into AgentConfig
776        let effective_prompt = if let Some(ref registry) = skill_registry {
777            let skill_content = registry.match_skills(prompt);
778            if skill_content.is_empty() {
779                prompt.to_string()
780            } else {
781                format!("{}\n\n---\n\n{}", skill_content, prompt)
782            }
783        } else {
784            prompt.to_string()
785        };
786
787        // Build AgentMemory from shared store (if configured)
788        let memory = self
789            .memory_store
790            .read()
791            .await
792            .as_ref()
793            .map(|store| Arc::new(AgentMemory::new(store.clone())));
794
795        // Create agent loop with permission policy, confirmation manager, and context providers
796        let config = AgentConfig {
797            prompt_slots: match system {
798                Some(s) => SystemPromptSlots::from_legacy(s),
799                None => SystemPromptSlots::default(),
800            },
801            tools,
802            max_tool_rounds: 50,
803            permission_checker: Some(permission_checker),
804            confirmation_manager: Some(confirmation_manager),
805            context_providers,
806            planning_mode,
807            goal_tracking,
808            hook_engine,
809            skill_registry,
810            memory,
811            ..AgentConfig::default()
812        };
813
814        let agent = AgentLoop::new(llm_client, self.tool_executor.clone(), tool_context, config)
815            .with_tool_metrics(tool_metrics);
816
817        // Execute with session context
818        let result = agent
819            .execute_with_session(&history, &effective_prompt, Some(session_id), None, None)
820            .await?;
821
822        // Update session
823        {
824            let mut session = session_lock.write().await;
825            session.messages = result.messages.clone();
826            session.update_usage(&result.usage);
827        }
828
829        // Persist to store
830        self.persist_in_background(session_id, "generate");
831
832        // Auto-compact if context usage exceeds threshold
833        if let Err(e) = self.maybe_auto_compact(session_id).await {
834            tracing::warn!("Auto-compact failed for session {}: {}", session_id, e);
835        }
836
837        Ok(result)
838    }
839
840    /// Generate response with streaming events
841    pub async fn generate_streaming(
842        &self,
843        session_id: &str,
844        prompt: &str,
845    ) -> Result<(
846        mpsc::Receiver<AgentEvent>,
847        tokio::task::JoinHandle<Result<AgentResult>>,
848        tokio_util::sync::CancellationToken,
849    )> {
850        let session_lock = self.get_session(session_id).await?;
851
852        // Check if session is paused
853        {
854            let session = session_lock.read().await;
855            if session.state == SessionState::Paused {
856                anyhow::bail!(
857                    "Session {} is paused. Call Resume before generating.",
858                    session_id
859                );
860            }
861        }
862
863        // Get session state and LLM client
864        let (
865            history,
866            system,
867            tools,
868            session_llm_client,
869            permission_checker,
870            confirmation_manager,
871            context_providers,
872            session_workspace,
873            tool_metrics,
874            hook_engine,
875            planning_mode,
876            goal_tracking,
877        ) = {
878            let session = session_lock.read().await;
879            (
880                session.messages.clone(),
881                session.system().map(String::from),
882                session.tools.clone(),
883                session.llm_client.clone(),
884                session.permission_checker.clone(),
885                session.confirmation_manager.clone(),
886                session.context_providers.clone(),
887                session.config.workspace.clone(),
888                session.tool_metrics.clone(),
889                session.config.hook_engine.clone(),
890                session.config.planning_mode,
891                session.config.goal_tracking,
892            )
893        };
894
895        // Use session's LLM client if configured, otherwise use default
896        let llm_client = if let Some(client) = session_llm_client {
897            client
898        } else if let Some(client) = self.llm_client.read().await.clone() {
899            client
900        } else {
901            anyhow::bail!(
902                "LLM client not configured for session {}. Please call Configure RPC with model configuration first.",
903                session_id
904            );
905        };
906
907        // Construct per-session ToolContext from session workspace, falling back to server default
908        let tool_context = if session_workspace.is_empty() {
909            crate::tools::ToolContext::new(self.tool_executor.workspace().clone())
910                .with_session_id(session_id)
911        } else {
912            crate::tools::ToolContext::new(std::path::PathBuf::from(&session_workspace))
913                .with_session_id(session_id)
914        };
915        let tool_context = if let Some(registry) = self.document_parser_registry().await {
916            tool_context.with_document_parsers(registry)
917        } else {
918            tool_context
919        };
920        let tool_context = if let Some(command_env) = self.tool_executor.command_env() {
921            tool_context.with_command_env(command_env)
922        } else {
923            tool_context
924        };
925
926        // Inject skill registry into system prompt and agent config
927        let skill_registry = match self.session_skill_registry(session_id).await {
928            Some(registry) => Some(registry),
929            None => self.skill_registry.read().await.clone(),
930        };
931        let system = if let Some(ref registry) = skill_registry {
932            let skill_prompt = registry.to_system_prompt();
933            if skill_prompt.is_empty() {
934                system
935            } else {
936                match system {
937                    Some(existing) => Some(format!("{}\n\n{}", existing, skill_prompt)),
938                    None => Some(skill_prompt),
939                }
940            }
941        } else {
942            system
943        };
944
945        // Inject matched skill content on-demand before skill_registry is moved into AgentConfig
946        let effective_prompt = if let Some(ref registry) = skill_registry {
947            let skill_content = registry.match_skills(prompt);
948            if skill_content.is_empty() {
949                prompt.to_string()
950            } else {
951                format!("{}\n\n---\n\n{}", skill_content, prompt)
952            }
953        } else {
954            prompt.to_string()
955        };
956
957        // Build AgentMemory from shared store (if configured)
958        let memory = self
959            .memory_store
960            .read()
961            .await
962            .as_ref()
963            .map(|store| Arc::new(AgentMemory::new(store.clone())));
964
965        // Create agent loop with permission policy, confirmation manager, and context providers
966        let config = AgentConfig {
967            prompt_slots: match system {
968                Some(s) => SystemPromptSlots::from_legacy(s),
969                None => SystemPromptSlots::default(),
970            },
971            tools,
972            max_tool_rounds: 50,
973            permission_checker: Some(permission_checker),
974            confirmation_manager: Some(confirmation_manager),
975            context_providers,
976            planning_mode,
977            goal_tracking,
978            hook_engine,
979            skill_registry,
980            memory,
981            ..AgentConfig::default()
982        };
983
984        let agent = AgentLoop::new(llm_client, self.tool_executor.clone(), tool_context, config)
985            .with_tool_metrics(tool_metrics);
986
987        // Execute with streaming
988        let (rx, handle, cancel_token) =
989            agent.execute_streaming(&history, &effective_prompt).await?;
990
991        // Store the cancellation token for cancellation support
992        let cancel_token_clone = cancel_token.clone();
993        {
994            let mut ops = self.ongoing_operations.write().await;
995            ops.insert(session_id.to_string(), cancel_token);
996        }
997
998        // Spawn task to update session after completion
999        let session_lock_clone = session_lock.clone();
1000        let original_handle = handle;
1001        let stores = self.stores.clone();
1002        let session_storage_types = self.session_storage_types.clone();
1003        let llm_configs = self.llm_configs.clone();
1004        let session_id_owned = session_id.to_string();
1005        let ongoing_operations = self.ongoing_operations.clone();
1006        let session_manager = self.clone();
1007
1008        let wrapped_handle = tokio::spawn(async move {
1009            let result = original_handle.await??;
1010
1011            // Remove from ongoing operations
1012            {
1013                let mut ops = ongoing_operations.write().await;
1014                ops.remove(&session_id_owned);
1015            }
1016
1017            // Update session
1018            {
1019                let mut session = session_lock_clone.write().await;
1020                session.messages = result.messages.clone();
1021                session.update_usage(&result.usage);
1022            }
1023
1024            // Persist to store
1025            let storage_type = {
1026                let storage_types = session_storage_types.read().await;
1027                storage_types.get(&session_id_owned).cloned()
1028            };
1029
1030            if let Some(storage_type) = storage_type {
1031                if storage_type != crate::config::StorageBackend::Memory {
1032                    let stores_guard = stores.read().await;
1033                    if let Some(store) = stores_guard.get(&storage_type) {
1034                        let session = session_lock_clone.read().await;
1035                        let llm_config = {
1036                            let configs = llm_configs.read().await;
1037                            configs.get(&session_id_owned).cloned()
1038                        };
1039                        let data = session.to_session_data(llm_config);
1040                        if let Err(e) = store.save(&data).await {
1041                            tracing::warn!(
1042                                "Failed to persist session {} after streaming: {}",
1043                                session_id_owned,
1044                                e
1045                            );
1046                        }
1047                    }
1048                }
1049            }
1050
1051            // Auto-compact if context usage exceeds threshold
1052            if let Err(e) = session_manager.maybe_auto_compact(&session_id_owned).await {
1053                tracing::warn!(
1054                    "Auto-compact failed for session {}: {}",
1055                    session_id_owned,
1056                    e
1057                );
1058            }
1059
1060            Ok(result)
1061        });
1062
1063        Ok((rx, wrapped_handle, cancel_token_clone))
1064    }
1065
1066    /// Get context usage for a session
1067    pub async fn context_usage(&self, session_id: &str) -> Result<ContextUsage> {
1068        let session_lock = self.get_session(session_id).await?;
1069        let session = session_lock.read().await;
1070        Ok(session.context_usage.clone())
1071    }
1072
1073    /// Get conversation history for a session
1074    pub async fn history(&self, session_id: &str) -> Result<Vec<Message>> {
1075        let session_lock = self.get_session(session_id).await?;
1076        let session = session_lock.read().await;
1077        Ok(session.messages.clone())
1078    }
1079
1080    /// Clear session history
1081    pub async fn clear(&self, session_id: &str) -> Result<()> {
1082        {
1083            let session_lock = self.get_session(session_id).await?;
1084            let mut session = session_lock.write().await;
1085            session.clear();
1086        }
1087
1088        // Persist to store
1089        self.persist_in_background(session_id, "clear");
1090
1091        Ok(())
1092    }
1093
1094    /// Compact session context
1095    pub async fn compact(&self, session_id: &str) -> Result<()> {
1096        tracing::info!(name: "a3s.session.compact", session_id = %session_id, "Compacting session context");
1097
1098        {
1099            let session_lock = self.get_session(session_id).await?;
1100            let mut session = session_lock.write().await;
1101
1102            // Get LLM client for compaction (if available)
1103            let llm_client = if let Some(client) = &session.llm_client {
1104                client.clone()
1105            } else if let Some(client) = self.llm_client.read().await.clone() {
1106                client
1107            } else {
1108                // If no LLM client available, just do simple truncation
1109                tracing::warn!("No LLM client configured for compaction, using simple truncation");
1110                let keep_messages = 20;
1111                if session.messages.len() > keep_messages {
1112                    let len = session.messages.len();
1113                    session.messages = session.messages.split_off(len - keep_messages);
1114                }
1115                // Persist after truncation
1116                drop(session);
1117                self.persist_in_background(session_id, "compact");
1118                return Ok(());
1119            };
1120
1121            session.compact(&llm_client).await?;
1122        }
1123
1124        // Persist to store
1125        self.persist_in_background(session_id, "compact");
1126
1127        Ok(())
1128    }
1129
1130    /// Check if auto-compaction should be triggered and perform it if needed.
1131    ///
1132    /// Called after `generate()` / `generate_streaming()` updates session usage.
1133    /// Triggers compaction when:
1134    /// - `auto_compact` is enabled in session config
1135    /// - `context_usage.percent` exceeds `auto_compact_threshold`
1136    pub async fn maybe_auto_compact(&self, session_id: &str) -> Result<bool> {
1137        let (should_compact, percent_before, messages_before) = {
1138            let session_lock = self.get_session(session_id).await?;
1139            let session = session_lock.read().await;
1140
1141            if !session.config.auto_compact {
1142                return Ok(false);
1143            }
1144
1145            let threshold = session.config.auto_compact_threshold;
1146            let percent = session.context_usage.percent;
1147            let msg_count = session.messages.len();
1148
1149            tracing::debug!(
1150                "Auto-compact check for session {}: percent={:.2}%, threshold={:.2}%, messages={}",
1151                session_id,
1152                percent * 100.0,
1153                threshold * 100.0,
1154                msg_count,
1155            );
1156
1157            (percent >= threshold, percent, msg_count)
1158        };
1159
1160        if !should_compact {
1161            return Ok(false);
1162        }
1163
1164        tracing::info!(
1165            name: "a3s.session.auto_compact",
1166            session_id = %session_id,
1167            percent_before = %format!("{:.1}%", percent_before * 100.0),
1168            messages_before = %messages_before,
1169            "Auto-compacting session due to high context usage"
1170        );
1171
1172        // Perform compaction (reuses existing compact logic)
1173        self.compact(session_id).await?;
1174
1175        // Get post-compaction message count
1176        let messages_after = {
1177            let session_lock = self.get_session(session_id).await?;
1178            let session = session_lock.read().await;
1179            session.messages.len()
1180        };
1181
1182        // Broadcast event to notify clients
1183        let event = AgentEvent::ContextCompacted {
1184            session_id: session_id.to_string(),
1185            before_messages: messages_before,
1186            after_messages: messages_after,
1187            percent_before,
1188        };
1189
1190        // Try to send via session's event broadcaster
1191        if let Ok(session_lock) = self.get_session(session_id).await {
1192            let session = session_lock.read().await;
1193            let _ = session.event_tx.send(event);
1194        }
1195
1196        tracing::info!(
1197            name: "a3s.session.auto_compact.done",
1198            session_id = %session_id,
1199            messages_before = %messages_before,
1200            messages_after = %messages_after,
1201            "Auto-compaction complete"
1202        );
1203
1204        Ok(true)
1205    }
1206
1207    /// Resolve the LLM client for a session (session-level -> default fallback)
1208    ///
1209    /// Returns `None` if no LLM client is configured at either level.
1210    pub async fn get_llm_for_session(
1211        &self,
1212        session_id: &str,
1213    ) -> Result<Option<Arc<dyn LlmClient>>> {
1214        let session_lock = self.get_session(session_id).await?;
1215        let session = session_lock.read().await;
1216
1217        if let Some(client) = &session.llm_client {
1218            return Ok(Some(client.clone()));
1219        }
1220
1221        Ok(self.llm_client.read().await.clone())
1222    }
1223
1224    /// Ask an ephemeral side question for an existing managed session.
1225    ///
1226    /// Uses a read-only snapshot of the session history plus current queue /
1227    /// confirmation / task state. Optional `runtime_context` lets hosts inject
1228    /// extra transient context such as browser-only UI state.
1229    pub async fn btw_with_context(
1230        &self,
1231        session_id: &str,
1232        question: &str,
1233        runtime_context: Option<&str>,
1234    ) -> Result<crate::agent_api::BtwResult> {
1235        let question = question.trim();
1236        if question.is_empty() {
1237            anyhow::bail!("btw: question cannot be empty");
1238        }
1239
1240        let session_lock = self.get_session(session_id).await?;
1241        let (history_snapshot, session_runtime) = {
1242            let session = session_lock.read().await;
1243            let mut sections = Vec::new();
1244
1245            let pending_confirmations = session.confirmation_manager.pending_confirmations().await;
1246            if !pending_confirmations.is_empty() {
1247                let mut lines = pending_confirmations
1248                    .into_iter()
1249                    .map(|pending| {
1250                        let arg_summary = Self::compact_json_value(&pending.args);
1251                        if arg_summary.is_empty() {
1252                            format!(
1253                                "- {} [{}] remaining={}ms",
1254                                pending.tool_name, pending.tool_id, pending.remaining_ms
1255                            )
1256                        } else {
1257                            format!(
1258                                "- {} [{}] remaining={}ms {}",
1259                                pending.tool_name,
1260                                pending.tool_id,
1261                                pending.remaining_ms,
1262                                arg_summary
1263                            )
1264                        }
1265                    })
1266                    .collect::<Vec<_>>();
1267                lines.sort();
1268                sections.push(format!("[pending confirmations]\n{}", lines.join("\n")));
1269            }
1270
1271            let stats = session.command_queue.stats().await;
1272            if stats.total_active > 0 || stats.total_pending > 0 || stats.external_pending > 0 {
1273                let mut lines = vec![format!(
1274                    "active={}, pending={}, external_pending={}",
1275                    stats.total_active, stats.total_pending, stats.external_pending
1276                )];
1277                let mut lanes = stats
1278                    .lanes
1279                    .into_values()
1280                    .filter(|lane| lane.active > 0 || lane.pending > 0)
1281                    .map(|lane| {
1282                        format!(
1283                            "- {:?}: active={}, pending={}, handler={:?}",
1284                            lane.lane, lane.active, lane.pending, lane.handler_mode
1285                        )
1286                    })
1287                    .collect::<Vec<_>>();
1288                lanes.sort();
1289                lines.extend(lanes);
1290                sections.push(format!("[session queue]\n{}", lines.join("\n")));
1291            }
1292
1293            let external_tasks = session.command_queue.pending_external_tasks().await;
1294            if !external_tasks.is_empty() {
1295                let mut lines = external_tasks
1296                    .into_iter()
1297                    .take(6)
1298                    .map(|task| {
1299                        let payload_summary = Self::compact_json_value(&task.payload);
1300                        if payload_summary.is_empty() {
1301                            format!(
1302                                "- {} {:?} remaining={}ms",
1303                                task.command_type,
1304                                task.lane,
1305                                task.remaining_ms()
1306                            )
1307                        } else {
1308                            format!(
1309                                "- {} {:?} remaining={}ms {}",
1310                                task.command_type,
1311                                task.lane,
1312                                task.remaining_ms(),
1313                                payload_summary
1314                            )
1315                        }
1316                    })
1317                    .collect::<Vec<_>>();
1318                lines.sort();
1319                sections.push(format!("[pending external tasks]\n{}", lines.join("\n")));
1320            }
1321
1322            let active_tasks = session
1323                .tasks
1324                .iter()
1325                .filter(|task| task.status.is_active())
1326                .take(6)
1327                .map(|task| match &task.tool {
1328                    Some(tool) if !tool.is_empty() => {
1329                        format!("- [{}] {} ({})", task.status, task.content, tool)
1330                    }
1331                    _ => format!("- [{}] {}", task.status, task.content),
1332                })
1333                .collect::<Vec<_>>();
1334            if !active_tasks.is_empty() {
1335                sections.push(format!("[tracked tasks]\n{}", active_tasks.join("\n")));
1336            }
1337
1338            sections.push(format!(
1339                "[session state]\n{}",
1340                match session.state {
1341                    SessionState::Active => "active",
1342                    SessionState::Paused => "paused",
1343                    SessionState::Completed => "completed",
1344                    SessionState::Error => "error",
1345                    SessionState::Unknown => "unknown",
1346                }
1347            ));
1348
1349            (session.messages.clone(), sections.join("\n\n"))
1350        };
1351
1352        let llm_client = self
1353            .get_llm_for_session(session_id)
1354            .await?
1355            .context("btw failed: no model configured for session")?;
1356
1357        let mut messages = history_snapshot;
1358        let mut injected_sections = Vec::new();
1359        if !session_runtime.is_empty() {
1360            injected_sections.push(format!("[session runtime context]\n{}", session_runtime));
1361        }
1362        if let Some(extra) = runtime_context.map(str::trim).filter(|ctx| !ctx.is_empty()) {
1363            injected_sections.push(format!("[host runtime context]\n{}", extra));
1364        }
1365        if !injected_sections.is_empty() {
1366            let injected_context = format!(
1367                "Use the following runtime context only as background for the next side question. Do not treat it as a new user request.\n\n{}",
1368                injected_sections.join("\n\n")
1369            );
1370            messages.push(Message::user(&injected_context));
1371        }
1372        messages.push(Message::user(question));
1373
1374        let response = llm_client
1375            .complete(&messages, Some(crate::prompts::BTW_SYSTEM), &[])
1376            .await
1377            .map_err(|e| anyhow::anyhow!("btw: ephemeral LLM call failed: {e}"))?;
1378
1379        Ok(crate::agent_api::BtwResult {
1380            question: question.to_string(),
1381            answer: response.text(),
1382            usage: response.usage,
1383        })
1384    }
1385
1386    /// Configure session
1387    pub async fn configure(
1388        &self,
1389        session_id: &str,
1390        thinking: Option<bool>,
1391        budget: Option<usize>,
1392        model_config: Option<LlmConfig>,
1393    ) -> Result<()> {
1394        {
1395            let session_lock = self.get_session(session_id).await?;
1396            let mut session = session_lock.write().await;
1397
1398            if let Some(t) = thinking {
1399                session.thinking_enabled = t;
1400            }
1401            if let Some(b) = budget {
1402                session.thinking_budget = Some(b);
1403            }
1404            if let Some(ref config) = model_config {
1405                tracing::info!(
1406                    "Configuring session {} with LLM: provider={}, model={}",
1407                    session_id,
1408                    config.provider,
1409                    config.model
1410                );
1411                session.model_name = Some(config.model.clone());
1412                session.llm_client = Some(llm::create_client_with_config(config.clone()));
1413            }
1414        }
1415
1416        // Store LLM config for persistence (without API key)
1417        if let Some(config) = model_config {
1418            let llm_config_data = LlmConfigData {
1419                provider: config.provider,
1420                model: config.model,
1421                api_key: None, // Don't persist API key
1422                base_url: config.base_url,
1423            };
1424            let mut configs = self.llm_configs.write().await;
1425            configs.insert(session_id.to_string(), llm_config_data);
1426        }
1427
1428        // Persist to store
1429        self.persist_in_background(session_id, "configure");
1430
1431        Ok(())
1432    }
1433
1434    /// Update a session's system prompt in place.
1435    pub async fn set_system_prompt(
1436        &self,
1437        session_id: &str,
1438        system_prompt: Option<String>,
1439    ) -> Result<()> {
1440        {
1441            let session_lock = self.get_session(session_id).await?;
1442            let mut session = session_lock.write().await;
1443            session.config.system_prompt = system_prompt;
1444        }
1445
1446        self.persist_in_background(session_id, "set_system_prompt");
1447        Ok(())
1448    }
1449
1450    /// Get session count
1451    pub async fn session_count(&self) -> usize {
1452        let sessions = self.sessions.read().await;
1453        sessions.len()
1454    }
1455
1456    /// Check health of all registered stores
1457    pub async fn store_health(&self) -> Vec<(String, Result<()>)> {
1458        let stores = self.stores.read().await;
1459        let mut results = Vec::new();
1460        for (_, store) in stores.iter() {
1461            let name = store.backend_name().to_string();
1462            let result = store.health_check().await;
1463            results.push((name, result));
1464        }
1465        results
1466    }
1467
1468    /// List all loaded tools (built-in tools)
1469    pub fn list_tools(&self) -> Vec<crate::llm::ToolDefinition> {
1470        self.tool_executor.definitions()
1471    }
1472
1473    /// Pause a session
1474    pub async fn pause_session(&self, session_id: &str) -> Result<bool> {
1475        let paused = {
1476            let session_lock = self.get_session(session_id).await?;
1477            let mut session = session_lock.write().await;
1478            session.pause()
1479        };
1480
1481        if paused {
1482            self.persist_in_background(session_id, "pause");
1483        }
1484
1485        Ok(paused)
1486    }
1487
1488    /// Resume a session
1489    pub async fn resume_session(&self, session_id: &str) -> Result<bool> {
1490        let resumed = {
1491            let session_lock = self.get_session(session_id).await?;
1492            let mut session = session_lock.write().await;
1493            session.resume()
1494        };
1495
1496        if resumed {
1497            self.persist_in_background(session_id, "resume");
1498        }
1499
1500        Ok(resumed)
1501    }
1502
1503    /// Cancel an ongoing operation for a session
1504    ///
1505    /// Returns true if an operation was cancelled, false if no operation was running.
1506    pub async fn cancel_operation(&self, session_id: &str) -> Result<bool> {
1507        // First, cancel any pending HITL confirmations
1508        let session_lock = self.get_session(session_id).await?;
1509        let cancelled_confirmations = {
1510            let session = session_lock.read().await;
1511            session.confirmation_manager.cancel_all().await
1512        };
1513
1514        if cancelled_confirmations > 0 {
1515            tracing::info!(
1516                "Cancelled {} pending confirmations for session {}",
1517                cancelled_confirmations,
1518                session_id
1519            );
1520        }
1521
1522        // Then, cancel the ongoing operation if any
1523        let cancel_token = {
1524            let mut ops = self.ongoing_operations.write().await;
1525            ops.remove(session_id)
1526        };
1527
1528        if let Some(token) = cancel_token {
1529            token.cancel();
1530            tracing::info!("Cancelled ongoing operation for session {}", session_id);
1531            Ok(true)
1532        } else if cancelled_confirmations > 0 {
1533            // We cancelled confirmations but no main operation
1534            Ok(true)
1535        } else {
1536            tracing::debug!("No ongoing operation to cancel for session {}", session_id);
1537            Ok(false)
1538        }
1539    }
1540
1541    /// Get all sessions (returns session locks for iteration)
1542    pub async fn get_all_sessions(&self) -> Vec<Arc<RwLock<Session>>> {
1543        let sessions = self.sessions.read().await;
1544        sessions.values().cloned().collect()
1545    }
1546
1547    /// Get tool executor reference
1548    pub fn tool_executor(&self) -> &Arc<ToolExecutor> {
1549        &self.tool_executor
1550    }
1551
1552    /// Confirm a tool execution (HITL)
1553    pub async fn confirm_tool(
1554        &self,
1555        session_id: &str,
1556        tool_id: &str,
1557        approved: bool,
1558        reason: Option<String>,
1559    ) -> Result<bool> {
1560        let session_lock = self.get_session(session_id).await?;
1561        let session = session_lock.read().await;
1562        session
1563            .confirmation_manager
1564            .confirm(tool_id, approved, reason)
1565            .await
1566            .map_err(|e| anyhow::anyhow!(e))
1567    }
1568
1569    /// Set confirmation policy for a session (HITL)
1570    pub async fn set_confirmation_policy(
1571        &self,
1572        session_id: &str,
1573        policy: ConfirmationPolicy,
1574    ) -> Result<ConfirmationPolicy> {
1575        {
1576            let session_lock = self.get_session(session_id).await?;
1577            let session = session_lock.read().await;
1578            session.set_confirmation_policy(policy.clone()).await;
1579        }
1580
1581        // Update config for persistence
1582        {
1583            let session_lock = self.get_session(session_id).await?;
1584            let mut session = session_lock.write().await;
1585            session.config.confirmation_policy = Some(policy.clone());
1586        }
1587
1588        // Persist to store
1589        self.persist_in_background(session_id, "set_confirmation_policy");
1590
1591        Ok(policy)
1592    }
1593
1594    /// Set permission policy for a session.
1595    pub async fn set_permission_policy(
1596        &self,
1597        session_id: &str,
1598        policy: PermissionPolicy,
1599    ) -> Result<PermissionPolicy> {
1600        {
1601            let session_lock = self.get_session(session_id).await?;
1602            let mut session = session_lock.write().await;
1603            session.set_permission_policy(policy.clone());
1604        }
1605
1606        self.persist_in_background(session_id, "set_permission_policy");
1607
1608        Ok(policy)
1609    }
1610
1611    /// Get confirmation policy for a session (HITL)
1612    pub async fn get_confirmation_policy(&self, session_id: &str) -> Result<ConfirmationPolicy> {
1613        let session_lock = self.get_session(session_id).await?;
1614        let session = session_lock.read().await;
1615        Ok(session.confirmation_policy().await)
1616    }
1617}