Skip to main content

a3s_code_core/session/
manager.rs

1//! SessionManager — manages multiple concurrent sessions
2
3use super::{ContextUsage, Session, SessionConfig, SessionState};
4use crate::agent::{AgentConfig, AgentEvent, AgentLoop, AgentResult};
5use crate::hitl::ConfirmationPolicy;
6use crate::llm::{self, LlmClient, LlmConfig, Message};
7use crate::mcp::McpManager;
8use crate::memory::AgentMemory;
9use crate::permissions::PermissionPolicy;
10use crate::prompts::SystemPromptSlots;
11use crate::skills::SkillRegistry;
12use crate::store::{FileSessionStore, LlmConfigData, SessionData, SessionStore};
13use crate::text::truncate_utf8;
14use crate::tools::ToolExecutor;
15use a3s_memory::MemoryStore;
16use anyhow::{Context, Result};
17use std::collections::HashMap;
18use std::sync::Arc;
19use tokio::sync::{mpsc, RwLock};
20
21/// Session manager handles multiple concurrent sessions
22#[derive(Clone)]
23pub struct SessionManager {
24    pub(crate) sessions: Arc<RwLock<HashMap<String, Arc<RwLock<Session>>>>>,
25    /// Default LLM client used when a session has no per-session client configured.
26    /// Wrapped in RwLock so it can be hot-swapped at runtime without restart.
27    pub(crate) llm_client: Arc<RwLock<Option<Arc<dyn LlmClient>>>>,
28    pub(crate) tool_executor: Arc<ToolExecutor>,
29    /// Session stores by storage type
30    pub(crate) stores: Arc<RwLock<HashMap<crate::config::StorageBackend, Arc<dyn SessionStore>>>>,
31    /// Track which storage type each session uses
32    pub(crate) session_storage_types: Arc<RwLock<HashMap<String, crate::config::StorageBackend>>>,
33    /// LLM configurations for sessions (stored separately for persistence)
34    pub(crate) llm_configs: Arc<RwLock<HashMap<String, LlmConfigData>>>,
35    /// Ongoing operations (session_id -> CancellationToken)
36    pub(crate) ongoing_operations:
37        Arc<RwLock<HashMap<String, tokio_util::sync::CancellationToken>>>,
38    /// Skill registry for runtime skill management
39    pub(crate) skill_registry: Arc<RwLock<Option<Arc<SkillRegistry>>>>,
40    /// Per-session skill registries layered on top of the shared registry.
41    pub(crate) session_skill_registries: Arc<RwLock<HashMap<String, Arc<SkillRegistry>>>>,
42    /// Shared memory store for agent long-term memory.
43    /// When set, each `generate`/`generate_streaming` call wraps this in
44    /// `AgentMemory` and injects it into `AgentConfig.memory`.
45    pub(crate) memory_store: Arc<RwLock<Option<Arc<dyn MemoryStore>>>>,
46    /// Shared MCP manager. When set, MCP tools are registered into the
47    /// `ToolExecutor` at startup so all sessions can use them.
48    pub(crate) mcp_manager: Arc<RwLock<Option<Arc<McpManager>>>>,
49}
50
51impl SessionManager {
52    fn compact_json_value(value: &serde_json::Value) -> String {
53        let raw = match value {
54            serde_json::Value::Null => String::new(),
55            serde_json::Value::String(s) => s.clone(),
56            _ => serde_json::to_string(value).unwrap_or_default(),
57        };
58        let compact = raw.split_whitespace().collect::<Vec<_>>().join(" ");
59        if compact.len() > 180 {
60            format!("{}...", truncate_utf8(&compact, 180))
61        } else {
62            compact
63        }
64    }
65
66    /// Create a new session manager without persistence
67    pub fn new(llm_client: Option<Arc<dyn LlmClient>>, tool_executor: Arc<ToolExecutor>) -> Self {
68        Self {
69            sessions: Arc::new(RwLock::new(HashMap::new())),
70            llm_client: Arc::new(RwLock::new(llm_client)),
71            tool_executor,
72            stores: Arc::new(RwLock::new(HashMap::new())),
73            session_storage_types: Arc::new(RwLock::new(HashMap::new())),
74            llm_configs: Arc::new(RwLock::new(HashMap::new())),
75            ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
76            skill_registry: Arc::new(RwLock::new(None)),
77            session_skill_registries: Arc::new(RwLock::new(HashMap::new())),
78            memory_store: Arc::new(RwLock::new(None)),
79            mcp_manager: Arc::new(RwLock::new(None)),
80        }
81    }
82
83    /// Create a session manager with file-based persistence
84    ///
85    /// Sessions will be automatically saved to disk and restored on startup.
86    pub async fn with_persistence<P: AsRef<std::path::Path>>(
87        llm_client: Option<Arc<dyn LlmClient>>,
88        tool_executor: Arc<ToolExecutor>,
89        sessions_dir: P,
90    ) -> Result<Self> {
91        let store = FileSessionStore::new(sessions_dir).await?;
92        let mut stores = HashMap::new();
93        stores.insert(
94            crate::config::StorageBackend::File,
95            Arc::new(store) as Arc<dyn SessionStore>,
96        );
97
98        let manager = Self {
99            sessions: Arc::new(RwLock::new(HashMap::new())),
100            llm_client: Arc::new(RwLock::new(llm_client)),
101            tool_executor,
102            stores: Arc::new(RwLock::new(stores)),
103            session_storage_types: Arc::new(RwLock::new(HashMap::new())),
104            llm_configs: Arc::new(RwLock::new(HashMap::new())),
105            ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
106            skill_registry: Arc::new(RwLock::new(None)),
107            session_skill_registries: Arc::new(RwLock::new(HashMap::new())),
108            memory_store: Arc::new(RwLock::new(None)),
109            mcp_manager: Arc::new(RwLock::new(None)),
110        };
111
112        Ok(manager)
113    }
114
115    /// Create a session manager with a custom store
116    ///
117    /// The `backend` parameter determines which `StorageBackend` key the store is registered under.
118    /// Sessions created with a matching `storage_type` will use this store.
119    pub fn with_store(
120        llm_client: Option<Arc<dyn LlmClient>>,
121        tool_executor: Arc<ToolExecutor>,
122        store: Arc<dyn SessionStore>,
123        backend: crate::config::StorageBackend,
124    ) -> Self {
125        let mut stores = HashMap::new();
126        stores.insert(backend, store);
127
128        Self {
129            sessions: Arc::new(RwLock::new(HashMap::new())),
130            llm_client: Arc::new(RwLock::new(llm_client)),
131            tool_executor,
132            stores: Arc::new(RwLock::new(stores)),
133            session_storage_types: Arc::new(RwLock::new(HashMap::new())),
134            llm_configs: Arc::new(RwLock::new(HashMap::new())),
135            ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
136            skill_registry: Arc::new(RwLock::new(None)),
137            session_skill_registries: Arc::new(RwLock::new(HashMap::new())),
138            memory_store: Arc::new(RwLock::new(None)),
139            mcp_manager: Arc::new(RwLock::new(None)),
140        }
141    }
142
143    /// Hot-swap the default LLM client used by sessions without a per-session client.
144    ///
145    /// Called by SafeClaw when `PUT /api/agent/config` updates the model config.
146    /// All subsequent `generate` / `generate_streaming` calls will use the new client.
147    pub async fn set_default_llm(&self, client: Option<Arc<dyn LlmClient>>) {
148        *self.llm_client.write().await = client;
149    }
150
151    /// Set the skill registry for runtime skill management.
152    ///
153    /// Skills registered here will be injected into the system prompt
154    /// for all subsequent `generate` / `generate_streaming` calls.
155    /// Also registers the `manage_skill` tool on the `ToolExecutor` so
156    /// the Agent can create, list, and remove skills at runtime.
157    pub async fn set_skill_registry(
158        &self,
159        registry: Arc<SkillRegistry>,
160        skills_dir: std::path::PathBuf,
161    ) {
162        // Register the manage_skill tool for self-bootstrap
163        let manage_tool = crate::skills::ManageSkillTool::new(registry.clone(), skills_dir);
164        self.tool_executor
165            .register_dynamic_tool(Arc::new(manage_tool));
166
167        *self.skill_registry.write().await = Some(registry);
168    }
169
170    /// Get the current skill registry, if any.
171    pub async fn skill_registry(&self) -> Option<Arc<SkillRegistry>> {
172        self.skill_registry.read().await.clone()
173    }
174
175    /// Override the active skill registry for a single session.
176    pub async fn set_session_skill_registry(
177        &self,
178        session_id: impl Into<String>,
179        registry: Arc<SkillRegistry>,
180    ) {
181        self.session_skill_registries
182            .write()
183            .await
184            .insert(session_id.into(), registry);
185    }
186
187    /// Get the active skill registry for a single session, if one was set.
188    pub async fn session_skill_registry(&self, session_id: &str) -> Option<Arc<SkillRegistry>> {
189        self.session_skill_registries
190            .read()
191            .await
192            .get(session_id)
193            .cloned()
194    }
195
196    /// Set the shared memory store for agent long-term memory.
197    ///
198    /// When set, every `generate`/`generate_streaming` call wraps this store
199    /// in an `AgentMemory` and injects it into `AgentConfig.memory`, enabling
200    /// automatic `recall_similar` before prompts and `remember_success`/
201    /// `remember_failure` after tool execution.
202    pub async fn set_memory_store(&self, store: Arc<dyn MemoryStore>) {
203        *self.memory_store.write().await = Some(store);
204    }
205
206    /// Get the current memory store, if any.
207    pub async fn memory_store(&self) -> Option<Arc<dyn MemoryStore>> {
208        self.memory_store.read().await.clone()
209    }
210
211    /// Set the shared MCP manager.
212    ///
213    /// When set, all tools from connected MCP servers are registered into the
214    /// `ToolExecutor` so they are available in every session.
215    pub async fn set_mcp_manager(&self, manager: Arc<McpManager>) {
216        let all_tools = manager.get_all_tools().await;
217        let mut by_server: HashMap<String, Vec<crate::mcp::McpTool>> = HashMap::new();
218        for (server, tool) in all_tools {
219            by_server.entry(server).or_default().push(tool);
220        }
221        for (server_name, tools) in by_server {
222            for tool in
223                crate::mcp::tools::create_mcp_tools(&server_name, tools, Arc::clone(&manager))
224            {
225                self.tool_executor.register_dynamic_tool(tool);
226            }
227        }
228        *self.mcp_manager.write().await = Some(manager);
229    }
230
231    /// Dynamically add and connect an MCP server to the shared manager.
232    ///
233    /// Registers the server config, connects it, and registers its tools into
234    /// the `ToolExecutor` so all subsequent sessions can use them.
235    pub async fn add_mcp_server(&self, config: crate::mcp::McpServerConfig) -> Result<()> {
236        let manager = {
237            let guard = self.mcp_manager.read().await;
238            match guard.clone() {
239                Some(m) => m,
240                None => {
241                    drop(guard);
242                    let m = Arc::new(McpManager::new());
243                    *self.mcp_manager.write().await = Some(Arc::clone(&m));
244                    m
245                }
246            }
247        };
248        let name = config.name.clone();
249        manager.register_server(config).await;
250        manager.connect(&name).await?;
251        let tools = manager.get_server_tools(&name).await;
252        for tool in crate::mcp::tools::create_mcp_tools(&name, tools, Arc::clone(&manager)) {
253            self.tool_executor.register_dynamic_tool(tool);
254        }
255        Ok(())
256    }
257
258    /// Disconnect and remove an MCP server from the shared manager.
259    ///
260    /// Also unregisters its tools from the `ToolExecutor`.
261    pub async fn remove_mcp_server(&self, name: &str) -> Result<()> {
262        let guard = self.mcp_manager.read().await;
263        if let Some(ref manager) = *guard {
264            manager.disconnect(name).await?;
265        }
266        self.tool_executor
267            .unregister_tools_by_prefix(&format!("mcp__{name}__"));
268        Ok(())
269    }
270
271    /// Get status of all connected MCP servers.
272    pub async fn mcp_status(
273        &self,
274    ) -> std::collections::HashMap<String, crate::mcp::McpServerStatus> {
275        let guard = self.mcp_manager.read().await;
276        match guard.as_ref() {
277            Some(m) => {
278                let m = Arc::clone(m);
279                drop(guard);
280                m.get_status().await
281            }
282            None => std::collections::HashMap::new(),
283        }
284    }
285
286    /// Restore a single session by ID from the store
287    ///
288    /// Searches all registered stores for the given session ID and restores it
289    /// into the in-memory session map. Returns an error if not found.
290    pub async fn restore_session_by_id(&self, session_id: &str) -> Result<()> {
291        // Check if already loaded
292        {
293            let sessions = self.sessions.read().await;
294            if sessions.contains_key(session_id) {
295                return Ok(());
296            }
297        }
298
299        let stores = self.stores.read().await;
300        for (backend, store) in stores.iter() {
301            match store.load(session_id).await {
302                Ok(Some(data)) => {
303                    {
304                        let mut storage_types = self.session_storage_types.write().await;
305                        storage_types.insert(data.id.clone(), backend.clone());
306                    }
307                    self.restore_session(data).await?;
308                    return Ok(());
309                }
310                Ok(None) => continue,
311                Err(e) => {
312                    tracing::warn!(
313                        "Failed to load session {} from {:?}: {}",
314                        session_id,
315                        backend,
316                        e
317                    );
318                    continue;
319                }
320            }
321        }
322
323        Err(anyhow::anyhow!(
324            "Session {} not found in any store",
325            session_id
326        ))
327    }
328
329    /// Load all sessions from all registered stores
330    pub async fn load_all_sessions(&mut self) -> Result<usize> {
331        let stores = self.stores.read().await;
332        let mut loaded = 0;
333
334        for (backend, store) in stores.iter() {
335            let session_ids = match store.list().await {
336                Ok(ids) => ids,
337                Err(e) => {
338                    tracing::warn!("Failed to list sessions from {:?} store: {}", backend, e);
339                    continue;
340                }
341            };
342
343            for id in session_ids {
344                match store.load(&id).await {
345                    Ok(Some(data)) => {
346                        // Record the storage type for this session
347                        {
348                            let mut storage_types = self.session_storage_types.write().await;
349                            storage_types.insert(data.id.clone(), backend.clone());
350                        }
351
352                        if let Err(e) = self.restore_session(data).await {
353                            tracing::warn!("Failed to restore session {}: {}", id, e);
354                        } else {
355                            loaded += 1;
356                        }
357                    }
358                    Ok(None) => {
359                        tracing::warn!("Session {} not found in store", id);
360                    }
361                    Err(e) => {
362                        tracing::warn!("Failed to load session {}: {}", id, e);
363                    }
364                }
365            }
366        }
367
368        tracing::info!("Loaded {} sessions from store", loaded);
369        Ok(loaded)
370    }
371
372    /// Restore a session from SessionData
373    async fn restore_session(&self, data: SessionData) -> Result<()> {
374        let tools = self.tool_executor.definitions();
375        let mut session = Session::new(data.id.clone(), data.config.clone(), tools).await?;
376
377        // Restore serializable state
378        session.restore_from_data(&data);
379
380        // Restore LLM config if present (without API key - must be reconfigured)
381        if let Some(llm_config) = &data.llm_config {
382            let mut configs = self.llm_configs.write().await;
383            configs.insert(data.id.clone(), llm_config.clone());
384        }
385
386        let mut sessions = self.sessions.write().await;
387        sessions.insert(data.id.clone(), Arc::new(RwLock::new(session)));
388
389        tracing::info!("Restored session: {}", data.id);
390        Ok(())
391    }
392
393    /// Save a session to the store
394    async fn save_session(&self, session_id: &str) -> Result<()> {
395        // Get the storage type for this session
396        let storage_type = {
397            let storage_types = self.session_storage_types.read().await;
398            storage_types.get(session_id).cloned()
399        };
400
401        let Some(storage_type) = storage_type else {
402            // No storage type means memory-only session
403            return Ok(());
404        };
405
406        // Skip saving for memory storage
407        if storage_type == crate::config::StorageBackend::Memory {
408            return Ok(());
409        }
410
411        // Get the appropriate store
412        let stores = self.stores.read().await;
413        let Some(store) = stores.get(&storage_type) else {
414            tracing::warn!("No store available for storage type: {:?}", storage_type);
415            return Ok(());
416        };
417
418        let session_lock = self.get_session(session_id).await?;
419        let session = session_lock.read().await;
420
421        // Get LLM config if set
422        let llm_config = {
423            let configs = self.llm_configs.read().await;
424            configs.get(session_id).cloned()
425        };
426
427        let data = session.to_session_data(llm_config);
428        store.save(&data).await?;
429
430        tracing::debug!("Saved session: {}", session_id);
431        Ok(())
432    }
433
434    /// Persist a session to store, logging and emitting an event on failure.
435    /// This is a non-fatal wrapper around `save_session` — the operation
436    /// succeeds in memory even if persistence fails.
437    async fn persist_or_warn(&self, session_id: &str, operation: &str) {
438        if let Err(e) = self.save_session(session_id).await {
439            tracing::warn!(
440                "Failed to persist session {} after {}: {}",
441                session_id,
442                operation,
443                e
444            );
445            // Emit event so SDK clients can react
446            if let Ok(session_lock) = self.get_session(session_id).await {
447                let session = session_lock.read().await;
448                let _ = session.event_tx().send(AgentEvent::PersistenceFailed {
449                    session_id: session_id.to_string(),
450                    operation: operation.to_string(),
451                    error: e.to_string(),
452                });
453            }
454        }
455    }
456
457    /// Spawn persistence as a background task (non-blocking).
458    ///
459    /// All `SessionManager` fields are `Arc`-wrapped, so `clone()` is a
460    /// cheap pointer-copy. This keeps file I/O off the response path.
461    fn persist_in_background(&self, session_id: &str, operation: &str) {
462        let mgr = self.clone();
463        let sid = session_id.to_string();
464        let op = operation.to_string();
465        tokio::spawn(async move {
466            mgr.persist_or_warn(&sid, &op).await;
467        });
468    }
469
470    /// Create a new session
471    pub async fn create_session(&self, id: String, config: SessionConfig) -> Result<String> {
472        tracing::info!(name: "a3s.session.create", session_id = %id, "Creating session");
473
474        // Record the storage type for this session
475        {
476            let mut storage_types = self.session_storage_types.write().await;
477            storage_types.insert(id.clone(), config.storage_type.clone());
478        }
479
480        // Get tool definitions from the executor
481        let tools = self.tool_executor.definitions();
482        let mut session = Session::new(id.clone(), config, tools).await?;
483
484        // Start the command queue
485        session.start_queue().await?;
486
487        // Set max context length if provided
488        if session.config.max_context_length > 0 {
489            session.context_usage.max_tokens = session.config.max_context_length as usize;
490        }
491
492        {
493            let mut sessions = self.sessions.write().await;
494            sessions.insert(id.clone(), Arc::new(RwLock::new(session)));
495        }
496
497        // Persist to store
498        self.persist_in_background(&id, "create");
499
500        tracing::info!("Created session: {}", id);
501        Ok(id)
502    }
503
504    /// Destroy a session
505    pub async fn destroy_session(&self, id: &str) -> Result<()> {
506        tracing::info!(name: "a3s.session.destroy", session_id = %id, "Destroying session");
507
508        // Get the storage type before removing the session
509        let storage_type = {
510            let storage_types = self.session_storage_types.read().await;
511            storage_types.get(id).cloned()
512        };
513
514        {
515            let mut sessions = self.sessions.write().await;
516            sessions.remove(id);
517        }
518
519        // Remove LLM config
520        {
521            let mut configs = self.llm_configs.write().await;
522            configs.remove(id);
523        }
524
525        {
526            let mut registries = self.session_skill_registries.write().await;
527            registries.remove(id);
528        }
529
530        // Remove storage type tracking
531        {
532            let mut storage_types = self.session_storage_types.write().await;
533            storage_types.remove(id);
534        }
535
536        // Delete from store if applicable
537        if let Some(storage_type) = storage_type {
538            if storage_type != crate::config::StorageBackend::Memory {
539                let stores = self.stores.read().await;
540                if let Some(store) = stores.get(&storage_type) {
541                    if let Err(e) = store.delete(id).await {
542                        tracing::warn!("Failed to delete session {} from store: {}", id, e);
543                    }
544                }
545            }
546        }
547
548        tracing::info!("Destroyed session: {}", id);
549        Ok(())
550    }
551
552    /// Get a session by ID
553    pub async fn get_session(&self, id: &str) -> Result<Arc<RwLock<Session>>> {
554        let sessions = self.sessions.read().await;
555        sessions
556            .get(id)
557            .cloned()
558            .context(format!("Session not found: {}", id))
559    }
560
561    /// Create a child session for a subagent
562    ///
563    /// Child sessions inherit the parent's LLM client but have their own
564    /// permission policy and configuration.
565    pub async fn create_child_session(
566        &self,
567        parent_id: &str,
568        child_id: String,
569        mut config: SessionConfig,
570    ) -> Result<String> {
571        // Verify parent exists and inherit HITL policy
572        let parent_lock = self.get_session(parent_id).await?;
573        let parent_llm_client = {
574            let parent = parent_lock.read().await;
575
576            // Inherit parent's confirmation policy if child doesn't have one
577            if config.confirmation_policy.is_none() {
578                let parent_policy = parent.confirmation_manager.policy().await;
579                config.confirmation_policy = Some(parent_policy);
580            }
581
582            parent.llm_client.clone()
583        };
584
585        // Set parent_id in config
586        config.parent_id = Some(parent_id.to_string());
587
588        // Get tool definitions from the executor
589        let tools = self.tool_executor.definitions();
590        let mut session = Session::new(child_id.clone(), config, tools).await?;
591
592        // Inherit LLM client from parent if not set
593        if session.llm_client.is_none() {
594            let default_llm = self.llm_client.read().await.clone();
595            session.llm_client = parent_llm_client.or(default_llm);
596        }
597
598        // Start the command queue
599        session.start_queue().await?;
600
601        // Set max context length if provided
602        if session.config.max_context_length > 0 {
603            session.context_usage.max_tokens = session.config.max_context_length as usize;
604        }
605
606        {
607            let mut sessions = self.sessions.write().await;
608            sessions.insert(child_id.clone(), Arc::new(RwLock::new(session)));
609        }
610
611        // Persist to store
612        self.persist_in_background(&child_id, "create_child");
613
614        tracing::info!(
615            "Created child session: {} (parent: {})",
616            child_id,
617            parent_id
618        );
619        Ok(child_id)
620    }
621
622    /// Get all child sessions for a parent session
623    pub async fn get_child_sessions(&self, parent_id: &str) -> Vec<String> {
624        let sessions = self.sessions.read().await;
625        let mut children = Vec::new();
626
627        for (id, session_lock) in sessions.iter() {
628            let session = session_lock.read().await;
629            if session.parent_id.as_deref() == Some(parent_id) {
630                children.push(id.clone());
631            }
632        }
633
634        children
635    }
636
637    /// Check if a session is a child session
638    pub async fn is_child_session(&self, session_id: &str) -> Result<bool> {
639        let session_lock = self.get_session(session_id).await?;
640        let session = session_lock.read().await;
641        Ok(session.is_child_session())
642    }
643
644    /// Generate response for a prompt
645    pub async fn generate(&self, session_id: &str, prompt: &str) -> Result<AgentResult> {
646        let session_lock = self.get_session(session_id).await?;
647
648        // Check if session is paused
649        {
650            let session = session_lock.read().await;
651            if session.state == SessionState::Paused {
652                anyhow::bail!(
653                    "Session {} is paused. Call Resume before generating.",
654                    session_id
655                );
656            }
657        }
658
659        // Get session state and LLM client
660        let (
661            history,
662            system,
663            tools,
664            session_llm_client,
665            permission_checker,
666            confirmation_manager,
667            context_providers,
668            session_workspace,
669            tool_metrics,
670            hook_engine,
671            planning_mode,
672            goal_tracking,
673        ) = {
674            let session = session_lock.read().await;
675            (
676                session.messages.clone(),
677                session.system().map(String::from),
678                session.tools.clone(),
679                session.llm_client.clone(),
680                session.permission_checker.clone(),
681                session.confirmation_manager.clone(),
682                session.context_providers.clone(),
683                session.config.workspace.clone(),
684                session.tool_metrics.clone(),
685                session.config.hook_engine.clone(),
686                session.config.planning_mode,
687                session.config.goal_tracking,
688            )
689        };
690
691        // Use session's LLM client if configured, otherwise use default
692        let llm_client = if let Some(client) = session_llm_client {
693            client
694        } else if let Some(client) = self.llm_client.read().await.clone() {
695            client
696        } else {
697            anyhow::bail!(
698                "LLM client not configured for session {}. Please call Configure RPC with model configuration first.",
699                session_id
700            );
701        };
702
703        // Construct per-session ToolContext from session workspace, falling back to server default
704        let tool_context = if session_workspace.is_empty() {
705            crate::tools::ToolContext::new(self.tool_executor.workspace().clone())
706                .with_session_id(session_id)
707        } else {
708            crate::tools::ToolContext::new(std::path::PathBuf::from(&session_workspace))
709                .with_session_id(session_id)
710        };
711        let tool_context = if let Some(command_env) = self.tool_executor.command_env() {
712            tool_context.with_command_env(command_env)
713        } else {
714            tool_context
715        };
716
717        // Inject skill registry into system prompt and agent config
718        let skill_registry = match self.session_skill_registry(session_id).await {
719            Some(registry) => Some(registry),
720            None => self.skill_registry.read().await.clone(),
721        };
722        let system = if let Some(ref registry) = skill_registry {
723            let skill_prompt = registry.to_system_prompt();
724            if skill_prompt.is_empty() {
725                system
726            } else {
727                match system {
728                    Some(existing) => Some(format!("{}\n\n{}", existing, skill_prompt)),
729                    None => Some(skill_prompt),
730                }
731            }
732        } else {
733            system
734        };
735
736        // Inject matched skill content on-demand before skill_registry is moved into AgentConfig
737        let effective_prompt = if let Some(ref registry) = skill_registry {
738            let skill_content = registry.match_skills(prompt);
739            if skill_content.is_empty() {
740                prompt.to_string()
741            } else {
742                format!("{}\n\n---\n\n{}", skill_content, prompt)
743            }
744        } else {
745            prompt.to_string()
746        };
747
748        // Build AgentMemory from shared store (if configured)
749        let memory = self
750            .memory_store
751            .read()
752            .await
753            .as_ref()
754            .map(|store| Arc::new(AgentMemory::new(store.clone())));
755
756        // Create agent loop with permission policy, confirmation manager, and context providers
757        let config = AgentConfig {
758            prompt_slots: match system {
759                Some(s) => SystemPromptSlots::from_legacy(s),
760                None => SystemPromptSlots::default(),
761            },
762            tools,
763            max_tool_rounds: 50,
764            permission_checker: Some(permission_checker),
765            confirmation_manager: Some(confirmation_manager),
766            context_providers,
767            planning_mode,
768            goal_tracking,
769            hook_engine,
770            skill_registry,
771            memory,
772            ..AgentConfig::default()
773        };
774
775        let agent = AgentLoop::new(llm_client, self.tool_executor.clone(), tool_context, config)
776            .with_tool_metrics(tool_metrics);
777
778        // Execute with session context
779        let result = agent
780            .execute_with_session(&history, &effective_prompt, Some(session_id), None, None)
781            .await?;
782
783        // Update session
784        {
785            let mut session = session_lock.write().await;
786            session.messages = result.messages.clone();
787            session.update_usage(&result.usage);
788        }
789
790        // Persist to store
791        self.persist_in_background(session_id, "generate");
792
793        // Auto-compact if context usage exceeds threshold
794        if let Err(e) = self.maybe_auto_compact(session_id).await {
795            tracing::warn!("Auto-compact failed for session {}: {}", session_id, e);
796        }
797
798        Ok(result)
799    }
800
801    /// Generate response with streaming events
802    pub async fn generate_streaming(
803        &self,
804        session_id: &str,
805        prompt: &str,
806    ) -> Result<(
807        mpsc::Receiver<AgentEvent>,
808        tokio::task::JoinHandle<Result<AgentResult>>,
809        tokio_util::sync::CancellationToken,
810    )> {
811        let session_lock = self.get_session(session_id).await?;
812
813        // Check if session is paused
814        {
815            let session = session_lock.read().await;
816            if session.state == SessionState::Paused {
817                anyhow::bail!(
818                    "Session {} is paused. Call Resume before generating.",
819                    session_id
820                );
821            }
822        }
823
824        // Get session state and LLM client
825        let (
826            history,
827            system,
828            tools,
829            session_llm_client,
830            permission_checker,
831            confirmation_manager,
832            context_providers,
833            session_workspace,
834            tool_metrics,
835            hook_engine,
836            planning_mode,
837            goal_tracking,
838        ) = {
839            let session = session_lock.read().await;
840            (
841                session.messages.clone(),
842                session.system().map(String::from),
843                session.tools.clone(),
844                session.llm_client.clone(),
845                session.permission_checker.clone(),
846                session.confirmation_manager.clone(),
847                session.context_providers.clone(),
848                session.config.workspace.clone(),
849                session.tool_metrics.clone(),
850                session.config.hook_engine.clone(),
851                session.config.planning_mode,
852                session.config.goal_tracking,
853            )
854        };
855
856        // Use session's LLM client if configured, otherwise use default
857        let llm_client = if let Some(client) = session_llm_client {
858            client
859        } else if let Some(client) = self.llm_client.read().await.clone() {
860            client
861        } else {
862            anyhow::bail!(
863                "LLM client not configured for session {}. Please call Configure RPC with model configuration first.",
864                session_id
865            );
866        };
867
868        // Construct per-session ToolContext from session workspace, falling back to server default
869        let tool_context = if session_workspace.is_empty() {
870            crate::tools::ToolContext::new(self.tool_executor.workspace().clone())
871                .with_session_id(session_id)
872        } else {
873            crate::tools::ToolContext::new(std::path::PathBuf::from(&session_workspace))
874                .with_session_id(session_id)
875        };
876        let tool_context = if let Some(command_env) = self.tool_executor.command_env() {
877            tool_context.with_command_env(command_env)
878        } else {
879            tool_context
880        };
881
882        // Inject skill registry into system prompt and agent config
883        let skill_registry = match self.session_skill_registry(session_id).await {
884            Some(registry) => Some(registry),
885            None => self.skill_registry.read().await.clone(),
886        };
887        let system = if let Some(ref registry) = skill_registry {
888            let skill_prompt = registry.to_system_prompt();
889            if skill_prompt.is_empty() {
890                system
891            } else {
892                match system {
893                    Some(existing) => Some(format!("{}\n\n{}", existing, skill_prompt)),
894                    None => Some(skill_prompt),
895                }
896            }
897        } else {
898            system
899        };
900
901        // Inject matched skill content on-demand before skill_registry is moved into AgentConfig
902        let effective_prompt = if let Some(ref registry) = skill_registry {
903            let skill_content = registry.match_skills(prompt);
904            if skill_content.is_empty() {
905                prompt.to_string()
906            } else {
907                format!("{}\n\n---\n\n{}", skill_content, prompt)
908            }
909        } else {
910            prompt.to_string()
911        };
912
913        // Build AgentMemory from shared store (if configured)
914        let memory = self
915            .memory_store
916            .read()
917            .await
918            .as_ref()
919            .map(|store| Arc::new(AgentMemory::new(store.clone())));
920
921        // Create agent loop with permission policy, confirmation manager, and context providers
922        let config = AgentConfig {
923            prompt_slots: match system {
924                Some(s) => SystemPromptSlots::from_legacy(s),
925                None => SystemPromptSlots::default(),
926            },
927            tools,
928            max_tool_rounds: 50,
929            permission_checker: Some(permission_checker),
930            confirmation_manager: Some(confirmation_manager),
931            context_providers,
932            planning_mode,
933            goal_tracking,
934            hook_engine,
935            skill_registry,
936            memory,
937            ..AgentConfig::default()
938        };
939
940        let agent = AgentLoop::new(llm_client, self.tool_executor.clone(), tool_context, config)
941            .with_tool_metrics(tool_metrics);
942
943        // Execute with streaming
944        let (rx, handle, cancel_token) =
945            agent.execute_streaming(&history, &effective_prompt).await?;
946
947        // Store the cancellation token for cancellation support
948        let cancel_token_clone = cancel_token.clone();
949        {
950            let mut ops = self.ongoing_operations.write().await;
951            ops.insert(session_id.to_string(), cancel_token);
952        }
953
954        // Spawn task to update session after completion
955        let session_lock_clone = session_lock.clone();
956        let original_handle = handle;
957        let stores = self.stores.clone();
958        let session_storage_types = self.session_storage_types.clone();
959        let llm_configs = self.llm_configs.clone();
960        let session_id_owned = session_id.to_string();
961        let ongoing_operations = self.ongoing_operations.clone();
962        let session_manager = self.clone();
963
964        let wrapped_handle = tokio::spawn(async move {
965            let result = original_handle.await??;
966
967            // Remove from ongoing operations
968            {
969                let mut ops = ongoing_operations.write().await;
970                ops.remove(&session_id_owned);
971            }
972
973            // Update session
974            {
975                let mut session = session_lock_clone.write().await;
976                session.messages = result.messages.clone();
977                session.update_usage(&result.usage);
978            }
979
980            // Persist to store
981            let storage_type = {
982                let storage_types = session_storage_types.read().await;
983                storage_types.get(&session_id_owned).cloned()
984            };
985
986            if let Some(storage_type) = storage_type {
987                if storage_type != crate::config::StorageBackend::Memory {
988                    let stores_guard = stores.read().await;
989                    if let Some(store) = stores_guard.get(&storage_type) {
990                        let session = session_lock_clone.read().await;
991                        let llm_config = {
992                            let configs = llm_configs.read().await;
993                            configs.get(&session_id_owned).cloned()
994                        };
995                        let data = session.to_session_data(llm_config);
996                        if let Err(e) = store.save(&data).await {
997                            tracing::warn!(
998                                "Failed to persist session {} after streaming: {}",
999                                session_id_owned,
1000                                e
1001                            );
1002                        }
1003                    }
1004                }
1005            }
1006
1007            // Auto-compact if context usage exceeds threshold
1008            if let Err(e) = session_manager.maybe_auto_compact(&session_id_owned).await {
1009                tracing::warn!(
1010                    "Auto-compact failed for session {}: {}",
1011                    session_id_owned,
1012                    e
1013                );
1014            }
1015
1016            Ok(result)
1017        });
1018
1019        Ok((rx, wrapped_handle, cancel_token_clone))
1020    }
1021
1022    /// Get context usage for a session
1023    pub async fn context_usage(&self, session_id: &str) -> Result<ContextUsage> {
1024        let session_lock = self.get_session(session_id).await?;
1025        let session = session_lock.read().await;
1026        Ok(session.context_usage.clone())
1027    }
1028
1029    /// Get conversation history for a session
1030    pub async fn history(&self, session_id: &str) -> Result<Vec<Message>> {
1031        let session_lock = self.get_session(session_id).await?;
1032        let session = session_lock.read().await;
1033        Ok(session.messages.clone())
1034    }
1035
1036    /// Clear session history
1037    pub async fn clear(&self, session_id: &str) -> Result<()> {
1038        {
1039            let session_lock = self.get_session(session_id).await?;
1040            let mut session = session_lock.write().await;
1041            session.clear();
1042        }
1043
1044        // Persist to store
1045        self.persist_in_background(session_id, "clear");
1046
1047        Ok(())
1048    }
1049
1050    /// Compact session context
1051    pub async fn compact(&self, session_id: &str) -> Result<()> {
1052        tracing::info!(name: "a3s.session.compact", session_id = %session_id, "Compacting session context");
1053
1054        {
1055            let session_lock = self.get_session(session_id).await?;
1056            let mut session = session_lock.write().await;
1057
1058            // Get LLM client for compaction (if available)
1059            let llm_client = if let Some(client) = &session.llm_client {
1060                client.clone()
1061            } else if let Some(client) = self.llm_client.read().await.clone() {
1062                client
1063            } else {
1064                // If no LLM client available, just do simple truncation
1065                tracing::warn!("No LLM client configured for compaction, using simple truncation");
1066                let keep_messages = 20;
1067                if session.messages.len() > keep_messages {
1068                    let len = session.messages.len();
1069                    session.messages = session.messages.split_off(len - keep_messages);
1070                }
1071                // Persist after truncation
1072                drop(session);
1073                self.persist_in_background(session_id, "compact");
1074                return Ok(());
1075            };
1076
1077            session.compact(&llm_client).await?;
1078        }
1079
1080        // Persist to store
1081        self.persist_in_background(session_id, "compact");
1082
1083        Ok(())
1084    }
1085
1086    /// Check if auto-compaction should be triggered and perform it if needed.
1087    ///
1088    /// Called after `generate()` / `generate_streaming()` updates session usage.
1089    /// Triggers compaction when:
1090    /// - `auto_compact` is enabled in session config
1091    /// - `context_usage.percent` exceeds `auto_compact_threshold`
1092    pub async fn maybe_auto_compact(&self, session_id: &str) -> Result<bool> {
1093        let (should_compact, percent_before, messages_before) = {
1094            let session_lock = self.get_session(session_id).await?;
1095            let session = session_lock.read().await;
1096
1097            if !session.config.auto_compact {
1098                return Ok(false);
1099            }
1100
1101            let threshold = session.config.auto_compact_threshold;
1102            let percent = session.context_usage.percent;
1103            let msg_count = session.messages.len();
1104
1105            tracing::debug!(
1106                "Auto-compact check for session {}: percent={:.2}%, threshold={:.2}%, messages={}",
1107                session_id,
1108                percent * 100.0,
1109                threshold * 100.0,
1110                msg_count,
1111            );
1112
1113            (percent >= threshold, percent, msg_count)
1114        };
1115
1116        if !should_compact {
1117            return Ok(false);
1118        }
1119
1120        tracing::info!(
1121            name: "a3s.session.auto_compact",
1122            session_id = %session_id,
1123            percent_before = %format!("{:.1}%", percent_before * 100.0),
1124            messages_before = %messages_before,
1125            "Auto-compacting session due to high context usage"
1126        );
1127
1128        // Perform compaction (reuses existing compact logic)
1129        self.compact(session_id).await?;
1130
1131        // Get post-compaction message count
1132        let messages_after = {
1133            let session_lock = self.get_session(session_id).await?;
1134            let session = session_lock.read().await;
1135            session.messages.len()
1136        };
1137
1138        // Broadcast event to notify clients
1139        let event = AgentEvent::ContextCompacted {
1140            session_id: session_id.to_string(),
1141            before_messages: messages_before,
1142            after_messages: messages_after,
1143            percent_before,
1144        };
1145
1146        // Try to send via session's event broadcaster
1147        if let Ok(session_lock) = self.get_session(session_id).await {
1148            let session = session_lock.read().await;
1149            let _ = session.event_tx.send(event);
1150        }
1151
1152        tracing::info!(
1153            name: "a3s.session.auto_compact.done",
1154            session_id = %session_id,
1155            messages_before = %messages_before,
1156            messages_after = %messages_after,
1157            "Auto-compaction complete"
1158        );
1159
1160        Ok(true)
1161    }
1162
1163    /// Resolve the LLM client for a session (session-level -> default fallback)
1164    ///
1165    /// Returns `None` if no LLM client is configured at either level.
1166    pub async fn get_llm_for_session(
1167        &self,
1168        session_id: &str,
1169    ) -> Result<Option<Arc<dyn LlmClient>>> {
1170        let session_lock = self.get_session(session_id).await?;
1171        let session = session_lock.read().await;
1172
1173        if let Some(client) = &session.llm_client {
1174            return Ok(Some(client.clone()));
1175        }
1176
1177        Ok(self.llm_client.read().await.clone())
1178    }
1179
1180    /// Ask an ephemeral side question for an existing managed session.
1181    ///
1182    /// Uses a read-only snapshot of the session history plus current queue /
1183    /// confirmation / task state. Optional `runtime_context` lets hosts inject
1184    /// extra transient context such as browser-only UI state.
1185    pub async fn btw_with_context(
1186        &self,
1187        session_id: &str,
1188        question: &str,
1189        runtime_context: Option<&str>,
1190    ) -> Result<crate::agent_api::BtwResult> {
1191        let question = question.trim();
1192        if question.is_empty() {
1193            anyhow::bail!("btw: question cannot be empty");
1194        }
1195
1196        let session_lock = self.get_session(session_id).await?;
1197        let (history_snapshot, session_runtime) = {
1198            let session = session_lock.read().await;
1199            let mut sections = Vec::new();
1200
1201            let pending_confirmations = session.confirmation_manager.pending_confirmations().await;
1202            if !pending_confirmations.is_empty() {
1203                let mut lines = pending_confirmations
1204                    .into_iter()
1205                    .map(|pending| {
1206                        let arg_summary = Self::compact_json_value(&pending.args);
1207                        if arg_summary.is_empty() {
1208                            format!(
1209                                "- {} [{}] remaining={}ms",
1210                                pending.tool_name, pending.tool_id, pending.remaining_ms
1211                            )
1212                        } else {
1213                            format!(
1214                                "- {} [{}] remaining={}ms {}",
1215                                pending.tool_name,
1216                                pending.tool_id,
1217                                pending.remaining_ms,
1218                                arg_summary
1219                            )
1220                        }
1221                    })
1222                    .collect::<Vec<_>>();
1223                lines.sort();
1224                sections.push(format!("[pending confirmations]\n{}", lines.join("\n")));
1225            }
1226
1227            let stats = session.command_queue.stats().await;
1228            if stats.total_active > 0 || stats.total_pending > 0 || stats.external_pending > 0 {
1229                let mut lines = vec![format!(
1230                    "active={}, pending={}, external_pending={}",
1231                    stats.total_active, stats.total_pending, stats.external_pending
1232                )];
1233                let mut lanes = stats
1234                    .lanes
1235                    .into_values()
1236                    .filter(|lane| lane.active > 0 || lane.pending > 0)
1237                    .map(|lane| {
1238                        format!(
1239                            "- {:?}: active={}, pending={}, handler={:?}",
1240                            lane.lane, lane.active, lane.pending, lane.handler_mode
1241                        )
1242                    })
1243                    .collect::<Vec<_>>();
1244                lanes.sort();
1245                lines.extend(lanes);
1246                sections.push(format!("[session queue]\n{}", lines.join("\n")));
1247            }
1248
1249            let external_tasks = session.command_queue.pending_external_tasks().await;
1250            if !external_tasks.is_empty() {
1251                let mut lines = external_tasks
1252                    .into_iter()
1253                    .take(6)
1254                    .map(|task| {
1255                        let payload_summary = Self::compact_json_value(&task.payload);
1256                        if payload_summary.is_empty() {
1257                            format!(
1258                                "- {} {:?} remaining={}ms",
1259                                task.command_type,
1260                                task.lane,
1261                                task.remaining_ms()
1262                            )
1263                        } else {
1264                            format!(
1265                                "- {} {:?} remaining={}ms {}",
1266                                task.command_type,
1267                                task.lane,
1268                                task.remaining_ms(),
1269                                payload_summary
1270                            )
1271                        }
1272                    })
1273                    .collect::<Vec<_>>();
1274                lines.sort();
1275                sections.push(format!("[pending external tasks]\n{}", lines.join("\n")));
1276            }
1277
1278            let active_tasks = session
1279                .tasks
1280                .iter()
1281                .filter(|task| task.status.is_active())
1282                .take(6)
1283                .map(|task| match &task.tool {
1284                    Some(tool) if !tool.is_empty() => {
1285                        format!("- [{}] {} ({})", task.status, task.content, tool)
1286                    }
1287                    _ => format!("- [{}] {}", task.status, task.content),
1288                })
1289                .collect::<Vec<_>>();
1290            if !active_tasks.is_empty() {
1291                sections.push(format!("[tracked tasks]\n{}", active_tasks.join("\n")));
1292            }
1293
1294            sections.push(format!(
1295                "[session state]\n{}",
1296                match session.state {
1297                    SessionState::Active => "active",
1298                    SessionState::Paused => "paused",
1299                    SessionState::Completed => "completed",
1300                    SessionState::Error => "error",
1301                    SessionState::Unknown => "unknown",
1302                }
1303            ));
1304
1305            (session.messages.clone(), sections.join("\n\n"))
1306        };
1307
1308        let llm_client = self
1309            .get_llm_for_session(session_id)
1310            .await?
1311            .context("btw failed: no model configured for session")?;
1312
1313        let mut messages = history_snapshot;
1314        let mut injected_sections = Vec::new();
1315        if !session_runtime.is_empty() {
1316            injected_sections.push(format!("[session runtime context]\n{}", session_runtime));
1317        }
1318        if let Some(extra) = runtime_context.map(str::trim).filter(|ctx| !ctx.is_empty()) {
1319            injected_sections.push(format!("[host runtime context]\n{}", extra));
1320        }
1321        if !injected_sections.is_empty() {
1322            let injected_context = format!(
1323                "Use the following runtime context only as background for the next side question. Do not treat it as a new user request.\n\n{}",
1324                injected_sections.join("\n\n")
1325            );
1326            messages.push(Message::user(&injected_context));
1327        }
1328        messages.push(Message::user(question));
1329
1330        let response = llm_client
1331            .complete(&messages, Some(crate::prompts::BTW_SYSTEM), &[])
1332            .await
1333            .map_err(|e| anyhow::anyhow!("btw: ephemeral LLM call failed: {e}"))?;
1334
1335        Ok(crate::agent_api::BtwResult {
1336            question: question.to_string(),
1337            answer: response.text(),
1338            usage: response.usage,
1339        })
1340    }
1341
1342    /// Configure session
1343    pub async fn configure(
1344        &self,
1345        session_id: &str,
1346        thinking: Option<bool>,
1347        budget: Option<usize>,
1348        model_config: Option<LlmConfig>,
1349    ) -> Result<()> {
1350        {
1351            let session_lock = self.get_session(session_id).await?;
1352            let mut session = session_lock.write().await;
1353
1354            if let Some(t) = thinking {
1355                session.thinking_enabled = t;
1356            }
1357            if let Some(b) = budget {
1358                session.thinking_budget = Some(b);
1359            }
1360            if let Some(ref config) = model_config {
1361                tracing::info!(
1362                    "Configuring session {} with LLM: provider={}, model={}",
1363                    session_id,
1364                    config.provider,
1365                    config.model
1366                );
1367                session.model_name = Some(config.model.clone());
1368                session.llm_client = Some(llm::create_client_with_config(config.clone()));
1369            }
1370        }
1371
1372        // Store LLM config for persistence (without API key)
1373        if let Some(config) = model_config {
1374            let llm_config_data = LlmConfigData {
1375                provider: config.provider,
1376                model: config.model,
1377                api_key: None, // Don't persist API key
1378                base_url: config.base_url,
1379            };
1380            let mut configs = self.llm_configs.write().await;
1381            configs.insert(session_id.to_string(), llm_config_data);
1382        }
1383
1384        // Persist to store
1385        self.persist_in_background(session_id, "configure");
1386
1387        Ok(())
1388    }
1389
1390    /// Update a session's system prompt in place.
1391    pub async fn set_system_prompt(
1392        &self,
1393        session_id: &str,
1394        system_prompt: Option<String>,
1395    ) -> Result<()> {
1396        {
1397            let session_lock = self.get_session(session_id).await?;
1398            let mut session = session_lock.write().await;
1399            session.config.system_prompt = system_prompt;
1400        }
1401
1402        self.persist_in_background(session_id, "set_system_prompt");
1403        Ok(())
1404    }
1405
1406    /// Get session count
1407    pub async fn session_count(&self) -> usize {
1408        let sessions = self.sessions.read().await;
1409        sessions.len()
1410    }
1411
1412    /// Check health of all registered stores
1413    pub async fn store_health(&self) -> Vec<(String, Result<()>)> {
1414        let stores = self.stores.read().await;
1415        let mut results = Vec::new();
1416        for (_, store) in stores.iter() {
1417            let name = store.backend_name().to_string();
1418            let result = store.health_check().await;
1419            results.push((name, result));
1420        }
1421        results
1422    }
1423
1424    /// List all loaded tools (built-in tools)
1425    pub fn list_tools(&self) -> Vec<crate::llm::ToolDefinition> {
1426        self.tool_executor.definitions()
1427    }
1428
1429    /// Pause a session
1430    pub async fn pause_session(&self, session_id: &str) -> Result<bool> {
1431        let paused = {
1432            let session_lock = self.get_session(session_id).await?;
1433            let mut session = session_lock.write().await;
1434            session.pause()
1435        };
1436
1437        if paused {
1438            self.persist_in_background(session_id, "pause");
1439        }
1440
1441        Ok(paused)
1442    }
1443
1444    /// Resume a session
1445    pub async fn resume_session(&self, session_id: &str) -> Result<bool> {
1446        let resumed = {
1447            let session_lock = self.get_session(session_id).await?;
1448            let mut session = session_lock.write().await;
1449            session.resume()
1450        };
1451
1452        if resumed {
1453            self.persist_in_background(session_id, "resume");
1454        }
1455
1456        Ok(resumed)
1457    }
1458
1459    /// Cancel an ongoing operation for a session
1460    ///
1461    /// Returns true if an operation was cancelled, false if no operation was running.
1462    pub async fn cancel_operation(&self, session_id: &str) -> Result<bool> {
1463        // First, cancel any pending HITL confirmations
1464        let session_lock = self.get_session(session_id).await?;
1465        let cancelled_confirmations = {
1466            let session = session_lock.read().await;
1467            session.confirmation_manager.cancel_all().await
1468        };
1469
1470        if cancelled_confirmations > 0 {
1471            tracing::info!(
1472                "Cancelled {} pending confirmations for session {}",
1473                cancelled_confirmations,
1474                session_id
1475            );
1476        }
1477
1478        // Then, cancel the ongoing operation if any
1479        let cancel_token = {
1480            let mut ops = self.ongoing_operations.write().await;
1481            ops.remove(session_id)
1482        };
1483
1484        if let Some(token) = cancel_token {
1485            token.cancel();
1486            tracing::info!("Cancelled ongoing operation for session {}", session_id);
1487            Ok(true)
1488        } else if cancelled_confirmations > 0 {
1489            // We cancelled confirmations but no main operation
1490            Ok(true)
1491        } else {
1492            tracing::debug!("No ongoing operation to cancel for session {}", session_id);
1493            Ok(false)
1494        }
1495    }
1496
1497    /// Get all sessions (returns session locks for iteration)
1498    pub async fn get_all_sessions(&self) -> Vec<Arc<RwLock<Session>>> {
1499        let sessions = self.sessions.read().await;
1500        sessions.values().cloned().collect()
1501    }
1502
1503    /// Get tool executor reference
1504    pub fn tool_executor(&self) -> &Arc<ToolExecutor> {
1505        &self.tool_executor
1506    }
1507
1508    /// Confirm a tool execution (HITL)
1509    pub async fn confirm_tool(
1510        &self,
1511        session_id: &str,
1512        tool_id: &str,
1513        approved: bool,
1514        reason: Option<String>,
1515    ) -> Result<bool> {
1516        let session_lock = self.get_session(session_id).await?;
1517        let session = session_lock.read().await;
1518        session
1519            .confirmation_manager
1520            .confirm(tool_id, approved, reason)
1521            .await
1522            .map_err(|e| anyhow::anyhow!(e))
1523    }
1524
1525    /// Set confirmation policy for a session (HITL)
1526    pub async fn set_confirmation_policy(
1527        &self,
1528        session_id: &str,
1529        policy: ConfirmationPolicy,
1530    ) -> Result<ConfirmationPolicy> {
1531        {
1532            let session_lock = self.get_session(session_id).await?;
1533            let session = session_lock.read().await;
1534            session.set_confirmation_policy(policy.clone()).await;
1535        }
1536
1537        // Update config for persistence
1538        {
1539            let session_lock = self.get_session(session_id).await?;
1540            let mut session = session_lock.write().await;
1541            session.config.confirmation_policy = Some(policy.clone());
1542        }
1543
1544        // Persist to store
1545        self.persist_in_background(session_id, "set_confirmation_policy");
1546
1547        Ok(policy)
1548    }
1549
1550    /// Set permission policy for a session.
1551    pub async fn set_permission_policy(
1552        &self,
1553        session_id: &str,
1554        policy: PermissionPolicy,
1555    ) -> Result<PermissionPolicy> {
1556        {
1557            let session_lock = self.get_session(session_id).await?;
1558            let mut session = session_lock.write().await;
1559            session.set_permission_policy(policy.clone());
1560        }
1561
1562        self.persist_in_background(session_id, "set_permission_policy");
1563
1564        Ok(policy)
1565    }
1566
1567    /// Get confirmation policy for a session (HITL)
1568    pub async fn get_confirmation_policy(&self, session_id: &str) -> Result<ConfirmationPolicy> {
1569        let session_lock = self.get_session(session_id).await?;
1570        let session = session_lock.read().await;
1571        Ok(session.confirmation_policy().await)
1572    }
1573}