Skip to main content

a3s_code_core/session/
manager.rs

1//! SessionManager — manages multiple concurrent sessions
2
3use super::{ContextUsage, Session, SessionConfig, SessionState};
4use crate::agent::{AgentConfig, AgentEvent, AgentLoop, AgentResult};
5use crate::hitl::ConfirmationPolicy;
6use crate::llm::{self, LlmClient, LlmConfig, Message};
7use crate::mcp::McpManager;
8use crate::memory::AgentMemory;
9use crate::permissions::PermissionPolicy;
10use crate::prompts::SystemPromptSlots;
11use crate::skills::SkillRegistry;
12use crate::store::{FileSessionStore, LlmConfigData, SessionData, SessionStore};
13use crate::text::truncate_utf8;
14use crate::tools::ToolExecutor;
15use crate::PlanningMode;
16use a3s_memory::MemoryStore;
17use anyhow::{Context, Result};
18use std::collections::HashMap;
19use std::sync::Arc;
20use tokio::sync::{mpsc, RwLock};
21
22/// Session manager handles multiple concurrent sessions
23#[derive(Clone)]
24pub struct SessionManager {
25    pub(crate) sessions: Arc<RwLock<HashMap<String, Arc<RwLock<Session>>>>>,
26    /// Default LLM client used when a session has no per-session client configured.
27    /// Wrapped in RwLock so it can be hot-swapped at runtime without restart.
28    pub(crate) llm_client: Arc<RwLock<Option<Arc<dyn LlmClient>>>>,
29    pub(crate) tool_executor: Arc<ToolExecutor>,
30    /// Session stores by storage type
31    pub(crate) stores: Arc<RwLock<HashMap<crate::config::StorageBackend, Arc<dyn SessionStore>>>>,
32    /// Track which storage type each session uses
33    pub(crate) session_storage_types: Arc<RwLock<HashMap<String, crate::config::StorageBackend>>>,
34    /// LLM configurations for sessions (stored separately for persistence)
35    pub(crate) llm_configs: Arc<RwLock<HashMap<String, LlmConfigData>>>,
36    /// Ongoing operations (session_id -> CancellationToken)
37    pub(crate) ongoing_operations:
38        Arc<RwLock<HashMap<String, tokio_util::sync::CancellationToken>>>,
39    /// Skill registry for runtime skill management
40    pub(crate) skill_registry: Arc<RwLock<Option<Arc<SkillRegistry>>>>,
41    /// Per-session skill registries layered on top of the shared registry.
42    pub(crate) session_skill_registries: Arc<RwLock<HashMap<String, Arc<SkillRegistry>>>>,
43    /// Shared memory store for agent long-term memory.
44    /// When set, each `generate`/`generate_streaming` call wraps this in
45    /// `AgentMemory` and injects it into `AgentConfig.memory`.
46    pub(crate) memory_store: Arc<RwLock<Option<Arc<dyn MemoryStore>>>>,
47    /// Shared MCP manager. When set, MCP tools are registered into the
48    /// `ToolExecutor` at startup so all sessions can use them.
49    pub(crate) mcp_manager: Arc<RwLock<Option<Arc<McpManager>>>>,
50}
51
52impl SessionManager {
53    fn compact_json_value(value: &serde_json::Value) -> String {
54        let raw = match value {
55            serde_json::Value::Null => String::new(),
56            serde_json::Value::String(s) => s.clone(),
57            _ => serde_json::to_string(value).unwrap_or_default(),
58        };
59        let compact = raw.split_whitespace().collect::<Vec<_>>().join(" ");
60        if compact.len() > 180 {
61            format!("{}...", truncate_utf8(&compact, 180))
62        } else {
63            compact
64        }
65    }
66
67    /// Create a new session manager without persistence
68    pub fn new(llm_client: Option<Arc<dyn LlmClient>>, tool_executor: Arc<ToolExecutor>) -> Self {
69        Self {
70            sessions: Arc::new(RwLock::new(HashMap::new())),
71            llm_client: Arc::new(RwLock::new(llm_client)),
72            tool_executor,
73            stores: Arc::new(RwLock::new(HashMap::new())),
74            session_storage_types: Arc::new(RwLock::new(HashMap::new())),
75            llm_configs: Arc::new(RwLock::new(HashMap::new())),
76            ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
77            skill_registry: Arc::new(RwLock::new(None)),
78            session_skill_registries: Arc::new(RwLock::new(HashMap::new())),
79            memory_store: Arc::new(RwLock::new(None)),
80            mcp_manager: Arc::new(RwLock::new(None)),
81        }
82    }
83
84    /// Create a session manager with file-based persistence
85    ///
86    /// Sessions will be automatically saved to disk and restored on startup.
87    pub async fn with_persistence<P: AsRef<std::path::Path>>(
88        llm_client: Option<Arc<dyn LlmClient>>,
89        tool_executor: Arc<ToolExecutor>,
90        sessions_dir: P,
91    ) -> Result<Self> {
92        let store = FileSessionStore::new(sessions_dir).await?;
93        let mut stores = HashMap::new();
94        stores.insert(
95            crate::config::StorageBackend::File,
96            Arc::new(store) as Arc<dyn SessionStore>,
97        );
98
99        let manager = Self {
100            sessions: Arc::new(RwLock::new(HashMap::new())),
101            llm_client: Arc::new(RwLock::new(llm_client)),
102            tool_executor,
103            stores: Arc::new(RwLock::new(stores)),
104            session_storage_types: Arc::new(RwLock::new(HashMap::new())),
105            llm_configs: Arc::new(RwLock::new(HashMap::new())),
106            ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
107            skill_registry: Arc::new(RwLock::new(None)),
108            session_skill_registries: Arc::new(RwLock::new(HashMap::new())),
109            memory_store: Arc::new(RwLock::new(None)),
110            mcp_manager: Arc::new(RwLock::new(None)),
111        };
112
113        Ok(manager)
114    }
115
116    /// Create a session manager with a custom store
117    ///
118    /// The `backend` parameter determines which `StorageBackend` key the store is registered under.
119    /// Sessions created with a matching `storage_type` will use this store.
120    pub fn with_store(
121        llm_client: Option<Arc<dyn LlmClient>>,
122        tool_executor: Arc<ToolExecutor>,
123        store: Arc<dyn SessionStore>,
124        backend: crate::config::StorageBackend,
125    ) -> Self {
126        let mut stores = HashMap::new();
127        stores.insert(backend, store);
128
129        Self {
130            sessions: Arc::new(RwLock::new(HashMap::new())),
131            llm_client: Arc::new(RwLock::new(llm_client)),
132            tool_executor,
133            stores: Arc::new(RwLock::new(stores)),
134            session_storage_types: Arc::new(RwLock::new(HashMap::new())),
135            llm_configs: Arc::new(RwLock::new(HashMap::new())),
136            ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
137            skill_registry: Arc::new(RwLock::new(None)),
138            session_skill_registries: Arc::new(RwLock::new(HashMap::new())),
139            memory_store: Arc::new(RwLock::new(None)),
140            mcp_manager: Arc::new(RwLock::new(None)),
141        }
142    }
143
144    /// Hot-swap the default LLM client used by sessions without a per-session client.
145    ///
146    /// Called by SafeClaw when `PUT /api/agent/config` updates the model config.
147    /// All subsequent `generate` / `generate_streaming` calls will use the new client.
148    pub async fn set_default_llm(&self, client: Option<Arc<dyn LlmClient>>) {
149        *self.llm_client.write().await = client;
150    }
151
152    /// Set the skill registry for runtime skill management.
153    ///
154    /// Skills registered here will be injected into the system prompt
155    /// for all subsequent `generate` / `generate_streaming` calls.
156    /// Also registers the `manage_skill` tool on the `ToolExecutor` so
157    /// the Agent can create, list, and remove skills at runtime.
158    pub async fn set_skill_registry(
159        &self,
160        registry: Arc<SkillRegistry>,
161        skills_dir: std::path::PathBuf,
162    ) {
163        // Register the manage_skill tool for self-bootstrap
164        let manage_tool = crate::skills::ManageSkillTool::new(registry.clone(), skills_dir);
165        self.tool_executor
166            .register_dynamic_tool(Arc::new(manage_tool));
167
168        *self.skill_registry.write().await = Some(registry);
169    }
170
171    /// Get the current skill registry, if any.
172    pub async fn skill_registry(&self) -> Option<Arc<SkillRegistry>> {
173        self.skill_registry.read().await.clone()
174    }
175
176    /// Override the active skill registry for a single session.
177    pub async fn set_session_skill_registry(
178        &self,
179        session_id: impl Into<String>,
180        registry: Arc<SkillRegistry>,
181    ) {
182        self.session_skill_registries
183            .write()
184            .await
185            .insert(session_id.into(), registry);
186    }
187
188    /// Get the active skill registry for a single session, if one was set.
189    pub async fn session_skill_registry(&self, session_id: &str) -> Option<Arc<SkillRegistry>> {
190        self.session_skill_registries
191            .read()
192            .await
193            .get(session_id)
194            .cloned()
195    }
196
197    /// Set the shared memory store for agent long-term memory.
198    ///
199    /// When set, every `generate`/`generate_streaming` call wraps this store
200    /// in an `AgentMemory` and injects it into `AgentConfig.memory`, enabling
201    /// automatic `recall_similar` before prompts and `remember_success`/
202    /// `remember_failure` after tool execution.
203    pub async fn set_memory_store(&self, store: Arc<dyn MemoryStore>) {
204        *self.memory_store.write().await = Some(store);
205    }
206
207    /// Get the current memory store, if any.
208    pub async fn memory_store(&self) -> Option<Arc<dyn MemoryStore>> {
209        self.memory_store.read().await.clone()
210    }
211
212    /// Set the shared MCP manager.
213    ///
214    /// When set, all tools from connected MCP servers are registered into the
215    /// `ToolExecutor` so they are available in every session.
216    pub async fn set_mcp_manager(&self, manager: Arc<McpManager>) {
217        let all_tools = manager.get_all_tools().await;
218        let mut by_server: HashMap<String, Vec<crate::mcp::McpTool>> = HashMap::new();
219        for (server, tool) in all_tools {
220            by_server.entry(server).or_default().push(tool);
221        }
222        for (server_name, tools) in by_server {
223            for tool in
224                crate::mcp::tools::create_mcp_tools(&server_name, tools, Arc::clone(&manager))
225            {
226                self.tool_executor.register_dynamic_tool(tool);
227            }
228        }
229        *self.mcp_manager.write().await = Some(manager);
230    }
231
232    /// Dynamically add and connect an MCP server to the shared manager.
233    ///
234    /// Registers the server config, connects it, and registers its tools into
235    /// the `ToolExecutor` so all subsequent sessions can use them.
236    pub async fn add_mcp_server(&self, config: crate::mcp::McpServerConfig) -> Result<()> {
237        let manager = {
238            let guard = self.mcp_manager.read().await;
239            match guard.clone() {
240                Some(m) => m,
241                None => {
242                    drop(guard);
243                    let m = Arc::new(McpManager::new());
244                    *self.mcp_manager.write().await = Some(Arc::clone(&m));
245                    m
246                }
247            }
248        };
249        let name = config.name.clone();
250        manager.register_server(config).await;
251        manager.connect(&name).await?;
252        let tools = manager.get_server_tools(&name).await;
253        for tool in crate::mcp::tools::create_mcp_tools(&name, tools, Arc::clone(&manager)) {
254            self.tool_executor.register_dynamic_tool(tool);
255        }
256        Ok(())
257    }
258
259    /// Disconnect and remove an MCP server from the shared manager.
260    ///
261    /// Also unregisters its tools from the `ToolExecutor`.
262    pub async fn remove_mcp_server(&self, name: &str) -> Result<()> {
263        let guard = self.mcp_manager.read().await;
264        if let Some(ref manager) = *guard {
265            manager.disconnect(name).await?;
266        }
267        self.tool_executor
268            .unregister_tools_by_prefix(&format!("mcp__{name}__"));
269        Ok(())
270    }
271
272    /// Get status of all connected MCP servers.
273    pub async fn mcp_status(
274        &self,
275    ) -> std::collections::HashMap<String, crate::mcp::McpServerStatus> {
276        let guard = self.mcp_manager.read().await;
277        match guard.as_ref() {
278            Some(m) => {
279                let m = Arc::clone(m);
280                drop(guard);
281                m.get_status().await
282            }
283            None => std::collections::HashMap::new(),
284        }
285    }
286
287    /// Restore a single session by ID from the store
288    ///
289    /// Searches all registered stores for the given session ID and restores it
290    /// into the in-memory session map. Returns an error if not found.
291    pub async fn restore_session_by_id(&self, session_id: &str) -> Result<()> {
292        // Check if already loaded
293        {
294            let sessions = self.sessions.read().await;
295            if sessions.contains_key(session_id) {
296                return Ok(());
297            }
298        }
299
300        let stores = self.stores.read().await;
301        for (backend, store) in stores.iter() {
302            match store.load(session_id).await {
303                Ok(Some(data)) => {
304                    {
305                        let mut storage_types = self.session_storage_types.write().await;
306                        storage_types.insert(data.id.clone(), backend.clone());
307                    }
308                    self.restore_session(data).await?;
309                    return Ok(());
310                }
311                Ok(None) => continue,
312                Err(e) => {
313                    tracing::warn!(
314                        "Failed to load session {} from {:?}: {}",
315                        session_id,
316                        backend,
317                        e
318                    );
319                    continue;
320                }
321            }
322        }
323
324        Err(anyhow::anyhow!(
325            "Session {} not found in any store",
326            session_id
327        ))
328    }
329
330    /// Load all sessions from all registered stores
331    pub async fn load_all_sessions(&mut self) -> Result<usize> {
332        let stores = self.stores.read().await;
333        let mut loaded = 0;
334
335        for (backend, store) in stores.iter() {
336            let session_ids = match store.list().await {
337                Ok(ids) => ids,
338                Err(e) => {
339                    tracing::warn!("Failed to list sessions from {:?} store: {}", backend, e);
340                    continue;
341                }
342            };
343
344            for id in session_ids {
345                match store.load(&id).await {
346                    Ok(Some(data)) => {
347                        // Record the storage type for this session
348                        {
349                            let mut storage_types = self.session_storage_types.write().await;
350                            storage_types.insert(data.id.clone(), backend.clone());
351                        }
352
353                        if let Err(e) = self.restore_session(data).await {
354                            tracing::warn!("Failed to restore session {}: {}", id, e);
355                        } else {
356                            loaded += 1;
357                        }
358                    }
359                    Ok(None) => {
360                        tracing::warn!("Session {} not found in store", id);
361                    }
362                    Err(e) => {
363                        tracing::warn!("Failed to load session {}: {}", id, e);
364                    }
365                }
366            }
367        }
368
369        tracing::info!("Loaded {} sessions from store", loaded);
370        Ok(loaded)
371    }
372
373    /// Restore a session from SessionData
374    async fn restore_session(&self, data: SessionData) -> Result<()> {
375        let tools = self.tool_executor.definitions();
376        let mut session = Session::new(data.id.clone(), data.config.clone(), tools).await?;
377
378        // Restore serializable state
379        session.restore_from_data(&data);
380
381        // Restore LLM config if present (without API key - must be reconfigured)
382        if let Some(llm_config) = &data.llm_config {
383            let mut configs = self.llm_configs.write().await;
384            configs.insert(data.id.clone(), llm_config.clone());
385        }
386
387        let mut sessions = self.sessions.write().await;
388        sessions.insert(data.id.clone(), Arc::new(RwLock::new(session)));
389
390        tracing::info!("Restored session: {}", data.id);
391        Ok(())
392    }
393
394    /// Save a session to the store
395    async fn save_session(&self, session_id: &str) -> Result<()> {
396        // Get the storage type for this session
397        let storage_type = {
398            let storage_types = self.session_storage_types.read().await;
399            storage_types.get(session_id).cloned()
400        };
401
402        let Some(storage_type) = storage_type else {
403            // No storage type means memory-only session
404            return Ok(());
405        };
406
407        // Skip saving for memory storage
408        if storage_type == crate::config::StorageBackend::Memory {
409            return Ok(());
410        }
411
412        // Get the appropriate store
413        let stores = self.stores.read().await;
414        let Some(store) = stores.get(&storage_type) else {
415            tracing::warn!("No store available for storage type: {:?}", storage_type);
416            return Ok(());
417        };
418
419        let session_lock = self.get_session(session_id).await?;
420        let session = session_lock.read().await;
421
422        // Get LLM config if set
423        let llm_config = {
424            let configs = self.llm_configs.read().await;
425            configs.get(session_id).cloned()
426        };
427
428        let data = session.to_session_data(llm_config);
429        store.save(&data).await?;
430
431        tracing::debug!("Saved session: {}", session_id);
432        Ok(())
433    }
434
435    /// Persist a session to store, logging and emitting an event on failure.
436    /// This is a non-fatal wrapper around `save_session` — the operation
437    /// succeeds in memory even if persistence fails.
438    async fn persist_or_warn(&self, session_id: &str, operation: &str) {
439        if let Err(e) = self.save_session(session_id).await {
440            tracing::warn!(
441                "Failed to persist session {} after {}: {}",
442                session_id,
443                operation,
444                e
445            );
446            // Emit event so SDK clients can react
447            if let Ok(session_lock) = self.get_session(session_id).await {
448                let session = session_lock.read().await;
449                let _ = session.event_tx().send(AgentEvent::PersistenceFailed {
450                    session_id: session_id.to_string(),
451                    operation: operation.to_string(),
452                    error: e.to_string(),
453                });
454            }
455        }
456    }
457
458    /// Spawn persistence as a background task (non-blocking).
459    ///
460    /// All `SessionManager` fields are `Arc`-wrapped, so `clone()` is a
461    /// cheap pointer-copy. This keeps file I/O off the response path.
462    fn persist_in_background(&self, session_id: &str, operation: &str) {
463        let mgr = self.clone();
464        let sid = session_id.to_string();
465        let op = operation.to_string();
466        tokio::spawn(async move {
467            mgr.persist_or_warn(&sid, &op).await;
468        });
469    }
470
471    /// Create a new session
472    pub async fn create_session(&self, id: String, config: SessionConfig) -> Result<String> {
473        tracing::info!(name: "a3s.session.create", session_id = %id, "Creating session");
474
475        // Record the storage type for this session
476        {
477            let mut storage_types = self.session_storage_types.write().await;
478            storage_types.insert(id.clone(), config.storage_type.clone());
479        }
480
481        // Get tool definitions from the executor
482        let tools = self.tool_executor.definitions();
483        let mut session = Session::new(id.clone(), config, tools).await?;
484
485        // Start the command queue
486        session.start_queue().await?;
487
488        // Set max context length if provided
489        if session.config.max_context_length > 0 {
490            session.context_usage.max_tokens = session.config.max_context_length as usize;
491        }
492
493        {
494            let mut sessions = self.sessions.write().await;
495            sessions.insert(id.clone(), Arc::new(RwLock::new(session)));
496        }
497
498        // Persist to store
499        self.persist_in_background(&id, "create");
500
501        tracing::info!("Created session: {}", id);
502        Ok(id)
503    }
504
505    /// Destroy a session
506    pub async fn destroy_session(&self, id: &str) -> Result<()> {
507        tracing::info!(name: "a3s.session.destroy", session_id = %id, "Destroying session");
508
509        // Get the storage type before removing the session
510        let storage_type = {
511            let storage_types = self.session_storage_types.read().await;
512            storage_types.get(id).cloned()
513        };
514
515        {
516            let mut sessions = self.sessions.write().await;
517            sessions.remove(id);
518        }
519
520        // Remove LLM config
521        {
522            let mut configs = self.llm_configs.write().await;
523            configs.remove(id);
524        }
525
526        {
527            let mut registries = self.session_skill_registries.write().await;
528            registries.remove(id);
529        }
530
531        // Remove storage type tracking
532        {
533            let mut storage_types = self.session_storage_types.write().await;
534            storage_types.remove(id);
535        }
536
537        // Delete from store if applicable
538        if let Some(storage_type) = storage_type {
539            if storage_type != crate::config::StorageBackend::Memory {
540                let stores = self.stores.read().await;
541                if let Some(store) = stores.get(&storage_type) {
542                    if let Err(e) = store.delete(id).await {
543                        tracing::warn!("Failed to delete session {} from store: {}", id, e);
544                    }
545                }
546            }
547        }
548
549        tracing::info!("Destroyed session: {}", id);
550        Ok(())
551    }
552
553    /// Get a session by ID
554    pub async fn get_session(&self, id: &str) -> Result<Arc<RwLock<Session>>> {
555        let sessions = self.sessions.read().await;
556        sessions
557            .get(id)
558            .cloned()
559            .context(format!("Session not found: {}", id))
560    }
561
562    /// Create a child session for a subagent
563    ///
564    /// Child sessions inherit the parent's LLM client but have their own
565    /// permission policy and configuration.
566    pub async fn create_child_session(
567        &self,
568        parent_id: &str,
569        child_id: String,
570        mut config: SessionConfig,
571    ) -> Result<String> {
572        // Verify parent exists and inherit HITL policy
573        let parent_lock = self.get_session(parent_id).await?;
574        let parent_llm_client = {
575            let parent = parent_lock.read().await;
576
577            // Inherit parent's confirmation policy if child doesn't have one
578            if config.confirmation_policy.is_none() {
579                let parent_policy = parent.confirmation_manager.policy().await;
580                config.confirmation_policy = Some(parent_policy);
581            }
582
583            parent.llm_client.clone()
584        };
585
586        // Set parent_id in config
587        config.parent_id = Some(parent_id.to_string());
588
589        // Get tool definitions from the executor
590        let tools = self.tool_executor.definitions();
591        let mut session = Session::new(child_id.clone(), config, tools).await?;
592
593        // Inherit LLM client from parent if not set
594        if session.llm_client.is_none() {
595            let default_llm = self.llm_client.read().await.clone();
596            session.llm_client = parent_llm_client.or(default_llm);
597        }
598
599        // Start the command queue
600        session.start_queue().await?;
601
602        // Set max context length if provided
603        if session.config.max_context_length > 0 {
604            session.context_usage.max_tokens = session.config.max_context_length as usize;
605        }
606
607        {
608            let mut sessions = self.sessions.write().await;
609            sessions.insert(child_id.clone(), Arc::new(RwLock::new(session)));
610        }
611
612        // Persist to store
613        self.persist_in_background(&child_id, "create_child");
614
615        tracing::info!(
616            "Created child session: {} (parent: {})",
617            child_id,
618            parent_id
619        );
620        Ok(child_id)
621    }
622
623    /// Get all child sessions for a parent session
624    pub async fn get_child_sessions(&self, parent_id: &str) -> Vec<String> {
625        let sessions = self.sessions.read().await;
626        let mut children = Vec::new();
627
628        for (id, session_lock) in sessions.iter() {
629            let session = session_lock.read().await;
630            if session.parent_id.as_deref() == Some(parent_id) {
631                children.push(id.clone());
632            }
633        }
634
635        children
636    }
637
638    /// Check if a session is a child session
639    pub async fn is_child_session(&self, session_id: &str) -> Result<bool> {
640        let session_lock = self.get_session(session_id).await?;
641        let session = session_lock.read().await;
642        Ok(session.is_child_session())
643    }
644
645    /// Generate response for a prompt
646    pub async fn generate(&self, session_id: &str, prompt: &str) -> Result<AgentResult> {
647        let session_lock = self.get_session(session_id).await?;
648
649        // Check if session is paused
650        {
651            let session = session_lock.read().await;
652            if session.state == SessionState::Paused {
653                anyhow::bail!(
654                    "Session {} is paused. Call Resume before generating.",
655                    session_id
656                );
657            }
658        }
659
660        // Get session state and LLM client
661        let (
662            history,
663            system,
664            tools,
665            session_llm_client,
666            permission_checker,
667            confirmation_manager,
668            context_providers,
669            session_workspace,
670            tool_metrics,
671            hook_engine,
672            planning_mode,
673            goal_tracking,
674        ) = {
675            let session = session_lock.read().await;
676            (
677                session.messages.clone(),
678                session.system().map(String::from),
679                session.tools.clone(),
680                session.llm_client.clone(),
681                session.permission_checker.clone(),
682                session.confirmation_manager.clone(),
683                session.context_providers.clone(),
684                session.config.workspace.clone(),
685                session.tool_metrics.clone(),
686                session.config.hook_engine.clone(),
687                session.config.planning_mode,
688                session.config.goal_tracking,
689            )
690        };
691
692        // Use session's LLM client if configured, otherwise use default
693        let llm_client = if let Some(client) = session_llm_client {
694            client
695        } else if let Some(client) = self.llm_client.read().await.clone() {
696            client
697        } else {
698            anyhow::bail!(
699                "LLM client not configured for session {}. Please call Configure RPC with model configuration first.",
700                session_id
701            );
702        };
703
704        // Construct per-session ToolContext from session workspace, falling back to server default
705        let tool_context = if session_workspace.is_empty() {
706            crate::tools::ToolContext::new(self.tool_executor.workspace().clone())
707                .with_session_id(session_id)
708        } else {
709            crate::tools::ToolContext::new(std::path::PathBuf::from(&session_workspace))
710                .with_session_id(session_id)
711        };
712        let tool_context = if let Some(command_env) = self.tool_executor.command_env() {
713            tool_context.with_command_env(command_env)
714        } else {
715            tool_context
716        };
717
718        // Inject skill registry into system prompt and agent config
719        let skill_registry = match self.session_skill_registry(session_id).await {
720            Some(registry) => Some(registry),
721            None => self.skill_registry.read().await.clone(),
722        };
723        let system = if let Some(ref registry) = skill_registry {
724            let skill_prompt = registry.to_system_prompt();
725            if skill_prompt.is_empty() {
726                system
727            } else {
728                match system {
729                    Some(existing) => Some(format!("{}\n\n{}", existing, skill_prompt)),
730                    None => Some(skill_prompt),
731                }
732            }
733        } else {
734            system
735        };
736
737        // Inject matched skill content on-demand before skill_registry is moved into AgentConfig
738        let effective_prompt = if let Some(ref registry) = skill_registry {
739            let skill_content = registry.match_skills(prompt);
740            if skill_content.is_empty() {
741                prompt.to_string()
742            } else {
743                format!("{}\n\n---\n\n{}", skill_content, prompt)
744            }
745        } else {
746            prompt.to_string()
747        };
748
749        // Build AgentMemory from shared store (if configured)
750        let memory = self
751            .memory_store
752            .read()
753            .await
754            .as_ref()
755            .map(|store| Arc::new(AgentMemory::new(store.clone())));
756
757        // Create agent loop with permission policy, confirmation manager, and context providers
758        let config = AgentConfig {
759            prompt_slots: match system {
760                Some(s) => SystemPromptSlots::from_legacy(s),
761                None => SystemPromptSlots::default(),
762            },
763            tools,
764            max_tool_rounds: 50,
765            permission_checker: Some(permission_checker),
766            confirmation_manager: Some(confirmation_manager),
767            context_providers,
768            planning_mode,
769            goal_tracking,
770            hook_engine,
771            skill_registry,
772            memory,
773            ..AgentConfig::default()
774        };
775
776        let agent = AgentLoop::new(llm_client, self.tool_executor.clone(), tool_context, config)
777            .with_tool_metrics(tool_metrics);
778
779        // Execute with session context
780        let result = agent
781            .execute_with_session(&history, &effective_prompt, Some(session_id), None, None)
782            .await?;
783
784        // Update session
785        {
786            let mut session = session_lock.write().await;
787            session.messages = result.messages.clone();
788            session.update_usage(&result.usage);
789        }
790
791        // Persist to store
792        self.persist_in_background(session_id, "generate");
793
794        // Auto-compact if context usage exceeds threshold
795        if let Err(e) = self.maybe_auto_compact(session_id).await {
796            tracing::warn!("Auto-compact failed for session {}: {}", session_id, e);
797        }
798
799        Ok(result)
800    }
801
802    /// Generate response with streaming events
803    pub async fn generate_streaming(
804        &self,
805        session_id: &str,
806        prompt: &str,
807    ) -> Result<(
808        mpsc::Receiver<AgentEvent>,
809        tokio::task::JoinHandle<Result<AgentResult>>,
810        tokio_util::sync::CancellationToken,
811    )> {
812        let session_lock = self.get_session(session_id).await?;
813
814        // Check if session is paused
815        {
816            let session = session_lock.read().await;
817            if session.state == SessionState::Paused {
818                anyhow::bail!(
819                    "Session {} is paused. Call Resume before generating.",
820                    session_id
821                );
822            }
823        }
824
825        // Get session state and LLM client
826        let (
827            history,
828            system,
829            tools,
830            session_llm_client,
831            permission_checker,
832            confirmation_manager,
833            context_providers,
834            session_workspace,
835            tool_metrics,
836            hook_engine,
837            planning_mode,
838            goal_tracking,
839        ) = {
840            let session = session_lock.read().await;
841            (
842                session.messages.clone(),
843                session.system().map(String::from),
844                session.tools.clone(),
845                session.llm_client.clone(),
846                session.permission_checker.clone(),
847                session.confirmation_manager.clone(),
848                session.context_providers.clone(),
849                session.config.workspace.clone(),
850                session.tool_metrics.clone(),
851                session.config.hook_engine.clone(),
852                session.config.planning_mode,
853                session.config.goal_tracking,
854            )
855        };
856
857        // Use session's LLM client if configured, otherwise use default
858        let llm_client = if let Some(client) = session_llm_client {
859            client
860        } else if let Some(client) = self.llm_client.read().await.clone() {
861            client
862        } else {
863            anyhow::bail!(
864                "LLM client not configured for session {}. Please call Configure RPC with model configuration first.",
865                session_id
866            );
867        };
868
869        // Construct per-session ToolContext from session workspace, falling back to server default
870        let tool_context = if session_workspace.is_empty() {
871            crate::tools::ToolContext::new(self.tool_executor.workspace().clone())
872                .with_session_id(session_id)
873        } else {
874            crate::tools::ToolContext::new(std::path::PathBuf::from(&session_workspace))
875                .with_session_id(session_id)
876        };
877        let tool_context = if let Some(command_env) = self.tool_executor.command_env() {
878            tool_context.with_command_env(command_env)
879        } else {
880            tool_context
881        };
882
883        // Inject skill registry into system prompt and agent config
884        let skill_registry = match self.session_skill_registry(session_id).await {
885            Some(registry) => Some(registry),
886            None => self.skill_registry.read().await.clone(),
887        };
888        let system = if let Some(ref registry) = skill_registry {
889            let skill_prompt = registry.to_system_prompt();
890            if skill_prompt.is_empty() {
891                system
892            } else {
893                match system {
894                    Some(existing) => Some(format!("{}\n\n{}", existing, skill_prompt)),
895                    None => Some(skill_prompt),
896                }
897            }
898        } else {
899            system
900        };
901
902        // Inject matched skill content on-demand before skill_registry is moved into AgentConfig
903        let effective_prompt = if let Some(ref registry) = skill_registry {
904            let skill_content = registry.match_skills(prompt);
905            if skill_content.is_empty() {
906                prompt.to_string()
907            } else {
908                format!("{}\n\n---\n\n{}", skill_content, prompt)
909            }
910        } else {
911            prompt.to_string()
912        };
913
914        // Build AgentMemory from shared store (if configured)
915        let memory = self
916            .memory_store
917            .read()
918            .await
919            .as_ref()
920            .map(|store| Arc::new(AgentMemory::new(store.clone())));
921
922        // Create agent loop with permission policy, confirmation manager, and context providers
923        let config = AgentConfig {
924            prompt_slots: match system {
925                Some(s) => SystemPromptSlots::from_legacy(s),
926                None => SystemPromptSlots::default(),
927            },
928            tools,
929            max_tool_rounds: 50,
930            permission_checker: Some(permission_checker),
931            confirmation_manager: Some(confirmation_manager),
932            context_providers,
933            planning_mode,
934            goal_tracking,
935            hook_engine,
936            skill_registry,
937            memory,
938            ..AgentConfig::default()
939        };
940
941        let agent = AgentLoop::new(llm_client, self.tool_executor.clone(), tool_context, config)
942            .with_tool_metrics(tool_metrics);
943
944        // Execute with streaming
945        let (rx, handle, cancel_token) =
946            agent.execute_streaming(&history, &effective_prompt).await?;
947
948        // Store the cancellation token for cancellation support
949        let cancel_token_clone = cancel_token.clone();
950        {
951            let mut ops = self.ongoing_operations.write().await;
952            ops.insert(session_id.to_string(), cancel_token);
953        }
954
955        // Spawn task to update session after completion
956        let session_lock_clone = session_lock.clone();
957        let original_handle = handle;
958        let stores = self.stores.clone();
959        let session_storage_types = self.session_storage_types.clone();
960        let llm_configs = self.llm_configs.clone();
961        let session_id_owned = session_id.to_string();
962        let ongoing_operations = self.ongoing_operations.clone();
963        let session_manager = self.clone();
964
965        let wrapped_handle = tokio::spawn(async move {
966            let result = original_handle.await??;
967
968            // Remove from ongoing operations
969            {
970                let mut ops = ongoing_operations.write().await;
971                ops.remove(&session_id_owned);
972            }
973
974            // Update session
975            {
976                let mut session = session_lock_clone.write().await;
977                session.messages = result.messages.clone();
978                session.update_usage(&result.usage);
979            }
980
981            // Persist to store
982            let storage_type = {
983                let storage_types = session_storage_types.read().await;
984                storage_types.get(&session_id_owned).cloned()
985            };
986
987            if let Some(storage_type) = storage_type {
988                if storage_type != crate::config::StorageBackend::Memory {
989                    let stores_guard = stores.read().await;
990                    if let Some(store) = stores_guard.get(&storage_type) {
991                        let session = session_lock_clone.read().await;
992                        let llm_config = {
993                            let configs = llm_configs.read().await;
994                            configs.get(&session_id_owned).cloned()
995                        };
996                        let data = session.to_session_data(llm_config);
997                        if let Err(e) = store.save(&data).await {
998                            tracing::warn!(
999                                "Failed to persist session {} after streaming: {}",
1000                                session_id_owned,
1001                                e
1002                            );
1003                        }
1004                    }
1005                }
1006            }
1007
1008            // Auto-compact if context usage exceeds threshold
1009            if let Err(e) = session_manager.maybe_auto_compact(&session_id_owned).await {
1010                tracing::warn!(
1011                    "Auto-compact failed for session {}: {}",
1012                    session_id_owned,
1013                    e
1014                );
1015            }
1016
1017            Ok(result)
1018        });
1019
1020        Ok((rx, wrapped_handle, cancel_token_clone))
1021    }
1022
1023    /// Get context usage for a session
1024    pub async fn context_usage(&self, session_id: &str) -> Result<ContextUsage> {
1025        let session_lock = self.get_session(session_id).await?;
1026        let session = session_lock.read().await;
1027        Ok(session.context_usage.clone())
1028    }
1029
1030    /// Get conversation history for a session
1031    pub async fn history(&self, session_id: &str) -> Result<Vec<Message>> {
1032        let session_lock = self.get_session(session_id).await?;
1033        let session = session_lock.read().await;
1034        Ok(session.messages.clone())
1035    }
1036
1037    /// Clear session history
1038    pub async fn clear(&self, session_id: &str) -> Result<()> {
1039        {
1040            let session_lock = self.get_session(session_id).await?;
1041            let mut session = session_lock.write().await;
1042            session.clear();
1043        }
1044
1045        // Persist to store
1046        self.persist_in_background(session_id, "clear");
1047
1048        Ok(())
1049    }
1050
1051    /// Compact session context
1052    pub async fn compact(&self, session_id: &str) -> Result<()> {
1053        tracing::info!(name: "a3s.session.compact", session_id = %session_id, "Compacting session context");
1054
1055        {
1056            let session_lock = self.get_session(session_id).await?;
1057            let mut session = session_lock.write().await;
1058
1059            // Get LLM client for compaction (if available)
1060            let llm_client = if let Some(client) = &session.llm_client {
1061                client.clone()
1062            } else if let Some(client) = self.llm_client.read().await.clone() {
1063                client
1064            } else {
1065                // If no LLM client available, just do simple truncation
1066                tracing::warn!("No LLM client configured for compaction, using simple truncation");
1067                let keep_messages = 20;
1068                if session.messages.len() > keep_messages {
1069                    let len = session.messages.len();
1070                    session.messages = session.messages.split_off(len - keep_messages);
1071                }
1072                // Persist after truncation
1073                drop(session);
1074                self.persist_in_background(session_id, "compact");
1075                return Ok(());
1076            };
1077
1078            session.compact(&llm_client).await?;
1079        }
1080
1081        // Persist to store
1082        self.persist_in_background(session_id, "compact");
1083
1084        Ok(())
1085    }
1086
1087    /// Check if auto-compaction should be triggered and perform it if needed.
1088    ///
1089    /// Called after `generate()` / `generate_streaming()` updates session usage.
1090    /// Triggers compaction when:
1091    /// - `auto_compact` is enabled in session config
1092    /// - `context_usage.percent` exceeds `auto_compact_threshold`
1093    pub async fn maybe_auto_compact(&self, session_id: &str) -> Result<bool> {
1094        let (should_compact, percent_before, messages_before) = {
1095            let session_lock = self.get_session(session_id).await?;
1096            let session = session_lock.read().await;
1097
1098            if !session.config.auto_compact {
1099                return Ok(false);
1100            }
1101
1102            let threshold = session.config.auto_compact_threshold;
1103            let percent = session.context_usage.percent;
1104            let msg_count = session.messages.len();
1105
1106            tracing::debug!(
1107                "Auto-compact check for session {}: percent={:.2}%, threshold={:.2}%, messages={}",
1108                session_id,
1109                percent * 100.0,
1110                threshold * 100.0,
1111                msg_count,
1112            );
1113
1114            (percent >= threshold, percent, msg_count)
1115        };
1116
1117        if !should_compact {
1118            return Ok(false);
1119        }
1120
1121        tracing::info!(
1122            name: "a3s.session.auto_compact",
1123            session_id = %session_id,
1124            percent_before = %format!("{:.1}%", percent_before * 100.0),
1125            messages_before = %messages_before,
1126            "Auto-compacting session due to high context usage"
1127        );
1128
1129        // Perform compaction (reuses existing compact logic)
1130        self.compact(session_id).await?;
1131
1132        // Get post-compaction message count
1133        let messages_after = {
1134            let session_lock = self.get_session(session_id).await?;
1135            let session = session_lock.read().await;
1136            session.messages.len()
1137        };
1138
1139        // Broadcast event to notify clients
1140        let event = AgentEvent::ContextCompacted {
1141            session_id: session_id.to_string(),
1142            before_messages: messages_before,
1143            after_messages: messages_after,
1144            percent_before,
1145        };
1146
1147        // Try to send via session's event broadcaster
1148        if let Ok(session_lock) = self.get_session(session_id).await {
1149            let session = session_lock.read().await;
1150            let _ = session.event_tx.send(event);
1151        }
1152
1153        tracing::info!(
1154            name: "a3s.session.auto_compact.done",
1155            session_id = %session_id,
1156            messages_before = %messages_before,
1157            messages_after = %messages_after,
1158            "Auto-compaction complete"
1159        );
1160
1161        Ok(true)
1162    }
1163
1164    /// Resolve the LLM client for a session (session-level -> default fallback)
1165    ///
1166    /// Returns `None` if no LLM client is configured at either level.
1167    pub async fn get_llm_for_session(
1168        &self,
1169        session_id: &str,
1170    ) -> Result<Option<Arc<dyn LlmClient>>> {
1171        let session_lock = self.get_session(session_id).await?;
1172        let session = session_lock.read().await;
1173
1174        if let Some(client) = &session.llm_client {
1175            return Ok(Some(client.clone()));
1176        }
1177
1178        Ok(self.llm_client.read().await.clone())
1179    }
1180
1181    /// Ask an ephemeral side question for an existing managed session.
1182    ///
1183    /// Uses a read-only snapshot of the session history plus current queue /
1184    /// confirmation / task state. Optional `runtime_context` lets hosts inject
1185    /// extra transient context such as browser-only UI state.
1186    pub async fn btw_with_context(
1187        &self,
1188        session_id: &str,
1189        question: &str,
1190        runtime_context: Option<&str>,
1191    ) -> Result<crate::agent_api::BtwResult> {
1192        let question = question.trim();
1193        if question.is_empty() {
1194            anyhow::bail!("btw: question cannot be empty");
1195        }
1196
1197        let session_lock = self.get_session(session_id).await?;
1198        let (history_snapshot, session_runtime) = {
1199            let session = session_lock.read().await;
1200            let mut sections = Vec::new();
1201
1202            let pending_confirmations = session.confirmation_manager.pending_confirmations().await;
1203            if !pending_confirmations.is_empty() {
1204                let mut lines = pending_confirmations
1205                    .into_iter()
1206                    .map(|pending| {
1207                        let arg_summary = Self::compact_json_value(&pending.args);
1208                        if arg_summary.is_empty() {
1209                            format!(
1210                                "- {} [{}] remaining={}ms",
1211                                pending.tool_name, pending.tool_id, pending.remaining_ms
1212                            )
1213                        } else {
1214                            format!(
1215                                "- {} [{}] remaining={}ms {}",
1216                                pending.tool_name,
1217                                pending.tool_id,
1218                                pending.remaining_ms,
1219                                arg_summary
1220                            )
1221                        }
1222                    })
1223                    .collect::<Vec<_>>();
1224                lines.sort();
1225                sections.push(format!("[pending confirmations]\n{}", lines.join("\n")));
1226            }
1227
1228            let stats = session.command_queue.stats().await;
1229            if stats.total_active > 0 || stats.total_pending > 0 || stats.external_pending > 0 {
1230                let mut lines = vec![format!(
1231                    "active={}, pending={}, external_pending={}",
1232                    stats.total_active, stats.total_pending, stats.external_pending
1233                )];
1234                let mut lanes = stats
1235                    .lanes
1236                    .into_values()
1237                    .filter(|lane| lane.active > 0 || lane.pending > 0)
1238                    .map(|lane| {
1239                        format!(
1240                            "- {:?}: active={}, pending={}, handler={:?}",
1241                            lane.lane, lane.active, lane.pending, lane.handler_mode
1242                        )
1243                    })
1244                    .collect::<Vec<_>>();
1245                lanes.sort();
1246                lines.extend(lanes);
1247                sections.push(format!("[session queue]\n{}", lines.join("\n")));
1248            }
1249
1250            let external_tasks = session.command_queue.pending_external_tasks().await;
1251            if !external_tasks.is_empty() {
1252                let mut lines = external_tasks
1253                    .into_iter()
1254                    .take(6)
1255                    .map(|task| {
1256                        let payload_summary = Self::compact_json_value(&task.payload);
1257                        if payload_summary.is_empty() {
1258                            format!(
1259                                "- {} {:?} remaining={}ms",
1260                                task.command_type,
1261                                task.lane,
1262                                task.remaining_ms()
1263                            )
1264                        } else {
1265                            format!(
1266                                "- {} {:?} remaining={}ms {}",
1267                                task.command_type,
1268                                task.lane,
1269                                task.remaining_ms(),
1270                                payload_summary
1271                            )
1272                        }
1273                    })
1274                    .collect::<Vec<_>>();
1275                lines.sort();
1276                sections.push(format!("[pending external tasks]\n{}", lines.join("\n")));
1277            }
1278
1279            let active_tasks = session
1280                .tasks
1281                .iter()
1282                .filter(|task| task.status.is_active())
1283                .take(6)
1284                .map(|task| match &task.tool {
1285                    Some(tool) if !tool.is_empty() => {
1286                        format!("- [{}] {} ({})", task.status, task.content, tool)
1287                    }
1288                    _ => format!("- [{}] {}", task.status, task.content),
1289                })
1290                .collect::<Vec<_>>();
1291            if !active_tasks.is_empty() {
1292                sections.push(format!("[tracked tasks]\n{}", active_tasks.join("\n")));
1293            }
1294
1295            sections.push(format!(
1296                "[session state]\n{}",
1297                match session.state {
1298                    SessionState::Active => "active",
1299                    SessionState::Paused => "paused",
1300                    SessionState::Completed => "completed",
1301                    SessionState::Error => "error",
1302                    SessionState::Unknown => "unknown",
1303                }
1304            ));
1305
1306            (session.messages.clone(), sections.join("\n\n"))
1307        };
1308
1309        let llm_client = self
1310            .get_llm_for_session(session_id)
1311            .await?
1312            .context("btw failed: no model configured for session")?;
1313
1314        let mut messages = history_snapshot;
1315        let mut injected_sections = Vec::new();
1316        if !session_runtime.is_empty() {
1317            injected_sections.push(format!("[session runtime context]\n{}", session_runtime));
1318        }
1319        if let Some(extra) = runtime_context.map(str::trim).filter(|ctx| !ctx.is_empty()) {
1320            injected_sections.push(format!("[host runtime context]\n{}", extra));
1321        }
1322        if !injected_sections.is_empty() {
1323            let injected_context = format!(
1324                "Use the following runtime context only as background for the next side question. Do not treat it as a new user request.\n\n{}",
1325                injected_sections.join("\n\n")
1326            );
1327            messages.push(Message::user(&injected_context));
1328        }
1329        messages.push(Message::user(question));
1330
1331        let response = llm_client
1332            .complete(&messages, Some(crate::prompts::BTW_SYSTEM), &[])
1333            .await
1334            .map_err(|e| anyhow::anyhow!("btw: ephemeral LLM call failed: {e}"))?;
1335
1336        Ok(crate::agent_api::BtwResult {
1337            question: question.to_string(),
1338            answer: response.text(),
1339            usage: response.usage,
1340        })
1341    }
1342
1343    /// Configure session
1344    pub async fn configure(
1345        &self,
1346        session_id: &str,
1347        thinking: Option<bool>,
1348        budget: Option<usize>,
1349        model_config: Option<LlmConfig>,
1350    ) -> Result<()> {
1351        {
1352            let session_lock = self.get_session(session_id).await?;
1353            let mut session = session_lock.write().await;
1354
1355            if let Some(t) = thinking {
1356                session.thinking_enabled = t;
1357            }
1358            if let Some(b) = budget {
1359                session.thinking_budget = Some(b);
1360            }
1361            if let Some(ref config) = model_config {
1362                tracing::info!(
1363                    "Configuring session {} with LLM: provider={}, model={}",
1364                    session_id,
1365                    config.provider,
1366                    config.model
1367                );
1368                session.model_name = Some(config.model.clone());
1369                session.llm_client = Some(llm::create_client_with_config(config.clone()));
1370            }
1371        }
1372
1373        // Store LLM config for persistence (without API key)
1374        if let Some(config) = model_config {
1375            let llm_config_data = LlmConfigData {
1376                provider: config.provider,
1377                model: config.model,
1378                api_key: None, // Don't persist API key
1379                base_url: config.base_url,
1380            };
1381            let mut configs = self.llm_configs.write().await;
1382            configs.insert(session_id.to_string(), llm_config_data);
1383        }
1384
1385        // Persist to store
1386        self.persist_in_background(session_id, "configure");
1387
1388        Ok(())
1389    }
1390
1391    /// Update a session's system prompt in place.
1392    pub async fn set_system_prompt(
1393        &self,
1394        session_id: &str,
1395        system_prompt: Option<String>,
1396    ) -> Result<()> {
1397        {
1398            let session_lock = self.get_session(session_id).await?;
1399            let mut session = session_lock.write().await;
1400            session.config.system_prompt = system_prompt;
1401        }
1402
1403        self.persist_in_background(session_id, "set_system_prompt");
1404        Ok(())
1405    }
1406
1407    /// Get session count
1408    pub async fn session_count(&self) -> usize {
1409        let sessions = self.sessions.read().await;
1410        sessions.len()
1411    }
1412
1413    /// Check health of all registered stores
1414    pub async fn store_health(&self) -> Vec<(String, Result<()>)> {
1415        let stores = self.stores.read().await;
1416        let mut results = Vec::new();
1417        for (_, store) in stores.iter() {
1418            let name = store.backend_name().to_string();
1419            let result = store.health_check().await;
1420            results.push((name, result));
1421        }
1422        results
1423    }
1424
1425    /// List all loaded tools (built-in tools)
1426    pub fn list_tools(&self) -> Vec<crate::llm::ToolDefinition> {
1427        self.tool_executor.definitions()
1428    }
1429
1430    /// Pause a session
1431    pub async fn pause_session(&self, session_id: &str) -> Result<bool> {
1432        let paused = {
1433            let session_lock = self.get_session(session_id).await?;
1434            let mut session = session_lock.write().await;
1435            session.pause()
1436        };
1437
1438        if paused {
1439            self.persist_in_background(session_id, "pause");
1440        }
1441
1442        Ok(paused)
1443    }
1444
1445    /// Resume a session
1446    pub async fn resume_session(&self, session_id: &str) -> Result<bool> {
1447        let resumed = {
1448            let session_lock = self.get_session(session_id).await?;
1449            let mut session = session_lock.write().await;
1450            session.resume()
1451        };
1452
1453        if resumed {
1454            self.persist_in_background(session_id, "resume");
1455        }
1456
1457        Ok(resumed)
1458    }
1459
1460    /// Cancel an ongoing operation for a session
1461    ///
1462    /// Returns true if an operation was cancelled, false if no operation was running.
1463    pub async fn cancel_operation(&self, session_id: &str) -> Result<bool> {
1464        // First, cancel any pending HITL confirmations
1465        let session_lock = self.get_session(session_id).await?;
1466        let cancelled_confirmations = {
1467            let session = session_lock.read().await;
1468            session.confirmation_manager.cancel_all().await
1469        };
1470
1471        if cancelled_confirmations > 0 {
1472            tracing::info!(
1473                "Cancelled {} pending confirmations for session {}",
1474                cancelled_confirmations,
1475                session_id
1476            );
1477        }
1478
1479        // Then, cancel the ongoing operation if any
1480        let cancel_token = {
1481            let mut ops = self.ongoing_operations.write().await;
1482            ops.remove(session_id)
1483        };
1484
1485        if let Some(token) = cancel_token {
1486            token.cancel();
1487            tracing::info!("Cancelled ongoing operation for session {}", session_id);
1488            Ok(true)
1489        } else if cancelled_confirmations > 0 {
1490            // We cancelled confirmations but no main operation
1491            Ok(true)
1492        } else {
1493            tracing::debug!("No ongoing operation to cancel for session {}", session_id);
1494            Ok(false)
1495        }
1496    }
1497
1498    /// Get all sessions (returns session locks for iteration)
1499    pub async fn get_all_sessions(&self) -> Vec<Arc<RwLock<Session>>> {
1500        let sessions = self.sessions.read().await;
1501        sessions.values().cloned().collect()
1502    }
1503
1504    /// Get tool executor reference
1505    pub fn tool_executor(&self) -> &Arc<ToolExecutor> {
1506        &self.tool_executor
1507    }
1508
1509    /// Confirm a tool execution (HITL)
1510    pub async fn confirm_tool(
1511        &self,
1512        session_id: &str,
1513        tool_id: &str,
1514        approved: bool,
1515        reason: Option<String>,
1516    ) -> Result<bool> {
1517        let session_lock = self.get_session(session_id).await?;
1518        let session = session_lock.read().await;
1519        session
1520            .confirmation_manager
1521            .confirm(tool_id, approved, reason)
1522            .await
1523            .map_err(|e| anyhow::anyhow!(e))
1524    }
1525
1526    /// Set confirmation policy for a session (HITL)
1527    pub async fn set_confirmation_policy(
1528        &self,
1529        session_id: &str,
1530        policy: ConfirmationPolicy,
1531    ) -> Result<ConfirmationPolicy> {
1532        {
1533            let session_lock = self.get_session(session_id).await?;
1534            let session = session_lock.read().await;
1535            session.set_confirmation_policy(policy.clone()).await;
1536        }
1537
1538        // Update config for persistence
1539        {
1540            let session_lock = self.get_session(session_id).await?;
1541            let mut session = session_lock.write().await;
1542            session.config.confirmation_policy = Some(policy.clone());
1543        }
1544
1545        // Persist to store
1546        self.persist_in_background(session_id, "set_confirmation_policy");
1547
1548        Ok(policy)
1549    }
1550
1551    /// Set permission policy for a session.
1552    pub async fn set_permission_policy(
1553        &self,
1554        session_id: &str,
1555        policy: PermissionPolicy,
1556    ) -> Result<PermissionPolicy> {
1557        {
1558            let session_lock = self.get_session(session_id).await?;
1559            let mut session = session_lock.write().await;
1560            session.set_permission_policy(policy.clone());
1561        }
1562
1563        self.persist_in_background(session_id, "set_permission_policy");
1564
1565        Ok(policy)
1566    }
1567
1568    /// Get confirmation policy for a session (HITL)
1569    pub async fn get_confirmation_policy(&self, session_id: &str) -> Result<ConfirmationPolicy> {
1570        let session_lock = self.get_session(session_id).await?;
1571        let session = session_lock.read().await;
1572        Ok(session.confirmation_policy().await)
1573    }
1574
1575    /// Set planning mode for a session.
1576    ///
1577    /// When set to `Enabled`, the next generation will use the planning phase.
1578    /// When set to `Disabled`, planning is skipped.
1579    /// When set to `Auto` (default), planning is determined by intent detection.
1580    pub async fn set_planning_mode(
1581        &self,
1582        session_id: &str,
1583        mode: PlanningMode,
1584    ) -> Result<PlanningMode> {
1585        {
1586            let session_lock = self.get_session(session_id).await?;
1587            let mut session = session_lock.write().await;
1588            session.config.planning_mode = mode;
1589        }
1590
1591        // Persist to store
1592        self.persist_in_background(session_id, "set_planning_mode");
1593
1594        Ok(mode)
1595    }
1596}