Skip to main content

a3s_code_core/session/
manager.rs

1//! SessionManager — manages multiple concurrent sessions
2
3use super::{ContextUsage, Session, SessionConfig, SessionState};
4use crate::agent::{AgentConfig, AgentEvent, AgentLoop, AgentResult};
5use crate::hitl::ConfirmationPolicy;
6use crate::llm::{self, LlmClient, LlmConfig, Message};
7use crate::mcp::McpManager;
8use crate::memory::AgentMemory;
9use crate::permissions::PermissionPolicy;
10use crate::prompts::SystemPromptSlots;
11use crate::skills::SkillRegistry;
12use crate::store::{FileSessionStore, LlmConfigData, SessionData, SessionStore};
13use crate::tools::ToolExecutor;
14use crate::DocumentParserRegistry;
15use a3s_memory::MemoryStore;
16use anyhow::{Context, Result};
17use std::collections::HashMap;
18use std::sync::Arc;
19use tokio::sync::{mpsc, RwLock};
20
21/// Session manager handles multiple concurrent sessions
22#[derive(Clone)]
23pub struct SessionManager {
24    pub(crate) sessions: Arc<RwLock<HashMap<String, Arc<RwLock<Session>>>>>,
25    /// Default LLM client used when a session has no per-session client configured.
26    /// Wrapped in RwLock so it can be hot-swapped at runtime without restart.
27    pub(crate) llm_client: Arc<RwLock<Option<Arc<dyn LlmClient>>>>,
28    pub(crate) tool_executor: Arc<ToolExecutor>,
29    /// Session stores by storage type
30    pub(crate) stores: Arc<RwLock<HashMap<crate::config::StorageBackend, Arc<dyn SessionStore>>>>,
31    /// Track which storage type each session uses
32    pub(crate) session_storage_types: Arc<RwLock<HashMap<String, crate::config::StorageBackend>>>,
33    /// LLM configurations for sessions (stored separately for persistence)
34    pub(crate) llm_configs: Arc<RwLock<HashMap<String, LlmConfigData>>>,
35    /// Ongoing operations (session_id -> CancellationToken)
36    pub(crate) ongoing_operations:
37        Arc<RwLock<HashMap<String, tokio_util::sync::CancellationToken>>>,
38    /// Skill registry for runtime skill management
39    pub(crate) skill_registry: Arc<RwLock<Option<Arc<SkillRegistry>>>>,
40    /// Per-session skill registries layered on top of the shared registry.
41    pub(crate) session_skill_registries: Arc<RwLock<HashMap<String, Arc<SkillRegistry>>>>,
42    /// Shared memory store for agent long-term memory.
43    /// When set, each `generate`/`generate_streaming` call wraps this in
44    /// `AgentMemory` and injects it into `AgentConfig.memory`.
45    pub(crate) memory_store: Arc<RwLock<Option<Arc<dyn MemoryStore>>>>,
46    /// Shared MCP manager. When set, MCP tools are registered into the
47    /// `ToolExecutor` at startup so all sessions can use them.
48    pub(crate) mcp_manager: Arc<RwLock<Option<Arc<McpManager>>>>,
49    /// Shared document parser registry for document-aware tools.
50    pub(crate) document_parser_registry: Arc<RwLock<Option<Arc<DocumentParserRegistry>>>>,
51}
52
53impl SessionManager {
54    fn compact_json_value(value: &serde_json::Value) -> String {
55        let raw = match value {
56            serde_json::Value::Null => String::new(),
57            serde_json::Value::String(s) => s.clone(),
58            _ => serde_json::to_string(value).unwrap_or_default(),
59        };
60        let compact = raw.split_whitespace().collect::<Vec<_>>().join(" ");
61        if compact.len() > 180 {
62            format!("{}...", &compact[..180])
63        } else {
64            compact
65        }
66    }
67
68    /// Create a new session manager without persistence
69    pub fn new(llm_client: Option<Arc<dyn LlmClient>>, tool_executor: Arc<ToolExecutor>) -> Self {
70        Self {
71            sessions: Arc::new(RwLock::new(HashMap::new())),
72            llm_client: Arc::new(RwLock::new(llm_client)),
73            tool_executor,
74            stores: Arc::new(RwLock::new(HashMap::new())),
75            session_storage_types: Arc::new(RwLock::new(HashMap::new())),
76            llm_configs: Arc::new(RwLock::new(HashMap::new())),
77            ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
78            skill_registry: Arc::new(RwLock::new(None)),
79            session_skill_registries: Arc::new(RwLock::new(HashMap::new())),
80            memory_store: Arc::new(RwLock::new(None)),
81            mcp_manager: Arc::new(RwLock::new(None)),
82            document_parser_registry: Arc::new(RwLock::new(None)),
83        }
84    }
85
86    /// Create a session manager with file-based persistence
87    ///
88    /// Sessions will be automatically saved to disk and restored on startup.
89    pub async fn with_persistence<P: AsRef<std::path::Path>>(
90        llm_client: Option<Arc<dyn LlmClient>>,
91        tool_executor: Arc<ToolExecutor>,
92        sessions_dir: P,
93    ) -> Result<Self> {
94        let store = FileSessionStore::new(sessions_dir).await?;
95        let mut stores = HashMap::new();
96        stores.insert(
97            crate::config::StorageBackend::File,
98            Arc::new(store) as Arc<dyn SessionStore>,
99        );
100
101        let manager = Self {
102            sessions: Arc::new(RwLock::new(HashMap::new())),
103            llm_client: Arc::new(RwLock::new(llm_client)),
104            tool_executor,
105            stores: Arc::new(RwLock::new(stores)),
106            session_storage_types: Arc::new(RwLock::new(HashMap::new())),
107            llm_configs: Arc::new(RwLock::new(HashMap::new())),
108            ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
109            skill_registry: Arc::new(RwLock::new(None)),
110            session_skill_registries: Arc::new(RwLock::new(HashMap::new())),
111            memory_store: Arc::new(RwLock::new(None)),
112            mcp_manager: Arc::new(RwLock::new(None)),
113            document_parser_registry: Arc::new(RwLock::new(None)),
114        };
115
116        Ok(manager)
117    }
118
119    /// Create a session manager with a custom store
120    ///
121    /// The `backend` parameter determines which `StorageBackend` key the store is registered under.
122    /// Sessions created with a matching `storage_type` will use this store.
123    pub fn with_store(
124        llm_client: Option<Arc<dyn LlmClient>>,
125        tool_executor: Arc<ToolExecutor>,
126        store: Arc<dyn SessionStore>,
127        backend: crate::config::StorageBackend,
128    ) -> Self {
129        let mut stores = HashMap::new();
130        stores.insert(backend, store);
131
132        Self {
133            sessions: Arc::new(RwLock::new(HashMap::new())),
134            llm_client: Arc::new(RwLock::new(llm_client)),
135            tool_executor,
136            stores: Arc::new(RwLock::new(stores)),
137            session_storage_types: Arc::new(RwLock::new(HashMap::new())),
138            llm_configs: Arc::new(RwLock::new(HashMap::new())),
139            ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
140            skill_registry: Arc::new(RwLock::new(None)),
141            session_skill_registries: Arc::new(RwLock::new(HashMap::new())),
142            memory_store: Arc::new(RwLock::new(None)),
143            mcp_manager: Arc::new(RwLock::new(None)),
144            document_parser_registry: Arc::new(RwLock::new(None)),
145        }
146    }
147
148    /// Hot-swap the default LLM client used by sessions without a per-session client.
149    ///
150    /// Called by SafeClaw when `PUT /api/agent/config` updates the model config.
151    /// All subsequent `generate` / `generate_streaming` calls will use the new client.
152    pub async fn set_default_llm(&self, client: Option<Arc<dyn LlmClient>>) {
153        *self.llm_client.write().await = client;
154    }
155
156    /// Set the skill registry for runtime skill management.
157    ///
158    /// Skills registered here will be injected into the system prompt
159    /// for all subsequent `generate` / `generate_streaming` calls.
160    /// Also registers the `manage_skill` tool on the `ToolExecutor` so
161    /// the Agent can create, list, and remove skills at runtime.
162    pub async fn set_skill_registry(
163        &self,
164        registry: Arc<SkillRegistry>,
165        skills_dir: std::path::PathBuf,
166    ) {
167        // Register the manage_skill tool for self-bootstrap
168        let manage_tool = crate::skills::ManageSkillTool::new(registry.clone(), skills_dir);
169        self.tool_executor
170            .register_dynamic_tool(Arc::new(manage_tool));
171
172        *self.skill_registry.write().await = Some(registry);
173    }
174
175    /// Get the current skill registry, if any.
176    pub async fn skill_registry(&self) -> Option<Arc<SkillRegistry>> {
177        self.skill_registry.read().await.clone()
178    }
179
180    /// Override the active skill registry for a single session.
181    pub async fn set_session_skill_registry(
182        &self,
183        session_id: impl Into<String>,
184        registry: Arc<SkillRegistry>,
185    ) {
186        self.session_skill_registries
187            .write()
188            .await
189            .insert(session_id.into(), registry);
190    }
191
192    /// Get the active skill registry for a single session, if one was set.
193    pub async fn session_skill_registry(&self, session_id: &str) -> Option<Arc<SkillRegistry>> {
194        self.session_skill_registries
195            .read()
196            .await
197            .get(session_id)
198            .cloned()
199    }
200
201    /// Set the shared memory store for agent long-term memory.
202    ///
203    /// When set, every `generate`/`generate_streaming` call wraps this store
204    /// in an `AgentMemory` and injects it into `AgentConfig.memory`, enabling
205    /// automatic `recall_similar` before prompts and `remember_success`/
206    /// `remember_failure` after tool execution.
207    pub async fn set_memory_store(&self, store: Arc<dyn MemoryStore>) {
208        *self.memory_store.write().await = Some(store);
209    }
210
211    /// Get the current memory store, if any.
212    pub async fn memory_store(&self) -> Option<Arc<dyn MemoryStore>> {
213        self.memory_store.read().await.clone()
214    }
215
216    /// Set the shared MCP manager.
217    ///
218    /// When set, all tools from connected MCP servers are registered into the
219    /// `ToolExecutor` so they are available in every session.
220    pub async fn set_mcp_manager(&self, manager: Arc<McpManager>) {
221        let all_tools = manager.get_all_tools().await;
222        let mut by_server: HashMap<String, Vec<crate::mcp::McpTool>> = HashMap::new();
223        for (server, tool) in all_tools {
224            by_server.entry(server).or_default().push(tool);
225        }
226        for (server_name, tools) in by_server {
227            for tool in
228                crate::mcp::tools::create_mcp_tools(&server_name, tools, Arc::clone(&manager))
229            {
230                self.tool_executor.register_dynamic_tool(tool);
231            }
232        }
233        *self.mcp_manager.write().await = Some(manager);
234    }
235
236    /// Dynamically add and connect an MCP server to the shared manager.
237    ///
238    /// Registers the server config, connects it, and registers its tools into
239    /// the `ToolExecutor` so all subsequent sessions can use them.
240    pub async fn add_mcp_server(&self, config: crate::mcp::McpServerConfig) -> Result<()> {
241        let manager = {
242            let guard = self.mcp_manager.read().await;
243            match guard.clone() {
244                Some(m) => m,
245                None => {
246                    drop(guard);
247                    let m = Arc::new(McpManager::new());
248                    *self.mcp_manager.write().await = Some(Arc::clone(&m));
249                    m
250                }
251            }
252        };
253        let name = config.name.clone();
254        manager.register_server(config).await;
255        manager.connect(&name).await?;
256        let tools = manager.get_server_tools(&name).await;
257        for tool in crate::mcp::tools::create_mcp_tools(&name, tools, Arc::clone(&manager)) {
258            self.tool_executor.register_dynamic_tool(tool);
259        }
260        Ok(())
261    }
262
263    /// Disconnect and remove an MCP server from the shared manager.
264    ///
265    /// Also unregisters its tools from the `ToolExecutor`.
266    pub async fn remove_mcp_server(&self, name: &str) -> Result<()> {
267        let guard = self.mcp_manager.read().await;
268        if let Some(ref manager) = *guard {
269            manager.disconnect(name).await?;
270        }
271        self.tool_executor
272            .unregister_tools_by_prefix(&format!("mcp__{name}__"));
273        Ok(())
274    }
275
276    /// Get status of all connected MCP servers.
277    pub async fn mcp_status(
278        &self,
279    ) -> std::collections::HashMap<String, crate::mcp::McpServerStatus> {
280        let guard = self.mcp_manager.read().await;
281        match guard.as_ref() {
282            Some(m) => {
283                let m = Arc::clone(m);
284                drop(guard);
285                m.get_status().await
286            }
287            None => std::collections::HashMap::new(),
288        }
289    }
290
291    /// Set the shared document parser registry for document-aware tools.
292    pub async fn set_document_parser_registry(&self, registry: Arc<DocumentParserRegistry>) {
293        *self.document_parser_registry.write().await = Some(registry);
294    }
295
296    /// Get the current shared document parser registry, if any.
297    pub async fn document_parser_registry(&self) -> Option<Arc<DocumentParserRegistry>> {
298        self.document_parser_registry.read().await.clone()
299    }
300
301    /// Restore a single session by ID from the store
302    ///
303    /// Searches all registered stores for the given session ID and restores it
304    /// into the in-memory session map. Returns an error if not found.
305    pub async fn restore_session_by_id(&self, session_id: &str) -> Result<()> {
306        // Check if already loaded
307        {
308            let sessions = self.sessions.read().await;
309            if sessions.contains_key(session_id) {
310                return Ok(());
311            }
312        }
313
314        let stores = self.stores.read().await;
315        for (backend, store) in stores.iter() {
316            match store.load(session_id).await {
317                Ok(Some(data)) => {
318                    {
319                        let mut storage_types = self.session_storage_types.write().await;
320                        storage_types.insert(data.id.clone(), backend.clone());
321                    }
322                    self.restore_session(data).await?;
323                    return Ok(());
324                }
325                Ok(None) => continue,
326                Err(e) => {
327                    tracing::warn!(
328                        "Failed to load session {} from {:?}: {}",
329                        session_id,
330                        backend,
331                        e
332                    );
333                    continue;
334                }
335            }
336        }
337
338        Err(anyhow::anyhow!(
339            "Session {} not found in any store",
340            session_id
341        ))
342    }
343
344    /// Load all sessions from all registered stores
345    pub async fn load_all_sessions(&mut self) -> Result<usize> {
346        let stores = self.stores.read().await;
347        let mut loaded = 0;
348
349        for (backend, store) in stores.iter() {
350            let session_ids = match store.list().await {
351                Ok(ids) => ids,
352                Err(e) => {
353                    tracing::warn!("Failed to list sessions from {:?} store: {}", backend, e);
354                    continue;
355                }
356            };
357
358            for id in session_ids {
359                match store.load(&id).await {
360                    Ok(Some(data)) => {
361                        // Record the storage type for this session
362                        {
363                            let mut storage_types = self.session_storage_types.write().await;
364                            storage_types.insert(data.id.clone(), backend.clone());
365                        }
366
367                        if let Err(e) = self.restore_session(data).await {
368                            tracing::warn!("Failed to restore session {}: {}", id, e);
369                        } else {
370                            loaded += 1;
371                        }
372                    }
373                    Ok(None) => {
374                        tracing::warn!("Session {} not found in store", id);
375                    }
376                    Err(e) => {
377                        tracing::warn!("Failed to load session {}: {}", id, e);
378                    }
379                }
380            }
381        }
382
383        tracing::info!("Loaded {} sessions from store", loaded);
384        Ok(loaded)
385    }
386
387    /// Restore a session from SessionData
388    async fn restore_session(&self, data: SessionData) -> Result<()> {
389        let tools = self.tool_executor.definitions();
390        let mut session = Session::new(data.id.clone(), data.config.clone(), tools).await?;
391
392        // Restore serializable state
393        session.restore_from_data(&data);
394
395        // Restore LLM config if present (without API key - must be reconfigured)
396        if let Some(llm_config) = &data.llm_config {
397            let mut configs = self.llm_configs.write().await;
398            configs.insert(data.id.clone(), llm_config.clone());
399        }
400
401        let mut sessions = self.sessions.write().await;
402        sessions.insert(data.id.clone(), Arc::new(RwLock::new(session)));
403
404        tracing::info!("Restored session: {}", data.id);
405        Ok(())
406    }
407
408    /// Save a session to the store
409    async fn save_session(&self, session_id: &str) -> Result<()> {
410        // Get the storage type for this session
411        let storage_type = {
412            let storage_types = self.session_storage_types.read().await;
413            storage_types.get(session_id).cloned()
414        };
415
416        let Some(storage_type) = storage_type else {
417            // No storage type means memory-only session
418            return Ok(());
419        };
420
421        // Skip saving for memory storage
422        if storage_type == crate::config::StorageBackend::Memory {
423            return Ok(());
424        }
425
426        // Get the appropriate store
427        let stores = self.stores.read().await;
428        let Some(store) = stores.get(&storage_type) else {
429            tracing::warn!("No store available for storage type: {:?}", storage_type);
430            return Ok(());
431        };
432
433        let session_lock = self.get_session(session_id).await?;
434        let session = session_lock.read().await;
435
436        // Get LLM config if set
437        let llm_config = {
438            let configs = self.llm_configs.read().await;
439            configs.get(session_id).cloned()
440        };
441
442        let data = session.to_session_data(llm_config);
443        store.save(&data).await?;
444
445        tracing::debug!("Saved session: {}", session_id);
446        Ok(())
447    }
448
449    /// Persist a session to store, logging and emitting an event on failure.
450    /// This is a non-fatal wrapper around `save_session` — the operation
451    /// succeeds in memory even if persistence fails.
452    async fn persist_or_warn(&self, session_id: &str, operation: &str) {
453        if let Err(e) = self.save_session(session_id).await {
454            tracing::warn!(
455                "Failed to persist session {} after {}: {}",
456                session_id,
457                operation,
458                e
459            );
460            // Emit event so SDK clients can react
461            if let Ok(session_lock) = self.get_session(session_id).await {
462                let session = session_lock.read().await;
463                let _ = session.event_tx().send(AgentEvent::PersistenceFailed {
464                    session_id: session_id.to_string(),
465                    operation: operation.to_string(),
466                    error: e.to_string(),
467                });
468            }
469        }
470    }
471
472    /// Spawn persistence as a background task (non-blocking).
473    ///
474    /// All `SessionManager` fields are `Arc`-wrapped, so `clone()` is a
475    /// cheap pointer-copy. This keeps file I/O off the response path.
476    fn persist_in_background(&self, session_id: &str, operation: &str) {
477        let mgr = self.clone();
478        let sid = session_id.to_string();
479        let op = operation.to_string();
480        tokio::spawn(async move {
481            mgr.persist_or_warn(&sid, &op).await;
482        });
483    }
484
485    /// Create a new session
486    pub async fn create_session(&self, id: String, config: SessionConfig) -> Result<String> {
487        tracing::info!(name: "a3s.session.create", session_id = %id, "Creating session");
488
489        // Record the storage type for this session
490        {
491            let mut storage_types = self.session_storage_types.write().await;
492            storage_types.insert(id.clone(), config.storage_type.clone());
493        }
494
495        // Get tool definitions from the executor
496        let tools = self.tool_executor.definitions();
497        let mut session = Session::new(id.clone(), config, tools).await?;
498
499        // Start the command queue
500        session.start_queue().await?;
501
502        // Set max context length if provided
503        if session.config.max_context_length > 0 {
504            session.context_usage.max_tokens = session.config.max_context_length as usize;
505        }
506
507        {
508            let mut sessions = self.sessions.write().await;
509            sessions.insert(id.clone(), Arc::new(RwLock::new(session)));
510        }
511
512        // Persist to store
513        self.persist_in_background(&id, "create");
514
515        tracing::info!("Created session: {}", id);
516        Ok(id)
517    }
518
519    /// Destroy a session
520    pub async fn destroy_session(&self, id: &str) -> Result<()> {
521        tracing::info!(name: "a3s.session.destroy", session_id = %id, "Destroying session");
522
523        // Get the storage type before removing the session
524        let storage_type = {
525            let storage_types = self.session_storage_types.read().await;
526            storage_types.get(id).cloned()
527        };
528
529        {
530            let mut sessions = self.sessions.write().await;
531            sessions.remove(id);
532        }
533
534        // Remove LLM config
535        {
536            let mut configs = self.llm_configs.write().await;
537            configs.remove(id);
538        }
539
540        {
541            let mut registries = self.session_skill_registries.write().await;
542            registries.remove(id);
543        }
544
545        // Remove storage type tracking
546        {
547            let mut storage_types = self.session_storage_types.write().await;
548            storage_types.remove(id);
549        }
550
551        // Delete from store if applicable
552        if let Some(storage_type) = storage_type {
553            if storage_type != crate::config::StorageBackend::Memory {
554                let stores = self.stores.read().await;
555                if let Some(store) = stores.get(&storage_type) {
556                    if let Err(e) = store.delete(id).await {
557                        tracing::warn!("Failed to delete session {} from store: {}", id, e);
558                    }
559                }
560            }
561        }
562
563        tracing::info!("Destroyed session: {}", id);
564        Ok(())
565    }
566
567    /// Get a session by ID
568    pub async fn get_session(&self, id: &str) -> Result<Arc<RwLock<Session>>> {
569        let sessions = self.sessions.read().await;
570        sessions
571            .get(id)
572            .cloned()
573            .context(format!("Session not found: {}", id))
574    }
575
576    /// Create a child session for a subagent
577    ///
578    /// Child sessions inherit the parent's LLM client but have their own
579    /// permission policy and configuration.
580    pub async fn create_child_session(
581        &self,
582        parent_id: &str,
583        child_id: String,
584        mut config: SessionConfig,
585    ) -> Result<String> {
586        // Verify parent exists and inherit HITL policy
587        let parent_lock = self.get_session(parent_id).await?;
588        let parent_llm_client = {
589            let parent = parent_lock.read().await;
590
591            // Inherit parent's confirmation policy if child doesn't have one
592            if config.confirmation_policy.is_none() {
593                let parent_policy = parent.confirmation_manager.policy().await;
594                config.confirmation_policy = Some(parent_policy);
595            }
596
597            parent.llm_client.clone()
598        };
599
600        // Set parent_id in config
601        config.parent_id = Some(parent_id.to_string());
602
603        // Get tool definitions from the executor
604        let tools = self.tool_executor.definitions();
605        let mut session = Session::new(child_id.clone(), config, tools).await?;
606
607        // Inherit LLM client from parent if not set
608        if session.llm_client.is_none() {
609            let default_llm = self.llm_client.read().await.clone();
610            session.llm_client = parent_llm_client.or(default_llm);
611        }
612
613        // Start the command queue
614        session.start_queue().await?;
615
616        // Set max context length if provided
617        if session.config.max_context_length > 0 {
618            session.context_usage.max_tokens = session.config.max_context_length as usize;
619        }
620
621        {
622            let mut sessions = self.sessions.write().await;
623            sessions.insert(child_id.clone(), Arc::new(RwLock::new(session)));
624        }
625
626        // Persist to store
627        self.persist_in_background(&child_id, "create_child");
628
629        tracing::info!(
630            "Created child session: {} (parent: {})",
631            child_id,
632            parent_id
633        );
634        Ok(child_id)
635    }
636
637    /// Get all child sessions for a parent session
638    pub async fn get_child_sessions(&self, parent_id: &str) -> Vec<String> {
639        let sessions = self.sessions.read().await;
640        let mut children = Vec::new();
641
642        for (id, session_lock) in sessions.iter() {
643            let session = session_lock.read().await;
644            if session.parent_id.as_deref() == Some(parent_id) {
645                children.push(id.clone());
646            }
647        }
648
649        children
650    }
651
652    /// Check if a session is a child session
653    pub async fn is_child_session(&self, session_id: &str) -> Result<bool> {
654        let session_lock = self.get_session(session_id).await?;
655        let session = session_lock.read().await;
656        Ok(session.is_child_session())
657    }
658
659    /// Generate response for a prompt
660    pub async fn generate(&self, session_id: &str, prompt: &str) -> Result<AgentResult> {
661        let session_lock = self.get_session(session_id).await?;
662
663        // Check if session is paused
664        {
665            let session = session_lock.read().await;
666            if session.state == SessionState::Paused {
667                anyhow::bail!(
668                    "Session {} is paused. Call Resume before generating.",
669                    session_id
670                );
671            }
672        }
673
674        // Get session state and LLM client
675        let (
676            history,
677            system,
678            tools,
679            session_llm_client,
680            permission_checker,
681            confirmation_manager,
682            context_providers,
683            session_workspace,
684            tool_metrics,
685            hook_engine,
686            planning_enabled,
687            goal_tracking,
688        ) = {
689            let session = session_lock.read().await;
690            (
691                session.messages.clone(),
692                session.system().map(String::from),
693                session.tools.clone(),
694                session.llm_client.clone(),
695                session.permission_checker.clone(),
696                session.confirmation_manager.clone(),
697                session.context_providers.clone(),
698                session.config.workspace.clone(),
699                session.tool_metrics.clone(),
700                session.config.hook_engine.clone(),
701                session.config.planning_enabled,
702                session.config.goal_tracking,
703            )
704        };
705
706        // Use session's LLM client if configured, otherwise use default
707        let llm_client = if let Some(client) = session_llm_client {
708            client
709        } else if let Some(client) = self.llm_client.read().await.clone() {
710            client
711        } else {
712            anyhow::bail!(
713                "LLM client not configured for session {}. Please call Configure RPC with model configuration first.",
714                session_id
715            );
716        };
717
718        // Construct per-session ToolContext from session workspace, falling back to server default
719        let tool_context = if session_workspace.is_empty() {
720            crate::tools::ToolContext::new(self.tool_executor.workspace().clone())
721                .with_session_id(session_id)
722        } else {
723            crate::tools::ToolContext::new(std::path::PathBuf::from(&session_workspace))
724                .with_session_id(session_id)
725        };
726        let tool_context = if let Some(registry) = self.document_parser_registry().await {
727            tool_context.with_document_parsers(registry)
728        } else {
729            tool_context
730        };
731
732        // Inject skill registry into system prompt and agent config
733        let skill_registry = match self.session_skill_registry(session_id).await {
734            Some(registry) => Some(registry),
735            None => self.skill_registry.read().await.clone(),
736        };
737        let system = if let Some(ref registry) = skill_registry {
738            let skill_prompt = registry.to_system_prompt();
739            if skill_prompt.is_empty() {
740                system
741            } else {
742                match system {
743                    Some(existing) => Some(format!("{}\n\n{}", existing, skill_prompt)),
744                    None => Some(skill_prompt),
745                }
746            }
747        } else {
748            system
749        };
750
751        // Inject matched skill content on-demand before skill_registry is moved into AgentConfig
752        let effective_prompt = if let Some(ref registry) = skill_registry {
753            let skill_content = registry.match_skills(prompt);
754            if skill_content.is_empty() {
755                prompt.to_string()
756            } else {
757                format!("{}\n\n---\n\n{}", skill_content, prompt)
758            }
759        } else {
760            prompt.to_string()
761        };
762
763        // Build AgentMemory from shared store (if configured)
764        let memory = self
765            .memory_store
766            .read()
767            .await
768            .as_ref()
769            .map(|store| Arc::new(AgentMemory::new(store.clone())));
770
771        // Create agent loop with permission policy, confirmation manager, and context providers
772        let config = AgentConfig {
773            prompt_slots: match system {
774                Some(s) => SystemPromptSlots::from_legacy(s),
775                None => SystemPromptSlots::default(),
776            },
777            tools,
778            max_tool_rounds: 50,
779            permission_checker: Some(permission_checker),
780            confirmation_manager: Some(confirmation_manager),
781            context_providers,
782            planning_enabled,
783            goal_tracking,
784            hook_engine,
785            skill_registry,
786            memory,
787            ..AgentConfig::default()
788        };
789
790        let agent = AgentLoop::new(llm_client, self.tool_executor.clone(), tool_context, config)
791            .with_tool_metrics(tool_metrics);
792
793        // Execute with session context
794        let result = agent
795            .execute_with_session(&history, &effective_prompt, Some(session_id), None, None)
796            .await?;
797
798        // Update session
799        {
800            let mut session = session_lock.write().await;
801            session.messages = result.messages.clone();
802            session.update_usage(&result.usage);
803        }
804
805        // Persist to store
806        self.persist_in_background(session_id, "generate");
807
808        // Auto-compact if context usage exceeds threshold
809        if let Err(e) = self.maybe_auto_compact(session_id).await {
810            tracing::warn!("Auto-compact failed for session {}: {}", session_id, e);
811        }
812
813        Ok(result)
814    }
815
816    /// Generate response with streaming events
817    pub async fn generate_streaming(
818        &self,
819        session_id: &str,
820        prompt: &str,
821    ) -> Result<(
822        mpsc::Receiver<AgentEvent>,
823        tokio::task::JoinHandle<Result<AgentResult>>,
824        tokio_util::sync::CancellationToken,
825    )> {
826        let session_lock = self.get_session(session_id).await?;
827
828        // Check if session is paused
829        {
830            let session = session_lock.read().await;
831            if session.state == SessionState::Paused {
832                anyhow::bail!(
833                    "Session {} is paused. Call Resume before generating.",
834                    session_id
835                );
836            }
837        }
838
839        // Get session state and LLM client
840        let (
841            history,
842            system,
843            tools,
844            session_llm_client,
845            permission_checker,
846            confirmation_manager,
847            context_providers,
848            session_workspace,
849            tool_metrics,
850            hook_engine,
851            planning_enabled,
852            goal_tracking,
853        ) = {
854            let session = session_lock.read().await;
855            (
856                session.messages.clone(),
857                session.system().map(String::from),
858                session.tools.clone(),
859                session.llm_client.clone(),
860                session.permission_checker.clone(),
861                session.confirmation_manager.clone(),
862                session.context_providers.clone(),
863                session.config.workspace.clone(),
864                session.tool_metrics.clone(),
865                session.config.hook_engine.clone(),
866                session.config.planning_enabled,
867                session.config.goal_tracking,
868            )
869        };
870
871        // Use session's LLM client if configured, otherwise use default
872        let llm_client = if let Some(client) = session_llm_client {
873            client
874        } else if let Some(client) = self.llm_client.read().await.clone() {
875            client
876        } else {
877            anyhow::bail!(
878                "LLM client not configured for session {}. Please call Configure RPC with model configuration first.",
879                session_id
880            );
881        };
882
883        // Construct per-session ToolContext from session workspace, falling back to server default
884        let tool_context = if session_workspace.is_empty() {
885            crate::tools::ToolContext::new(self.tool_executor.workspace().clone())
886                .with_session_id(session_id)
887        } else {
888            crate::tools::ToolContext::new(std::path::PathBuf::from(&session_workspace))
889                .with_session_id(session_id)
890        };
891        let tool_context = if let Some(registry) = self.document_parser_registry().await {
892            tool_context.with_document_parsers(registry)
893        } else {
894            tool_context
895        };
896
897        // Inject skill registry into system prompt and agent config
898        let skill_registry = match self.session_skill_registry(session_id).await {
899            Some(registry) => Some(registry),
900            None => self.skill_registry.read().await.clone(),
901        };
902        let system = if let Some(ref registry) = skill_registry {
903            let skill_prompt = registry.to_system_prompt();
904            if skill_prompt.is_empty() {
905                system
906            } else {
907                match system {
908                    Some(existing) => Some(format!("{}\n\n{}", existing, skill_prompt)),
909                    None => Some(skill_prompt),
910                }
911            }
912        } else {
913            system
914        };
915
916        // Inject matched skill content on-demand before skill_registry is moved into AgentConfig
917        let effective_prompt = if let Some(ref registry) = skill_registry {
918            let skill_content = registry.match_skills(prompt);
919            if skill_content.is_empty() {
920                prompt.to_string()
921            } else {
922                format!("{}\n\n---\n\n{}", skill_content, prompt)
923            }
924        } else {
925            prompt.to_string()
926        };
927
928        // Build AgentMemory from shared store (if configured)
929        let memory = self
930            .memory_store
931            .read()
932            .await
933            .as_ref()
934            .map(|store| Arc::new(AgentMemory::new(store.clone())));
935
936        // Create agent loop with permission policy, confirmation manager, and context providers
937        let config = AgentConfig {
938            prompt_slots: match system {
939                Some(s) => SystemPromptSlots::from_legacy(s),
940                None => SystemPromptSlots::default(),
941            },
942            tools,
943            max_tool_rounds: 50,
944            permission_checker: Some(permission_checker),
945            confirmation_manager: Some(confirmation_manager),
946            context_providers,
947            planning_enabled,
948            goal_tracking,
949            hook_engine,
950            skill_registry,
951            memory,
952            ..AgentConfig::default()
953        };
954
955        let agent = AgentLoop::new(llm_client, self.tool_executor.clone(), tool_context, config)
956            .with_tool_metrics(tool_metrics);
957
958        // Execute with streaming
959        let (rx, handle, cancel_token) =
960            agent.execute_streaming(&history, &effective_prompt).await?;
961
962        // Store the cancellation token for cancellation support
963        let cancel_token_clone = cancel_token.clone();
964        {
965            let mut ops = self.ongoing_operations.write().await;
966            ops.insert(session_id.to_string(), cancel_token);
967        }
968
969        // Spawn task to update session after completion
970        let session_lock_clone = session_lock.clone();
971        let original_handle = handle;
972        let stores = self.stores.clone();
973        let session_storage_types = self.session_storage_types.clone();
974        let llm_configs = self.llm_configs.clone();
975        let session_id_owned = session_id.to_string();
976        let ongoing_operations = self.ongoing_operations.clone();
977        let session_manager = self.clone();
978
979        let wrapped_handle = tokio::spawn(async move {
980            let result = original_handle.await??;
981
982            // Remove from ongoing operations
983            {
984                let mut ops = ongoing_operations.write().await;
985                ops.remove(&session_id_owned);
986            }
987
988            // Update session
989            {
990                let mut session = session_lock_clone.write().await;
991                session.messages = result.messages.clone();
992                session.update_usage(&result.usage);
993            }
994
995            // Persist to store
996            let storage_type = {
997                let storage_types = session_storage_types.read().await;
998                storage_types.get(&session_id_owned).cloned()
999            };
1000
1001            if let Some(storage_type) = storage_type {
1002                if storage_type != crate::config::StorageBackend::Memory {
1003                    let stores_guard = stores.read().await;
1004                    if let Some(store) = stores_guard.get(&storage_type) {
1005                        let session = session_lock_clone.read().await;
1006                        let llm_config = {
1007                            let configs = llm_configs.read().await;
1008                            configs.get(&session_id_owned).cloned()
1009                        };
1010                        let data = session.to_session_data(llm_config);
1011                        if let Err(e) = store.save(&data).await {
1012                            tracing::warn!(
1013                                "Failed to persist session {} after streaming: {}",
1014                                session_id_owned,
1015                                e
1016                            );
1017                        }
1018                    }
1019                }
1020            }
1021
1022            // Auto-compact if context usage exceeds threshold
1023            if let Err(e) = session_manager.maybe_auto_compact(&session_id_owned).await {
1024                tracing::warn!(
1025                    "Auto-compact failed for session {}: {}",
1026                    session_id_owned,
1027                    e
1028                );
1029            }
1030
1031            Ok(result)
1032        });
1033
1034        Ok((rx, wrapped_handle, cancel_token_clone))
1035    }
1036
1037    /// Get context usage for a session
1038    pub async fn context_usage(&self, session_id: &str) -> Result<ContextUsage> {
1039        let session_lock = self.get_session(session_id).await?;
1040        let session = session_lock.read().await;
1041        Ok(session.context_usage.clone())
1042    }
1043
1044    /// Get conversation history for a session
1045    pub async fn history(&self, session_id: &str) -> Result<Vec<Message>> {
1046        let session_lock = self.get_session(session_id).await?;
1047        let session = session_lock.read().await;
1048        Ok(session.messages.clone())
1049    }
1050
1051    /// Clear session history
1052    pub async fn clear(&self, session_id: &str) -> Result<()> {
1053        {
1054            let session_lock = self.get_session(session_id).await?;
1055            let mut session = session_lock.write().await;
1056            session.clear();
1057        }
1058
1059        // Persist to store
1060        self.persist_in_background(session_id, "clear");
1061
1062        Ok(())
1063    }
1064
1065    /// Compact session context
1066    pub async fn compact(&self, session_id: &str) -> Result<()> {
1067        tracing::info!(name: "a3s.session.compact", session_id = %session_id, "Compacting session context");
1068
1069        {
1070            let session_lock = self.get_session(session_id).await?;
1071            let mut session = session_lock.write().await;
1072
1073            // Get LLM client for compaction (if available)
1074            let llm_client = if let Some(client) = &session.llm_client {
1075                client.clone()
1076            } else if let Some(client) = self.llm_client.read().await.clone() {
1077                client
1078            } else {
1079                // If no LLM client available, just do simple truncation
1080                tracing::warn!("No LLM client configured for compaction, using simple truncation");
1081                let keep_messages = 20;
1082                if session.messages.len() > keep_messages {
1083                    let len = session.messages.len();
1084                    session.messages = session.messages.split_off(len - keep_messages);
1085                }
1086                // Persist after truncation
1087                drop(session);
1088                self.persist_in_background(session_id, "compact");
1089                return Ok(());
1090            };
1091
1092            session.compact(&llm_client).await?;
1093        }
1094
1095        // Persist to store
1096        self.persist_in_background(session_id, "compact");
1097
1098        Ok(())
1099    }
1100
1101    /// Check if auto-compaction should be triggered and perform it if needed.
1102    ///
1103    /// Called after `generate()` / `generate_streaming()` updates session usage.
1104    /// Triggers compaction when:
1105    /// - `auto_compact` is enabled in session config
1106    /// - `context_usage.percent` exceeds `auto_compact_threshold`
1107    pub async fn maybe_auto_compact(&self, session_id: &str) -> Result<bool> {
1108        let (should_compact, percent_before, messages_before) = {
1109            let session_lock = self.get_session(session_id).await?;
1110            let session = session_lock.read().await;
1111
1112            if !session.config.auto_compact {
1113                return Ok(false);
1114            }
1115
1116            let threshold = session.config.auto_compact_threshold;
1117            let percent = session.context_usage.percent;
1118            let msg_count = session.messages.len();
1119
1120            tracing::debug!(
1121                "Auto-compact check for session {}: percent={:.2}%, threshold={:.2}%, messages={}",
1122                session_id,
1123                percent * 100.0,
1124                threshold * 100.0,
1125                msg_count,
1126            );
1127
1128            (percent >= threshold, percent, msg_count)
1129        };
1130
1131        if !should_compact {
1132            return Ok(false);
1133        }
1134
1135        tracing::info!(
1136            name: "a3s.session.auto_compact",
1137            session_id = %session_id,
1138            percent_before = %format!("{:.1}%", percent_before * 100.0),
1139            messages_before = %messages_before,
1140            "Auto-compacting session due to high context usage"
1141        );
1142
1143        // Perform compaction (reuses existing compact logic)
1144        self.compact(session_id).await?;
1145
1146        // Get post-compaction message count
1147        let messages_after = {
1148            let session_lock = self.get_session(session_id).await?;
1149            let session = session_lock.read().await;
1150            session.messages.len()
1151        };
1152
1153        // Broadcast event to notify clients
1154        let event = AgentEvent::ContextCompacted {
1155            session_id: session_id.to_string(),
1156            before_messages: messages_before,
1157            after_messages: messages_after,
1158            percent_before,
1159        };
1160
1161        // Try to send via session's event broadcaster
1162        if let Ok(session_lock) = self.get_session(session_id).await {
1163            let session = session_lock.read().await;
1164            let _ = session.event_tx.send(event);
1165        }
1166
1167        tracing::info!(
1168            name: "a3s.session.auto_compact.done",
1169            session_id = %session_id,
1170            messages_before = %messages_before,
1171            messages_after = %messages_after,
1172            "Auto-compaction complete"
1173        );
1174
1175        Ok(true)
1176    }
1177
1178    /// Resolve the LLM client for a session (session-level -> default fallback)
1179    ///
1180    /// Returns `None` if no LLM client is configured at either level.
1181    pub async fn get_llm_for_session(
1182        &self,
1183        session_id: &str,
1184    ) -> Result<Option<Arc<dyn LlmClient>>> {
1185        let session_lock = self.get_session(session_id).await?;
1186        let session = session_lock.read().await;
1187
1188        if let Some(client) = &session.llm_client {
1189            return Ok(Some(client.clone()));
1190        }
1191
1192        Ok(self.llm_client.read().await.clone())
1193    }
1194
1195    /// Ask an ephemeral side question for an existing managed session.
1196    ///
1197    /// Uses a read-only snapshot of the session history plus current queue /
1198    /// confirmation / task state. Optional `runtime_context` lets hosts inject
1199    /// extra transient context such as browser-only UI state.
1200    pub async fn btw_with_context(
1201        &self,
1202        session_id: &str,
1203        question: &str,
1204        runtime_context: Option<&str>,
1205    ) -> Result<crate::agent_api::BtwResult> {
1206        let question = question.trim();
1207        if question.is_empty() {
1208            anyhow::bail!("btw: question cannot be empty");
1209        }
1210
1211        let session_lock = self.get_session(session_id).await?;
1212        let (history_snapshot, session_runtime) = {
1213            let session = session_lock.read().await;
1214            let mut sections = Vec::new();
1215
1216            let pending_confirmations = session.confirmation_manager.pending_confirmations().await;
1217            if !pending_confirmations.is_empty() {
1218                let mut lines = pending_confirmations
1219                    .into_iter()
1220                    .map(|pending| {
1221                        let arg_summary = Self::compact_json_value(&pending.args);
1222                        if arg_summary.is_empty() {
1223                            format!(
1224                                "- {} [{}] remaining={}ms",
1225                                pending.tool_name, pending.tool_id, pending.remaining_ms
1226                            )
1227                        } else {
1228                            format!(
1229                                "- {} [{}] remaining={}ms {}",
1230                                pending.tool_name,
1231                                pending.tool_id,
1232                                pending.remaining_ms,
1233                                arg_summary
1234                            )
1235                        }
1236                    })
1237                    .collect::<Vec<_>>();
1238                lines.sort();
1239                sections.push(format!("[pending confirmations]\n{}", lines.join("\n")));
1240            }
1241
1242            let stats = session.command_queue.stats().await;
1243            if stats.total_active > 0 || stats.total_pending > 0 || stats.external_pending > 0 {
1244                let mut lines = vec![format!(
1245                    "active={}, pending={}, external_pending={}",
1246                    stats.total_active, stats.total_pending, stats.external_pending
1247                )];
1248                let mut lanes = stats
1249                    .lanes
1250                    .into_values()
1251                    .filter(|lane| lane.active > 0 || lane.pending > 0)
1252                    .map(|lane| {
1253                        format!(
1254                            "- {:?}: active={}, pending={}, handler={:?}",
1255                            lane.lane, lane.active, lane.pending, lane.handler_mode
1256                        )
1257                    })
1258                    .collect::<Vec<_>>();
1259                lanes.sort();
1260                lines.extend(lanes);
1261                sections.push(format!("[session queue]\n{}", lines.join("\n")));
1262            }
1263
1264            let external_tasks = session.command_queue.pending_external_tasks().await;
1265            if !external_tasks.is_empty() {
1266                let mut lines = external_tasks
1267                    .into_iter()
1268                    .take(6)
1269                    .map(|task| {
1270                        let payload_summary = Self::compact_json_value(&task.payload);
1271                        if payload_summary.is_empty() {
1272                            format!(
1273                                "- {} {:?} remaining={}ms",
1274                                task.command_type,
1275                                task.lane,
1276                                task.remaining_ms()
1277                            )
1278                        } else {
1279                            format!(
1280                                "- {} {:?} remaining={}ms {}",
1281                                task.command_type,
1282                                task.lane,
1283                                task.remaining_ms(),
1284                                payload_summary
1285                            )
1286                        }
1287                    })
1288                    .collect::<Vec<_>>();
1289                lines.sort();
1290                sections.push(format!("[pending external tasks]\n{}", lines.join("\n")));
1291            }
1292
1293            let active_tasks = session
1294                .tasks
1295                .iter()
1296                .filter(|task| task.status.is_active())
1297                .take(6)
1298                .map(|task| match &task.tool {
1299                    Some(tool) if !tool.is_empty() => {
1300                        format!("- [{}] {} ({})", task.status, task.content, tool)
1301                    }
1302                    _ => format!("- [{}] {}", task.status, task.content),
1303                })
1304                .collect::<Vec<_>>();
1305            if !active_tasks.is_empty() {
1306                sections.push(format!("[tracked tasks]\n{}", active_tasks.join("\n")));
1307            }
1308
1309            sections.push(format!(
1310                "[session state]\n{}",
1311                match session.state {
1312                    SessionState::Active => "active",
1313                    SessionState::Paused => "paused",
1314                    SessionState::Completed => "completed",
1315                    SessionState::Error => "error",
1316                    SessionState::Unknown => "unknown",
1317                }
1318            ));
1319
1320            (session.messages.clone(), sections.join("\n\n"))
1321        };
1322
1323        let llm_client = self
1324            .get_llm_for_session(session_id)
1325            .await?
1326            .context("btw failed: no model configured for session")?;
1327
1328        let mut messages = history_snapshot;
1329        let mut injected_sections = Vec::new();
1330        if !session_runtime.is_empty() {
1331            injected_sections.push(format!("[session runtime context]\n{}", session_runtime));
1332        }
1333        if let Some(extra) = runtime_context.map(str::trim).filter(|ctx| !ctx.is_empty()) {
1334            injected_sections.push(format!("[host runtime context]\n{}", extra));
1335        }
1336        if !injected_sections.is_empty() {
1337            let injected_context = format!(
1338                "Use the following runtime context only as background for the next side question. Do not treat it as a new user request.\n\n{}",
1339                injected_sections.join("\n\n")
1340            );
1341            messages.push(Message::user(&injected_context));
1342        }
1343        messages.push(Message::user(question));
1344
1345        let response = llm_client
1346            .complete(&messages, Some(crate::prompts::BTW_SYSTEM), &[])
1347            .await
1348            .map_err(|e| anyhow::anyhow!("btw: ephemeral LLM call failed: {e}"))?;
1349
1350        Ok(crate::agent_api::BtwResult {
1351            question: question.to_string(),
1352            answer: response.text(),
1353            usage: response.usage,
1354        })
1355    }
1356
1357    /// Configure session
1358    pub async fn configure(
1359        &self,
1360        session_id: &str,
1361        thinking: Option<bool>,
1362        budget: Option<usize>,
1363        model_config: Option<LlmConfig>,
1364    ) -> Result<()> {
1365        {
1366            let session_lock = self.get_session(session_id).await?;
1367            let mut session = session_lock.write().await;
1368
1369            if let Some(t) = thinking {
1370                session.thinking_enabled = t;
1371            }
1372            if let Some(b) = budget {
1373                session.thinking_budget = Some(b);
1374            }
1375            if let Some(ref config) = model_config {
1376                tracing::info!(
1377                    "Configuring session {} with LLM: provider={}, model={}",
1378                    session_id,
1379                    config.provider,
1380                    config.model
1381                );
1382                session.model_name = Some(config.model.clone());
1383                session.llm_client = Some(llm::create_client_with_config(config.clone()));
1384            }
1385        }
1386
1387        // Store LLM config for persistence (without API key)
1388        if let Some(config) = model_config {
1389            let llm_config_data = LlmConfigData {
1390                provider: config.provider,
1391                model: config.model,
1392                api_key: None, // Don't persist API key
1393                base_url: config.base_url,
1394            };
1395            let mut configs = self.llm_configs.write().await;
1396            configs.insert(session_id.to_string(), llm_config_data);
1397        }
1398
1399        // Persist to store
1400        self.persist_in_background(session_id, "configure");
1401
1402        Ok(())
1403    }
1404
1405    /// Update a session's system prompt in place.
1406    pub async fn set_system_prompt(
1407        &self,
1408        session_id: &str,
1409        system_prompt: Option<String>,
1410    ) -> Result<()> {
1411        {
1412            let session_lock = self.get_session(session_id).await?;
1413            let mut session = session_lock.write().await;
1414            session.config.system_prompt = system_prompt;
1415        }
1416
1417        self.persist_in_background(session_id, "set_system_prompt");
1418        Ok(())
1419    }
1420
1421    /// Get session count
1422    pub async fn session_count(&self) -> usize {
1423        let sessions = self.sessions.read().await;
1424        sessions.len()
1425    }
1426
1427    /// Check health of all registered stores
1428    pub async fn store_health(&self) -> Vec<(String, Result<()>)> {
1429        let stores = self.stores.read().await;
1430        let mut results = Vec::new();
1431        for (_, store) in stores.iter() {
1432            let name = store.backend_name().to_string();
1433            let result = store.health_check().await;
1434            results.push((name, result));
1435        }
1436        results
1437    }
1438
1439    /// List all loaded tools (built-in tools)
1440    pub fn list_tools(&self) -> Vec<crate::llm::ToolDefinition> {
1441        self.tool_executor.definitions()
1442    }
1443
1444    /// Pause a session
1445    pub async fn pause_session(&self, session_id: &str) -> Result<bool> {
1446        let paused = {
1447            let session_lock = self.get_session(session_id).await?;
1448            let mut session = session_lock.write().await;
1449            session.pause()
1450        };
1451
1452        if paused {
1453            self.persist_in_background(session_id, "pause");
1454        }
1455
1456        Ok(paused)
1457    }
1458
1459    /// Resume a session
1460    pub async fn resume_session(&self, session_id: &str) -> Result<bool> {
1461        let resumed = {
1462            let session_lock = self.get_session(session_id).await?;
1463            let mut session = session_lock.write().await;
1464            session.resume()
1465        };
1466
1467        if resumed {
1468            self.persist_in_background(session_id, "resume");
1469        }
1470
1471        Ok(resumed)
1472    }
1473
1474    /// Cancel an ongoing operation for a session
1475    ///
1476    /// Returns true if an operation was cancelled, false if no operation was running.
1477    pub async fn cancel_operation(&self, session_id: &str) -> Result<bool> {
1478        // First, cancel any pending HITL confirmations
1479        let session_lock = self.get_session(session_id).await?;
1480        let cancelled_confirmations = {
1481            let session = session_lock.read().await;
1482            session.confirmation_manager.cancel_all().await
1483        };
1484
1485        if cancelled_confirmations > 0 {
1486            tracing::info!(
1487                "Cancelled {} pending confirmations for session {}",
1488                cancelled_confirmations,
1489                session_id
1490            );
1491        }
1492
1493        // Then, cancel the ongoing operation if any
1494        let cancel_token = {
1495            let mut ops = self.ongoing_operations.write().await;
1496            ops.remove(session_id)
1497        };
1498
1499        if let Some(token) = cancel_token {
1500            token.cancel();
1501            tracing::info!("Cancelled ongoing operation for session {}", session_id);
1502            Ok(true)
1503        } else if cancelled_confirmations > 0 {
1504            // We cancelled confirmations but no main operation
1505            Ok(true)
1506        } else {
1507            tracing::debug!("No ongoing operation to cancel for session {}", session_id);
1508            Ok(false)
1509        }
1510    }
1511
1512    /// Get all sessions (returns session locks for iteration)
1513    pub async fn get_all_sessions(&self) -> Vec<Arc<RwLock<Session>>> {
1514        let sessions = self.sessions.read().await;
1515        sessions.values().cloned().collect()
1516    }
1517
1518    /// Get tool executor reference
1519    pub fn tool_executor(&self) -> &Arc<ToolExecutor> {
1520        &self.tool_executor
1521    }
1522
1523    /// Confirm a tool execution (HITL)
1524    pub async fn confirm_tool(
1525        &self,
1526        session_id: &str,
1527        tool_id: &str,
1528        approved: bool,
1529        reason: Option<String>,
1530    ) -> Result<bool> {
1531        let session_lock = self.get_session(session_id).await?;
1532        let session = session_lock.read().await;
1533        session
1534            .confirmation_manager
1535            .confirm(tool_id, approved, reason)
1536            .await
1537            .map_err(|e| anyhow::anyhow!(e))
1538    }
1539
1540    /// Set confirmation policy for a session (HITL)
1541    pub async fn set_confirmation_policy(
1542        &self,
1543        session_id: &str,
1544        policy: ConfirmationPolicy,
1545    ) -> Result<ConfirmationPolicy> {
1546        {
1547            let session_lock = self.get_session(session_id).await?;
1548            let session = session_lock.read().await;
1549            session.set_confirmation_policy(policy.clone()).await;
1550        }
1551
1552        // Update config for persistence
1553        {
1554            let session_lock = self.get_session(session_id).await?;
1555            let mut session = session_lock.write().await;
1556            session.config.confirmation_policy = Some(policy.clone());
1557        }
1558
1559        // Persist to store
1560        self.persist_in_background(session_id, "set_confirmation_policy");
1561
1562        Ok(policy)
1563    }
1564
1565    /// Set permission policy for a session.
1566    pub async fn set_permission_policy(
1567        &self,
1568        session_id: &str,
1569        policy: PermissionPolicy,
1570    ) -> Result<PermissionPolicy> {
1571        {
1572            let session_lock = self.get_session(session_id).await?;
1573            let mut session = session_lock.write().await;
1574            session.set_permission_policy(policy.clone());
1575        }
1576
1577        self.persist_in_background(session_id, "set_permission_policy");
1578
1579        Ok(policy)
1580    }
1581
1582    /// Get confirmation policy for a session (HITL)
1583    pub async fn get_confirmation_policy(&self, session_id: &str) -> Result<ConfirmationPolicy> {
1584        let session_lock = self.get_session(session_id).await?;
1585        let session = session_lock.read().await;
1586        Ok(session.confirmation_policy().await)
1587    }
1588}