Skip to main content

a3s_code_core/session/
manager.rs

1//! SessionManager — manages multiple concurrent sessions
2
3use super::{ContextUsage, Session, SessionConfig, SessionState};
4use crate::agent::{AgentConfig, AgentEvent, AgentLoop, AgentResult};
5use crate::hitl::ConfirmationPolicy;
6use crate::llm::{self, LlmClient, LlmConfig, Message};
7use crate::memory::AgentMemory;
8use crate::prompts::SystemPromptSlots;
9use crate::skills::SkillRegistry;
10use crate::store::{FileSessionStore, LlmConfigData, SessionData, SessionStore};
11use crate::mcp::McpManager;
12use crate::tools::ToolExecutor;
13use a3s_memory::MemoryStore;
14use anyhow::{Context, Result};
15use std::collections::HashMap;
16use std::sync::Arc;
17use tokio::sync::{mpsc, RwLock};
18
19/// Session manager handles multiple concurrent sessions
20#[derive(Clone)]
21pub struct SessionManager {
22    pub(crate) sessions: Arc<RwLock<HashMap<String, Arc<RwLock<Session>>>>>,
23    /// Default LLM client used when a session has no per-session client configured.
24    /// Wrapped in RwLock so it can be hot-swapped at runtime without restart.
25    pub(crate) llm_client: Arc<RwLock<Option<Arc<dyn LlmClient>>>>,
26    pub(crate) tool_executor: Arc<ToolExecutor>,
27    /// Session stores by storage type
28    pub(crate) stores: Arc<RwLock<HashMap<crate::config::StorageBackend, Arc<dyn SessionStore>>>>,
29    /// Track which storage type each session uses
30    pub(crate) session_storage_types: Arc<RwLock<HashMap<String, crate::config::StorageBackend>>>,
31    /// LLM configurations for sessions (stored separately for persistence)
32    pub(crate) llm_configs: Arc<RwLock<HashMap<String, LlmConfigData>>>,
33    /// Ongoing operations (session_id -> JoinHandle)
34    pub(crate) ongoing_operations: Arc<RwLock<HashMap<String, tokio::task::AbortHandle>>>,
35    /// Skill registry for runtime skill management
36    pub(crate) skill_registry: Arc<RwLock<Option<Arc<SkillRegistry>>>>,
37    /// Shared memory store for agent long-term memory.
38    /// When set, each `generate`/`generate_streaming` call wraps this in
39    /// `AgentMemory` and injects it into `AgentConfig.memory`.
40    pub(crate) memory_store: Arc<RwLock<Option<Arc<dyn MemoryStore>>>>,
41    /// Shared MCP manager. When set, MCP tools are registered into the
42    /// `ToolExecutor` at startup so all sessions can use them.
43    pub(crate) mcp_manager: Arc<RwLock<Option<Arc<McpManager>>>>,
44}
45
46impl SessionManager {
47    /// Create a new session manager without persistence
48    pub fn new(llm_client: Option<Arc<dyn LlmClient>>, tool_executor: Arc<ToolExecutor>) -> Self {
49        Self {
50            sessions: Arc::new(RwLock::new(HashMap::new())),
51            llm_client: Arc::new(RwLock::new(llm_client)),
52            tool_executor,
53            stores: Arc::new(RwLock::new(HashMap::new())),
54            session_storage_types: Arc::new(RwLock::new(HashMap::new())),
55            llm_configs: Arc::new(RwLock::new(HashMap::new())),
56            ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
57            skill_registry: Arc::new(RwLock::new(None)),
58            memory_store: Arc::new(RwLock::new(None)),
59            mcp_manager: Arc::new(RwLock::new(None)),
60        }
61    }
62
63    /// Create a session manager with file-based persistence
64    ///
65    /// Sessions will be automatically saved to disk and restored on startup.
66    pub async fn with_persistence<P: AsRef<std::path::Path>>(
67        llm_client: Option<Arc<dyn LlmClient>>,
68        tool_executor: Arc<ToolExecutor>,
69        sessions_dir: P,
70    ) -> Result<Self> {
71        let store = FileSessionStore::new(sessions_dir).await?;
72        let mut stores = HashMap::new();
73        stores.insert(
74            crate::config::StorageBackend::File,
75            Arc::new(store) as Arc<dyn SessionStore>,
76        );
77
78        let manager = Self {
79            sessions: Arc::new(RwLock::new(HashMap::new())),
80            llm_client: Arc::new(RwLock::new(llm_client)),
81            tool_executor,
82            stores: Arc::new(RwLock::new(stores)),
83            session_storage_types: Arc::new(RwLock::new(HashMap::new())),
84            llm_configs: Arc::new(RwLock::new(HashMap::new())),
85            ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
86            skill_registry: Arc::new(RwLock::new(None)),
87            memory_store: Arc::new(RwLock::new(None)),
88            mcp_manager: Arc::new(RwLock::new(None)),
89        };
90
91        Ok(manager)
92    }
93
94    /// Create a session manager with a custom store
95    ///
96    /// The `backend` parameter determines which `StorageBackend` key the store is registered under.
97    /// Sessions created with a matching `storage_type` will use this store.
98    pub fn with_store(
99        llm_client: Option<Arc<dyn LlmClient>>,
100        tool_executor: Arc<ToolExecutor>,
101        store: Arc<dyn SessionStore>,
102        backend: crate::config::StorageBackend,
103    ) -> Self {
104        let mut stores = HashMap::new();
105        stores.insert(backend, store);
106
107        Self {
108            sessions: Arc::new(RwLock::new(HashMap::new())),
109            llm_client: Arc::new(RwLock::new(llm_client)),
110            tool_executor,
111            stores: Arc::new(RwLock::new(stores)),
112            session_storage_types: Arc::new(RwLock::new(HashMap::new())),
113            llm_configs: Arc::new(RwLock::new(HashMap::new())),
114            ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
115            skill_registry: Arc::new(RwLock::new(None)),
116            memory_store: Arc::new(RwLock::new(None)),
117            mcp_manager: Arc::new(RwLock::new(None)),
118        }
119    }
120
121    /// Hot-swap the default LLM client used by sessions without a per-session client.
122    ///
123    /// Called by SafeClaw when `PUT /api/agent/config` updates the model config.
124    /// All subsequent `generate` / `generate_streaming` calls will use the new client.
125    pub async fn set_default_llm(&self, client: Option<Arc<dyn LlmClient>>) {
126        *self.llm_client.write().await = client;
127    }
128
129    /// Set the skill registry for runtime skill management.
130    ///
131    /// Skills registered here will be injected into the system prompt
132    /// for all subsequent `generate` / `generate_streaming` calls.
133    /// Also registers the `manage_skill` tool on the `ToolExecutor` so
134    /// the Agent can create, list, and remove skills at runtime.
135    pub async fn set_skill_registry(
136        &self,
137        registry: Arc<SkillRegistry>,
138        skills_dir: std::path::PathBuf,
139    ) {
140        // Register the manage_skill tool for self-bootstrap
141        let manage_tool = crate::skills::ManageSkillTool::new(registry.clone(), skills_dir);
142        self.tool_executor
143            .register_dynamic_tool(Arc::new(manage_tool));
144
145        *self.skill_registry.write().await = Some(registry);
146    }
147
148    /// Get the current skill registry, if any.
149    pub async fn skill_registry(&self) -> Option<Arc<SkillRegistry>> {
150        self.skill_registry.read().await.clone()
151    }
152
153    /// Set the shared memory store for agent long-term memory.
154    ///
155    /// When set, every `generate`/`generate_streaming` call wraps this store
156    /// in an `AgentMemory` and injects it into `AgentConfig.memory`, enabling
157    /// automatic `recall_similar` before prompts and `remember_success`/
158    /// `remember_failure` after tool execution.
159    pub async fn set_memory_store(&self, store: Arc<dyn MemoryStore>) {
160        *self.memory_store.write().await = Some(store);
161    }
162
163    /// Get the current memory store, if any.
164    pub async fn memory_store(&self) -> Option<Arc<dyn MemoryStore>> {
165        self.memory_store.read().await.clone()
166    }
167
168    /// Set the shared MCP manager.
169    ///
170    /// When set, all tools from connected MCP servers are registered into the
171    /// `ToolExecutor` so they are available in every session.
172    pub async fn set_mcp_manager(&self, manager: Arc<McpManager>) {
173        let all_tools = manager.get_all_tools().await;
174        let mut by_server: HashMap<String, Vec<crate::mcp::McpTool>> = HashMap::new();
175        for (server, tool) in all_tools {
176            by_server.entry(server).or_default().push(tool);
177        }
178        for (server_name, tools) in by_server {
179            for tool in crate::mcp::tools::create_mcp_tools(&server_name, tools, Arc::clone(&manager)) {
180                self.tool_executor.register_dynamic_tool(tool);
181            }
182        }
183        *self.mcp_manager.write().await = Some(manager);
184    }
185
186    /// Dynamically add and connect an MCP server to the shared manager.
187    ///
188    /// Registers the server config, connects it, and registers its tools into
189    /// the `ToolExecutor` so all subsequent sessions can use them.
190    pub async fn add_mcp_server(&self, config: crate::mcp::McpServerConfig) -> Result<()> {
191        let manager = {
192            let guard = self.mcp_manager.read().await;
193            match guard.clone() {
194                Some(m) => m,
195                None => {
196                    drop(guard);
197                    let m = Arc::new(McpManager::new());
198                    *self.mcp_manager.write().await = Some(Arc::clone(&m));
199                    m
200                }
201            }
202        };
203        let name = config.name.clone();
204        manager.register_server(config).await;
205        manager.connect(&name).await?;
206        let tools = manager.get_server_tools(&name).await;
207        for tool in crate::mcp::tools::create_mcp_tools(&name, tools, Arc::clone(&manager)) {
208            self.tool_executor.register_dynamic_tool(tool);
209        }
210        Ok(())
211    }
212
213    /// Disconnect and remove an MCP server from the shared manager.
214    ///
215    /// Also unregisters its tools from the `ToolExecutor`.
216    pub async fn remove_mcp_server(&self, name: &str) -> Result<()> {
217        let guard = self.mcp_manager.read().await;
218        if let Some(ref manager) = *guard {
219            manager.disconnect(name).await?;
220        }
221        self.tool_executor.unregister_tools_by_prefix(&format!("mcp__{name}__"));
222        Ok(())
223    }
224
225    /// Get status of all connected MCP servers.
226    pub async fn mcp_status(&self) -> std::collections::HashMap<String, crate::mcp::McpServerStatus> {
227        let guard = self.mcp_manager.read().await;
228        match guard.as_ref() {
229            Some(m) => {
230                let m = Arc::clone(m);
231                drop(guard);
232                m.get_status().await
233            }
234            None => std::collections::HashMap::new(),
235        }
236    }
237
238    /// Restore a single session by ID from the store
239    ///
240    /// Searches all registered stores for the given session ID and restores it
241    /// into the in-memory session map. Returns an error if not found.
242    pub async fn restore_session_by_id(&self, session_id: &str) -> Result<()> {
243        // Check if already loaded
244        {
245            let sessions = self.sessions.read().await;
246            if sessions.contains_key(session_id) {
247                return Ok(());
248            }
249        }
250
251        let stores = self.stores.read().await;
252        for (backend, store) in stores.iter() {
253            match store.load(session_id).await {
254                Ok(Some(data)) => {
255                    {
256                        let mut storage_types = self.session_storage_types.write().await;
257                        storage_types.insert(data.id.clone(), backend.clone());
258                    }
259                    self.restore_session(data).await?;
260                    return Ok(());
261                }
262                Ok(None) => continue,
263                Err(e) => {
264                    tracing::warn!(
265                        "Failed to load session {} from {:?}: {}",
266                        session_id,
267                        backend,
268                        e
269                    );
270                    continue;
271                }
272            }
273        }
274
275        Err(anyhow::anyhow!(
276            "Session {} not found in any store",
277            session_id
278        ))
279    }
280
281    /// Load all sessions from all registered stores
282    pub async fn load_all_sessions(&mut self) -> Result<usize> {
283        let stores = self.stores.read().await;
284        let mut loaded = 0;
285
286        for (backend, store) in stores.iter() {
287            let session_ids = match store.list().await {
288                Ok(ids) => ids,
289                Err(e) => {
290                    tracing::warn!("Failed to list sessions from {:?} store: {}", backend, e);
291                    continue;
292                }
293            };
294
295            for id in session_ids {
296                match store.load(&id).await {
297                    Ok(Some(data)) => {
298                        // Record the storage type for this session
299                        {
300                            let mut storage_types = self.session_storage_types.write().await;
301                            storage_types.insert(data.id.clone(), backend.clone());
302                        }
303
304                        if let Err(e) = self.restore_session(data).await {
305                            tracing::warn!("Failed to restore session {}: {}", id, e);
306                        } else {
307                            loaded += 1;
308                        }
309                    }
310                    Ok(None) => {
311                        tracing::warn!("Session {} not found in store", id);
312                    }
313                    Err(e) => {
314                        tracing::warn!("Failed to load session {}: {}", id, e);
315                    }
316                }
317            }
318        }
319
320        tracing::info!("Loaded {} sessions from store", loaded);
321        Ok(loaded)
322    }
323
324    /// Restore a session from SessionData
325    async fn restore_session(&self, data: SessionData) -> Result<()> {
326        let tools = self.tool_executor.definitions();
327        let mut session = Session::new(data.id.clone(), data.config.clone(), tools).await?;
328
329        // Restore serializable state
330        session.restore_from_data(&data);
331
332        // Restore LLM config if present (without API key - must be reconfigured)
333        if let Some(llm_config) = &data.llm_config {
334            let mut configs = self.llm_configs.write().await;
335            configs.insert(data.id.clone(), llm_config.clone());
336        }
337
338        let mut sessions = self.sessions.write().await;
339        sessions.insert(data.id.clone(), Arc::new(RwLock::new(session)));
340
341        tracing::info!("Restored session: {}", data.id);
342        Ok(())
343    }
344
345    /// Save a session to the store
346    async fn save_session(&self, session_id: &str) -> Result<()> {
347        // Get the storage type for this session
348        let storage_type = {
349            let storage_types = self.session_storage_types.read().await;
350            storage_types.get(session_id).cloned()
351        };
352
353        let Some(storage_type) = storage_type else {
354            // No storage type means memory-only session
355            return Ok(());
356        };
357
358        // Skip saving for memory storage
359        if storage_type == crate::config::StorageBackend::Memory {
360            return Ok(());
361        }
362
363        // Get the appropriate store
364        let stores = self.stores.read().await;
365        let Some(store) = stores.get(&storage_type) else {
366            tracing::warn!("No store available for storage type: {:?}", storage_type);
367            return Ok(());
368        };
369
370        let session_lock = self.get_session(session_id).await?;
371        let session = session_lock.read().await;
372
373        // Get LLM config if set
374        let llm_config = {
375            let configs = self.llm_configs.read().await;
376            configs.get(session_id).cloned()
377        };
378
379        let data = session.to_session_data(llm_config);
380        store.save(&data).await?;
381
382        tracing::debug!("Saved session: {}", session_id);
383        Ok(())
384    }
385
386    /// Persist a session to store, logging and emitting an event on failure.
387    /// This is a non-fatal wrapper around `save_session` — the operation
388    /// succeeds in memory even if persistence fails.
389    async fn persist_or_warn(&self, session_id: &str, operation: &str) {
390        if let Err(e) = self.save_session(session_id).await {
391            tracing::warn!(
392                "Failed to persist session {} after {}: {}",
393                session_id,
394                operation,
395                e
396            );
397            // Emit event so SDK clients can react
398            if let Ok(session_lock) = self.get_session(session_id).await {
399                let session = session_lock.read().await;
400                let _ = session.event_tx().send(AgentEvent::PersistenceFailed {
401                    session_id: session_id.to_string(),
402                    operation: operation.to_string(),
403                    error: e.to_string(),
404                });
405            }
406        }
407    }
408
409    /// Spawn persistence as a background task (non-blocking).
410    ///
411    /// All `SessionManager` fields are `Arc`-wrapped, so `clone()` is a
412    /// cheap pointer-copy. This keeps file I/O off the response path.
413    fn persist_in_background(&self, session_id: &str, operation: &str) {
414        let mgr = self.clone();
415        let sid = session_id.to_string();
416        let op = operation.to_string();
417        tokio::spawn(async move {
418            mgr.persist_or_warn(&sid, &op).await;
419        });
420    }
421
422    /// Create a new session
423    pub async fn create_session(&self, id: String, config: SessionConfig) -> Result<String> {
424        tracing::info!(name: "a3s.session.create", session_id = %id, "Creating session");
425
426        // Record the storage type for this session
427        {
428            let mut storage_types = self.session_storage_types.write().await;
429            storage_types.insert(id.clone(), config.storage_type.clone());
430        }
431
432        // Get tool definitions from the executor
433        let tools = self.tool_executor.definitions();
434        let mut session = Session::new(id.clone(), config, tools).await?;
435
436        // Start the command queue
437        session.start_queue().await?;
438
439        // Set max context length if provided
440        if session.config.max_context_length > 0 {
441            session.context_usage.max_tokens = session.config.max_context_length as usize;
442        }
443
444        {
445            let mut sessions = self.sessions.write().await;
446            sessions.insert(id.clone(), Arc::new(RwLock::new(session)));
447        }
448
449        // Persist to store
450        self.persist_in_background(&id, "create");
451
452        tracing::info!("Created session: {}", id);
453        Ok(id)
454    }
455
456    /// Destroy a session
457    pub async fn destroy_session(&self, id: &str) -> Result<()> {
458        tracing::info!(name: "a3s.session.destroy", session_id = %id, "Destroying session");
459
460        // Get the storage type before removing the session
461        let storage_type = {
462            let storage_types = self.session_storage_types.read().await;
463            storage_types.get(id).cloned()
464        };
465
466        {
467            let mut sessions = self.sessions.write().await;
468            sessions.remove(id);
469        }
470
471        // Remove LLM config
472        {
473            let mut configs = self.llm_configs.write().await;
474            configs.remove(id);
475        }
476
477        // Remove storage type tracking
478        {
479            let mut storage_types = self.session_storage_types.write().await;
480            storage_types.remove(id);
481        }
482
483        // Delete from store if applicable
484        if let Some(storage_type) = storage_type {
485            if storage_type != crate::config::StorageBackend::Memory {
486                let stores = self.stores.read().await;
487                if let Some(store) = stores.get(&storage_type) {
488                    if let Err(e) = store.delete(id).await {
489                        tracing::warn!("Failed to delete session {} from store: {}", id, e);
490                    }
491                }
492            }
493        }
494
495        tracing::info!("Destroyed session: {}", id);
496        Ok(())
497    }
498
499    /// Get a session by ID
500    pub async fn get_session(&self, id: &str) -> Result<Arc<RwLock<Session>>> {
501        let sessions = self.sessions.read().await;
502        sessions
503            .get(id)
504            .cloned()
505            .context(format!("Session not found: {}", id))
506    }
507
508    /// Create a child session for a subagent
509    ///
510    /// Child sessions inherit the parent's LLM client but have their own
511    /// permission policy and configuration.
512    pub async fn create_child_session(
513        &self,
514        parent_id: &str,
515        child_id: String,
516        mut config: SessionConfig,
517    ) -> Result<String> {
518        // Verify parent exists and inherit HITL policy
519        let parent_lock = self.get_session(parent_id).await?;
520        let parent_llm_client = {
521            let parent = parent_lock.read().await;
522
523            // Inherit parent's confirmation policy if child doesn't have one
524            if config.confirmation_policy.is_none() {
525                let parent_policy = parent.confirmation_manager.policy().await;
526                config.confirmation_policy = Some(parent_policy);
527            }
528
529            parent.llm_client.clone()
530        };
531
532        // Set parent_id in config
533        config.parent_id = Some(parent_id.to_string());
534
535        // Get tool definitions from the executor
536        let tools = self.tool_executor.definitions();
537        let mut session = Session::new(child_id.clone(), config, tools).await?;
538
539        // Inherit LLM client from parent if not set
540        if session.llm_client.is_none() {
541            let default_llm = self.llm_client.read().await.clone();
542            session.llm_client = parent_llm_client.or(default_llm);
543        }
544
545        // Start the command queue
546        session.start_queue().await?;
547
548        // Set max context length if provided
549        if session.config.max_context_length > 0 {
550            session.context_usage.max_tokens = session.config.max_context_length as usize;
551        }
552
553        {
554            let mut sessions = self.sessions.write().await;
555            sessions.insert(child_id.clone(), Arc::new(RwLock::new(session)));
556        }
557
558        // Persist to store
559        self.persist_in_background(&child_id, "create_child");
560
561        tracing::info!(
562            "Created child session: {} (parent: {})",
563            child_id,
564            parent_id
565        );
566        Ok(child_id)
567    }
568
569    /// Get all child sessions for a parent session
570    pub async fn get_child_sessions(&self, parent_id: &str) -> Vec<String> {
571        let sessions = self.sessions.read().await;
572        let mut children = Vec::new();
573
574        for (id, session_lock) in sessions.iter() {
575            let session = session_lock.read().await;
576            if session.parent_id.as_deref() == Some(parent_id) {
577                children.push(id.clone());
578            }
579        }
580
581        children
582    }
583
584    /// Check if a session is a child session
585    pub async fn is_child_session(&self, session_id: &str) -> Result<bool> {
586        let session_lock = self.get_session(session_id).await?;
587        let session = session_lock.read().await;
588        Ok(session.is_child_session())
589    }
590
591    /// Generate response for a prompt
592    pub async fn generate(&self, session_id: &str, prompt: &str) -> Result<AgentResult> {
593        let session_lock = self.get_session(session_id).await?;
594
595        // Check if session is paused
596        {
597            let session = session_lock.read().await;
598            if session.state == SessionState::Paused {
599                anyhow::bail!(
600                    "Session {} is paused. Call Resume before generating.",
601                    session_id
602                );
603            }
604        }
605
606        // Get session state and LLM client
607        let (
608            history,
609            system,
610            tools,
611            session_llm_client,
612            permission_checker,
613            confirmation_manager,
614            context_providers,
615            session_workspace,
616            tool_metrics,
617            hook_engine,
618            planning_enabled,
619            goal_tracking,
620        ) = {
621            let session = session_lock.read().await;
622            (
623                session.messages.clone(),
624                session.system().map(String::from),
625                session.tools.clone(),
626                session.llm_client.clone(),
627                session.permission_checker.clone(),
628                session.confirmation_manager.clone(),
629                session.context_providers.clone(),
630                session.config.workspace.clone(),
631                session.tool_metrics.clone(),
632                session.config.hook_engine.clone(),
633                session.config.planning_enabled,
634                session.config.goal_tracking,
635            )
636        };
637
638        // Use session's LLM client if configured, otherwise use default
639        let llm_client = if let Some(client) = session_llm_client {
640            client
641        } else if let Some(client) = self.llm_client.read().await.clone() {
642            client
643        } else {
644            anyhow::bail!(
645                "LLM client not configured for session {}. Please call Configure RPC with model configuration first.",
646                session_id
647            );
648        };
649
650        // Construct per-session ToolContext from session workspace, falling back to server default
651        let tool_context = if session_workspace.is_empty() {
652            crate::tools::ToolContext::new(self.tool_executor.workspace().clone())
653                .with_session_id(session_id)
654        } else {
655            crate::tools::ToolContext::new(std::path::PathBuf::from(&session_workspace))
656                .with_session_id(session_id)
657        };
658
659        // Inject skill registry into system prompt and agent config
660        let skill_registry = self.skill_registry.read().await.clone();
661        let system = if let Some(ref registry) = skill_registry {
662            let skill_prompt = registry.to_system_prompt();
663            if skill_prompt.is_empty() {
664                system
665            } else {
666                match system {
667                    Some(existing) => Some(format!("{}\n\n{}", existing, skill_prompt)),
668                    None => Some(skill_prompt),
669                }
670            }
671        } else {
672            system
673        };
674
675        // Inject matched skill content on-demand before skill_registry is moved into AgentConfig
676        let effective_prompt = if let Some(ref registry) = skill_registry {
677            let skill_content = registry.match_skills(prompt);
678            if skill_content.is_empty() {
679                prompt.to_string()
680            } else {
681                format!("{}\n\n---\n\n{}", skill_content, prompt)
682            }
683        } else {
684            prompt.to_string()
685        };
686
687        // Build AgentMemory from shared store (if configured)
688        let memory = self
689            .memory_store
690            .read()
691            .await
692            .as_ref()
693            .map(|store| Arc::new(AgentMemory::new(store.clone())));
694
695        // Create agent loop with permission policy, confirmation manager, and context providers
696        let config = AgentConfig {
697            prompt_slots: match system {
698                Some(s) => SystemPromptSlots::from_legacy(s),
699                None => SystemPromptSlots::default(),
700            },
701            tools,
702            max_tool_rounds: 50,
703            permission_checker: Some(permission_checker),
704            confirmation_manager: Some(confirmation_manager),
705            context_providers,
706            planning_enabled,
707            goal_tracking,
708            hook_engine,
709            skill_registry,
710            memory,
711            ..AgentConfig::default()
712        };
713
714        let agent = AgentLoop::new(llm_client, self.tool_executor.clone(), tool_context, config)
715            .with_tool_metrics(tool_metrics);
716
717        // Execute with session context
718        let result = agent
719            .execute_with_session(&history, &effective_prompt, Some(session_id), None)
720            .await?;
721
722        // Update session
723        {
724            let mut session = session_lock.write().await;
725            session.messages = result.messages.clone();
726            session.update_usage(&result.usage);
727        }
728
729        // Persist to store
730        self.persist_in_background(session_id, "generate");
731
732        // Auto-compact if context usage exceeds threshold
733        if let Err(e) = self.maybe_auto_compact(session_id).await {
734            tracing::warn!("Auto-compact failed for session {}: {}", session_id, e);
735        }
736
737        Ok(result)
738    }
739
740    /// Generate response with streaming events
741    pub async fn generate_streaming(
742        &self,
743        session_id: &str,
744        prompt: &str,
745    ) -> Result<(
746        mpsc::Receiver<AgentEvent>,
747        tokio::task::JoinHandle<Result<AgentResult>>,
748    )> {
749        let session_lock = self.get_session(session_id).await?;
750
751        // Check if session is paused
752        {
753            let session = session_lock.read().await;
754            if session.state == SessionState::Paused {
755                anyhow::bail!(
756                    "Session {} is paused. Call Resume before generating.",
757                    session_id
758                );
759            }
760        }
761
762        // Get session state and LLM client
763        let (
764            history,
765            system,
766            tools,
767            session_llm_client,
768            permission_checker,
769            confirmation_manager,
770            context_providers,
771            session_workspace,
772            tool_metrics,
773            hook_engine,
774            planning_enabled,
775            goal_tracking,
776        ) = {
777            let session = session_lock.read().await;
778            (
779                session.messages.clone(),
780                session.system().map(String::from),
781                session.tools.clone(),
782                session.llm_client.clone(),
783                session.permission_checker.clone(),
784                session.confirmation_manager.clone(),
785                session.context_providers.clone(),
786                session.config.workspace.clone(),
787                session.tool_metrics.clone(),
788                session.config.hook_engine.clone(),
789                session.config.planning_enabled,
790                session.config.goal_tracking,
791            )
792        };
793
794        // Use session's LLM client if configured, otherwise use default
795        let llm_client = if let Some(client) = session_llm_client {
796            client
797        } else if let Some(client) = self.llm_client.read().await.clone() {
798            client
799        } else {
800            anyhow::bail!(
801                "LLM client not configured for session {}. Please call Configure RPC with model configuration first.",
802                session_id
803            );
804        };
805
806        // Construct per-session ToolContext from session workspace, falling back to server default
807        let tool_context = if session_workspace.is_empty() {
808            crate::tools::ToolContext::new(self.tool_executor.workspace().clone())
809                .with_session_id(session_id)
810        } else {
811            crate::tools::ToolContext::new(std::path::PathBuf::from(&session_workspace))
812                .with_session_id(session_id)
813        };
814
815        // Inject skill registry into system prompt and agent config
816        let skill_registry = self.skill_registry.read().await.clone();
817        let system = if let Some(ref registry) = skill_registry {
818            let skill_prompt = registry.to_system_prompt();
819            if skill_prompt.is_empty() {
820                system
821            } else {
822                match system {
823                    Some(existing) => Some(format!("{}\n\n{}", existing, skill_prompt)),
824                    None => Some(skill_prompt),
825                }
826            }
827        } else {
828            system
829        };
830
831        // Inject matched skill content on-demand before skill_registry is moved into AgentConfig
832        let effective_prompt = if let Some(ref registry) = skill_registry {
833            let skill_content = registry.match_skills(prompt);
834            if skill_content.is_empty() {
835                prompt.to_string()
836            } else {
837                format!("{}\n\n---\n\n{}", skill_content, prompt)
838            }
839        } else {
840            prompt.to_string()
841        };
842
843        // Build AgentMemory from shared store (if configured)
844        let memory = self
845            .memory_store
846            .read()
847            .await
848            .as_ref()
849            .map(|store| Arc::new(AgentMemory::new(store.clone())));
850
851        // Create agent loop with permission policy, confirmation manager, and context providers
852        let config = AgentConfig {
853            prompt_slots: match system {
854                Some(s) => SystemPromptSlots::from_legacy(s),
855                None => SystemPromptSlots::default(),
856            },
857            tools,
858            max_tool_rounds: 50,
859            permission_checker: Some(permission_checker),
860            confirmation_manager: Some(confirmation_manager),
861            context_providers,
862            planning_enabled,
863            goal_tracking,
864            hook_engine,
865            skill_registry,
866            memory,
867            ..AgentConfig::default()
868        };
869
870        let agent = AgentLoop::new(llm_client, self.tool_executor.clone(), tool_context, config)
871            .with_tool_metrics(tool_metrics);
872
873        // Execute with streaming
874        let (rx, handle) = agent.execute_streaming(&history, &effective_prompt).await?;
875
876        // Store the abort handle for cancellation support
877        let abort_handle = handle.abort_handle();
878        {
879            let mut ops = self.ongoing_operations.write().await;
880            ops.insert(session_id.to_string(), abort_handle);
881        }
882
883        // Spawn task to update session after completion
884        let session_lock_clone = session_lock.clone();
885        let original_handle = handle;
886        let stores = self.stores.clone();
887        let session_storage_types = self.session_storage_types.clone();
888        let llm_configs = self.llm_configs.clone();
889        let session_id_owned = session_id.to_string();
890        let ongoing_operations = self.ongoing_operations.clone();
891        let session_manager = self.clone();
892
893        let wrapped_handle = tokio::spawn(async move {
894            let result = original_handle.await??;
895
896            // Remove from ongoing operations
897            {
898                let mut ops = ongoing_operations.write().await;
899                ops.remove(&session_id_owned);
900            }
901
902            // Update session
903            {
904                let mut session = session_lock_clone.write().await;
905                session.messages = result.messages.clone();
906                session.update_usage(&result.usage);
907            }
908
909            // Persist to store
910            let storage_type = {
911                let storage_types = session_storage_types.read().await;
912                storage_types.get(&session_id_owned).cloned()
913            };
914
915            if let Some(storage_type) = storage_type {
916                if storage_type != crate::config::StorageBackend::Memory {
917                    let stores_guard = stores.read().await;
918                    if let Some(store) = stores_guard.get(&storage_type) {
919                        let session = session_lock_clone.read().await;
920                        let llm_config = {
921                            let configs = llm_configs.read().await;
922                            configs.get(&session_id_owned).cloned()
923                        };
924                        let data = session.to_session_data(llm_config);
925                        if let Err(e) = store.save(&data).await {
926                            tracing::warn!(
927                                "Failed to persist session {} after streaming: {}",
928                                session_id_owned,
929                                e
930                            );
931                        }
932                    }
933                }
934            }
935
936            // Auto-compact if context usage exceeds threshold
937            if let Err(e) = session_manager.maybe_auto_compact(&session_id_owned).await {
938                tracing::warn!(
939                    "Auto-compact failed for session {}: {}",
940                    session_id_owned,
941                    e
942                );
943            }
944
945            Ok(result)
946        });
947
948        Ok((rx, wrapped_handle))
949    }
950
951    /// Get context usage for a session
952    pub async fn context_usage(&self, session_id: &str) -> Result<ContextUsage> {
953        let session_lock = self.get_session(session_id).await?;
954        let session = session_lock.read().await;
955        Ok(session.context_usage.clone())
956    }
957
958    /// Get conversation history for a session
959    pub async fn history(&self, session_id: &str) -> Result<Vec<Message>> {
960        let session_lock = self.get_session(session_id).await?;
961        let session = session_lock.read().await;
962        Ok(session.messages.clone())
963    }
964
965    /// Clear session history
966    pub async fn clear(&self, session_id: &str) -> Result<()> {
967        {
968            let session_lock = self.get_session(session_id).await?;
969            let mut session = session_lock.write().await;
970            session.clear();
971        }
972
973        // Persist to store
974        self.persist_in_background(session_id, "clear");
975
976        Ok(())
977    }
978
979    /// Compact session context
980    pub async fn compact(&self, session_id: &str) -> Result<()> {
981        tracing::info!(name: "a3s.session.compact", session_id = %session_id, "Compacting session context");
982
983        {
984            let session_lock = self.get_session(session_id).await?;
985            let mut session = session_lock.write().await;
986
987            // Get LLM client for compaction (if available)
988            let llm_client = if let Some(client) = &session.llm_client {
989                client.clone()
990            } else if let Some(client) = self.llm_client.read().await.clone() {
991                client
992            } else {
993                // If no LLM client available, just do simple truncation
994                tracing::warn!("No LLM client configured for compaction, using simple truncation");
995                let keep_messages = 20;
996                if session.messages.len() > keep_messages {
997                    let len = session.messages.len();
998                    session.messages = session.messages.split_off(len - keep_messages);
999                }
1000                // Persist after truncation
1001                drop(session);
1002                self.persist_in_background(session_id, "compact");
1003                return Ok(());
1004            };
1005
1006            session.compact(&llm_client).await?;
1007        }
1008
1009        // Persist to store
1010        self.persist_in_background(session_id, "compact");
1011
1012        Ok(())
1013    }
1014
1015    /// Check if auto-compaction should be triggered and perform it if needed.
1016    ///
1017    /// Called after `generate()` / `generate_streaming()` updates session usage.
1018    /// Triggers compaction when:
1019    /// - `auto_compact` is enabled in session config
1020    /// - `context_usage.percent` exceeds `auto_compact_threshold`
1021    pub async fn maybe_auto_compact(&self, session_id: &str) -> Result<bool> {
1022        let (should_compact, percent_before, messages_before) = {
1023            let session_lock = self.get_session(session_id).await?;
1024            let session = session_lock.read().await;
1025
1026            if !session.config.auto_compact {
1027                return Ok(false);
1028            }
1029
1030            let threshold = session.config.auto_compact_threshold;
1031            let percent = session.context_usage.percent;
1032            let msg_count = session.messages.len();
1033
1034            tracing::debug!(
1035                "Auto-compact check for session {}: percent={:.2}%, threshold={:.2}%, messages={}",
1036                session_id,
1037                percent * 100.0,
1038                threshold * 100.0,
1039                msg_count,
1040            );
1041
1042            (percent >= threshold, percent, msg_count)
1043        };
1044
1045        if !should_compact {
1046            return Ok(false);
1047        }
1048
1049        tracing::info!(
1050            name: "a3s.session.auto_compact",
1051            session_id = %session_id,
1052            percent_before = %format!("{:.1}%", percent_before * 100.0),
1053            messages_before = %messages_before,
1054            "Auto-compacting session due to high context usage"
1055        );
1056
1057        // Perform compaction (reuses existing compact logic)
1058        self.compact(session_id).await?;
1059
1060        // Get post-compaction message count
1061        let messages_after = {
1062            let session_lock = self.get_session(session_id).await?;
1063            let session = session_lock.read().await;
1064            session.messages.len()
1065        };
1066
1067        // Broadcast event to notify clients
1068        let event = AgentEvent::ContextCompacted {
1069            session_id: session_id.to_string(),
1070            before_messages: messages_before,
1071            after_messages: messages_after,
1072            percent_before,
1073        };
1074
1075        // Try to send via session's event broadcaster
1076        if let Ok(session_lock) = self.get_session(session_id).await {
1077            let session = session_lock.read().await;
1078            let _ = session.event_tx.send(event);
1079        }
1080
1081        tracing::info!(
1082            name: "a3s.session.auto_compact.done",
1083            session_id = %session_id,
1084            messages_before = %messages_before,
1085            messages_after = %messages_after,
1086            "Auto-compaction complete"
1087        );
1088
1089        Ok(true)
1090    }
1091
1092    /// Resolve the LLM client for a session (session-level -> default fallback)
1093    ///
1094    /// Returns `None` if no LLM client is configured at either level.
1095    pub async fn get_llm_for_session(
1096        &self,
1097        session_id: &str,
1098    ) -> Result<Option<Arc<dyn LlmClient>>> {
1099        let session_lock = self.get_session(session_id).await?;
1100        let session = session_lock.read().await;
1101
1102        if let Some(client) = &session.llm_client {
1103            return Ok(Some(client.clone()));
1104        }
1105
1106        Ok(self.llm_client.read().await.clone())
1107    }
1108
1109    /// Configure session
1110    pub async fn configure(
1111        &self,
1112        session_id: &str,
1113        thinking: Option<bool>,
1114        budget: Option<usize>,
1115        model_config: Option<LlmConfig>,
1116    ) -> Result<()> {
1117        {
1118            let session_lock = self.get_session(session_id).await?;
1119            let mut session = session_lock.write().await;
1120
1121            if let Some(t) = thinking {
1122                session.thinking_enabled = t;
1123            }
1124            if let Some(b) = budget {
1125                session.thinking_budget = Some(b);
1126            }
1127            if let Some(ref config) = model_config {
1128                tracing::info!(
1129                    "Configuring session {} with LLM: provider={}, model={}",
1130                    session_id,
1131                    config.provider,
1132                    config.model
1133                );
1134                session.model_name = Some(config.model.clone());
1135                session.llm_client = Some(llm::create_client_with_config(config.clone()));
1136            }
1137        }
1138
1139        // Store LLM config for persistence (without API key)
1140        if let Some(config) = model_config {
1141            let llm_config_data = LlmConfigData {
1142                provider: config.provider,
1143                model: config.model,
1144                api_key: None, // Don't persist API key
1145                base_url: config.base_url,
1146            };
1147            let mut configs = self.llm_configs.write().await;
1148            configs.insert(session_id.to_string(), llm_config_data);
1149        }
1150
1151        // Persist to store
1152        self.persist_in_background(session_id, "configure");
1153
1154        Ok(())
1155    }
1156
1157    /// Get session count
1158    pub async fn session_count(&self) -> usize {
1159        let sessions = self.sessions.read().await;
1160        sessions.len()
1161    }
1162
1163    /// Check health of all registered stores
1164    pub async fn store_health(&self) -> Vec<(String, Result<()>)> {
1165        let stores = self.stores.read().await;
1166        let mut results = Vec::new();
1167        for (_, store) in stores.iter() {
1168            let name = store.backend_name().to_string();
1169            let result = store.health_check().await;
1170            results.push((name, result));
1171        }
1172        results
1173    }
1174
1175    /// List all loaded tools (built-in tools)
1176    pub fn list_tools(&self) -> Vec<crate::llm::ToolDefinition> {
1177        self.tool_executor.definitions()
1178    }
1179
1180    /// Pause a session
1181    pub async fn pause_session(&self, session_id: &str) -> Result<bool> {
1182        let paused = {
1183            let session_lock = self.get_session(session_id).await?;
1184            let mut session = session_lock.write().await;
1185            session.pause()
1186        };
1187
1188        if paused {
1189            self.persist_in_background(session_id, "pause");
1190        }
1191
1192        Ok(paused)
1193    }
1194
1195    /// Resume a session
1196    pub async fn resume_session(&self, session_id: &str) -> Result<bool> {
1197        let resumed = {
1198            let session_lock = self.get_session(session_id).await?;
1199            let mut session = session_lock.write().await;
1200            session.resume()
1201        };
1202
1203        if resumed {
1204            self.persist_in_background(session_id, "resume");
1205        }
1206
1207        Ok(resumed)
1208    }
1209
1210    /// Cancel an ongoing operation for a session
1211    ///
1212    /// Returns true if an operation was cancelled, false if no operation was running.
1213    pub async fn cancel_operation(&self, session_id: &str) -> Result<bool> {
1214        // First, cancel any pending HITL confirmations
1215        let session_lock = self.get_session(session_id).await?;
1216        let cancelled_confirmations = {
1217            let session = session_lock.read().await;
1218            session.confirmation_manager.cancel_all().await
1219        };
1220
1221        if cancelled_confirmations > 0 {
1222            tracing::info!(
1223                "Cancelled {} pending confirmations for session {}",
1224                cancelled_confirmations,
1225                session_id
1226            );
1227        }
1228
1229        // Then, abort the ongoing operation if any
1230        let abort_handle = {
1231            let mut ops = self.ongoing_operations.write().await;
1232            ops.remove(session_id)
1233        };
1234
1235        if let Some(handle) = abort_handle {
1236            handle.abort();
1237            tracing::info!("Cancelled ongoing operation for session {}", session_id);
1238            Ok(true)
1239        } else if cancelled_confirmations > 0 {
1240            // We cancelled confirmations but no main operation
1241            Ok(true)
1242        } else {
1243            tracing::debug!("No ongoing operation to cancel for session {}", session_id);
1244            Ok(false)
1245        }
1246    }
1247
1248    /// Get all sessions (returns session locks for iteration)
1249    pub async fn get_all_sessions(&self) -> Vec<Arc<RwLock<Session>>> {
1250        let sessions = self.sessions.read().await;
1251        sessions.values().cloned().collect()
1252    }
1253
1254    /// Get tool executor reference
1255    pub fn tool_executor(&self) -> &Arc<ToolExecutor> {
1256        &self.tool_executor
1257    }
1258
1259    /// Confirm a tool execution (HITL)
1260    pub async fn confirm_tool(
1261        &self,
1262        session_id: &str,
1263        tool_id: &str,
1264        approved: bool,
1265        reason: Option<String>,
1266    ) -> Result<bool> {
1267        let session_lock = self.get_session(session_id).await?;
1268        let session = session_lock.read().await;
1269        session
1270            .confirmation_manager
1271            .confirm(tool_id, approved, reason)
1272            .await
1273            .map_err(|e| anyhow::anyhow!(e))
1274    }
1275
1276    /// Set confirmation policy for a session (HITL)
1277    pub async fn set_confirmation_policy(
1278        &self,
1279        session_id: &str,
1280        policy: ConfirmationPolicy,
1281    ) -> Result<ConfirmationPolicy> {
1282        {
1283            let session_lock = self.get_session(session_id).await?;
1284            let session = session_lock.read().await;
1285            session.set_confirmation_policy(policy.clone()).await;
1286        }
1287
1288        // Update config for persistence
1289        {
1290            let session_lock = self.get_session(session_id).await?;
1291            let mut session = session_lock.write().await;
1292            session.config.confirmation_policy = Some(policy.clone());
1293        }
1294
1295        // Persist to store
1296        self.persist_in_background(session_id, "set_confirmation_policy");
1297
1298        Ok(policy)
1299    }
1300
1301    /// Get confirmation policy for a session (HITL)
1302    pub async fn get_confirmation_policy(&self, session_id: &str) -> Result<ConfirmationPolicy> {
1303        let session_lock = self.get_session(session_id).await?;
1304        let session = session_lock.read().await;
1305        Ok(session.confirmation_policy().await)
1306    }
1307}