Skip to main content

a3s_code_core/session/
manager.rs

1//! SessionManager — manages multiple concurrent sessions
2
3use super::{ContextUsage, Session, SessionConfig, SessionState};
4use crate::agent::{AgentConfig, AgentEvent, AgentLoop, AgentResult};
5use crate::hitl::ConfirmationPolicy;
6use crate::llm::{self, LlmClient, LlmConfig, Message};
7use crate::memory::AgentMemory;
8use crate::prompts::SystemPromptSlots;
9use crate::skills::SkillRegistry;
10use crate::store::{FileSessionStore, LlmConfigData, SessionData, SessionStore};
11use crate::tools::ToolExecutor;
12use a3s_memory::MemoryStore;
13use anyhow::{Context, Result};
14use std::collections::HashMap;
15use std::sync::Arc;
16use tokio::sync::{mpsc, RwLock};
17
18/// Session manager handles multiple concurrent sessions
19#[derive(Clone)]
20pub struct SessionManager {
21    pub(crate) sessions: Arc<RwLock<HashMap<String, Arc<RwLock<Session>>>>>,
22    /// Default LLM client used when a session has no per-session client configured.
23    /// Wrapped in RwLock so it can be hot-swapped at runtime without restart.
24    pub(crate) llm_client: Arc<RwLock<Option<Arc<dyn LlmClient>>>>,
25    pub(crate) tool_executor: Arc<ToolExecutor>,
26    /// Session stores by storage type
27    pub(crate) stores: Arc<RwLock<HashMap<crate::config::StorageBackend, Arc<dyn SessionStore>>>>,
28    /// Track which storage type each session uses
29    pub(crate) session_storage_types: Arc<RwLock<HashMap<String, crate::config::StorageBackend>>>,
30    /// LLM configurations for sessions (stored separately for persistence)
31    pub(crate) llm_configs: Arc<RwLock<HashMap<String, LlmConfigData>>>,
32    /// Ongoing operations (session_id -> JoinHandle)
33    pub(crate) ongoing_operations: Arc<RwLock<HashMap<String, tokio::task::AbortHandle>>>,
34    /// Skill registry for runtime skill management
35    pub(crate) skill_registry: Arc<RwLock<Option<Arc<SkillRegistry>>>>,
36    /// Shared memory store for agent long-term memory.
37    /// When set, each `generate`/`generate_streaming` call wraps this in
38    /// `AgentMemory` and injects it into `AgentConfig.memory`.
39    pub(crate) memory_store: Arc<RwLock<Option<Arc<dyn MemoryStore>>>>,
40}
41
42impl SessionManager {
43    /// Create a new session manager without persistence
44    pub fn new(llm_client: Option<Arc<dyn LlmClient>>, tool_executor: Arc<ToolExecutor>) -> Self {
45        Self {
46            sessions: Arc::new(RwLock::new(HashMap::new())),
47            llm_client: Arc::new(RwLock::new(llm_client)),
48            tool_executor,
49            stores: Arc::new(RwLock::new(HashMap::new())),
50            session_storage_types: Arc::new(RwLock::new(HashMap::new())),
51            llm_configs: Arc::new(RwLock::new(HashMap::new())),
52            ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
53            skill_registry: Arc::new(RwLock::new(None)),
54            memory_store: Arc::new(RwLock::new(None)),
55        }
56    }
57
58    /// Create a session manager with file-based persistence
59    ///
60    /// Sessions will be automatically saved to disk and restored on startup.
61    pub async fn with_persistence<P: AsRef<std::path::Path>>(
62        llm_client: Option<Arc<dyn LlmClient>>,
63        tool_executor: Arc<ToolExecutor>,
64        sessions_dir: P,
65    ) -> Result<Self> {
66        let store = FileSessionStore::new(sessions_dir).await?;
67        let mut stores = HashMap::new();
68        stores.insert(
69            crate::config::StorageBackend::File,
70            Arc::new(store) as Arc<dyn SessionStore>,
71        );
72
73        let manager = Self {
74            sessions: Arc::new(RwLock::new(HashMap::new())),
75            llm_client: Arc::new(RwLock::new(llm_client)),
76            tool_executor,
77            stores: Arc::new(RwLock::new(stores)),
78            session_storage_types: Arc::new(RwLock::new(HashMap::new())),
79            llm_configs: Arc::new(RwLock::new(HashMap::new())),
80            ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
81            skill_registry: Arc::new(RwLock::new(None)),
82            memory_store: Arc::new(RwLock::new(None)),
83        };
84
85        Ok(manager)
86    }
87
88    /// Create a session manager with a custom store
89    ///
90    /// The `backend` parameter determines which `StorageBackend` key the store is registered under.
91    /// Sessions created with a matching `storage_type` will use this store.
92    pub fn with_store(
93        llm_client: Option<Arc<dyn LlmClient>>,
94        tool_executor: Arc<ToolExecutor>,
95        store: Arc<dyn SessionStore>,
96        backend: crate::config::StorageBackend,
97    ) -> Self {
98        let mut stores = HashMap::new();
99        stores.insert(backend, store);
100
101        Self {
102            sessions: Arc::new(RwLock::new(HashMap::new())),
103            llm_client: Arc::new(RwLock::new(llm_client)),
104            tool_executor,
105            stores: Arc::new(RwLock::new(stores)),
106            session_storage_types: Arc::new(RwLock::new(HashMap::new())),
107            llm_configs: Arc::new(RwLock::new(HashMap::new())),
108            ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
109            skill_registry: Arc::new(RwLock::new(None)),
110            memory_store: Arc::new(RwLock::new(None)),
111        }
112    }
113
114    /// Hot-swap the default LLM client used by sessions without a per-session client.
115    ///
116    /// Called by SafeClaw when `PUT /api/agent/config` updates the model config.
117    /// All subsequent `generate` / `generate_streaming` calls will use the new client.
118    pub async fn set_default_llm(&self, client: Option<Arc<dyn LlmClient>>) {
119        *self.llm_client.write().await = client;
120    }
121
122    /// Set the skill registry for runtime skill management.
123    ///
124    /// Skills registered here will be injected into the system prompt
125    /// for all subsequent `generate` / `generate_streaming` calls.
126    /// Also registers the `manage_skill` tool on the `ToolExecutor` so
127    /// the Agent can create, list, and remove skills at runtime.
128    pub async fn set_skill_registry(
129        &self,
130        registry: Arc<SkillRegistry>,
131        skills_dir: std::path::PathBuf,
132    ) {
133        // Register the manage_skill tool for self-bootstrap
134        let manage_tool = crate::skills::ManageSkillTool::new(registry.clone(), skills_dir);
135        self.tool_executor
136            .register_dynamic_tool(Arc::new(manage_tool));
137
138        *self.skill_registry.write().await = Some(registry);
139    }
140
141    /// Get the current skill registry, if any.
142    pub async fn skill_registry(&self) -> Option<Arc<SkillRegistry>> {
143        self.skill_registry.read().await.clone()
144    }
145
146    /// Set the shared memory store for agent long-term memory.
147    ///
148    /// When set, every `generate`/`generate_streaming` call wraps this store
149    /// in an `AgentMemory` and injects it into `AgentConfig.memory`, enabling
150    /// automatic `recall_similar` before prompts and `remember_success`/
151    /// `remember_failure` after tool execution.
152    pub async fn set_memory_store(&self, store: Arc<dyn MemoryStore>) {
153        *self.memory_store.write().await = Some(store);
154    }
155
156    /// Get the current memory store, if any.
157    pub async fn memory_store(&self) -> Option<Arc<dyn MemoryStore>> {
158        self.memory_store.read().await.clone()
159    }
160
161    /// Restore a single session by ID from the store
162    ///
163    /// Searches all registered stores for the given session ID and restores it
164    /// into the in-memory session map. Returns an error if not found.
165    pub async fn restore_session_by_id(&self, session_id: &str) -> Result<()> {
166        // Check if already loaded
167        {
168            let sessions = self.sessions.read().await;
169            if sessions.contains_key(session_id) {
170                return Ok(());
171            }
172        }
173
174        let stores = self.stores.read().await;
175        for (backend, store) in stores.iter() {
176            match store.load(session_id).await {
177                Ok(Some(data)) => {
178                    {
179                        let mut storage_types = self.session_storage_types.write().await;
180                        storage_types.insert(data.id.clone(), backend.clone());
181                    }
182                    self.restore_session(data).await?;
183                    return Ok(());
184                }
185                Ok(None) => continue,
186                Err(e) => {
187                    tracing::warn!(
188                        "Failed to load session {} from {:?}: {}",
189                        session_id,
190                        backend,
191                        e
192                    );
193                    continue;
194                }
195            }
196        }
197
198        Err(anyhow::anyhow!(
199            "Session {} not found in any store",
200            session_id
201        ))
202    }
203
204    /// Load all sessions from all registered stores
205    pub async fn load_all_sessions(&mut self) -> Result<usize> {
206        let stores = self.stores.read().await;
207        let mut loaded = 0;
208
209        for (backend, store) in stores.iter() {
210            let session_ids = match store.list().await {
211                Ok(ids) => ids,
212                Err(e) => {
213                    tracing::warn!("Failed to list sessions from {:?} store: {}", backend, e);
214                    continue;
215                }
216            };
217
218            for id in session_ids {
219                match store.load(&id).await {
220                    Ok(Some(data)) => {
221                        // Record the storage type for this session
222                        {
223                            let mut storage_types = self.session_storage_types.write().await;
224                            storage_types.insert(data.id.clone(), backend.clone());
225                        }
226
227                        if let Err(e) = self.restore_session(data).await {
228                            tracing::warn!("Failed to restore session {}: {}", id, e);
229                        } else {
230                            loaded += 1;
231                        }
232                    }
233                    Ok(None) => {
234                        tracing::warn!("Session {} not found in store", id);
235                    }
236                    Err(e) => {
237                        tracing::warn!("Failed to load session {}: {}", id, e);
238                    }
239                }
240            }
241        }
242
243        tracing::info!("Loaded {} sessions from store", loaded);
244        Ok(loaded)
245    }
246
247    /// Restore a session from SessionData
248    async fn restore_session(&self, data: SessionData) -> Result<()> {
249        let tools = self.tool_executor.definitions();
250        let mut session = Session::new(data.id.clone(), data.config.clone(), tools).await?;
251
252        // Restore serializable state
253        session.restore_from_data(&data);
254
255        // Restore LLM config if present (without API key - must be reconfigured)
256        if let Some(llm_config) = &data.llm_config {
257            let mut configs = self.llm_configs.write().await;
258            configs.insert(data.id.clone(), llm_config.clone());
259        }
260
261        let mut sessions = self.sessions.write().await;
262        sessions.insert(data.id.clone(), Arc::new(RwLock::new(session)));
263
264        tracing::info!("Restored session: {}", data.id);
265        Ok(())
266    }
267
268    /// Save a session to the store
269    async fn save_session(&self, session_id: &str) -> Result<()> {
270        // Get the storage type for this session
271        let storage_type = {
272            let storage_types = self.session_storage_types.read().await;
273            storage_types.get(session_id).cloned()
274        };
275
276        let Some(storage_type) = storage_type else {
277            // No storage type means memory-only session
278            return Ok(());
279        };
280
281        // Skip saving for memory storage
282        if storage_type == crate::config::StorageBackend::Memory {
283            return Ok(());
284        }
285
286        // Get the appropriate store
287        let stores = self.stores.read().await;
288        let Some(store) = stores.get(&storage_type) else {
289            tracing::warn!("No store available for storage type: {:?}", storage_type);
290            return Ok(());
291        };
292
293        let session_lock = self.get_session(session_id).await?;
294        let session = session_lock.read().await;
295
296        // Get LLM config if set
297        let llm_config = {
298            let configs = self.llm_configs.read().await;
299            configs.get(session_id).cloned()
300        };
301
302        let data = session.to_session_data(llm_config);
303        store.save(&data).await?;
304
305        tracing::debug!("Saved session: {}", session_id);
306        Ok(())
307    }
308
309    /// Persist a session to store, logging and emitting an event on failure.
310    /// This is a non-fatal wrapper around `save_session` — the operation
311    /// succeeds in memory even if persistence fails.
312    async fn persist_or_warn(&self, session_id: &str, operation: &str) {
313        if let Err(e) = self.save_session(session_id).await {
314            tracing::warn!(
315                "Failed to persist session {} after {}: {}",
316                session_id,
317                operation,
318                e
319            );
320            // Emit event so SDK clients can react
321            if let Ok(session_lock) = self.get_session(session_id).await {
322                let session = session_lock.read().await;
323                let _ = session.event_tx().send(AgentEvent::PersistenceFailed {
324                    session_id: session_id.to_string(),
325                    operation: operation.to_string(),
326                    error: e.to_string(),
327                });
328            }
329        }
330    }
331
332    /// Spawn persistence as a background task (non-blocking).
333    ///
334    /// All `SessionManager` fields are `Arc`-wrapped, so `clone()` is a
335    /// cheap pointer-copy. This keeps file I/O off the response path.
336    fn persist_in_background(&self, session_id: &str, operation: &str) {
337        let mgr = self.clone();
338        let sid = session_id.to_string();
339        let op = operation.to_string();
340        tokio::spawn(async move {
341            mgr.persist_or_warn(&sid, &op).await;
342        });
343    }
344
345    /// Create a new session
346    pub async fn create_session(&self, id: String, config: SessionConfig) -> Result<String> {
347        tracing::info!(name: "a3s.session.create", session_id = %id, "Creating session");
348
349        // Record the storage type for this session
350        {
351            let mut storage_types = self.session_storage_types.write().await;
352            storage_types.insert(id.clone(), config.storage_type.clone());
353        }
354
355        // Get tool definitions from the executor
356        let tools = self.tool_executor.definitions();
357        let mut session = Session::new(id.clone(), config, tools).await?;
358
359        // Start the command queue
360        session.start_queue().await?;
361
362        // Set max context length if provided
363        if session.config.max_context_length > 0 {
364            session.context_usage.max_tokens = session.config.max_context_length as usize;
365        }
366
367        {
368            let mut sessions = self.sessions.write().await;
369            sessions.insert(id.clone(), Arc::new(RwLock::new(session)));
370        }
371
372        // Persist to store
373        self.persist_in_background(&id, "create");
374
375        tracing::info!("Created session: {}", id);
376        Ok(id)
377    }
378
379    /// Destroy a session
380    pub async fn destroy_session(&self, id: &str) -> Result<()> {
381        tracing::info!(name: "a3s.session.destroy", session_id = %id, "Destroying session");
382
383        // Get the storage type before removing the session
384        let storage_type = {
385            let storage_types = self.session_storage_types.read().await;
386            storage_types.get(id).cloned()
387        };
388
389        {
390            let mut sessions = self.sessions.write().await;
391            sessions.remove(id);
392        }
393
394        // Remove LLM config
395        {
396            let mut configs = self.llm_configs.write().await;
397            configs.remove(id);
398        }
399
400        // Remove storage type tracking
401        {
402            let mut storage_types = self.session_storage_types.write().await;
403            storage_types.remove(id);
404        }
405
406        // Delete from store if applicable
407        if let Some(storage_type) = storage_type {
408            if storage_type != crate::config::StorageBackend::Memory {
409                let stores = self.stores.read().await;
410                if let Some(store) = stores.get(&storage_type) {
411                    if let Err(e) = store.delete(id).await {
412                        tracing::warn!("Failed to delete session {} from store: {}", id, e);
413                    }
414                }
415            }
416        }
417
418        tracing::info!("Destroyed session: {}", id);
419        Ok(())
420    }
421
422    /// Get a session by ID
423    pub async fn get_session(&self, id: &str) -> Result<Arc<RwLock<Session>>> {
424        let sessions = self.sessions.read().await;
425        sessions
426            .get(id)
427            .cloned()
428            .context(format!("Session not found: {}", id))
429    }
430
431    /// Create a child session for a subagent
432    ///
433    /// Child sessions inherit the parent's LLM client but have their own
434    /// permission policy and configuration.
435    pub async fn create_child_session(
436        &self,
437        parent_id: &str,
438        child_id: String,
439        mut config: SessionConfig,
440    ) -> Result<String> {
441        // Verify parent exists and inherit HITL policy
442        let parent_lock = self.get_session(parent_id).await?;
443        let parent_llm_client = {
444            let parent = parent_lock.read().await;
445
446            // Inherit parent's confirmation policy if child doesn't have one
447            if config.confirmation_policy.is_none() {
448                let parent_policy = parent.confirmation_manager.policy().await;
449                config.confirmation_policy = Some(parent_policy);
450            }
451
452            parent.llm_client.clone()
453        };
454
455        // Set parent_id in config
456        config.parent_id = Some(parent_id.to_string());
457
458        // Get tool definitions from the executor
459        let tools = self.tool_executor.definitions();
460        let mut session = Session::new(child_id.clone(), config, tools).await?;
461
462        // Inherit LLM client from parent if not set
463        if session.llm_client.is_none() {
464            let default_llm = self.llm_client.read().await.clone();
465            session.llm_client = parent_llm_client.or(default_llm);
466        }
467
468        // Start the command queue
469        session.start_queue().await?;
470
471        // Set max context length if provided
472        if session.config.max_context_length > 0 {
473            session.context_usage.max_tokens = session.config.max_context_length as usize;
474        }
475
476        {
477            let mut sessions = self.sessions.write().await;
478            sessions.insert(child_id.clone(), Arc::new(RwLock::new(session)));
479        }
480
481        // Persist to store
482        self.persist_in_background(&child_id, "create_child");
483
484        tracing::info!(
485            "Created child session: {} (parent: {})",
486            child_id,
487            parent_id
488        );
489        Ok(child_id)
490    }
491
492    /// Get all child sessions for a parent session
493    pub async fn get_child_sessions(&self, parent_id: &str) -> Vec<String> {
494        let sessions = self.sessions.read().await;
495        let mut children = Vec::new();
496
497        for (id, session_lock) in sessions.iter() {
498            let session = session_lock.read().await;
499            if session.parent_id.as_deref() == Some(parent_id) {
500                children.push(id.clone());
501            }
502        }
503
504        children
505    }
506
507    /// Check if a session is a child session
508    pub async fn is_child_session(&self, session_id: &str) -> Result<bool> {
509        let session_lock = self.get_session(session_id).await?;
510        let session = session_lock.read().await;
511        Ok(session.is_child_session())
512    }
513
514    /// Generate response for a prompt
515    pub async fn generate(&self, session_id: &str, prompt: &str) -> Result<AgentResult> {
516        let session_lock = self.get_session(session_id).await?;
517
518        // Check if session is paused
519        {
520            let session = session_lock.read().await;
521            if session.state == SessionState::Paused {
522                anyhow::bail!(
523                    "Session {} is paused. Call Resume before generating.",
524                    session_id
525                );
526            }
527        }
528
529        // Get session state and LLM client
530        let (
531            history,
532            system,
533            tools,
534            session_llm_client,
535            permission_checker,
536            confirmation_manager,
537            context_providers,
538            session_workspace,
539            tool_metrics,
540            hook_engine,
541            planning_enabled,
542            goal_tracking,
543        ) = {
544            let session = session_lock.read().await;
545            (
546                session.messages.clone(),
547                session.system().map(String::from),
548                session.tools.clone(),
549                session.llm_client.clone(),
550                session.permission_checker.clone(),
551                session.confirmation_manager.clone(),
552                session.context_providers.clone(),
553                session.config.workspace.clone(),
554                session.tool_metrics.clone(),
555                session.config.hook_engine.clone(),
556                session.config.planning_enabled,
557                session.config.goal_tracking,
558            )
559        };
560
561        // Use session's LLM client if configured, otherwise use default
562        let llm_client = if let Some(client) = session_llm_client {
563            client
564        } else if let Some(client) = self.llm_client.read().await.clone() {
565            client
566        } else {
567            anyhow::bail!(
568                "LLM client not configured for session {}. Please call Configure RPC with model configuration first.",
569                session_id
570            );
571        };
572
573        // Construct per-session ToolContext from session workspace, falling back to server default
574        let tool_context = if session_workspace.is_empty() {
575            crate::tools::ToolContext::new(self.tool_executor.workspace().clone())
576                .with_session_id(session_id)
577        } else {
578            crate::tools::ToolContext::new(std::path::PathBuf::from(&session_workspace))
579                .with_session_id(session_id)
580        };
581
582        // Inject skill registry into system prompt and agent config
583        let skill_registry = self.skill_registry.read().await.clone();
584        let system = if let Some(ref registry) = skill_registry {
585            let skill_prompt = registry.to_system_prompt();
586            if skill_prompt.is_empty() {
587                system
588            } else {
589                match system {
590                    Some(existing) => Some(format!("{}\n\n{}", existing, skill_prompt)),
591                    None => Some(skill_prompt),
592                }
593            }
594        } else {
595            system
596        };
597
598        // Inject matched skill content on-demand before skill_registry is moved into AgentConfig
599        let effective_prompt = if let Some(ref registry) = skill_registry {
600            let skill_content = registry.match_skills(prompt);
601            if skill_content.is_empty() {
602                prompt.to_string()
603            } else {
604                format!("{}\n\n---\n\n{}", skill_content, prompt)
605            }
606        } else {
607            prompt.to_string()
608        };
609
610        // Build AgentMemory from shared store (if configured)
611        let memory = self
612            .memory_store
613            .read()
614            .await
615            .as_ref()
616            .map(|store| Arc::new(AgentMemory::new(store.clone())));
617
618        // Create agent loop with permission policy, confirmation manager, and context providers
619        let config = AgentConfig {
620            prompt_slots: match system {
621                Some(s) => SystemPromptSlots::from_legacy(s),
622                None => SystemPromptSlots::default(),
623            },
624            tools,
625            max_tool_rounds: 50,
626            permission_checker: Some(permission_checker),
627            confirmation_manager: Some(confirmation_manager),
628            context_providers,
629            planning_enabled,
630            goal_tracking,
631            hook_engine,
632            skill_registry,
633            memory,
634            ..AgentConfig::default()
635        };
636
637        let agent = AgentLoop::new(llm_client, self.tool_executor.clone(), tool_context, config)
638            .with_tool_metrics(tool_metrics);
639
640        // Execute with session context
641        let result = agent
642            .execute_with_session(&history, &effective_prompt, Some(session_id), None)
643            .await?;
644
645        // Update session
646        {
647            let mut session = session_lock.write().await;
648            session.messages = result.messages.clone();
649            session.update_usage(&result.usage);
650        }
651
652        // Persist to store
653        self.persist_in_background(session_id, "generate");
654
655        // Auto-compact if context usage exceeds threshold
656        if let Err(e) = self.maybe_auto_compact(session_id).await {
657            tracing::warn!("Auto-compact failed for session {}: {}", session_id, e);
658        }
659
660        Ok(result)
661    }
662
663    /// Generate response with streaming events
664    pub async fn generate_streaming(
665        &self,
666        session_id: &str,
667        prompt: &str,
668    ) -> Result<(
669        mpsc::Receiver<AgentEvent>,
670        tokio::task::JoinHandle<Result<AgentResult>>,
671    )> {
672        let session_lock = self.get_session(session_id).await?;
673
674        // Check if session is paused
675        {
676            let session = session_lock.read().await;
677            if session.state == SessionState::Paused {
678                anyhow::bail!(
679                    "Session {} is paused. Call Resume before generating.",
680                    session_id
681                );
682            }
683        }
684
685        // Get session state and LLM client
686        let (
687            history,
688            system,
689            tools,
690            session_llm_client,
691            permission_checker,
692            confirmation_manager,
693            context_providers,
694            session_workspace,
695            tool_metrics,
696            hook_engine,
697            planning_enabled,
698            goal_tracking,
699        ) = {
700            let session = session_lock.read().await;
701            (
702                session.messages.clone(),
703                session.system().map(String::from),
704                session.tools.clone(),
705                session.llm_client.clone(),
706                session.permission_checker.clone(),
707                session.confirmation_manager.clone(),
708                session.context_providers.clone(),
709                session.config.workspace.clone(),
710                session.tool_metrics.clone(),
711                session.config.hook_engine.clone(),
712                session.config.planning_enabled,
713                session.config.goal_tracking,
714            )
715        };
716
717        // Use session's LLM client if configured, otherwise use default
718        let llm_client = if let Some(client) = session_llm_client {
719            client
720        } else if let Some(client) = self.llm_client.read().await.clone() {
721            client
722        } else {
723            anyhow::bail!(
724                "LLM client not configured for session {}. Please call Configure RPC with model configuration first.",
725                session_id
726            );
727        };
728
729        // Construct per-session ToolContext from session workspace, falling back to server default
730        let tool_context = if session_workspace.is_empty() {
731            crate::tools::ToolContext::new(self.tool_executor.workspace().clone())
732                .with_session_id(session_id)
733        } else {
734            crate::tools::ToolContext::new(std::path::PathBuf::from(&session_workspace))
735                .with_session_id(session_id)
736        };
737
738        // Inject skill registry into system prompt and agent config
739        let skill_registry = self.skill_registry.read().await.clone();
740        let system = if let Some(ref registry) = skill_registry {
741            let skill_prompt = registry.to_system_prompt();
742            if skill_prompt.is_empty() {
743                system
744            } else {
745                match system {
746                    Some(existing) => Some(format!("{}\n\n{}", existing, skill_prompt)),
747                    None => Some(skill_prompt),
748                }
749            }
750        } else {
751            system
752        };
753
754        // Inject matched skill content on-demand before skill_registry is moved into AgentConfig
755        let effective_prompt = if let Some(ref registry) = skill_registry {
756            let skill_content = registry.match_skills(prompt);
757            if skill_content.is_empty() {
758                prompt.to_string()
759            } else {
760                format!("{}\n\n---\n\n{}", skill_content, prompt)
761            }
762        } else {
763            prompt.to_string()
764        };
765
766        // Build AgentMemory from shared store (if configured)
767        let memory = self
768            .memory_store
769            .read()
770            .await
771            .as_ref()
772            .map(|store| Arc::new(AgentMemory::new(store.clone())));
773
774        // Create agent loop with permission policy, confirmation manager, and context providers
775        let config = AgentConfig {
776            prompt_slots: match system {
777                Some(s) => SystemPromptSlots::from_legacy(s),
778                None => SystemPromptSlots::default(),
779            },
780            tools,
781            max_tool_rounds: 50,
782            permission_checker: Some(permission_checker),
783            confirmation_manager: Some(confirmation_manager),
784            context_providers,
785            planning_enabled,
786            goal_tracking,
787            hook_engine,
788            skill_registry,
789            memory,
790            ..AgentConfig::default()
791        };
792
793        let agent = AgentLoop::new(llm_client, self.tool_executor.clone(), tool_context, config)
794            .with_tool_metrics(tool_metrics);
795
796        // Execute with streaming
797        let (rx, handle) = agent.execute_streaming(&history, &effective_prompt).await?;
798
799        // Store the abort handle for cancellation support
800        let abort_handle = handle.abort_handle();
801        {
802            let mut ops = self.ongoing_operations.write().await;
803            ops.insert(session_id.to_string(), abort_handle);
804        }
805
806        // Spawn task to update session after completion
807        let session_lock_clone = session_lock.clone();
808        let original_handle = handle;
809        let stores = self.stores.clone();
810        let session_storage_types = self.session_storage_types.clone();
811        let llm_configs = self.llm_configs.clone();
812        let session_id_owned = session_id.to_string();
813        let ongoing_operations = self.ongoing_operations.clone();
814        let session_manager = self.clone();
815
816        let wrapped_handle = tokio::spawn(async move {
817            let result = original_handle.await??;
818
819            // Remove from ongoing operations
820            {
821                let mut ops = ongoing_operations.write().await;
822                ops.remove(&session_id_owned);
823            }
824
825            // Update session
826            {
827                let mut session = session_lock_clone.write().await;
828                session.messages = result.messages.clone();
829                session.update_usage(&result.usage);
830            }
831
832            // Persist to store
833            let storage_type = {
834                let storage_types = session_storage_types.read().await;
835                storage_types.get(&session_id_owned).cloned()
836            };
837
838            if let Some(storage_type) = storage_type {
839                if storage_type != crate::config::StorageBackend::Memory {
840                    let stores_guard = stores.read().await;
841                    if let Some(store) = stores_guard.get(&storage_type) {
842                        let session = session_lock_clone.read().await;
843                        let llm_config = {
844                            let configs = llm_configs.read().await;
845                            configs.get(&session_id_owned).cloned()
846                        };
847                        let data = session.to_session_data(llm_config);
848                        if let Err(e) = store.save(&data).await {
849                            tracing::warn!(
850                                "Failed to persist session {} after streaming: {}",
851                                session_id_owned,
852                                e
853                            );
854                        }
855                    }
856                }
857            }
858
859            // Auto-compact if context usage exceeds threshold
860            if let Err(e) = session_manager.maybe_auto_compact(&session_id_owned).await {
861                tracing::warn!(
862                    "Auto-compact failed for session {}: {}",
863                    session_id_owned,
864                    e
865                );
866            }
867
868            Ok(result)
869        });
870
871        Ok((rx, wrapped_handle))
872    }
873
874    /// Get context usage for a session
875    pub async fn context_usage(&self, session_id: &str) -> Result<ContextUsage> {
876        let session_lock = self.get_session(session_id).await?;
877        let session = session_lock.read().await;
878        Ok(session.context_usage.clone())
879    }
880
881    /// Get conversation history for a session
882    pub async fn history(&self, session_id: &str) -> Result<Vec<Message>> {
883        let session_lock = self.get_session(session_id).await?;
884        let session = session_lock.read().await;
885        Ok(session.messages.clone())
886    }
887
888    /// Clear session history
889    pub async fn clear(&self, session_id: &str) -> Result<()> {
890        {
891            let session_lock = self.get_session(session_id).await?;
892            let mut session = session_lock.write().await;
893            session.clear();
894        }
895
896        // Persist to store
897        self.persist_in_background(session_id, "clear");
898
899        Ok(())
900    }
901
902    /// Compact session context
903    pub async fn compact(&self, session_id: &str) -> Result<()> {
904        tracing::info!(name: "a3s.session.compact", session_id = %session_id, "Compacting session context");
905
906        {
907            let session_lock = self.get_session(session_id).await?;
908            let mut session = session_lock.write().await;
909
910            // Get LLM client for compaction (if available)
911            let llm_client = if let Some(client) = &session.llm_client {
912                client.clone()
913            } else if let Some(client) = self.llm_client.read().await.clone() {
914                client
915            } else {
916                // If no LLM client available, just do simple truncation
917                tracing::warn!("No LLM client configured for compaction, using simple truncation");
918                let keep_messages = 20;
919                if session.messages.len() > keep_messages {
920                    let len = session.messages.len();
921                    session.messages = session.messages.split_off(len - keep_messages);
922                }
923                // Persist after truncation
924                drop(session);
925                self.persist_in_background(session_id, "compact");
926                return Ok(());
927            };
928
929            session.compact(&llm_client).await?;
930        }
931
932        // Persist to store
933        self.persist_in_background(session_id, "compact");
934
935        Ok(())
936    }
937
938    /// Check if auto-compaction should be triggered and perform it if needed.
939    ///
940    /// Called after `generate()` / `generate_streaming()` updates session usage.
941    /// Triggers compaction when:
942    /// - `auto_compact` is enabled in session config
943    /// - `context_usage.percent` exceeds `auto_compact_threshold`
944    pub async fn maybe_auto_compact(&self, session_id: &str) -> Result<bool> {
945        let (should_compact, percent_before, messages_before) = {
946            let session_lock = self.get_session(session_id).await?;
947            let session = session_lock.read().await;
948
949            if !session.config.auto_compact {
950                return Ok(false);
951            }
952
953            let threshold = session.config.auto_compact_threshold;
954            let percent = session.context_usage.percent;
955            let msg_count = session.messages.len();
956
957            tracing::debug!(
958                "Auto-compact check for session {}: percent={:.2}%, threshold={:.2}%, messages={}",
959                session_id,
960                percent * 100.0,
961                threshold * 100.0,
962                msg_count,
963            );
964
965            (percent >= threshold, percent, msg_count)
966        };
967
968        if !should_compact {
969            return Ok(false);
970        }
971
972        tracing::info!(
973            name: "a3s.session.auto_compact",
974            session_id = %session_id,
975            percent_before = %format!("{:.1}%", percent_before * 100.0),
976            messages_before = %messages_before,
977            "Auto-compacting session due to high context usage"
978        );
979
980        // Perform compaction (reuses existing compact logic)
981        self.compact(session_id).await?;
982
983        // Get post-compaction message count
984        let messages_after = {
985            let session_lock = self.get_session(session_id).await?;
986            let session = session_lock.read().await;
987            session.messages.len()
988        };
989
990        // Broadcast event to notify clients
991        let event = AgentEvent::ContextCompacted {
992            session_id: session_id.to_string(),
993            before_messages: messages_before,
994            after_messages: messages_after,
995            percent_before,
996        };
997
998        // Try to send via session's event broadcaster
999        if let Ok(session_lock) = self.get_session(session_id).await {
1000            let session = session_lock.read().await;
1001            let _ = session.event_tx.send(event);
1002        }
1003
1004        tracing::info!(
1005            name: "a3s.session.auto_compact.done",
1006            session_id = %session_id,
1007            messages_before = %messages_before,
1008            messages_after = %messages_after,
1009            "Auto-compaction complete"
1010        );
1011
1012        Ok(true)
1013    }
1014
1015    /// Resolve the LLM client for a session (session-level -> default fallback)
1016    ///
1017    /// Returns `None` if no LLM client is configured at either level.
1018    pub async fn get_llm_for_session(
1019        &self,
1020        session_id: &str,
1021    ) -> Result<Option<Arc<dyn LlmClient>>> {
1022        let session_lock = self.get_session(session_id).await?;
1023        let session = session_lock.read().await;
1024
1025        if let Some(client) = &session.llm_client {
1026            return Ok(Some(client.clone()));
1027        }
1028
1029        Ok(self.llm_client.read().await.clone())
1030    }
1031
1032    /// Configure session
1033    pub async fn configure(
1034        &self,
1035        session_id: &str,
1036        thinking: Option<bool>,
1037        budget: Option<usize>,
1038        model_config: Option<LlmConfig>,
1039    ) -> Result<()> {
1040        {
1041            let session_lock = self.get_session(session_id).await?;
1042            let mut session = session_lock.write().await;
1043
1044            if let Some(t) = thinking {
1045                session.thinking_enabled = t;
1046            }
1047            if let Some(b) = budget {
1048                session.thinking_budget = Some(b);
1049            }
1050            if let Some(ref config) = model_config {
1051                tracing::info!(
1052                    "Configuring session {} with LLM: provider={}, model={}",
1053                    session_id,
1054                    config.provider,
1055                    config.model
1056                );
1057                session.model_name = Some(config.model.clone());
1058                session.llm_client = Some(llm::create_client_with_config(config.clone()));
1059            }
1060        }
1061
1062        // Store LLM config for persistence (without API key)
1063        if let Some(config) = model_config {
1064            let llm_config_data = LlmConfigData {
1065                provider: config.provider,
1066                model: config.model,
1067                api_key: None, // Don't persist API key
1068                base_url: config.base_url,
1069            };
1070            let mut configs = self.llm_configs.write().await;
1071            configs.insert(session_id.to_string(), llm_config_data);
1072        }
1073
1074        // Persist to store
1075        self.persist_in_background(session_id, "configure");
1076
1077        Ok(())
1078    }
1079
1080    /// Get session count
1081    pub async fn session_count(&self) -> usize {
1082        let sessions = self.sessions.read().await;
1083        sessions.len()
1084    }
1085
1086    /// Check health of all registered stores
1087    pub async fn store_health(&self) -> Vec<(String, Result<()>)> {
1088        let stores = self.stores.read().await;
1089        let mut results = Vec::new();
1090        for (_, store) in stores.iter() {
1091            let name = store.backend_name().to_string();
1092            let result = store.health_check().await;
1093            results.push((name, result));
1094        }
1095        results
1096    }
1097
1098    /// List all loaded tools (built-in tools)
1099    pub fn list_tools(&self) -> Vec<crate::llm::ToolDefinition> {
1100        self.tool_executor.definitions()
1101    }
1102
1103    /// Pause a session
1104    pub async fn pause_session(&self, session_id: &str) -> Result<bool> {
1105        let paused = {
1106            let session_lock = self.get_session(session_id).await?;
1107            let mut session = session_lock.write().await;
1108            session.pause()
1109        };
1110
1111        if paused {
1112            self.persist_in_background(session_id, "pause");
1113        }
1114
1115        Ok(paused)
1116    }
1117
1118    /// Resume a session
1119    pub async fn resume_session(&self, session_id: &str) -> Result<bool> {
1120        let resumed = {
1121            let session_lock = self.get_session(session_id).await?;
1122            let mut session = session_lock.write().await;
1123            session.resume()
1124        };
1125
1126        if resumed {
1127            self.persist_in_background(session_id, "resume");
1128        }
1129
1130        Ok(resumed)
1131    }
1132
1133    /// Cancel an ongoing operation for a session
1134    ///
1135    /// Returns true if an operation was cancelled, false if no operation was running.
1136    pub async fn cancel_operation(&self, session_id: &str) -> Result<bool> {
1137        // First, cancel any pending HITL confirmations
1138        let session_lock = self.get_session(session_id).await?;
1139        let cancelled_confirmations = {
1140            let session = session_lock.read().await;
1141            session.confirmation_manager.cancel_all().await
1142        };
1143
1144        if cancelled_confirmations > 0 {
1145            tracing::info!(
1146                "Cancelled {} pending confirmations for session {}",
1147                cancelled_confirmations,
1148                session_id
1149            );
1150        }
1151
1152        // Then, abort the ongoing operation if any
1153        let abort_handle = {
1154            let mut ops = self.ongoing_operations.write().await;
1155            ops.remove(session_id)
1156        };
1157
1158        if let Some(handle) = abort_handle {
1159            handle.abort();
1160            tracing::info!("Cancelled ongoing operation for session {}", session_id);
1161            Ok(true)
1162        } else if cancelled_confirmations > 0 {
1163            // We cancelled confirmations but no main operation
1164            Ok(true)
1165        } else {
1166            tracing::debug!("No ongoing operation to cancel for session {}", session_id);
1167            Ok(false)
1168        }
1169    }
1170
1171    /// Get all sessions (returns session locks for iteration)
1172    pub async fn get_all_sessions(&self) -> Vec<Arc<RwLock<Session>>> {
1173        let sessions = self.sessions.read().await;
1174        sessions.values().cloned().collect()
1175    }
1176
1177    /// Get tool executor reference
1178    pub fn tool_executor(&self) -> &Arc<ToolExecutor> {
1179        &self.tool_executor
1180    }
1181
1182    /// Confirm a tool execution (HITL)
1183    pub async fn confirm_tool(
1184        &self,
1185        session_id: &str,
1186        tool_id: &str,
1187        approved: bool,
1188        reason: Option<String>,
1189    ) -> Result<bool> {
1190        let session_lock = self.get_session(session_id).await?;
1191        let session = session_lock.read().await;
1192        session
1193            .confirmation_manager
1194            .confirm(tool_id, approved, reason)
1195            .await
1196            .map_err(|e| anyhow::anyhow!(e))
1197    }
1198
1199    /// Set confirmation policy for a session (HITL)
1200    pub async fn set_confirmation_policy(
1201        &self,
1202        session_id: &str,
1203        policy: ConfirmationPolicy,
1204    ) -> Result<ConfirmationPolicy> {
1205        {
1206            let session_lock = self.get_session(session_id).await?;
1207            let session = session_lock.read().await;
1208            session.set_confirmation_policy(policy.clone()).await;
1209        }
1210
1211        // Update config for persistence
1212        {
1213            let session_lock = self.get_session(session_id).await?;
1214            let mut session = session_lock.write().await;
1215            session.config.confirmation_policy = Some(policy.clone());
1216        }
1217
1218        // Persist to store
1219        self.persist_in_background(session_id, "set_confirmation_policy");
1220
1221        Ok(policy)
1222    }
1223
1224    /// Get confirmation policy for a session (HITL)
1225    pub async fn get_confirmation_policy(&self, session_id: &str) -> Result<ConfirmationPolicy> {
1226        let session_lock = self.get_session(session_id).await?;
1227        let session = session_lock.read().await;
1228        Ok(session.confirmation_policy().await)
1229    }
1230}