Skip to main content

a3s_code_core/session/
manager.rs

1//! SessionManager — manages multiple concurrent sessions
2
3use super::{ContextUsage, Session, SessionConfig, SessionState};
4use crate::agent::{AgentConfig, AgentEvent, AgentLoop, AgentResult};
5use crate::hitl::ConfirmationPolicy;
6use crate::llm::{self, LlmClient, LlmConfig, Message};
7use crate::store::{FileSessionStore, LlmConfigData, SessionData, SessionStore};
8use crate::tools::ToolExecutor;
9use anyhow::{Context, Result};
10use std::collections::HashMap;
11use std::sync::Arc;
12use tokio::sync::{mpsc, RwLock};
13
14/// Session manager handles multiple concurrent sessions
15#[derive(Clone)]
16pub struct SessionManager {
17    pub(crate) sessions: Arc<RwLock<HashMap<String, Arc<RwLock<Session>>>>>,
18    pub(crate) llm_client: Option<Arc<dyn LlmClient>>,
19    pub(crate) tool_executor: Arc<ToolExecutor>,
20    /// Session stores by storage type
21    pub(crate) stores: Arc<RwLock<HashMap<crate::config::StorageBackend, Arc<dyn SessionStore>>>>,
22    /// Track which storage type each session uses
23    pub(crate) session_storage_types: Arc<RwLock<HashMap<String, crate::config::StorageBackend>>>,
24    /// LLM configurations for sessions (stored separately for persistence)
25    pub(crate) llm_configs: Arc<RwLock<HashMap<String, LlmConfigData>>>,
26    /// Ongoing operations (session_id -> JoinHandle)
27    pub(crate) ongoing_operations: Arc<RwLock<HashMap<String, tokio::task::AbortHandle>>>,
28}
29
30impl SessionManager {
31    /// Create a new session manager without persistence
32    pub fn new(llm_client: Option<Arc<dyn LlmClient>>, tool_executor: Arc<ToolExecutor>) -> Self {
33        Self {
34            sessions: Arc::new(RwLock::new(HashMap::new())),
35            llm_client,
36            tool_executor,
37            stores: Arc::new(RwLock::new(HashMap::new())),
38            session_storage_types: Arc::new(RwLock::new(HashMap::new())),
39            llm_configs: Arc::new(RwLock::new(HashMap::new())),
40            ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
41        }
42    }
43
44    /// Create a session manager with file-based persistence
45    ///
46    /// Sessions will be automatically saved to disk and restored on startup.
47    pub async fn with_persistence<P: AsRef<std::path::Path>>(
48        llm_client: Option<Arc<dyn LlmClient>>,
49        tool_executor: Arc<ToolExecutor>,
50        sessions_dir: P,
51    ) -> Result<Self> {
52        let store = FileSessionStore::new(sessions_dir).await?;
53        let mut stores = HashMap::new();
54        stores.insert(
55            crate::config::StorageBackend::File,
56            Arc::new(store) as Arc<dyn SessionStore>,
57        );
58
59        let manager = Self {
60            sessions: Arc::new(RwLock::new(HashMap::new())),
61            llm_client,
62            tool_executor,
63            stores: Arc::new(RwLock::new(stores)),
64            session_storage_types: Arc::new(RwLock::new(HashMap::new())),
65            llm_configs: Arc::new(RwLock::new(HashMap::new())),
66            ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
67        };
68
69        Ok(manager)
70    }
71
72    /// Create a session manager with a custom store
73    ///
74    /// The `backend` parameter determines which `StorageBackend` key the store is registered under.
75    /// Sessions created with a matching `storage_type` will use this store.
76    pub fn with_store(
77        llm_client: Option<Arc<dyn LlmClient>>,
78        tool_executor: Arc<ToolExecutor>,
79        store: Arc<dyn SessionStore>,
80        backend: crate::config::StorageBackend,
81    ) -> Self {
82        let mut stores = HashMap::new();
83        stores.insert(backend, store);
84
85        Self {
86            sessions: Arc::new(RwLock::new(HashMap::new())),
87            llm_client,
88            tool_executor,
89            stores: Arc::new(RwLock::new(stores)),
90            session_storage_types: Arc::new(RwLock::new(HashMap::new())),
91            llm_configs: Arc::new(RwLock::new(HashMap::new())),
92            ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
93        }
94    }
95
96    /// Restore a single session by ID from the store
97    ///
98    /// Searches all registered stores for the given session ID and restores it
99    /// into the in-memory session map. Returns an error if not found.
100    pub async fn restore_session_by_id(&self, session_id: &str) -> Result<()> {
101        // Check if already loaded
102        {
103            let sessions = self.sessions.read().await;
104            if sessions.contains_key(session_id) {
105                return Ok(());
106            }
107        }
108
109        let stores = self.stores.read().await;
110        for (backend, store) in stores.iter() {
111            match store.load(session_id).await {
112                Ok(Some(data)) => {
113                    {
114                        let mut storage_types = self.session_storage_types.write().await;
115                        storage_types.insert(data.id.clone(), backend.clone());
116                    }
117                    self.restore_session(data).await?;
118                    return Ok(());
119                }
120                Ok(None) => continue,
121                Err(e) => {
122                    tracing::warn!(
123                        "Failed to load session {} from {:?}: {}",
124                        session_id,
125                        backend,
126                        e
127                    );
128                    continue;
129                }
130            }
131        }
132
133        Err(anyhow::anyhow!(
134            "Session {} not found in any store",
135            session_id
136        ))
137    }
138
139    /// Load all sessions from all registered stores
140    pub async fn load_all_sessions(&mut self) -> Result<usize> {
141        let stores = self.stores.read().await;
142        let mut loaded = 0;
143
144        for (backend, store) in stores.iter() {
145            let session_ids = match store.list().await {
146                Ok(ids) => ids,
147                Err(e) => {
148                    tracing::warn!("Failed to list sessions from {:?} store: {}", backend, e);
149                    continue;
150                }
151            };
152
153            for id in session_ids {
154                match store.load(&id).await {
155                    Ok(Some(data)) => {
156                        // Record the storage type for this session
157                        {
158                            let mut storage_types = self.session_storage_types.write().await;
159                            storage_types.insert(data.id.clone(), backend.clone());
160                        }
161
162                        if let Err(e) = self.restore_session(data).await {
163                            tracing::warn!("Failed to restore session {}: {}", id, e);
164                        } else {
165                            loaded += 1;
166                        }
167                    }
168                    Ok(None) => {
169                        tracing::warn!("Session {} not found in store", id);
170                    }
171                    Err(e) => {
172                        tracing::warn!("Failed to load session {}: {}", id, e);
173                    }
174                }
175            }
176        }
177
178        tracing::info!("Loaded {} sessions from store", loaded);
179        Ok(loaded)
180    }
181
182    /// Restore a session from SessionData
183    async fn restore_session(&self, data: SessionData) -> Result<()> {
184        let tools = self.tool_executor.definitions();
185        let mut session = Session::new(data.id.clone(), data.config.clone(), tools).await?;
186
187        // Restore serializable state
188        session.restore_from_data(&data);
189
190        // Restore LLM config if present (without API key - must be reconfigured)
191        if let Some(llm_config) = &data.llm_config {
192            let mut configs = self.llm_configs.write().await;
193            configs.insert(data.id.clone(), llm_config.clone());
194        }
195
196        let mut sessions = self.sessions.write().await;
197        sessions.insert(data.id.clone(), Arc::new(RwLock::new(session)));
198
199        tracing::info!("Restored session: {}", data.id);
200        Ok(())
201    }
202
203    /// Save a session to the store
204    async fn save_session(&self, session_id: &str) -> Result<()> {
205        // Get the storage type for this session
206        let storage_type = {
207            let storage_types = self.session_storage_types.read().await;
208            storage_types.get(session_id).cloned()
209        };
210
211        let Some(storage_type) = storage_type else {
212            // No storage type means memory-only session
213            return Ok(());
214        };
215
216        // Skip saving for memory storage
217        if storage_type == crate::config::StorageBackend::Memory {
218            return Ok(());
219        }
220
221        // Get the appropriate store
222        let stores = self.stores.read().await;
223        let Some(store) = stores.get(&storage_type) else {
224            tracing::warn!("No store available for storage type: {:?}", storage_type);
225            return Ok(());
226        };
227
228        let session_lock = self.get_session(session_id).await?;
229        let session = session_lock.read().await;
230
231        // Get LLM config if set
232        let llm_config = {
233            let configs = self.llm_configs.read().await;
234            configs.get(session_id).cloned()
235        };
236
237        let data = session.to_session_data(llm_config);
238        store.save(&data).await?;
239
240        tracing::debug!("Saved session: {}", session_id);
241        Ok(())
242    }
243
244    /// Persist a session to store, logging and emitting an event on failure.
245    /// This is a non-fatal wrapper around `save_session` — the operation
246    /// succeeds in memory even if persistence fails.
247    async fn persist_or_warn(&self, session_id: &str, operation: &str) {
248        if let Err(e) = self.save_session(session_id).await {
249            tracing::warn!(
250                "Failed to persist session {} after {}: {}",
251                session_id,
252                operation,
253                e
254            );
255            // Emit event so SDK clients can react
256            if let Ok(session_lock) = self.get_session(session_id).await {
257                let session = session_lock.read().await;
258                let _ = session.event_tx().send(AgentEvent::PersistenceFailed {
259                    session_id: session_id.to_string(),
260                    operation: operation.to_string(),
261                    error: e.to_string(),
262                });
263            }
264        }
265    }
266
267    /// Spawn persistence as a background task (non-blocking).
268    ///
269    /// All `SessionManager` fields are `Arc`-wrapped, so `clone()` is a
270    /// cheap pointer-copy. This keeps file I/O off the response path.
271    fn persist_in_background(&self, session_id: &str, operation: &str) {
272        let mgr = self.clone();
273        let sid = session_id.to_string();
274        let op = operation.to_string();
275        tokio::spawn(async move {
276            mgr.persist_or_warn(&sid, &op).await;
277        });
278    }
279
280    /// Create a new session
281    pub async fn create_session(&self, id: String, config: SessionConfig) -> Result<String> {
282        tracing::info!(name: "a3s.session.create", session_id = %id, "Creating session");
283
284        // Record the storage type for this session
285        {
286            let mut storage_types = self.session_storage_types.write().await;
287            storage_types.insert(id.clone(), config.storage_type.clone());
288        }
289
290        // Get tool definitions from the executor
291        let tools = self.tool_executor.definitions();
292        let mut session = Session::new(id.clone(), config, tools).await?;
293
294        // Start the command queue
295        session.start_queue().await?;
296
297        // Set max context length if provided
298        if session.config.max_context_length > 0 {
299            session.context_usage.max_tokens = session.config.max_context_length as usize;
300        }
301
302        {
303            let mut sessions = self.sessions.write().await;
304            sessions.insert(id.clone(), Arc::new(RwLock::new(session)));
305        }
306
307        // Persist to store
308        self.persist_in_background(&id, "create");
309
310        tracing::info!("Created session: {}", id);
311        Ok(id)
312    }
313
314    /// Destroy a session
315    pub async fn destroy_session(&self, id: &str) -> Result<()> {
316        tracing::info!(name: "a3s.session.destroy", session_id = %id, "Destroying session");
317
318        // Get the storage type before removing the session
319        let storage_type = {
320            let storage_types = self.session_storage_types.read().await;
321            storage_types.get(id).cloned()
322        };
323
324        {
325            let mut sessions = self.sessions.write().await;
326            sessions.remove(id);
327        }
328
329        // Remove LLM config
330        {
331            let mut configs = self.llm_configs.write().await;
332            configs.remove(id);
333        }
334
335        // Remove storage type tracking
336        {
337            let mut storage_types = self.session_storage_types.write().await;
338            storage_types.remove(id);
339        }
340
341        // Delete from store if applicable
342        if let Some(storage_type) = storage_type {
343            if storage_type != crate::config::StorageBackend::Memory {
344                let stores = self.stores.read().await;
345                if let Some(store) = stores.get(&storage_type) {
346                    if let Err(e) = store.delete(id).await {
347                        tracing::warn!("Failed to delete session {} from store: {}", id, e);
348                    }
349                }
350            }
351        }
352
353        tracing::info!("Destroyed session: {}", id);
354        Ok(())
355    }
356
357    /// Get a session by ID
358    pub async fn get_session(&self, id: &str) -> Result<Arc<RwLock<Session>>> {
359        let sessions = self.sessions.read().await;
360        sessions
361            .get(id)
362            .cloned()
363            .context(format!("Session not found: {}", id))
364    }
365
366    /// Create a child session for a subagent
367    ///
368    /// Child sessions inherit the parent's LLM client but have their own
369    /// permission policy and configuration.
370    pub async fn create_child_session(
371        &self,
372        parent_id: &str,
373        child_id: String,
374        mut config: SessionConfig,
375    ) -> Result<String> {
376        // Verify parent exists and inherit HITL policy
377        let parent_lock = self.get_session(parent_id).await?;
378        let parent_llm_client = {
379            let parent = parent_lock.read().await;
380
381            // Inherit parent's confirmation policy if child doesn't have one
382            if config.confirmation_policy.is_none() {
383                let parent_policy = parent.confirmation_manager.policy().await;
384                config.confirmation_policy = Some(parent_policy);
385            }
386
387            parent.llm_client.clone()
388        };
389
390        // Set parent_id in config
391        config.parent_id = Some(parent_id.to_string());
392
393        // Get tool definitions from the executor
394        let tools = self.tool_executor.definitions();
395        let mut session = Session::new(child_id.clone(), config, tools).await?;
396
397        // Inherit LLM client from parent if not set
398        if session.llm_client.is_none() {
399            session.llm_client = parent_llm_client.or_else(|| self.llm_client.clone());
400        }
401
402        // Start the command queue
403        session.start_queue().await?;
404
405        // Set max context length if provided
406        if session.config.max_context_length > 0 {
407            session.context_usage.max_tokens = session.config.max_context_length as usize;
408        }
409
410        {
411            let mut sessions = self.sessions.write().await;
412            sessions.insert(child_id.clone(), Arc::new(RwLock::new(session)));
413        }
414
415        // Persist to store
416        self.persist_in_background(&child_id, "create_child");
417
418        tracing::info!(
419            "Created child session: {} (parent: {})",
420            child_id,
421            parent_id
422        );
423        Ok(child_id)
424    }
425
426    /// Get all child sessions for a parent session
427    pub async fn get_child_sessions(&self, parent_id: &str) -> Vec<String> {
428        let sessions = self.sessions.read().await;
429        let mut children = Vec::new();
430
431        for (id, session_lock) in sessions.iter() {
432            let session = session_lock.read().await;
433            if session.parent_id.as_deref() == Some(parent_id) {
434                children.push(id.clone());
435            }
436        }
437
438        children
439    }
440
441    /// Check if a session is a child session
442    pub async fn is_child_session(&self, session_id: &str) -> Result<bool> {
443        let session_lock = self.get_session(session_id).await?;
444        let session = session_lock.read().await;
445        Ok(session.is_child_session())
446    }
447
448    /// Generate response for a prompt
449    pub async fn generate(&self, session_id: &str, prompt: &str) -> Result<AgentResult> {
450        let session_lock = self.get_session(session_id).await?;
451
452        // Check if session is paused
453        {
454            let session = session_lock.read().await;
455            if session.state == SessionState::Paused {
456                anyhow::bail!(
457                    "Session {} is paused. Call Resume before generating.",
458                    session_id
459                );
460            }
461        }
462
463        // Get session state and LLM client
464        let (
465            history,
466            system,
467            tools,
468            session_llm_client,
469            permission_checker,
470            confirmation_manager,
471            context_providers,
472            session_workspace,
473            tool_metrics,
474            hook_engine,
475            planning_enabled,
476            goal_tracking,
477        ) = {
478            let session = session_lock.read().await;
479            (
480                session.messages.clone(),
481                session.system().map(String::from),
482                session.tools.clone(),
483                session.llm_client.clone(),
484                session.permission_checker.clone(),
485                session.confirmation_manager.clone(),
486                session.context_providers.clone(),
487                session.config.workspace.clone(),
488                session.tool_metrics.clone(),
489                session.config.hook_engine.clone(),
490                session.config.planning_enabled,
491                session.config.goal_tracking,
492            )
493        };
494
495        // Use session's LLM client if configured, otherwise use default
496        let llm_client = if let Some(client) = session_llm_client {
497            client
498        } else if let Some(client) = &self.llm_client {
499            client.clone()
500        } else {
501            anyhow::bail!(
502                "LLM client not configured for session {}. Please call Configure RPC with model configuration first.",
503                session_id
504            );
505        };
506
507        // Construct per-session ToolContext from session workspace, falling back to server default
508        let tool_context = if session_workspace.is_empty() {
509            crate::tools::ToolContext::new(self.tool_executor.workspace().clone())
510                .with_session_id(session_id)
511        } else {
512            crate::tools::ToolContext::new(std::path::PathBuf::from(&session_workspace))
513                .with_session_id(session_id)
514        };
515
516        // Create agent loop with permission policy, confirmation manager, and context providers
517        let config = AgentConfig {
518            system_prompt: system,
519            tools,
520            max_tool_rounds: 50,
521            permission_checker: Some(permission_checker),
522            confirmation_manager: Some(confirmation_manager),
523            context_providers,
524            planning_enabled,
525            goal_tracking,
526            hook_engine,
527            skill_registry: None,
528        };
529
530        let agent = AgentLoop::new(llm_client, self.tool_executor.clone(), tool_context, config)
531            .with_tool_metrics(tool_metrics);
532
533        // Execute with session context
534        let result = agent
535            .execute_with_session(&history, prompt, Some(session_id), None)
536            .await?;
537
538        // Update session
539        {
540            let mut session = session_lock.write().await;
541            session.messages = result.messages.clone();
542            session.update_usage(&result.usage);
543        }
544
545        // Persist to store
546        self.persist_in_background(session_id, "generate");
547
548        // Auto-compact if context usage exceeds threshold
549        if let Err(e) = self.maybe_auto_compact(session_id).await {
550            tracing::warn!("Auto-compact failed for session {}: {}", session_id, e);
551        }
552
553        Ok(result)
554    }
555
556    /// Generate response with streaming events
557    pub async fn generate_streaming(
558        &self,
559        session_id: &str,
560        prompt: &str,
561    ) -> Result<(
562        mpsc::Receiver<AgentEvent>,
563        tokio::task::JoinHandle<Result<AgentResult>>,
564    )> {
565        let session_lock = self.get_session(session_id).await?;
566
567        // Check if session is paused
568        {
569            let session = session_lock.read().await;
570            if session.state == SessionState::Paused {
571                anyhow::bail!(
572                    "Session {} is paused. Call Resume before generating.",
573                    session_id
574                );
575            }
576        }
577
578        // Get session state and LLM client
579        let (
580            history,
581            system,
582            tools,
583            session_llm_client,
584            permission_checker,
585            confirmation_manager,
586            context_providers,
587            session_workspace,
588            tool_metrics,
589            hook_engine,
590            planning_enabled,
591            goal_tracking,
592        ) = {
593            let session = session_lock.read().await;
594            (
595                session.messages.clone(),
596                session.system().map(String::from),
597                session.tools.clone(),
598                session.llm_client.clone(),
599                session.permission_checker.clone(),
600                session.confirmation_manager.clone(),
601                session.context_providers.clone(),
602                session.config.workspace.clone(),
603                session.tool_metrics.clone(),
604                session.config.hook_engine.clone(),
605                session.config.planning_enabled,
606                session.config.goal_tracking,
607            )
608        };
609
610        // Use session's LLM client if configured, otherwise use default
611        let llm_client = if let Some(client) = session_llm_client {
612            client
613        } else if let Some(client) = &self.llm_client {
614            client.clone()
615        } else {
616            anyhow::bail!(
617                "LLM client not configured for session {}. Please call Configure RPC with model configuration first.",
618                session_id
619            );
620        };
621
622        // Construct per-session ToolContext from session workspace, falling back to server default
623        let tool_context = if session_workspace.is_empty() {
624            crate::tools::ToolContext::new(self.tool_executor.workspace().clone())
625                .with_session_id(session_id)
626        } else {
627            crate::tools::ToolContext::new(std::path::PathBuf::from(&session_workspace))
628                .with_session_id(session_id)
629        };
630
631        // Create agent loop with permission policy, confirmation manager, and context providers
632        let config = AgentConfig {
633            system_prompt: system,
634            tools,
635            max_tool_rounds: 50,
636            permission_checker: Some(permission_checker),
637            confirmation_manager: Some(confirmation_manager),
638            context_providers,
639            planning_enabled,
640            goal_tracking,
641            hook_engine,
642            skill_registry: None,
643        };
644
645        let agent = AgentLoop::new(llm_client, self.tool_executor.clone(), tool_context, config)
646            .with_tool_metrics(tool_metrics);
647
648        // Execute with streaming
649        let (rx, handle) = agent.execute_streaming(&history, prompt).await?;
650
651        // Store the abort handle for cancellation support
652        let abort_handle = handle.abort_handle();
653        {
654            let mut ops = self.ongoing_operations.write().await;
655            ops.insert(session_id.to_string(), abort_handle);
656        }
657
658        // Spawn task to update session after completion
659        let session_lock_clone = session_lock.clone();
660        let original_handle = handle;
661        let stores = self.stores.clone();
662        let session_storage_types = self.session_storage_types.clone();
663        let llm_configs = self.llm_configs.clone();
664        let session_id_owned = session_id.to_string();
665        let ongoing_operations = self.ongoing_operations.clone();
666        let session_manager = self.clone();
667
668        let wrapped_handle = tokio::spawn(async move {
669            let result = original_handle.await??;
670
671            // Remove from ongoing operations
672            {
673                let mut ops = ongoing_operations.write().await;
674                ops.remove(&session_id_owned);
675            }
676
677            // Update session
678            {
679                let mut session = session_lock_clone.write().await;
680                session.messages = result.messages.clone();
681                session.update_usage(&result.usage);
682            }
683
684            // Persist to store
685            let storage_type = {
686                let storage_types = session_storage_types.read().await;
687                storage_types.get(&session_id_owned).cloned()
688            };
689
690            if let Some(storage_type) = storage_type {
691                if storage_type != crate::config::StorageBackend::Memory {
692                    let stores_guard = stores.read().await;
693                    if let Some(store) = stores_guard.get(&storage_type) {
694                        let session = session_lock_clone.read().await;
695                        let llm_config = {
696                            let configs = llm_configs.read().await;
697                            configs.get(&session_id_owned).cloned()
698                        };
699                        let data = session.to_session_data(llm_config);
700                        if let Err(e) = store.save(&data).await {
701                            tracing::warn!(
702                                "Failed to persist session {} after streaming: {}",
703                                session_id_owned,
704                                e
705                            );
706                        }
707                    }
708                }
709            }
710
711            // Auto-compact if context usage exceeds threshold
712            if let Err(e) = session_manager.maybe_auto_compact(&session_id_owned).await {
713                tracing::warn!(
714                    "Auto-compact failed for session {}: {}",
715                    session_id_owned,
716                    e
717                );
718            }
719
720            Ok(result)
721        });
722
723        Ok((rx, wrapped_handle))
724    }
725
726    /// Get context usage for a session
727    pub async fn context_usage(&self, session_id: &str) -> Result<ContextUsage> {
728        let session_lock = self.get_session(session_id).await?;
729        let session = session_lock.read().await;
730        Ok(session.context_usage.clone())
731    }
732
733    /// Get conversation history for a session
734    pub async fn history(&self, session_id: &str) -> Result<Vec<Message>> {
735        let session_lock = self.get_session(session_id).await?;
736        let session = session_lock.read().await;
737        Ok(session.messages.clone())
738    }
739
740    /// Clear session history
741    pub async fn clear(&self, session_id: &str) -> Result<()> {
742        {
743            let session_lock = self.get_session(session_id).await?;
744            let mut session = session_lock.write().await;
745            session.clear();
746        }
747
748        // Persist to store
749        self.persist_in_background(session_id, "clear");
750
751        Ok(())
752    }
753
754    /// Compact session context
755    pub async fn compact(&self, session_id: &str) -> Result<()> {
756        tracing::info!(name: "a3s.session.compact", session_id = %session_id, "Compacting session context");
757
758        {
759            let session_lock = self.get_session(session_id).await?;
760            let mut session = session_lock.write().await;
761
762            // Get LLM client for compaction (if available)
763            let llm_client = if let Some(client) = &session.llm_client {
764                client.clone()
765            } else if let Some(client) = &self.llm_client {
766                client.clone()
767            } else {
768                // If no LLM client available, just do simple truncation
769                tracing::warn!("No LLM client configured for compaction, using simple truncation");
770                let keep_messages = 20;
771                if session.messages.len() > keep_messages {
772                    let len = session.messages.len();
773                    session.messages = session.messages.split_off(len - keep_messages);
774                }
775                // Persist after truncation
776                drop(session);
777                self.persist_in_background(session_id, "compact");
778                return Ok(());
779            };
780
781            session.compact(&llm_client).await?;
782        }
783
784        // Persist to store
785        self.persist_in_background(session_id, "compact");
786
787        Ok(())
788    }
789
790    /// Check if auto-compaction should be triggered and perform it if needed.
791    ///
792    /// Called after `generate()` / `generate_streaming()` updates session usage.
793    /// Triggers compaction when:
794    /// - `auto_compact` is enabled in session config
795    /// - `context_usage.percent` exceeds `auto_compact_threshold`
796    pub async fn maybe_auto_compact(&self, session_id: &str) -> Result<bool> {
797        let (should_compact, percent_before, messages_before) = {
798            let session_lock = self.get_session(session_id).await?;
799            let session = session_lock.read().await;
800
801            if !session.config.auto_compact {
802                return Ok(false);
803            }
804
805            let threshold = session.config.auto_compact_threshold;
806            let percent = session.context_usage.percent;
807            let msg_count = session.messages.len();
808
809            tracing::debug!(
810                "Auto-compact check for session {}: percent={:.2}%, threshold={:.2}%, messages={}",
811                session_id,
812                percent * 100.0,
813                threshold * 100.0,
814                msg_count,
815            );
816
817            (percent >= threshold, percent, msg_count)
818        };
819
820        if !should_compact {
821            return Ok(false);
822        }
823
824        tracing::info!(
825            name: "a3s.session.auto_compact",
826            session_id = %session_id,
827            percent_before = %format!("{:.1}%", percent_before * 100.0),
828            messages_before = %messages_before,
829            "Auto-compacting session due to high context usage"
830        );
831
832        // Perform compaction (reuses existing compact logic)
833        self.compact(session_id).await?;
834
835        // Get post-compaction message count
836        let messages_after = {
837            let session_lock = self.get_session(session_id).await?;
838            let session = session_lock.read().await;
839            session.messages.len()
840        };
841
842        // Broadcast event to notify clients
843        let event = AgentEvent::ContextCompacted {
844            session_id: session_id.to_string(),
845            before_messages: messages_before,
846            after_messages: messages_after,
847            percent_before,
848        };
849
850        // Try to send via session's event broadcaster
851        if let Ok(session_lock) = self.get_session(session_id).await {
852            let session = session_lock.read().await;
853            let _ = session.event_tx.send(event);
854        }
855
856        tracing::info!(
857            name: "a3s.session.auto_compact.done",
858            session_id = %session_id,
859            messages_before = %messages_before,
860            messages_after = %messages_after,
861            "Auto-compaction complete"
862        );
863
864        Ok(true)
865    }
866
867    /// Resolve the LLM client for a session (session-level -> default fallback)
868    ///
869    /// Returns `None` if no LLM client is configured at either level.
870    pub async fn get_llm_for_session(
871        &self,
872        session_id: &str,
873    ) -> Result<Option<Arc<dyn LlmClient>>> {
874        let session_lock = self.get_session(session_id).await?;
875        let session = session_lock.read().await;
876
877        if let Some(client) = &session.llm_client {
878            return Ok(Some(client.clone()));
879        }
880
881        Ok(self.llm_client.clone())
882    }
883
884    /// Configure session
885    pub async fn configure(
886        &self,
887        session_id: &str,
888        thinking: Option<bool>,
889        budget: Option<usize>,
890        model_config: Option<LlmConfig>,
891    ) -> Result<()> {
892        {
893            let session_lock = self.get_session(session_id).await?;
894            let mut session = session_lock.write().await;
895
896            if let Some(t) = thinking {
897                session.thinking_enabled = t;
898            }
899            if let Some(b) = budget {
900                session.thinking_budget = Some(b);
901            }
902            if let Some(ref config) = model_config {
903                tracing::info!(
904                    "Configuring session {} with LLM: provider={}, model={}",
905                    session_id,
906                    config.provider,
907                    config.model
908                );
909                session.model_name = Some(config.model.clone());
910                session.llm_client = Some(llm::create_client_with_config(config.clone()));
911            }
912        }
913
914        // Store LLM config for persistence (without API key)
915        if let Some(config) = model_config {
916            let llm_config_data = LlmConfigData {
917                provider: config.provider,
918                model: config.model,
919                api_key: None, // Don't persist API key
920                base_url: config.base_url,
921            };
922            let mut configs = self.llm_configs.write().await;
923            configs.insert(session_id.to_string(), llm_config_data);
924        }
925
926        // Persist to store
927        self.persist_in_background(session_id, "configure");
928
929        Ok(())
930    }
931
932    /// Get session count
933    pub async fn session_count(&self) -> usize {
934        let sessions = self.sessions.read().await;
935        sessions.len()
936    }
937
938    /// Check health of all registered stores
939    pub async fn store_health(&self) -> Vec<(String, Result<()>)> {
940        let stores = self.stores.read().await;
941        let mut results = Vec::new();
942        for (_, store) in stores.iter() {
943            let name = store.backend_name().to_string();
944            let result = store.health_check().await;
945            results.push((name, result));
946        }
947        results
948    }
949
950    /// List all loaded tools (built-in tools)
951    pub fn list_tools(&self) -> Vec<crate::llm::ToolDefinition> {
952        self.tool_executor.definitions()
953    }
954
955    /// Pause a session
956    pub async fn pause_session(&self, session_id: &str) -> Result<bool> {
957        let paused = {
958            let session_lock = self.get_session(session_id).await?;
959            let mut session = session_lock.write().await;
960            session.pause()
961        };
962
963        if paused {
964            self.persist_in_background(session_id, "pause");
965        }
966
967        Ok(paused)
968    }
969
970    /// Resume a session
971    pub async fn resume_session(&self, session_id: &str) -> Result<bool> {
972        let resumed = {
973            let session_lock = self.get_session(session_id).await?;
974            let mut session = session_lock.write().await;
975            session.resume()
976        };
977
978        if resumed {
979            self.persist_in_background(session_id, "resume");
980        }
981
982        Ok(resumed)
983    }
984
985    /// Cancel an ongoing operation for a session
986    ///
987    /// Returns true if an operation was cancelled, false if no operation was running.
988    pub async fn cancel_operation(&self, session_id: &str) -> Result<bool> {
989        // First, cancel any pending HITL confirmations
990        let session_lock = self.get_session(session_id).await?;
991        let cancelled_confirmations = {
992            let session = session_lock.read().await;
993            session.confirmation_manager.cancel_all().await
994        };
995
996        if cancelled_confirmations > 0 {
997            tracing::info!(
998                "Cancelled {} pending confirmations for session {}",
999                cancelled_confirmations,
1000                session_id
1001            );
1002        }
1003
1004        // Then, abort the ongoing operation if any
1005        let abort_handle = {
1006            let mut ops = self.ongoing_operations.write().await;
1007            ops.remove(session_id)
1008        };
1009
1010        if let Some(handle) = abort_handle {
1011            handle.abort();
1012            tracing::info!("Cancelled ongoing operation for session {}", session_id);
1013            Ok(true)
1014        } else if cancelled_confirmations > 0 {
1015            // We cancelled confirmations but no main operation
1016            Ok(true)
1017        } else {
1018            tracing::debug!("No ongoing operation to cancel for session {}", session_id);
1019            Ok(false)
1020        }
1021    }
1022
1023    /// Get all sessions (returns session locks for iteration)
1024    pub async fn get_all_sessions(&self) -> Vec<Arc<RwLock<Session>>> {
1025        let sessions = self.sessions.read().await;
1026        sessions.values().cloned().collect()
1027    }
1028
1029    /// Get tool executor reference
1030    pub fn tool_executor(&self) -> &Arc<ToolExecutor> {
1031        &self.tool_executor
1032    }
1033
1034    /// Confirm a tool execution (HITL)
1035    pub async fn confirm_tool(
1036        &self,
1037        session_id: &str,
1038        tool_id: &str,
1039        approved: bool,
1040        reason: Option<String>,
1041    ) -> Result<bool> {
1042        let session_lock = self.get_session(session_id).await?;
1043        let session = session_lock.read().await;
1044        session
1045            .confirmation_manager
1046            .confirm(tool_id, approved, reason)
1047            .await
1048            .map_err(|e| anyhow::anyhow!(e))
1049    }
1050
1051    /// Set confirmation policy for a session (HITL)
1052    pub async fn set_confirmation_policy(
1053        &self,
1054        session_id: &str,
1055        policy: ConfirmationPolicy,
1056    ) -> Result<ConfirmationPolicy> {
1057        {
1058            let session_lock = self.get_session(session_id).await?;
1059            let session = session_lock.read().await;
1060            session.set_confirmation_policy(policy.clone()).await;
1061        }
1062
1063        // Update config for persistence
1064        {
1065            let session_lock = self.get_session(session_id).await?;
1066            let mut session = session_lock.write().await;
1067            session.config.confirmation_policy = Some(policy.clone());
1068        }
1069
1070        // Persist to store
1071        self.persist_in_background(session_id, "set_confirmation_policy");
1072
1073        Ok(policy)
1074    }
1075
1076    /// Get confirmation policy for a session (HITL)
1077    pub async fn get_confirmation_policy(&self, session_id: &str) -> Result<ConfirmationPolicy> {
1078        let session_lock = self.get_session(session_id).await?;
1079        let session = session_lock.read().await;
1080        Ok(session.confirmation_policy().await)
1081    }
1082}