Skip to main content

a3s_code_core/session/
manager.rs

1//! SessionManager — manages multiple concurrent sessions
2
3use super::{ContextUsage, Session, SessionConfig, SessionState};
4use crate::agent::{AgentConfig, AgentEvent, AgentLoop, AgentResult};
5use crate::hitl::ConfirmationPolicy;
6use crate::llm::{self, LlmClient, LlmConfig, Message};
7use crate::mcp::McpManager;
8use crate::memory::AgentMemory;
9use crate::prompts::SystemPromptSlots;
10use crate::skills::SkillRegistry;
11use crate::store::{FileSessionStore, LlmConfigData, SessionData, SessionStore};
12use crate::tools::ToolExecutor;
13use a3s_memory::MemoryStore;
14use anyhow::{Context, Result};
15use std::collections::HashMap;
16use std::sync::Arc;
17use tokio::sync::{mpsc, RwLock};
18
19/// Session manager handles multiple concurrent sessions
20#[derive(Clone)]
21pub struct SessionManager {
22    pub(crate) sessions: Arc<RwLock<HashMap<String, Arc<RwLock<Session>>>>>,
23    /// Default LLM client used when a session has no per-session client configured.
24    /// Wrapped in RwLock so it can be hot-swapped at runtime without restart.
25    pub(crate) llm_client: Arc<RwLock<Option<Arc<dyn LlmClient>>>>,
26    pub(crate) tool_executor: Arc<ToolExecutor>,
27    /// Session stores by storage type
28    pub(crate) stores: Arc<RwLock<HashMap<crate::config::StorageBackend, Arc<dyn SessionStore>>>>,
29    /// Track which storage type each session uses
30    pub(crate) session_storage_types: Arc<RwLock<HashMap<String, crate::config::StorageBackend>>>,
31    /// LLM configurations for sessions (stored separately for persistence)
32    pub(crate) llm_configs: Arc<RwLock<HashMap<String, LlmConfigData>>>,
33    /// Ongoing operations (session_id -> JoinHandle)
34    pub(crate) ongoing_operations: Arc<RwLock<HashMap<String, tokio::task::AbortHandle>>>,
35    /// Skill registry for runtime skill management
36    pub(crate) skill_registry: Arc<RwLock<Option<Arc<SkillRegistry>>>>,
37    /// Shared memory store for agent long-term memory.
38    /// When set, each `generate`/`generate_streaming` call wraps this in
39    /// `AgentMemory` and injects it into `AgentConfig.memory`.
40    pub(crate) memory_store: Arc<RwLock<Option<Arc<dyn MemoryStore>>>>,
41    /// Shared MCP manager. When set, MCP tools are registered into the
42    /// `ToolExecutor` at startup so all sessions can use them.
43    pub(crate) mcp_manager: Arc<RwLock<Option<Arc<McpManager>>>>,
44}
45
46impl SessionManager {
47    /// Create a new session manager without persistence
48    pub fn new(llm_client: Option<Arc<dyn LlmClient>>, tool_executor: Arc<ToolExecutor>) -> Self {
49        Self {
50            sessions: Arc::new(RwLock::new(HashMap::new())),
51            llm_client: Arc::new(RwLock::new(llm_client)),
52            tool_executor,
53            stores: Arc::new(RwLock::new(HashMap::new())),
54            session_storage_types: Arc::new(RwLock::new(HashMap::new())),
55            llm_configs: Arc::new(RwLock::new(HashMap::new())),
56            ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
57            skill_registry: Arc::new(RwLock::new(None)),
58            memory_store: Arc::new(RwLock::new(None)),
59            mcp_manager: Arc::new(RwLock::new(None)),
60        }
61    }
62
63    /// Create a session manager with file-based persistence
64    ///
65    /// Sessions will be automatically saved to disk and restored on startup.
66    pub async fn with_persistence<P: AsRef<std::path::Path>>(
67        llm_client: Option<Arc<dyn LlmClient>>,
68        tool_executor: Arc<ToolExecutor>,
69        sessions_dir: P,
70    ) -> Result<Self> {
71        let store = FileSessionStore::new(sessions_dir).await?;
72        let mut stores = HashMap::new();
73        stores.insert(
74            crate::config::StorageBackend::File,
75            Arc::new(store) as Arc<dyn SessionStore>,
76        );
77
78        let manager = Self {
79            sessions: Arc::new(RwLock::new(HashMap::new())),
80            llm_client: Arc::new(RwLock::new(llm_client)),
81            tool_executor,
82            stores: Arc::new(RwLock::new(stores)),
83            session_storage_types: Arc::new(RwLock::new(HashMap::new())),
84            llm_configs: Arc::new(RwLock::new(HashMap::new())),
85            ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
86            skill_registry: Arc::new(RwLock::new(None)),
87            memory_store: Arc::new(RwLock::new(None)),
88            mcp_manager: Arc::new(RwLock::new(None)),
89        };
90
91        Ok(manager)
92    }
93
94    /// Create a session manager with a custom store
95    ///
96    /// The `backend` parameter determines which `StorageBackend` key the store is registered under.
97    /// Sessions created with a matching `storage_type` will use this store.
98    pub fn with_store(
99        llm_client: Option<Arc<dyn LlmClient>>,
100        tool_executor: Arc<ToolExecutor>,
101        store: Arc<dyn SessionStore>,
102        backend: crate::config::StorageBackend,
103    ) -> Self {
104        let mut stores = HashMap::new();
105        stores.insert(backend, store);
106
107        Self {
108            sessions: Arc::new(RwLock::new(HashMap::new())),
109            llm_client: Arc::new(RwLock::new(llm_client)),
110            tool_executor,
111            stores: Arc::new(RwLock::new(stores)),
112            session_storage_types: Arc::new(RwLock::new(HashMap::new())),
113            llm_configs: Arc::new(RwLock::new(HashMap::new())),
114            ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
115            skill_registry: Arc::new(RwLock::new(None)),
116            memory_store: Arc::new(RwLock::new(None)),
117            mcp_manager: Arc::new(RwLock::new(None)),
118        }
119    }
120
121    /// Hot-swap the default LLM client used by sessions without a per-session client.
122    ///
123    /// Called by SafeClaw when `PUT /api/agent/config` updates the model config.
124    /// All subsequent `generate` / `generate_streaming` calls will use the new client.
125    pub async fn set_default_llm(&self, client: Option<Arc<dyn LlmClient>>) {
126        *self.llm_client.write().await = client;
127    }
128
129    /// Set the skill registry for runtime skill management.
130    ///
131    /// Skills registered here will be injected into the system prompt
132    /// for all subsequent `generate` / `generate_streaming` calls.
133    /// Also registers the `manage_skill` tool on the `ToolExecutor` so
134    /// the Agent can create, list, and remove skills at runtime.
135    pub async fn set_skill_registry(
136        &self,
137        registry: Arc<SkillRegistry>,
138        skills_dir: std::path::PathBuf,
139    ) {
140        // Register the manage_skill tool for self-bootstrap
141        let manage_tool = crate::skills::ManageSkillTool::new(registry.clone(), skills_dir);
142        self.tool_executor
143            .register_dynamic_tool(Arc::new(manage_tool));
144
145        *self.skill_registry.write().await = Some(registry);
146    }
147
148    /// Get the current skill registry, if any.
149    pub async fn skill_registry(&self) -> Option<Arc<SkillRegistry>> {
150        self.skill_registry.read().await.clone()
151    }
152
153    /// Set the shared memory store for agent long-term memory.
154    ///
155    /// When set, every `generate`/`generate_streaming` call wraps this store
156    /// in an `AgentMemory` and injects it into `AgentConfig.memory`, enabling
157    /// automatic `recall_similar` before prompts and `remember_success`/
158    /// `remember_failure` after tool execution.
159    pub async fn set_memory_store(&self, store: Arc<dyn MemoryStore>) {
160        *self.memory_store.write().await = Some(store);
161    }
162
163    /// Get the current memory store, if any.
164    pub async fn memory_store(&self) -> Option<Arc<dyn MemoryStore>> {
165        self.memory_store.read().await.clone()
166    }
167
168    /// Set the shared MCP manager.
169    ///
170    /// When set, all tools from connected MCP servers are registered into the
171    /// `ToolExecutor` so they are available in every session.
172    pub async fn set_mcp_manager(&self, manager: Arc<McpManager>) {
173        let all_tools = manager.get_all_tools().await;
174        let mut by_server: HashMap<String, Vec<crate::mcp::McpTool>> = HashMap::new();
175        for (server, tool) in all_tools {
176            by_server.entry(server).or_default().push(tool);
177        }
178        for (server_name, tools) in by_server {
179            for tool in
180                crate::mcp::tools::create_mcp_tools(&server_name, tools, Arc::clone(&manager))
181            {
182                self.tool_executor.register_dynamic_tool(tool);
183            }
184        }
185        *self.mcp_manager.write().await = Some(manager);
186    }
187
188    /// Dynamically add and connect an MCP server to the shared manager.
189    ///
190    /// Registers the server config, connects it, and registers its tools into
191    /// the `ToolExecutor` so all subsequent sessions can use them.
192    pub async fn add_mcp_server(&self, config: crate::mcp::McpServerConfig) -> Result<()> {
193        let manager = {
194            let guard = self.mcp_manager.read().await;
195            match guard.clone() {
196                Some(m) => m,
197                None => {
198                    drop(guard);
199                    let m = Arc::new(McpManager::new());
200                    *self.mcp_manager.write().await = Some(Arc::clone(&m));
201                    m
202                }
203            }
204        };
205        let name = config.name.clone();
206        manager.register_server(config).await;
207        manager.connect(&name).await?;
208        let tools = manager.get_server_tools(&name).await;
209        for tool in crate::mcp::tools::create_mcp_tools(&name, tools, Arc::clone(&manager)) {
210            self.tool_executor.register_dynamic_tool(tool);
211        }
212        Ok(())
213    }
214
215    /// Disconnect and remove an MCP server from the shared manager.
216    ///
217    /// Also unregisters its tools from the `ToolExecutor`.
218    pub async fn remove_mcp_server(&self, name: &str) -> Result<()> {
219        let guard = self.mcp_manager.read().await;
220        if let Some(ref manager) = *guard {
221            manager.disconnect(name).await?;
222        }
223        self.tool_executor
224            .unregister_tools_by_prefix(&format!("mcp__{name}__"));
225        Ok(())
226    }
227
228    /// Get status of all connected MCP servers.
229    pub async fn mcp_status(
230        &self,
231    ) -> std::collections::HashMap<String, crate::mcp::McpServerStatus> {
232        let guard = self.mcp_manager.read().await;
233        match guard.as_ref() {
234            Some(m) => {
235                let m = Arc::clone(m);
236                drop(guard);
237                m.get_status().await
238            }
239            None => std::collections::HashMap::new(),
240        }
241    }
242
243    /// Restore a single session by ID from the store
244    ///
245    /// Searches all registered stores for the given session ID and restores it
246    /// into the in-memory session map. Returns an error if not found.
247    pub async fn restore_session_by_id(&self, session_id: &str) -> Result<()> {
248        // Check if already loaded
249        {
250            let sessions = self.sessions.read().await;
251            if sessions.contains_key(session_id) {
252                return Ok(());
253            }
254        }
255
256        let stores = self.stores.read().await;
257        for (backend, store) in stores.iter() {
258            match store.load(session_id).await {
259                Ok(Some(data)) => {
260                    {
261                        let mut storage_types = self.session_storage_types.write().await;
262                        storage_types.insert(data.id.clone(), backend.clone());
263                    }
264                    self.restore_session(data).await?;
265                    return Ok(());
266                }
267                Ok(None) => continue,
268                Err(e) => {
269                    tracing::warn!(
270                        "Failed to load session {} from {:?}: {}",
271                        session_id,
272                        backend,
273                        e
274                    );
275                    continue;
276                }
277            }
278        }
279
280        Err(anyhow::anyhow!(
281            "Session {} not found in any store",
282            session_id
283        ))
284    }
285
286    /// Load all sessions from all registered stores
287    pub async fn load_all_sessions(&mut self) -> Result<usize> {
288        let stores = self.stores.read().await;
289        let mut loaded = 0;
290
291        for (backend, store) in stores.iter() {
292            let session_ids = match store.list().await {
293                Ok(ids) => ids,
294                Err(e) => {
295                    tracing::warn!("Failed to list sessions from {:?} store: {}", backend, e);
296                    continue;
297                }
298            };
299
300            for id in session_ids {
301                match store.load(&id).await {
302                    Ok(Some(data)) => {
303                        // Record the storage type for this session
304                        {
305                            let mut storage_types = self.session_storage_types.write().await;
306                            storage_types.insert(data.id.clone(), backend.clone());
307                        }
308
309                        if let Err(e) = self.restore_session(data).await {
310                            tracing::warn!("Failed to restore session {}: {}", id, e);
311                        } else {
312                            loaded += 1;
313                        }
314                    }
315                    Ok(None) => {
316                        tracing::warn!("Session {} not found in store", id);
317                    }
318                    Err(e) => {
319                        tracing::warn!("Failed to load session {}: {}", id, e);
320                    }
321                }
322            }
323        }
324
325        tracing::info!("Loaded {} sessions from store", loaded);
326        Ok(loaded)
327    }
328
329    /// Restore a session from SessionData
330    async fn restore_session(&self, data: SessionData) -> Result<()> {
331        let tools = self.tool_executor.definitions();
332        let mut session = Session::new(data.id.clone(), data.config.clone(), tools).await?;
333
334        // Restore serializable state
335        session.restore_from_data(&data);
336
337        // Restore LLM config if present (without API key - must be reconfigured)
338        if let Some(llm_config) = &data.llm_config {
339            let mut configs = self.llm_configs.write().await;
340            configs.insert(data.id.clone(), llm_config.clone());
341        }
342
343        let mut sessions = self.sessions.write().await;
344        sessions.insert(data.id.clone(), Arc::new(RwLock::new(session)));
345
346        tracing::info!("Restored session: {}", data.id);
347        Ok(())
348    }
349
350    /// Save a session to the store
351    async fn save_session(&self, session_id: &str) -> Result<()> {
352        // Get the storage type for this session
353        let storage_type = {
354            let storage_types = self.session_storage_types.read().await;
355            storage_types.get(session_id).cloned()
356        };
357
358        let Some(storage_type) = storage_type else {
359            // No storage type means memory-only session
360            return Ok(());
361        };
362
363        // Skip saving for memory storage
364        if storage_type == crate::config::StorageBackend::Memory {
365            return Ok(());
366        }
367
368        // Get the appropriate store
369        let stores = self.stores.read().await;
370        let Some(store) = stores.get(&storage_type) else {
371            tracing::warn!("No store available for storage type: {:?}", storage_type);
372            return Ok(());
373        };
374
375        let session_lock = self.get_session(session_id).await?;
376        let session = session_lock.read().await;
377
378        // Get LLM config if set
379        let llm_config = {
380            let configs = self.llm_configs.read().await;
381            configs.get(session_id).cloned()
382        };
383
384        let data = session.to_session_data(llm_config);
385        store.save(&data).await?;
386
387        tracing::debug!("Saved session: {}", session_id);
388        Ok(())
389    }
390
391    /// Persist a session to store, logging and emitting an event on failure.
392    /// This is a non-fatal wrapper around `save_session` — the operation
393    /// succeeds in memory even if persistence fails.
394    async fn persist_or_warn(&self, session_id: &str, operation: &str) {
395        if let Err(e) = self.save_session(session_id).await {
396            tracing::warn!(
397                "Failed to persist session {} after {}: {}",
398                session_id,
399                operation,
400                e
401            );
402            // Emit event so SDK clients can react
403            if let Ok(session_lock) = self.get_session(session_id).await {
404                let session = session_lock.read().await;
405                let _ = session.event_tx().send(AgentEvent::PersistenceFailed {
406                    session_id: session_id.to_string(),
407                    operation: operation.to_string(),
408                    error: e.to_string(),
409                });
410            }
411        }
412    }
413
414    /// Spawn persistence as a background task (non-blocking).
415    ///
416    /// All `SessionManager` fields are `Arc`-wrapped, so `clone()` is a
417    /// cheap pointer-copy. This keeps file I/O off the response path.
418    fn persist_in_background(&self, session_id: &str, operation: &str) {
419        let mgr = self.clone();
420        let sid = session_id.to_string();
421        let op = operation.to_string();
422        tokio::spawn(async move {
423            mgr.persist_or_warn(&sid, &op).await;
424        });
425    }
426
427    /// Create a new session
428    pub async fn create_session(&self, id: String, config: SessionConfig) -> Result<String> {
429        tracing::info!(name: "a3s.session.create", session_id = %id, "Creating session");
430
431        // Record the storage type for this session
432        {
433            let mut storage_types = self.session_storage_types.write().await;
434            storage_types.insert(id.clone(), config.storage_type.clone());
435        }
436
437        // Get tool definitions from the executor
438        let tools = self.tool_executor.definitions();
439        let mut session = Session::new(id.clone(), config, tools).await?;
440
441        // Start the command queue
442        session.start_queue().await?;
443
444        // Set max context length if provided
445        if session.config.max_context_length > 0 {
446            session.context_usage.max_tokens = session.config.max_context_length as usize;
447        }
448
449        {
450            let mut sessions = self.sessions.write().await;
451            sessions.insert(id.clone(), Arc::new(RwLock::new(session)));
452        }
453
454        // Persist to store
455        self.persist_in_background(&id, "create");
456
457        tracing::info!("Created session: {}", id);
458        Ok(id)
459    }
460
461    /// Destroy a session
462    pub async fn destroy_session(&self, id: &str) -> Result<()> {
463        tracing::info!(name: "a3s.session.destroy", session_id = %id, "Destroying session");
464
465        // Get the storage type before removing the session
466        let storage_type = {
467            let storage_types = self.session_storage_types.read().await;
468            storage_types.get(id).cloned()
469        };
470
471        {
472            let mut sessions = self.sessions.write().await;
473            sessions.remove(id);
474        }
475
476        // Remove LLM config
477        {
478            let mut configs = self.llm_configs.write().await;
479            configs.remove(id);
480        }
481
482        // Remove storage type tracking
483        {
484            let mut storage_types = self.session_storage_types.write().await;
485            storage_types.remove(id);
486        }
487
488        // Delete from store if applicable
489        if let Some(storage_type) = storage_type {
490            if storage_type != crate::config::StorageBackend::Memory {
491                let stores = self.stores.read().await;
492                if let Some(store) = stores.get(&storage_type) {
493                    if let Err(e) = store.delete(id).await {
494                        tracing::warn!("Failed to delete session {} from store: {}", id, e);
495                    }
496                }
497            }
498        }
499
500        tracing::info!("Destroyed session: {}", id);
501        Ok(())
502    }
503
504    /// Get a session by ID
505    pub async fn get_session(&self, id: &str) -> Result<Arc<RwLock<Session>>> {
506        let sessions = self.sessions.read().await;
507        sessions
508            .get(id)
509            .cloned()
510            .context(format!("Session not found: {}", id))
511    }
512
513    /// Create a child session for a subagent
514    ///
515    /// Child sessions inherit the parent's LLM client but have their own
516    /// permission policy and configuration.
517    pub async fn create_child_session(
518        &self,
519        parent_id: &str,
520        child_id: String,
521        mut config: SessionConfig,
522    ) -> Result<String> {
523        // Verify parent exists and inherit HITL policy
524        let parent_lock = self.get_session(parent_id).await?;
525        let parent_llm_client = {
526            let parent = parent_lock.read().await;
527
528            // Inherit parent's confirmation policy if child doesn't have one
529            if config.confirmation_policy.is_none() {
530                let parent_policy = parent.confirmation_manager.policy().await;
531                config.confirmation_policy = Some(parent_policy);
532            }
533
534            parent.llm_client.clone()
535        };
536
537        // Set parent_id in config
538        config.parent_id = Some(parent_id.to_string());
539
540        // Get tool definitions from the executor
541        let tools = self.tool_executor.definitions();
542        let mut session = Session::new(child_id.clone(), config, tools).await?;
543
544        // Inherit LLM client from parent if not set
545        if session.llm_client.is_none() {
546            let default_llm = self.llm_client.read().await.clone();
547            session.llm_client = parent_llm_client.or(default_llm);
548        }
549
550        // Start the command queue
551        session.start_queue().await?;
552
553        // Set max context length if provided
554        if session.config.max_context_length > 0 {
555            session.context_usage.max_tokens = session.config.max_context_length as usize;
556        }
557
558        {
559            let mut sessions = self.sessions.write().await;
560            sessions.insert(child_id.clone(), Arc::new(RwLock::new(session)));
561        }
562
563        // Persist to store
564        self.persist_in_background(&child_id, "create_child");
565
566        tracing::info!(
567            "Created child session: {} (parent: {})",
568            child_id,
569            parent_id
570        );
571        Ok(child_id)
572    }
573
574    /// Get all child sessions for a parent session
575    pub async fn get_child_sessions(&self, parent_id: &str) -> Vec<String> {
576        let sessions = self.sessions.read().await;
577        let mut children = Vec::new();
578
579        for (id, session_lock) in sessions.iter() {
580            let session = session_lock.read().await;
581            if session.parent_id.as_deref() == Some(parent_id) {
582                children.push(id.clone());
583            }
584        }
585
586        children
587    }
588
589    /// Check if a session is a child session
590    pub async fn is_child_session(&self, session_id: &str) -> Result<bool> {
591        let session_lock = self.get_session(session_id).await?;
592        let session = session_lock.read().await;
593        Ok(session.is_child_session())
594    }
595
596    /// Generate response for a prompt
597    pub async fn generate(&self, session_id: &str, prompt: &str) -> Result<AgentResult> {
598        let session_lock = self.get_session(session_id).await?;
599
600        // Check if session is paused
601        {
602            let session = session_lock.read().await;
603            if session.state == SessionState::Paused {
604                anyhow::bail!(
605                    "Session {} is paused. Call Resume before generating.",
606                    session_id
607                );
608            }
609        }
610
611        // Get session state and LLM client
612        let (
613            history,
614            system,
615            tools,
616            session_llm_client,
617            permission_checker,
618            confirmation_manager,
619            context_providers,
620            session_workspace,
621            tool_metrics,
622            hook_engine,
623            planning_enabled,
624            goal_tracking,
625        ) = {
626            let session = session_lock.read().await;
627            (
628                session.messages.clone(),
629                session.system().map(String::from),
630                session.tools.clone(),
631                session.llm_client.clone(),
632                session.permission_checker.clone(),
633                session.confirmation_manager.clone(),
634                session.context_providers.clone(),
635                session.config.workspace.clone(),
636                session.tool_metrics.clone(),
637                session.config.hook_engine.clone(),
638                session.config.planning_enabled,
639                session.config.goal_tracking,
640            )
641        };
642
643        // Use session's LLM client if configured, otherwise use default
644        let llm_client = if let Some(client) = session_llm_client {
645            client
646        } else if let Some(client) = self.llm_client.read().await.clone() {
647            client
648        } else {
649            anyhow::bail!(
650                "LLM client not configured for session {}. Please call Configure RPC with model configuration first.",
651                session_id
652            );
653        };
654
655        // Construct per-session ToolContext from session workspace, falling back to server default
656        let tool_context = if session_workspace.is_empty() {
657            crate::tools::ToolContext::new(self.tool_executor.workspace().clone())
658                .with_session_id(session_id)
659        } else {
660            crate::tools::ToolContext::new(std::path::PathBuf::from(&session_workspace))
661                .with_session_id(session_id)
662        };
663
664        // Inject skill registry into system prompt and agent config
665        let skill_registry = self.skill_registry.read().await.clone();
666        let system = if let Some(ref registry) = skill_registry {
667            let skill_prompt = registry.to_system_prompt();
668            if skill_prompt.is_empty() {
669                system
670            } else {
671                match system {
672                    Some(existing) => Some(format!("{}\n\n{}", existing, skill_prompt)),
673                    None => Some(skill_prompt),
674                }
675            }
676        } else {
677            system
678        };
679
680        // Inject matched skill content on-demand before skill_registry is moved into AgentConfig
681        let effective_prompt = if let Some(ref registry) = skill_registry {
682            let skill_content = registry.match_skills(prompt);
683            if skill_content.is_empty() {
684                prompt.to_string()
685            } else {
686                format!("{}\n\n---\n\n{}", skill_content, prompt)
687            }
688        } else {
689            prompt.to_string()
690        };
691
692        // Build AgentMemory from shared store (if configured)
693        let memory = self
694            .memory_store
695            .read()
696            .await
697            .as_ref()
698            .map(|store| Arc::new(AgentMemory::new(store.clone())));
699
700        // Create agent loop with permission policy, confirmation manager, and context providers
701        let config = AgentConfig {
702            prompt_slots: match system {
703                Some(s) => SystemPromptSlots::from_legacy(s),
704                None => SystemPromptSlots::default(),
705            },
706            tools,
707            max_tool_rounds: 50,
708            permission_checker: Some(permission_checker),
709            confirmation_manager: Some(confirmation_manager),
710            context_providers,
711            planning_enabled,
712            goal_tracking,
713            hook_engine,
714            skill_registry,
715            memory,
716            ..AgentConfig::default()
717        };
718
719        let agent = AgentLoop::new(llm_client, self.tool_executor.clone(), tool_context, config)
720            .with_tool_metrics(tool_metrics);
721
722        // Execute with session context
723        let result = agent
724            .execute_with_session(&history, &effective_prompt, Some(session_id), None)
725            .await?;
726
727        // Update session
728        {
729            let mut session = session_lock.write().await;
730            session.messages = result.messages.clone();
731            session.update_usage(&result.usage);
732        }
733
734        // Persist to store
735        self.persist_in_background(session_id, "generate");
736
737        // Auto-compact if context usage exceeds threshold
738        if let Err(e) = self.maybe_auto_compact(session_id).await {
739            tracing::warn!("Auto-compact failed for session {}: {}", session_id, e);
740        }
741
742        Ok(result)
743    }
744
745    /// Generate response with streaming events
746    pub async fn generate_streaming(
747        &self,
748        session_id: &str,
749        prompt: &str,
750    ) -> Result<(
751        mpsc::Receiver<AgentEvent>,
752        tokio::task::JoinHandle<Result<AgentResult>>,
753    )> {
754        let session_lock = self.get_session(session_id).await?;
755
756        // Check if session is paused
757        {
758            let session = session_lock.read().await;
759            if session.state == SessionState::Paused {
760                anyhow::bail!(
761                    "Session {} is paused. Call Resume before generating.",
762                    session_id
763                );
764            }
765        }
766
767        // Get session state and LLM client
768        let (
769            history,
770            system,
771            tools,
772            session_llm_client,
773            permission_checker,
774            confirmation_manager,
775            context_providers,
776            session_workspace,
777            tool_metrics,
778            hook_engine,
779            planning_enabled,
780            goal_tracking,
781        ) = {
782            let session = session_lock.read().await;
783            (
784                session.messages.clone(),
785                session.system().map(String::from),
786                session.tools.clone(),
787                session.llm_client.clone(),
788                session.permission_checker.clone(),
789                session.confirmation_manager.clone(),
790                session.context_providers.clone(),
791                session.config.workspace.clone(),
792                session.tool_metrics.clone(),
793                session.config.hook_engine.clone(),
794                session.config.planning_enabled,
795                session.config.goal_tracking,
796            )
797        };
798
799        // Use session's LLM client if configured, otherwise use default
800        let llm_client = if let Some(client) = session_llm_client {
801            client
802        } else if let Some(client) = self.llm_client.read().await.clone() {
803            client
804        } else {
805            anyhow::bail!(
806                "LLM client not configured for session {}. Please call Configure RPC with model configuration first.",
807                session_id
808            );
809        };
810
811        // Construct per-session ToolContext from session workspace, falling back to server default
812        let tool_context = if session_workspace.is_empty() {
813            crate::tools::ToolContext::new(self.tool_executor.workspace().clone())
814                .with_session_id(session_id)
815        } else {
816            crate::tools::ToolContext::new(std::path::PathBuf::from(&session_workspace))
817                .with_session_id(session_id)
818        };
819
820        // Inject skill registry into system prompt and agent config
821        let skill_registry = self.skill_registry.read().await.clone();
822        let system = if let Some(ref registry) = skill_registry {
823            let skill_prompt = registry.to_system_prompt();
824            if skill_prompt.is_empty() {
825                system
826            } else {
827                match system {
828                    Some(existing) => Some(format!("{}\n\n{}", existing, skill_prompt)),
829                    None => Some(skill_prompt),
830                }
831            }
832        } else {
833            system
834        };
835
836        // Inject matched skill content on-demand before skill_registry is moved into AgentConfig
837        let effective_prompt = if let Some(ref registry) = skill_registry {
838            let skill_content = registry.match_skills(prompt);
839            if skill_content.is_empty() {
840                prompt.to_string()
841            } else {
842                format!("{}\n\n---\n\n{}", skill_content, prompt)
843            }
844        } else {
845            prompt.to_string()
846        };
847
848        // Build AgentMemory from shared store (if configured)
849        let memory = self
850            .memory_store
851            .read()
852            .await
853            .as_ref()
854            .map(|store| Arc::new(AgentMemory::new(store.clone())));
855
856        // Create agent loop with permission policy, confirmation manager, and context providers
857        let config = AgentConfig {
858            prompt_slots: match system {
859                Some(s) => SystemPromptSlots::from_legacy(s),
860                None => SystemPromptSlots::default(),
861            },
862            tools,
863            max_tool_rounds: 50,
864            permission_checker: Some(permission_checker),
865            confirmation_manager: Some(confirmation_manager),
866            context_providers,
867            planning_enabled,
868            goal_tracking,
869            hook_engine,
870            skill_registry,
871            memory,
872            ..AgentConfig::default()
873        };
874
875        let agent = AgentLoop::new(llm_client, self.tool_executor.clone(), tool_context, config)
876            .with_tool_metrics(tool_metrics);
877
878        // Execute with streaming
879        let (rx, handle) = agent.execute_streaming(&history, &effective_prompt).await?;
880
881        // Store the abort handle for cancellation support
882        let abort_handle = handle.abort_handle();
883        {
884            let mut ops = self.ongoing_operations.write().await;
885            ops.insert(session_id.to_string(), abort_handle);
886        }
887
888        // Spawn task to update session after completion
889        let session_lock_clone = session_lock.clone();
890        let original_handle = handle;
891        let stores = self.stores.clone();
892        let session_storage_types = self.session_storage_types.clone();
893        let llm_configs = self.llm_configs.clone();
894        let session_id_owned = session_id.to_string();
895        let ongoing_operations = self.ongoing_operations.clone();
896        let session_manager = self.clone();
897
898        let wrapped_handle = tokio::spawn(async move {
899            let result = original_handle.await??;
900
901            // Remove from ongoing operations
902            {
903                let mut ops = ongoing_operations.write().await;
904                ops.remove(&session_id_owned);
905            }
906
907            // Update session
908            {
909                let mut session = session_lock_clone.write().await;
910                session.messages = result.messages.clone();
911                session.update_usage(&result.usage);
912            }
913
914            // Persist to store
915            let storage_type = {
916                let storage_types = session_storage_types.read().await;
917                storage_types.get(&session_id_owned).cloned()
918            };
919
920            if let Some(storage_type) = storage_type {
921                if storage_type != crate::config::StorageBackend::Memory {
922                    let stores_guard = stores.read().await;
923                    if let Some(store) = stores_guard.get(&storage_type) {
924                        let session = session_lock_clone.read().await;
925                        let llm_config = {
926                            let configs = llm_configs.read().await;
927                            configs.get(&session_id_owned).cloned()
928                        };
929                        let data = session.to_session_data(llm_config);
930                        if let Err(e) = store.save(&data).await {
931                            tracing::warn!(
932                                "Failed to persist session {} after streaming: {}",
933                                session_id_owned,
934                                e
935                            );
936                        }
937                    }
938                }
939            }
940
941            // Auto-compact if context usage exceeds threshold
942            if let Err(e) = session_manager.maybe_auto_compact(&session_id_owned).await {
943                tracing::warn!(
944                    "Auto-compact failed for session {}: {}",
945                    session_id_owned,
946                    e
947                );
948            }
949
950            Ok(result)
951        });
952
953        Ok((rx, wrapped_handle))
954    }
955
956    /// Get context usage for a session
957    pub async fn context_usage(&self, session_id: &str) -> Result<ContextUsage> {
958        let session_lock = self.get_session(session_id).await?;
959        let session = session_lock.read().await;
960        Ok(session.context_usage.clone())
961    }
962
963    /// Get conversation history for a session
964    pub async fn history(&self, session_id: &str) -> Result<Vec<Message>> {
965        let session_lock = self.get_session(session_id).await?;
966        let session = session_lock.read().await;
967        Ok(session.messages.clone())
968    }
969
970    /// Clear session history
971    pub async fn clear(&self, session_id: &str) -> Result<()> {
972        {
973            let session_lock = self.get_session(session_id).await?;
974            let mut session = session_lock.write().await;
975            session.clear();
976        }
977
978        // Persist to store
979        self.persist_in_background(session_id, "clear");
980
981        Ok(())
982    }
983
984    /// Compact session context
985    pub async fn compact(&self, session_id: &str) -> Result<()> {
986        tracing::info!(name: "a3s.session.compact", session_id = %session_id, "Compacting session context");
987
988        {
989            let session_lock = self.get_session(session_id).await?;
990            let mut session = session_lock.write().await;
991
992            // Get LLM client for compaction (if available)
993            let llm_client = if let Some(client) = &session.llm_client {
994                client.clone()
995            } else if let Some(client) = self.llm_client.read().await.clone() {
996                client
997            } else {
998                // If no LLM client available, just do simple truncation
999                tracing::warn!("No LLM client configured for compaction, using simple truncation");
1000                let keep_messages = 20;
1001                if session.messages.len() > keep_messages {
1002                    let len = session.messages.len();
1003                    session.messages = session.messages.split_off(len - keep_messages);
1004                }
1005                // Persist after truncation
1006                drop(session);
1007                self.persist_in_background(session_id, "compact");
1008                return Ok(());
1009            };
1010
1011            session.compact(&llm_client).await?;
1012        }
1013
1014        // Persist to store
1015        self.persist_in_background(session_id, "compact");
1016
1017        Ok(())
1018    }
1019
1020    /// Check if auto-compaction should be triggered and perform it if needed.
1021    ///
1022    /// Called after `generate()` / `generate_streaming()` updates session usage.
1023    /// Triggers compaction when:
1024    /// - `auto_compact` is enabled in session config
1025    /// - `context_usage.percent` exceeds `auto_compact_threshold`
1026    pub async fn maybe_auto_compact(&self, session_id: &str) -> Result<bool> {
1027        let (should_compact, percent_before, messages_before) = {
1028            let session_lock = self.get_session(session_id).await?;
1029            let session = session_lock.read().await;
1030
1031            if !session.config.auto_compact {
1032                return Ok(false);
1033            }
1034
1035            let threshold = session.config.auto_compact_threshold;
1036            let percent = session.context_usage.percent;
1037            let msg_count = session.messages.len();
1038
1039            tracing::debug!(
1040                "Auto-compact check for session {}: percent={:.2}%, threshold={:.2}%, messages={}",
1041                session_id,
1042                percent * 100.0,
1043                threshold * 100.0,
1044                msg_count,
1045            );
1046
1047            (percent >= threshold, percent, msg_count)
1048        };
1049
1050        if !should_compact {
1051            return Ok(false);
1052        }
1053
1054        tracing::info!(
1055            name: "a3s.session.auto_compact",
1056            session_id = %session_id,
1057            percent_before = %format!("{:.1}%", percent_before * 100.0),
1058            messages_before = %messages_before,
1059            "Auto-compacting session due to high context usage"
1060        );
1061
1062        // Perform compaction (reuses existing compact logic)
1063        self.compact(session_id).await?;
1064
1065        // Get post-compaction message count
1066        let messages_after = {
1067            let session_lock = self.get_session(session_id).await?;
1068            let session = session_lock.read().await;
1069            session.messages.len()
1070        };
1071
1072        // Broadcast event to notify clients
1073        let event = AgentEvent::ContextCompacted {
1074            session_id: session_id.to_string(),
1075            before_messages: messages_before,
1076            after_messages: messages_after,
1077            percent_before,
1078        };
1079
1080        // Try to send via session's event broadcaster
1081        if let Ok(session_lock) = self.get_session(session_id).await {
1082            let session = session_lock.read().await;
1083            let _ = session.event_tx.send(event);
1084        }
1085
1086        tracing::info!(
1087            name: "a3s.session.auto_compact.done",
1088            session_id = %session_id,
1089            messages_before = %messages_before,
1090            messages_after = %messages_after,
1091            "Auto-compaction complete"
1092        );
1093
1094        Ok(true)
1095    }
1096
1097    /// Resolve the LLM client for a session (session-level -> default fallback)
1098    ///
1099    /// Returns `None` if no LLM client is configured at either level.
1100    pub async fn get_llm_for_session(
1101        &self,
1102        session_id: &str,
1103    ) -> Result<Option<Arc<dyn LlmClient>>> {
1104        let session_lock = self.get_session(session_id).await?;
1105        let session = session_lock.read().await;
1106
1107        if let Some(client) = &session.llm_client {
1108            return Ok(Some(client.clone()));
1109        }
1110
1111        Ok(self.llm_client.read().await.clone())
1112    }
1113
1114    /// Configure session
1115    pub async fn configure(
1116        &self,
1117        session_id: &str,
1118        thinking: Option<bool>,
1119        budget: Option<usize>,
1120        model_config: Option<LlmConfig>,
1121    ) -> Result<()> {
1122        {
1123            let session_lock = self.get_session(session_id).await?;
1124            let mut session = session_lock.write().await;
1125
1126            if let Some(t) = thinking {
1127                session.thinking_enabled = t;
1128            }
1129            if let Some(b) = budget {
1130                session.thinking_budget = Some(b);
1131            }
1132            if let Some(ref config) = model_config {
1133                tracing::info!(
1134                    "Configuring session {} with LLM: provider={}, model={}",
1135                    session_id,
1136                    config.provider,
1137                    config.model
1138                );
1139                session.model_name = Some(config.model.clone());
1140                session.llm_client = Some(llm::create_client_with_config(config.clone()));
1141            }
1142        }
1143
1144        // Store LLM config for persistence (without API key)
1145        if let Some(config) = model_config {
1146            let llm_config_data = LlmConfigData {
1147                provider: config.provider,
1148                model: config.model,
1149                api_key: None, // Don't persist API key
1150                base_url: config.base_url,
1151            };
1152            let mut configs = self.llm_configs.write().await;
1153            configs.insert(session_id.to_string(), llm_config_data);
1154        }
1155
1156        // Persist to store
1157        self.persist_in_background(session_id, "configure");
1158
1159        Ok(())
1160    }
1161
1162    /// Get session count
1163    pub async fn session_count(&self) -> usize {
1164        let sessions = self.sessions.read().await;
1165        sessions.len()
1166    }
1167
1168    /// Check health of all registered stores
1169    pub async fn store_health(&self) -> Vec<(String, Result<()>)> {
1170        let stores = self.stores.read().await;
1171        let mut results = Vec::new();
1172        for (_, store) in stores.iter() {
1173            let name = store.backend_name().to_string();
1174            let result = store.health_check().await;
1175            results.push((name, result));
1176        }
1177        results
1178    }
1179
1180    /// List all loaded tools (built-in tools)
1181    pub fn list_tools(&self) -> Vec<crate::llm::ToolDefinition> {
1182        self.tool_executor.definitions()
1183    }
1184
1185    /// Pause a session
1186    pub async fn pause_session(&self, session_id: &str) -> Result<bool> {
1187        let paused = {
1188            let session_lock = self.get_session(session_id).await?;
1189            let mut session = session_lock.write().await;
1190            session.pause()
1191        };
1192
1193        if paused {
1194            self.persist_in_background(session_id, "pause");
1195        }
1196
1197        Ok(paused)
1198    }
1199
1200    /// Resume a session
1201    pub async fn resume_session(&self, session_id: &str) -> Result<bool> {
1202        let resumed = {
1203            let session_lock = self.get_session(session_id).await?;
1204            let mut session = session_lock.write().await;
1205            session.resume()
1206        };
1207
1208        if resumed {
1209            self.persist_in_background(session_id, "resume");
1210        }
1211
1212        Ok(resumed)
1213    }
1214
1215    /// Cancel an ongoing operation for a session
1216    ///
1217    /// Returns true if an operation was cancelled, false if no operation was running.
1218    pub async fn cancel_operation(&self, session_id: &str) -> Result<bool> {
1219        // First, cancel any pending HITL confirmations
1220        let session_lock = self.get_session(session_id).await?;
1221        let cancelled_confirmations = {
1222            let session = session_lock.read().await;
1223            session.confirmation_manager.cancel_all().await
1224        };
1225
1226        if cancelled_confirmations > 0 {
1227            tracing::info!(
1228                "Cancelled {} pending confirmations for session {}",
1229                cancelled_confirmations,
1230                session_id
1231            );
1232        }
1233
1234        // Then, abort the ongoing operation if any
1235        let abort_handle = {
1236            let mut ops = self.ongoing_operations.write().await;
1237            ops.remove(session_id)
1238        };
1239
1240        if let Some(handle) = abort_handle {
1241            handle.abort();
1242            tracing::info!("Cancelled ongoing operation for session {}", session_id);
1243            Ok(true)
1244        } else if cancelled_confirmations > 0 {
1245            // We cancelled confirmations but no main operation
1246            Ok(true)
1247        } else {
1248            tracing::debug!("No ongoing operation to cancel for session {}", session_id);
1249            Ok(false)
1250        }
1251    }
1252
1253    /// Get all sessions (returns session locks for iteration)
1254    pub async fn get_all_sessions(&self) -> Vec<Arc<RwLock<Session>>> {
1255        let sessions = self.sessions.read().await;
1256        sessions.values().cloned().collect()
1257    }
1258
1259    /// Get tool executor reference
1260    pub fn tool_executor(&self) -> &Arc<ToolExecutor> {
1261        &self.tool_executor
1262    }
1263
1264    /// Confirm a tool execution (HITL)
1265    pub async fn confirm_tool(
1266        &self,
1267        session_id: &str,
1268        tool_id: &str,
1269        approved: bool,
1270        reason: Option<String>,
1271    ) -> Result<bool> {
1272        let session_lock = self.get_session(session_id).await?;
1273        let session = session_lock.read().await;
1274        session
1275            .confirmation_manager
1276            .confirm(tool_id, approved, reason)
1277            .await
1278            .map_err(|e| anyhow::anyhow!(e))
1279    }
1280
1281    /// Set confirmation policy for a session (HITL)
1282    pub async fn set_confirmation_policy(
1283        &self,
1284        session_id: &str,
1285        policy: ConfirmationPolicy,
1286    ) -> Result<ConfirmationPolicy> {
1287        {
1288            let session_lock = self.get_session(session_id).await?;
1289            let session = session_lock.read().await;
1290            session.set_confirmation_policy(policy.clone()).await;
1291        }
1292
1293        // Update config for persistence
1294        {
1295            let session_lock = self.get_session(session_id).await?;
1296            let mut session = session_lock.write().await;
1297            session.config.confirmation_policy = Some(policy.clone());
1298        }
1299
1300        // Persist to store
1301        self.persist_in_background(session_id, "set_confirmation_policy");
1302
1303        Ok(policy)
1304    }
1305
1306    /// Get confirmation policy for a session (HITL)
1307    pub async fn get_confirmation_policy(&self, session_id: &str) -> Result<ConfirmationPolicy> {
1308        let session_lock = self.get_session(session_id).await?;
1309        let session = session_lock.read().await;
1310        Ok(session.confirmation_policy().await)
1311    }
1312}