Skip to main content

a3s_code_core/session/
manager.rs

1//! SessionManager — manages multiple concurrent sessions
2
3use super::{ContextUsage, Session, SessionConfig, SessionState};
4use crate::agent::{AgentConfig, AgentEvent, AgentLoop, AgentResult};
5use crate::hitl::ConfirmationPolicy;
6use crate::llm::{self, LlmClient, LlmConfig, Message};
7use crate::mcp::McpManager;
8use crate::memory::AgentMemory;
9use crate::prompts::SystemPromptSlots;
10use crate::skills::SkillRegistry;
11use crate::store::{FileSessionStore, LlmConfigData, SessionData, SessionStore};
12use crate::tools::ToolExecutor;
13use a3s_memory::MemoryStore;
14use anyhow::{Context, Result};
15use std::collections::HashMap;
16use std::sync::Arc;
17use tokio::sync::{mpsc, RwLock};
18
19/// Session manager handles multiple concurrent sessions
20#[derive(Clone)]
21pub struct SessionManager {
22    pub(crate) sessions: Arc<RwLock<HashMap<String, Arc<RwLock<Session>>>>>,
23    /// Default LLM client used when a session has no per-session client configured.
24    /// Wrapped in RwLock so it can be hot-swapped at runtime without restart.
25    pub(crate) llm_client: Arc<RwLock<Option<Arc<dyn LlmClient>>>>,
26    pub(crate) tool_executor: Arc<ToolExecutor>,
27    /// Session stores by storage type
28    pub(crate) stores: Arc<RwLock<HashMap<crate::config::StorageBackend, Arc<dyn SessionStore>>>>,
29    /// Track which storage type each session uses
30    pub(crate) session_storage_types: Arc<RwLock<HashMap<String, crate::config::StorageBackend>>>,
31    /// LLM configurations for sessions (stored separately for persistence)
32    pub(crate) llm_configs: Arc<RwLock<HashMap<String, LlmConfigData>>>,
33    /// Ongoing operations (session_id -> CancellationToken)
34    pub(crate) ongoing_operations:
35        Arc<RwLock<HashMap<String, tokio_util::sync::CancellationToken>>>,
36    /// Skill registry for runtime skill management
37    pub(crate) skill_registry: Arc<RwLock<Option<Arc<SkillRegistry>>>>,
38    /// Per-session skill registries layered on top of the shared registry.
39    pub(crate) session_skill_registries: Arc<RwLock<HashMap<String, Arc<SkillRegistry>>>>,
40    /// Shared memory store for agent long-term memory.
41    /// When set, each `generate`/`generate_streaming` call wraps this in
42    /// `AgentMemory` and injects it into `AgentConfig.memory`.
43    pub(crate) memory_store: Arc<RwLock<Option<Arc<dyn MemoryStore>>>>,
44    /// Shared MCP manager. When set, MCP tools are registered into the
45    /// `ToolExecutor` at startup so all sessions can use them.
46    pub(crate) mcp_manager: Arc<RwLock<Option<Arc<McpManager>>>>,
47}
48
49impl SessionManager {
50    /// Create a new session manager without persistence
51    pub fn new(llm_client: Option<Arc<dyn LlmClient>>, tool_executor: Arc<ToolExecutor>) -> Self {
52        Self {
53            sessions: Arc::new(RwLock::new(HashMap::new())),
54            llm_client: Arc::new(RwLock::new(llm_client)),
55            tool_executor,
56            stores: Arc::new(RwLock::new(HashMap::new())),
57            session_storage_types: Arc::new(RwLock::new(HashMap::new())),
58            llm_configs: Arc::new(RwLock::new(HashMap::new())),
59            ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
60            skill_registry: Arc::new(RwLock::new(None)),
61            session_skill_registries: Arc::new(RwLock::new(HashMap::new())),
62            memory_store: Arc::new(RwLock::new(None)),
63            mcp_manager: Arc::new(RwLock::new(None)),
64        }
65    }
66
67    /// Create a session manager with file-based persistence
68    ///
69    /// Sessions will be automatically saved to disk and restored on startup.
70    pub async fn with_persistence<P: AsRef<std::path::Path>>(
71        llm_client: Option<Arc<dyn LlmClient>>,
72        tool_executor: Arc<ToolExecutor>,
73        sessions_dir: P,
74    ) -> Result<Self> {
75        let store = FileSessionStore::new(sessions_dir).await?;
76        let mut stores = HashMap::new();
77        stores.insert(
78            crate::config::StorageBackend::File,
79            Arc::new(store) as Arc<dyn SessionStore>,
80        );
81
82        let manager = Self {
83            sessions: Arc::new(RwLock::new(HashMap::new())),
84            llm_client: Arc::new(RwLock::new(llm_client)),
85            tool_executor,
86            stores: Arc::new(RwLock::new(stores)),
87            session_storage_types: Arc::new(RwLock::new(HashMap::new())),
88            llm_configs: Arc::new(RwLock::new(HashMap::new())),
89            ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
90            skill_registry: Arc::new(RwLock::new(None)),
91            session_skill_registries: Arc::new(RwLock::new(HashMap::new())),
92            memory_store: Arc::new(RwLock::new(None)),
93            mcp_manager: Arc::new(RwLock::new(None)),
94        };
95
96        Ok(manager)
97    }
98
99    /// Create a session manager with a custom store
100    ///
101    /// The `backend` parameter determines which `StorageBackend` key the store is registered under.
102    /// Sessions created with a matching `storage_type` will use this store.
103    pub fn with_store(
104        llm_client: Option<Arc<dyn LlmClient>>,
105        tool_executor: Arc<ToolExecutor>,
106        store: Arc<dyn SessionStore>,
107        backend: crate::config::StorageBackend,
108    ) -> Self {
109        let mut stores = HashMap::new();
110        stores.insert(backend, store);
111
112        Self {
113            sessions: Arc::new(RwLock::new(HashMap::new())),
114            llm_client: Arc::new(RwLock::new(llm_client)),
115            tool_executor,
116            stores: Arc::new(RwLock::new(stores)),
117            session_storage_types: Arc::new(RwLock::new(HashMap::new())),
118            llm_configs: Arc::new(RwLock::new(HashMap::new())),
119            ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
120            skill_registry: Arc::new(RwLock::new(None)),
121            session_skill_registries: Arc::new(RwLock::new(HashMap::new())),
122            memory_store: Arc::new(RwLock::new(None)),
123            mcp_manager: Arc::new(RwLock::new(None)),
124        }
125    }
126
127    /// Hot-swap the default LLM client used by sessions without a per-session client.
128    ///
129    /// Called by SafeClaw when `PUT /api/agent/config` updates the model config.
130    /// All subsequent `generate` / `generate_streaming` calls will use the new client.
131    pub async fn set_default_llm(&self, client: Option<Arc<dyn LlmClient>>) {
132        *self.llm_client.write().await = client;
133    }
134
135    /// Set the skill registry for runtime skill management.
136    ///
137    /// Skills registered here will be injected into the system prompt
138    /// for all subsequent `generate` / `generate_streaming` calls.
139    /// Also registers the `manage_skill` tool on the `ToolExecutor` so
140    /// the Agent can create, list, and remove skills at runtime.
141    pub async fn set_skill_registry(
142        &self,
143        registry: Arc<SkillRegistry>,
144        skills_dir: std::path::PathBuf,
145    ) {
146        // Register the manage_skill tool for self-bootstrap
147        let manage_tool = crate::skills::ManageSkillTool::new(registry.clone(), skills_dir);
148        self.tool_executor
149            .register_dynamic_tool(Arc::new(manage_tool));
150
151        *self.skill_registry.write().await = Some(registry);
152    }
153
154    /// Get the current skill registry, if any.
155    pub async fn skill_registry(&self) -> Option<Arc<SkillRegistry>> {
156        self.skill_registry.read().await.clone()
157    }
158
159    /// Override the active skill registry for a single session.
160    pub async fn set_session_skill_registry(
161        &self,
162        session_id: impl Into<String>,
163        registry: Arc<SkillRegistry>,
164    ) {
165        self.session_skill_registries
166            .write()
167            .await
168            .insert(session_id.into(), registry);
169    }
170
171    /// Get the active skill registry for a single session, if one was set.
172    pub async fn session_skill_registry(&self, session_id: &str) -> Option<Arc<SkillRegistry>> {
173        self.session_skill_registries
174            .read()
175            .await
176            .get(session_id)
177            .cloned()
178    }
179
180    /// Set the shared memory store for agent long-term memory.
181    ///
182    /// When set, every `generate`/`generate_streaming` call wraps this store
183    /// in an `AgentMemory` and injects it into `AgentConfig.memory`, enabling
184    /// automatic `recall_similar` before prompts and `remember_success`/
185    /// `remember_failure` after tool execution.
186    pub async fn set_memory_store(&self, store: Arc<dyn MemoryStore>) {
187        *self.memory_store.write().await = Some(store);
188    }
189
190    /// Get the current memory store, if any.
191    pub async fn memory_store(&self) -> Option<Arc<dyn MemoryStore>> {
192        self.memory_store.read().await.clone()
193    }
194
195    /// Set the shared MCP manager.
196    ///
197    /// When set, all tools from connected MCP servers are registered into the
198    /// `ToolExecutor` so they are available in every session.
199    pub async fn set_mcp_manager(&self, manager: Arc<McpManager>) {
200        let all_tools = manager.get_all_tools().await;
201        let mut by_server: HashMap<String, Vec<crate::mcp::McpTool>> = HashMap::new();
202        for (server, tool) in all_tools {
203            by_server.entry(server).or_default().push(tool);
204        }
205        for (server_name, tools) in by_server {
206            for tool in
207                crate::mcp::tools::create_mcp_tools(&server_name, tools, Arc::clone(&manager))
208            {
209                self.tool_executor.register_dynamic_tool(tool);
210            }
211        }
212        *self.mcp_manager.write().await = Some(manager);
213    }
214
215    /// Dynamically add and connect an MCP server to the shared manager.
216    ///
217    /// Registers the server config, connects it, and registers its tools into
218    /// the `ToolExecutor` so all subsequent sessions can use them.
219    pub async fn add_mcp_server(&self, config: crate::mcp::McpServerConfig) -> Result<()> {
220        let manager = {
221            let guard = self.mcp_manager.read().await;
222            match guard.clone() {
223                Some(m) => m,
224                None => {
225                    drop(guard);
226                    let m = Arc::new(McpManager::new());
227                    *self.mcp_manager.write().await = Some(Arc::clone(&m));
228                    m
229                }
230            }
231        };
232        let name = config.name.clone();
233        manager.register_server(config).await;
234        manager.connect(&name).await?;
235        let tools = manager.get_server_tools(&name).await;
236        for tool in crate::mcp::tools::create_mcp_tools(&name, tools, Arc::clone(&manager)) {
237            self.tool_executor.register_dynamic_tool(tool);
238        }
239        Ok(())
240    }
241
242    /// Disconnect and remove an MCP server from the shared manager.
243    ///
244    /// Also unregisters its tools from the `ToolExecutor`.
245    pub async fn remove_mcp_server(&self, name: &str) -> Result<()> {
246        let guard = self.mcp_manager.read().await;
247        if let Some(ref manager) = *guard {
248            manager.disconnect(name).await?;
249        }
250        self.tool_executor
251            .unregister_tools_by_prefix(&format!("mcp__{name}__"));
252        Ok(())
253    }
254
255    /// Get status of all connected MCP servers.
256    pub async fn mcp_status(
257        &self,
258    ) -> std::collections::HashMap<String, crate::mcp::McpServerStatus> {
259        let guard = self.mcp_manager.read().await;
260        match guard.as_ref() {
261            Some(m) => {
262                let m = Arc::clone(m);
263                drop(guard);
264                m.get_status().await
265            }
266            None => std::collections::HashMap::new(),
267        }
268    }
269
270    /// Restore a single session by ID from the store
271    ///
272    /// Searches all registered stores for the given session ID and restores it
273    /// into the in-memory session map. Returns an error if not found.
274    pub async fn restore_session_by_id(&self, session_id: &str) -> Result<()> {
275        // Check if already loaded
276        {
277            let sessions = self.sessions.read().await;
278            if sessions.contains_key(session_id) {
279                return Ok(());
280            }
281        }
282
283        let stores = self.stores.read().await;
284        for (backend, store) in stores.iter() {
285            match store.load(session_id).await {
286                Ok(Some(data)) => {
287                    {
288                        let mut storage_types = self.session_storage_types.write().await;
289                        storage_types.insert(data.id.clone(), backend.clone());
290                    }
291                    self.restore_session(data).await?;
292                    return Ok(());
293                }
294                Ok(None) => continue,
295                Err(e) => {
296                    tracing::warn!(
297                        "Failed to load session {} from {:?}: {}",
298                        session_id,
299                        backend,
300                        e
301                    );
302                    continue;
303                }
304            }
305        }
306
307        Err(anyhow::anyhow!(
308            "Session {} not found in any store",
309            session_id
310        ))
311    }
312
313    /// Load all sessions from all registered stores
314    pub async fn load_all_sessions(&mut self) -> Result<usize> {
315        let stores = self.stores.read().await;
316        let mut loaded = 0;
317
318        for (backend, store) in stores.iter() {
319            let session_ids = match store.list().await {
320                Ok(ids) => ids,
321                Err(e) => {
322                    tracing::warn!("Failed to list sessions from {:?} store: {}", backend, e);
323                    continue;
324                }
325            };
326
327            for id in session_ids {
328                match store.load(&id).await {
329                    Ok(Some(data)) => {
330                        // Record the storage type for this session
331                        {
332                            let mut storage_types = self.session_storage_types.write().await;
333                            storage_types.insert(data.id.clone(), backend.clone());
334                        }
335
336                        if let Err(e) = self.restore_session(data).await {
337                            tracing::warn!("Failed to restore session {}: {}", id, e);
338                        } else {
339                            loaded += 1;
340                        }
341                    }
342                    Ok(None) => {
343                        tracing::warn!("Session {} not found in store", id);
344                    }
345                    Err(e) => {
346                        tracing::warn!("Failed to load session {}: {}", id, e);
347                    }
348                }
349            }
350        }
351
352        tracing::info!("Loaded {} sessions from store", loaded);
353        Ok(loaded)
354    }
355
356    /// Restore a session from SessionData
357    async fn restore_session(&self, data: SessionData) -> Result<()> {
358        let tools = self.tool_executor.definitions();
359        let mut session = Session::new(data.id.clone(), data.config.clone(), tools).await?;
360
361        // Restore serializable state
362        session.restore_from_data(&data);
363
364        // Restore LLM config if present (without API key - must be reconfigured)
365        if let Some(llm_config) = &data.llm_config {
366            let mut configs = self.llm_configs.write().await;
367            configs.insert(data.id.clone(), llm_config.clone());
368        }
369
370        let mut sessions = self.sessions.write().await;
371        sessions.insert(data.id.clone(), Arc::new(RwLock::new(session)));
372
373        tracing::info!("Restored session: {}", data.id);
374        Ok(())
375    }
376
377    /// Save a session to the store
378    async fn save_session(&self, session_id: &str) -> Result<()> {
379        // Get the storage type for this session
380        let storage_type = {
381            let storage_types = self.session_storage_types.read().await;
382            storage_types.get(session_id).cloned()
383        };
384
385        let Some(storage_type) = storage_type else {
386            // No storage type means memory-only session
387            return Ok(());
388        };
389
390        // Skip saving for memory storage
391        if storage_type == crate::config::StorageBackend::Memory {
392            return Ok(());
393        }
394
395        // Get the appropriate store
396        let stores = self.stores.read().await;
397        let Some(store) = stores.get(&storage_type) else {
398            tracing::warn!("No store available for storage type: {:?}", storage_type);
399            return Ok(());
400        };
401
402        let session_lock = self.get_session(session_id).await?;
403        let session = session_lock.read().await;
404
405        // Get LLM config if set
406        let llm_config = {
407            let configs = self.llm_configs.read().await;
408            configs.get(session_id).cloned()
409        };
410
411        let data = session.to_session_data(llm_config);
412        store.save(&data).await?;
413
414        tracing::debug!("Saved session: {}", session_id);
415        Ok(())
416    }
417
418    /// Persist a session to store, logging and emitting an event on failure.
419    /// This is a non-fatal wrapper around `save_session` — the operation
420    /// succeeds in memory even if persistence fails.
421    async fn persist_or_warn(&self, session_id: &str, operation: &str) {
422        if let Err(e) = self.save_session(session_id).await {
423            tracing::warn!(
424                "Failed to persist session {} after {}: {}",
425                session_id,
426                operation,
427                e
428            );
429            // Emit event so SDK clients can react
430            if let Ok(session_lock) = self.get_session(session_id).await {
431                let session = session_lock.read().await;
432                let _ = session.event_tx().send(AgentEvent::PersistenceFailed {
433                    session_id: session_id.to_string(),
434                    operation: operation.to_string(),
435                    error: e.to_string(),
436                });
437            }
438        }
439    }
440
441    /// Spawn persistence as a background task (non-blocking).
442    ///
443    /// All `SessionManager` fields are `Arc`-wrapped, so `clone()` is a
444    /// cheap pointer-copy. This keeps file I/O off the response path.
445    fn persist_in_background(&self, session_id: &str, operation: &str) {
446        let mgr = self.clone();
447        let sid = session_id.to_string();
448        let op = operation.to_string();
449        tokio::spawn(async move {
450            mgr.persist_or_warn(&sid, &op).await;
451        });
452    }
453
454    /// Create a new session
455    pub async fn create_session(&self, id: String, config: SessionConfig) -> Result<String> {
456        tracing::info!(name: "a3s.session.create", session_id = %id, "Creating session");
457
458        // Record the storage type for this session
459        {
460            let mut storage_types = self.session_storage_types.write().await;
461            storage_types.insert(id.clone(), config.storage_type.clone());
462        }
463
464        // Get tool definitions from the executor
465        let tools = self.tool_executor.definitions();
466        let mut session = Session::new(id.clone(), config, tools).await?;
467
468        // Start the command queue
469        session.start_queue().await?;
470
471        // Set max context length if provided
472        if session.config.max_context_length > 0 {
473            session.context_usage.max_tokens = session.config.max_context_length as usize;
474        }
475
476        {
477            let mut sessions = self.sessions.write().await;
478            sessions.insert(id.clone(), Arc::new(RwLock::new(session)));
479        }
480
481        // Persist to store
482        self.persist_in_background(&id, "create");
483
484        tracing::info!("Created session: {}", id);
485        Ok(id)
486    }
487
488    /// Destroy a session
489    pub async fn destroy_session(&self, id: &str) -> Result<()> {
490        tracing::info!(name: "a3s.session.destroy", session_id = %id, "Destroying session");
491
492        // Get the storage type before removing the session
493        let storage_type = {
494            let storage_types = self.session_storage_types.read().await;
495            storage_types.get(id).cloned()
496        };
497
498        {
499            let mut sessions = self.sessions.write().await;
500            sessions.remove(id);
501        }
502
503        // Remove LLM config
504        {
505            let mut configs = self.llm_configs.write().await;
506            configs.remove(id);
507        }
508
509        {
510            let mut registries = self.session_skill_registries.write().await;
511            registries.remove(id);
512        }
513
514        // Remove storage type tracking
515        {
516            let mut storage_types = self.session_storage_types.write().await;
517            storage_types.remove(id);
518        }
519
520        // Delete from store if applicable
521        if let Some(storage_type) = storage_type {
522            if storage_type != crate::config::StorageBackend::Memory {
523                let stores = self.stores.read().await;
524                if let Some(store) = stores.get(&storage_type) {
525                    if let Err(e) = store.delete(id).await {
526                        tracing::warn!("Failed to delete session {} from store: {}", id, e);
527                    }
528                }
529            }
530        }
531
532        tracing::info!("Destroyed session: {}", id);
533        Ok(())
534    }
535
536    /// Get a session by ID
537    pub async fn get_session(&self, id: &str) -> Result<Arc<RwLock<Session>>> {
538        let sessions = self.sessions.read().await;
539        sessions
540            .get(id)
541            .cloned()
542            .context(format!("Session not found: {}", id))
543    }
544
545    /// Create a child session for a subagent
546    ///
547    /// Child sessions inherit the parent's LLM client but have their own
548    /// permission policy and configuration.
549    pub async fn create_child_session(
550        &self,
551        parent_id: &str,
552        child_id: String,
553        mut config: SessionConfig,
554    ) -> Result<String> {
555        // Verify parent exists and inherit HITL policy
556        let parent_lock = self.get_session(parent_id).await?;
557        let parent_llm_client = {
558            let parent = parent_lock.read().await;
559
560            // Inherit parent's confirmation policy if child doesn't have one
561            if config.confirmation_policy.is_none() {
562                let parent_policy = parent.confirmation_manager.policy().await;
563                config.confirmation_policy = Some(parent_policy);
564            }
565
566            parent.llm_client.clone()
567        };
568
569        // Set parent_id in config
570        config.parent_id = Some(parent_id.to_string());
571
572        // Get tool definitions from the executor
573        let tools = self.tool_executor.definitions();
574        let mut session = Session::new(child_id.clone(), config, tools).await?;
575
576        // Inherit LLM client from parent if not set
577        if session.llm_client.is_none() {
578            let default_llm = self.llm_client.read().await.clone();
579            session.llm_client = parent_llm_client.or(default_llm);
580        }
581
582        // Start the command queue
583        session.start_queue().await?;
584
585        // Set max context length if provided
586        if session.config.max_context_length > 0 {
587            session.context_usage.max_tokens = session.config.max_context_length as usize;
588        }
589
590        {
591            let mut sessions = self.sessions.write().await;
592            sessions.insert(child_id.clone(), Arc::new(RwLock::new(session)));
593        }
594
595        // Persist to store
596        self.persist_in_background(&child_id, "create_child");
597
598        tracing::info!(
599            "Created child session: {} (parent: {})",
600            child_id,
601            parent_id
602        );
603        Ok(child_id)
604    }
605
606    /// Get all child sessions for a parent session
607    pub async fn get_child_sessions(&self, parent_id: &str) -> Vec<String> {
608        let sessions = self.sessions.read().await;
609        let mut children = Vec::new();
610
611        for (id, session_lock) in sessions.iter() {
612            let session = session_lock.read().await;
613            if session.parent_id.as_deref() == Some(parent_id) {
614                children.push(id.clone());
615            }
616        }
617
618        children
619    }
620
621    /// Check if a session is a child session
622    pub async fn is_child_session(&self, session_id: &str) -> Result<bool> {
623        let session_lock = self.get_session(session_id).await?;
624        let session = session_lock.read().await;
625        Ok(session.is_child_session())
626    }
627
628    /// Generate response for a prompt
629    pub async fn generate(&self, session_id: &str, prompt: &str) -> Result<AgentResult> {
630        let session_lock = self.get_session(session_id).await?;
631
632        // Check if session is paused
633        {
634            let session = session_lock.read().await;
635            if session.state == SessionState::Paused {
636                anyhow::bail!(
637                    "Session {} is paused. Call Resume before generating.",
638                    session_id
639                );
640            }
641        }
642
643        // Get session state and LLM client
644        let (
645            history,
646            system,
647            tools,
648            session_llm_client,
649            permission_checker,
650            confirmation_manager,
651            context_providers,
652            session_workspace,
653            tool_metrics,
654            hook_engine,
655            planning_enabled,
656            goal_tracking,
657        ) = {
658            let session = session_lock.read().await;
659            (
660                session.messages.clone(),
661                session.system().map(String::from),
662                session.tools.clone(),
663                session.llm_client.clone(),
664                session.permission_checker.clone(),
665                session.confirmation_manager.clone(),
666                session.context_providers.clone(),
667                session.config.workspace.clone(),
668                session.tool_metrics.clone(),
669                session.config.hook_engine.clone(),
670                session.config.planning_enabled,
671                session.config.goal_tracking,
672            )
673        };
674
675        // Use session's LLM client if configured, otherwise use default
676        let llm_client = if let Some(client) = session_llm_client {
677            client
678        } else if let Some(client) = self.llm_client.read().await.clone() {
679            client
680        } else {
681            anyhow::bail!(
682                "LLM client not configured for session {}. Please call Configure RPC with model configuration first.",
683                session_id
684            );
685        };
686
687        // Construct per-session ToolContext from session workspace, falling back to server default
688        let tool_context = if session_workspace.is_empty() {
689            crate::tools::ToolContext::new(self.tool_executor.workspace().clone())
690                .with_session_id(session_id)
691        } else {
692            crate::tools::ToolContext::new(std::path::PathBuf::from(&session_workspace))
693                .with_session_id(session_id)
694        };
695
696        // Inject skill registry into system prompt and agent config
697        let skill_registry = match self.session_skill_registry(session_id).await {
698            Some(registry) => Some(registry),
699            None => self.skill_registry.read().await.clone(),
700        };
701        let system = if let Some(ref registry) = skill_registry {
702            let skill_prompt = registry.to_system_prompt();
703            if skill_prompt.is_empty() {
704                system
705            } else {
706                match system {
707                    Some(existing) => Some(format!("{}\n\n{}", existing, skill_prompt)),
708                    None => Some(skill_prompt),
709                }
710            }
711        } else {
712            system
713        };
714
715        // Inject matched skill content on-demand before skill_registry is moved into AgentConfig
716        let effective_prompt = if let Some(ref registry) = skill_registry {
717            let skill_content = registry.match_skills(prompt);
718            if skill_content.is_empty() {
719                prompt.to_string()
720            } else {
721                format!("{}\n\n---\n\n{}", skill_content, prompt)
722            }
723        } else {
724            prompt.to_string()
725        };
726
727        // Build AgentMemory from shared store (if configured)
728        let memory = self
729            .memory_store
730            .read()
731            .await
732            .as_ref()
733            .map(|store| Arc::new(AgentMemory::new(store.clone())));
734
735        // Create agent loop with permission policy, confirmation manager, and context providers
736        let config = AgentConfig {
737            prompt_slots: match system {
738                Some(s) => SystemPromptSlots::from_legacy(s),
739                None => SystemPromptSlots::default(),
740            },
741            tools,
742            max_tool_rounds: 50,
743            permission_checker: Some(permission_checker),
744            confirmation_manager: Some(confirmation_manager),
745            context_providers,
746            planning_enabled,
747            goal_tracking,
748            hook_engine,
749            skill_registry,
750            memory,
751            ..AgentConfig::default()
752        };
753
754        let agent = AgentLoop::new(llm_client, self.tool_executor.clone(), tool_context, config)
755            .with_tool_metrics(tool_metrics);
756
757        // Execute with session context
758        let result = agent
759            .execute_with_session(&history, &effective_prompt, Some(session_id), None, None)
760            .await?;
761
762        // Update session
763        {
764            let mut session = session_lock.write().await;
765            session.messages = result.messages.clone();
766            session.update_usage(&result.usage);
767        }
768
769        // Persist to store
770        self.persist_in_background(session_id, "generate");
771
772        // Auto-compact if context usage exceeds threshold
773        if let Err(e) = self.maybe_auto_compact(session_id).await {
774            tracing::warn!("Auto-compact failed for session {}: {}", session_id, e);
775        }
776
777        Ok(result)
778    }
779
780    /// Generate response with streaming events
781    pub async fn generate_streaming(
782        &self,
783        session_id: &str,
784        prompt: &str,
785    ) -> Result<(
786        mpsc::Receiver<AgentEvent>,
787        tokio::task::JoinHandle<Result<AgentResult>>,
788        tokio_util::sync::CancellationToken,
789    )> {
790        let session_lock = self.get_session(session_id).await?;
791
792        // Check if session is paused
793        {
794            let session = session_lock.read().await;
795            if session.state == SessionState::Paused {
796                anyhow::bail!(
797                    "Session {} is paused. Call Resume before generating.",
798                    session_id
799                );
800            }
801        }
802
803        // Get session state and LLM client
804        let (
805            history,
806            system,
807            tools,
808            session_llm_client,
809            permission_checker,
810            confirmation_manager,
811            context_providers,
812            session_workspace,
813            tool_metrics,
814            hook_engine,
815            planning_enabled,
816            goal_tracking,
817        ) = {
818            let session = session_lock.read().await;
819            (
820                session.messages.clone(),
821                session.system().map(String::from),
822                session.tools.clone(),
823                session.llm_client.clone(),
824                session.permission_checker.clone(),
825                session.confirmation_manager.clone(),
826                session.context_providers.clone(),
827                session.config.workspace.clone(),
828                session.tool_metrics.clone(),
829                session.config.hook_engine.clone(),
830                session.config.planning_enabled,
831                session.config.goal_tracking,
832            )
833        };
834
835        // Use session's LLM client if configured, otherwise use default
836        let llm_client = if let Some(client) = session_llm_client {
837            client
838        } else if let Some(client) = self.llm_client.read().await.clone() {
839            client
840        } else {
841            anyhow::bail!(
842                "LLM client not configured for session {}. Please call Configure RPC with model configuration first.",
843                session_id
844            );
845        };
846
847        // Construct per-session ToolContext from session workspace, falling back to server default
848        let tool_context = if session_workspace.is_empty() {
849            crate::tools::ToolContext::new(self.tool_executor.workspace().clone())
850                .with_session_id(session_id)
851        } else {
852            crate::tools::ToolContext::new(std::path::PathBuf::from(&session_workspace))
853                .with_session_id(session_id)
854        };
855
856        // Inject skill registry into system prompt and agent config
857        let skill_registry = match self.session_skill_registry(session_id).await {
858            Some(registry) => Some(registry),
859            None => self.skill_registry.read().await.clone(),
860        };
861        let system = if let Some(ref registry) = skill_registry {
862            let skill_prompt = registry.to_system_prompt();
863            if skill_prompt.is_empty() {
864                system
865            } else {
866                match system {
867                    Some(existing) => Some(format!("{}\n\n{}", existing, skill_prompt)),
868                    None => Some(skill_prompt),
869                }
870            }
871        } else {
872            system
873        };
874
875        // Inject matched skill content on-demand before skill_registry is moved into AgentConfig
876        let effective_prompt = if let Some(ref registry) = skill_registry {
877            let skill_content = registry.match_skills(prompt);
878            if skill_content.is_empty() {
879                prompt.to_string()
880            } else {
881                format!("{}\n\n---\n\n{}", skill_content, prompt)
882            }
883        } else {
884            prompt.to_string()
885        };
886
887        // Build AgentMemory from shared store (if configured)
888        let memory = self
889            .memory_store
890            .read()
891            .await
892            .as_ref()
893            .map(|store| Arc::new(AgentMemory::new(store.clone())));
894
895        // Create agent loop with permission policy, confirmation manager, and context providers
896        let config = AgentConfig {
897            prompt_slots: match system {
898                Some(s) => SystemPromptSlots::from_legacy(s),
899                None => SystemPromptSlots::default(),
900            },
901            tools,
902            max_tool_rounds: 50,
903            permission_checker: Some(permission_checker),
904            confirmation_manager: Some(confirmation_manager),
905            context_providers,
906            planning_enabled,
907            goal_tracking,
908            hook_engine,
909            skill_registry,
910            memory,
911            ..AgentConfig::default()
912        };
913
914        let agent = AgentLoop::new(llm_client, self.tool_executor.clone(), tool_context, config)
915            .with_tool_metrics(tool_metrics);
916
917        // Execute with streaming
918        let (rx, handle, cancel_token) =
919            agent.execute_streaming(&history, &effective_prompt).await?;
920
921        // Store the cancellation token for cancellation support
922        let cancel_token_clone = cancel_token.clone();
923        {
924            let mut ops = self.ongoing_operations.write().await;
925            ops.insert(session_id.to_string(), cancel_token);
926        }
927
928        // Spawn task to update session after completion
929        let session_lock_clone = session_lock.clone();
930        let original_handle = handle;
931        let stores = self.stores.clone();
932        let session_storage_types = self.session_storage_types.clone();
933        let llm_configs = self.llm_configs.clone();
934        let session_id_owned = session_id.to_string();
935        let ongoing_operations = self.ongoing_operations.clone();
936        let session_manager = self.clone();
937
938        let wrapped_handle = tokio::spawn(async move {
939            let result = original_handle.await??;
940
941            // Remove from ongoing operations
942            {
943                let mut ops = ongoing_operations.write().await;
944                ops.remove(&session_id_owned);
945            }
946
947            // Update session
948            {
949                let mut session = session_lock_clone.write().await;
950                session.messages = result.messages.clone();
951                session.update_usage(&result.usage);
952            }
953
954            // Persist to store
955            let storage_type = {
956                let storage_types = session_storage_types.read().await;
957                storage_types.get(&session_id_owned).cloned()
958            };
959
960            if let Some(storage_type) = storage_type {
961                if storage_type != crate::config::StorageBackend::Memory {
962                    let stores_guard = stores.read().await;
963                    if let Some(store) = stores_guard.get(&storage_type) {
964                        let session = session_lock_clone.read().await;
965                        let llm_config = {
966                            let configs = llm_configs.read().await;
967                            configs.get(&session_id_owned).cloned()
968                        };
969                        let data = session.to_session_data(llm_config);
970                        if let Err(e) = store.save(&data).await {
971                            tracing::warn!(
972                                "Failed to persist session {} after streaming: {}",
973                                session_id_owned,
974                                e
975                            );
976                        }
977                    }
978                }
979            }
980
981            // Auto-compact if context usage exceeds threshold
982            if let Err(e) = session_manager.maybe_auto_compact(&session_id_owned).await {
983                tracing::warn!(
984                    "Auto-compact failed for session {}: {}",
985                    session_id_owned,
986                    e
987                );
988            }
989
990            Ok(result)
991        });
992
993        Ok((rx, wrapped_handle, cancel_token_clone))
994    }
995
996    /// Get context usage for a session
997    pub async fn context_usage(&self, session_id: &str) -> Result<ContextUsage> {
998        let session_lock = self.get_session(session_id).await?;
999        let session = session_lock.read().await;
1000        Ok(session.context_usage.clone())
1001    }
1002
1003    /// Get conversation history for a session
1004    pub async fn history(&self, session_id: &str) -> Result<Vec<Message>> {
1005        let session_lock = self.get_session(session_id).await?;
1006        let session = session_lock.read().await;
1007        Ok(session.messages.clone())
1008    }
1009
1010    /// Clear session history
1011    pub async fn clear(&self, session_id: &str) -> Result<()> {
1012        {
1013            let session_lock = self.get_session(session_id).await?;
1014            let mut session = session_lock.write().await;
1015            session.clear();
1016        }
1017
1018        // Persist to store
1019        self.persist_in_background(session_id, "clear");
1020
1021        Ok(())
1022    }
1023
1024    /// Compact session context
1025    pub async fn compact(&self, session_id: &str) -> Result<()> {
1026        tracing::info!(name: "a3s.session.compact", session_id = %session_id, "Compacting session context");
1027
1028        {
1029            let session_lock = self.get_session(session_id).await?;
1030            let mut session = session_lock.write().await;
1031
1032            // Get LLM client for compaction (if available)
1033            let llm_client = if let Some(client) = &session.llm_client {
1034                client.clone()
1035            } else if let Some(client) = self.llm_client.read().await.clone() {
1036                client
1037            } else {
1038                // If no LLM client available, just do simple truncation
1039                tracing::warn!("No LLM client configured for compaction, using simple truncation");
1040                let keep_messages = 20;
1041                if session.messages.len() > keep_messages {
1042                    let len = session.messages.len();
1043                    session.messages = session.messages.split_off(len - keep_messages);
1044                }
1045                // Persist after truncation
1046                drop(session);
1047                self.persist_in_background(session_id, "compact");
1048                return Ok(());
1049            };
1050
1051            session.compact(&llm_client).await?;
1052        }
1053
1054        // Persist to store
1055        self.persist_in_background(session_id, "compact");
1056
1057        Ok(())
1058    }
1059
1060    /// Check if auto-compaction should be triggered and perform it if needed.
1061    ///
1062    /// Called after `generate()` / `generate_streaming()` updates session usage.
1063    /// Triggers compaction when:
1064    /// - `auto_compact` is enabled in session config
1065    /// - `context_usage.percent` exceeds `auto_compact_threshold`
1066    pub async fn maybe_auto_compact(&self, session_id: &str) -> Result<bool> {
1067        let (should_compact, percent_before, messages_before) = {
1068            let session_lock = self.get_session(session_id).await?;
1069            let session = session_lock.read().await;
1070
1071            if !session.config.auto_compact {
1072                return Ok(false);
1073            }
1074
1075            let threshold = session.config.auto_compact_threshold;
1076            let percent = session.context_usage.percent;
1077            let msg_count = session.messages.len();
1078
1079            tracing::debug!(
1080                "Auto-compact check for session {}: percent={:.2}%, threshold={:.2}%, messages={}",
1081                session_id,
1082                percent * 100.0,
1083                threshold * 100.0,
1084                msg_count,
1085            );
1086
1087            (percent >= threshold, percent, msg_count)
1088        };
1089
1090        if !should_compact {
1091            return Ok(false);
1092        }
1093
1094        tracing::info!(
1095            name: "a3s.session.auto_compact",
1096            session_id = %session_id,
1097            percent_before = %format!("{:.1}%", percent_before * 100.0),
1098            messages_before = %messages_before,
1099            "Auto-compacting session due to high context usage"
1100        );
1101
1102        // Perform compaction (reuses existing compact logic)
1103        self.compact(session_id).await?;
1104
1105        // Get post-compaction message count
1106        let messages_after = {
1107            let session_lock = self.get_session(session_id).await?;
1108            let session = session_lock.read().await;
1109            session.messages.len()
1110        };
1111
1112        // Broadcast event to notify clients
1113        let event = AgentEvent::ContextCompacted {
1114            session_id: session_id.to_string(),
1115            before_messages: messages_before,
1116            after_messages: messages_after,
1117            percent_before,
1118        };
1119
1120        // Try to send via session's event broadcaster
1121        if let Ok(session_lock) = self.get_session(session_id).await {
1122            let session = session_lock.read().await;
1123            let _ = session.event_tx.send(event);
1124        }
1125
1126        tracing::info!(
1127            name: "a3s.session.auto_compact.done",
1128            session_id = %session_id,
1129            messages_before = %messages_before,
1130            messages_after = %messages_after,
1131            "Auto-compaction complete"
1132        );
1133
1134        Ok(true)
1135    }
1136
1137    /// Resolve the LLM client for a session (session-level -> default fallback)
1138    ///
1139    /// Returns `None` if no LLM client is configured at either level.
1140    pub async fn get_llm_for_session(
1141        &self,
1142        session_id: &str,
1143    ) -> Result<Option<Arc<dyn LlmClient>>> {
1144        let session_lock = self.get_session(session_id).await?;
1145        let session = session_lock.read().await;
1146
1147        if let Some(client) = &session.llm_client {
1148            return Ok(Some(client.clone()));
1149        }
1150
1151        Ok(self.llm_client.read().await.clone())
1152    }
1153
1154    /// Configure session
1155    pub async fn configure(
1156        &self,
1157        session_id: &str,
1158        thinking: Option<bool>,
1159        budget: Option<usize>,
1160        model_config: Option<LlmConfig>,
1161    ) -> Result<()> {
1162        {
1163            let session_lock = self.get_session(session_id).await?;
1164            let mut session = session_lock.write().await;
1165
1166            if let Some(t) = thinking {
1167                session.thinking_enabled = t;
1168            }
1169            if let Some(b) = budget {
1170                session.thinking_budget = Some(b);
1171            }
1172            if let Some(ref config) = model_config {
1173                tracing::info!(
1174                    "Configuring session {} with LLM: provider={}, model={}",
1175                    session_id,
1176                    config.provider,
1177                    config.model
1178                );
1179                session.model_name = Some(config.model.clone());
1180                session.llm_client = Some(llm::create_client_with_config(config.clone()));
1181            }
1182        }
1183
1184        // Store LLM config for persistence (without API key)
1185        if let Some(config) = model_config {
1186            let llm_config_data = LlmConfigData {
1187                provider: config.provider,
1188                model: config.model,
1189                api_key: None, // Don't persist API key
1190                base_url: config.base_url,
1191            };
1192            let mut configs = self.llm_configs.write().await;
1193            configs.insert(session_id.to_string(), llm_config_data);
1194        }
1195
1196        // Persist to store
1197        self.persist_in_background(session_id, "configure");
1198
1199        Ok(())
1200    }
1201
1202    /// Update a session's system prompt in place.
1203    pub async fn set_system_prompt(
1204        &self,
1205        session_id: &str,
1206        system_prompt: Option<String>,
1207    ) -> Result<()> {
1208        {
1209            let session_lock = self.get_session(session_id).await?;
1210            let mut session = session_lock.write().await;
1211            session.config.system_prompt = system_prompt;
1212        }
1213
1214        self.persist_in_background(session_id, "set_system_prompt");
1215        Ok(())
1216    }
1217
1218    /// Get session count
1219    pub async fn session_count(&self) -> usize {
1220        let sessions = self.sessions.read().await;
1221        sessions.len()
1222    }
1223
1224    /// Check health of all registered stores
1225    pub async fn store_health(&self) -> Vec<(String, Result<()>)> {
1226        let stores = self.stores.read().await;
1227        let mut results = Vec::new();
1228        for (_, store) in stores.iter() {
1229            let name = store.backend_name().to_string();
1230            let result = store.health_check().await;
1231            results.push((name, result));
1232        }
1233        results
1234    }
1235
1236    /// List all loaded tools (built-in tools)
1237    pub fn list_tools(&self) -> Vec<crate::llm::ToolDefinition> {
1238        self.tool_executor.definitions()
1239    }
1240
1241    /// Pause a session
1242    pub async fn pause_session(&self, session_id: &str) -> Result<bool> {
1243        let paused = {
1244            let session_lock = self.get_session(session_id).await?;
1245            let mut session = session_lock.write().await;
1246            session.pause()
1247        };
1248
1249        if paused {
1250            self.persist_in_background(session_id, "pause");
1251        }
1252
1253        Ok(paused)
1254    }
1255
1256    /// Resume a session
1257    pub async fn resume_session(&self, session_id: &str) -> Result<bool> {
1258        let resumed = {
1259            let session_lock = self.get_session(session_id).await?;
1260            let mut session = session_lock.write().await;
1261            session.resume()
1262        };
1263
1264        if resumed {
1265            self.persist_in_background(session_id, "resume");
1266        }
1267
1268        Ok(resumed)
1269    }
1270
1271    /// Cancel an ongoing operation for a session
1272    ///
1273    /// Returns true if an operation was cancelled, false if no operation was running.
1274    pub async fn cancel_operation(&self, session_id: &str) -> Result<bool> {
1275        // First, cancel any pending HITL confirmations
1276        let session_lock = self.get_session(session_id).await?;
1277        let cancelled_confirmations = {
1278            let session = session_lock.read().await;
1279            session.confirmation_manager.cancel_all().await
1280        };
1281
1282        if cancelled_confirmations > 0 {
1283            tracing::info!(
1284                "Cancelled {} pending confirmations for session {}",
1285                cancelled_confirmations,
1286                session_id
1287            );
1288        }
1289
1290        // Then, cancel the ongoing operation if any
1291        let cancel_token = {
1292            let mut ops = self.ongoing_operations.write().await;
1293            ops.remove(session_id)
1294        };
1295
1296        if let Some(token) = cancel_token {
1297            token.cancel();
1298            tracing::info!("Cancelled ongoing operation for session {}", session_id);
1299            Ok(true)
1300        } else if cancelled_confirmations > 0 {
1301            // We cancelled confirmations but no main operation
1302            Ok(true)
1303        } else {
1304            tracing::debug!("No ongoing operation to cancel for session {}", session_id);
1305            Ok(false)
1306        }
1307    }
1308
1309    /// Get all sessions (returns session locks for iteration)
1310    pub async fn get_all_sessions(&self) -> Vec<Arc<RwLock<Session>>> {
1311        let sessions = self.sessions.read().await;
1312        sessions.values().cloned().collect()
1313    }
1314
1315    /// Get tool executor reference
1316    pub fn tool_executor(&self) -> &Arc<ToolExecutor> {
1317        &self.tool_executor
1318    }
1319
1320    /// Confirm a tool execution (HITL)
1321    pub async fn confirm_tool(
1322        &self,
1323        session_id: &str,
1324        tool_id: &str,
1325        approved: bool,
1326        reason: Option<String>,
1327    ) -> Result<bool> {
1328        let session_lock = self.get_session(session_id).await?;
1329        let session = session_lock.read().await;
1330        session
1331            .confirmation_manager
1332            .confirm(tool_id, approved, reason)
1333            .await
1334            .map_err(|e| anyhow::anyhow!(e))
1335    }
1336
1337    /// Set confirmation policy for a session (HITL)
1338    pub async fn set_confirmation_policy(
1339        &self,
1340        session_id: &str,
1341        policy: ConfirmationPolicy,
1342    ) -> Result<ConfirmationPolicy> {
1343        {
1344            let session_lock = self.get_session(session_id).await?;
1345            let session = session_lock.read().await;
1346            session.set_confirmation_policy(policy.clone()).await;
1347        }
1348
1349        // Update config for persistence
1350        {
1351            let session_lock = self.get_session(session_id).await?;
1352            let mut session = session_lock.write().await;
1353            session.config.confirmation_policy = Some(policy.clone());
1354        }
1355
1356        // Persist to store
1357        self.persist_in_background(session_id, "set_confirmation_policy");
1358
1359        Ok(policy)
1360    }
1361
1362    /// Get confirmation policy for a session (HITL)
1363    pub async fn get_confirmation_policy(&self, session_id: &str) -> Result<ConfirmationPolicy> {
1364        let session_lock = self.get_session(session_id).await?;
1365        let session = session_lock.read().await;
1366        Ok(session.confirmation_policy().await)
1367    }
1368}