Skip to main content

a3s_code_core/session/
manager.rs

1//! SessionManager — manages multiple concurrent sessions
2
3use super::{ContextUsage, Session, SessionConfig, SessionState};
4use crate::agent::{AgentConfig, AgentEvent, AgentLoop, AgentResult};
5use crate::hitl::ConfirmationPolicy;
6use crate::llm::{self, ContentBlock, LlmClient, LlmConfig, Message};
7use crate::permissions::{PermissionDecision, PermissionPolicy};
8use crate::planning::Task;
9use crate::store::{FileSessionStore, LlmConfigData, SessionData, SessionStore};
10use crate::tools::ToolExecutor;
11use anyhow::{Context, Result};
12use std::collections::HashMap;
13use std::sync::Arc;
14use tokio::sync::{mpsc, RwLock};
15
16/// Session manager handles multiple concurrent sessions
17#[derive(Clone)]
18pub struct SessionManager {
19    pub(crate) sessions: Arc<RwLock<HashMap<String, Arc<RwLock<Session>>>>>,
20    pub(crate) llm_client: Option<Arc<dyn LlmClient>>,
21    pub(crate) tool_executor: Arc<ToolExecutor>,
22    /// Session stores by storage type
23    pub(crate) stores: Arc<RwLock<HashMap<crate::config::StorageBackend, Arc<dyn SessionStore>>>>,
24    /// Track which storage type each session uses
25    pub(crate) session_storage_types: Arc<RwLock<HashMap<String, crate::config::StorageBackend>>>,
26    /// LLM configurations for sessions (stored separately for persistence)
27    pub(crate) llm_configs: Arc<RwLock<HashMap<String, LlmConfigData>>>,
28    /// Ongoing operations (session_id -> JoinHandle)
29    pub(crate) ongoing_operations: Arc<RwLock<HashMap<String, tokio::task::AbortHandle>>>,
30}
31
32impl SessionManager {
33    /// Create a new session manager without persistence
34    pub fn new(llm_client: Option<Arc<dyn LlmClient>>, tool_executor: Arc<ToolExecutor>) -> Self {
35        Self {
36            sessions: Arc::new(RwLock::new(HashMap::new())),
37            llm_client,
38            tool_executor,
39            stores: Arc::new(RwLock::new(HashMap::new())),
40            session_storage_types: Arc::new(RwLock::new(HashMap::new())),
41            llm_configs: Arc::new(RwLock::new(HashMap::new())),
42            ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
43        }
44    }
45
46    /// Create a session manager with file-based persistence
47    ///
48    /// Sessions will be automatically saved to disk and restored on startup.
49    pub async fn with_persistence<P: AsRef<std::path::Path>>(
50        llm_client: Option<Arc<dyn LlmClient>>,
51        tool_executor: Arc<ToolExecutor>,
52        sessions_dir: P,
53    ) -> Result<Self> {
54        let store = FileSessionStore::new(sessions_dir).await?;
55        let mut stores = HashMap::new();
56        stores.insert(
57            crate::config::StorageBackend::File,
58            Arc::new(store) as Arc<dyn SessionStore>,
59        );
60
61        let manager = Self {
62            sessions: Arc::new(RwLock::new(HashMap::new())),
63            llm_client,
64            tool_executor,
65            stores: Arc::new(RwLock::new(stores)),
66            session_storage_types: Arc::new(RwLock::new(HashMap::new())),
67            llm_configs: Arc::new(RwLock::new(HashMap::new())),
68            ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
69        };
70
71        Ok(manager)
72    }
73
74    /// Create a session manager with a custom store
75    ///
76    /// The `backend` parameter determines which `StorageBackend` key the store is registered under.
77    /// Sessions created with a matching `storage_type` will use this store.
78    pub fn with_store(
79        llm_client: Option<Arc<dyn LlmClient>>,
80        tool_executor: Arc<ToolExecutor>,
81        store: Arc<dyn SessionStore>,
82        backend: crate::config::StorageBackend,
83    ) -> Self {
84        let mut stores = HashMap::new();
85        stores.insert(backend, store);
86
87        Self {
88            sessions: Arc::new(RwLock::new(HashMap::new())),
89            llm_client,
90            tool_executor,
91            stores: Arc::new(RwLock::new(stores)),
92            session_storage_types: Arc::new(RwLock::new(HashMap::new())),
93            llm_configs: Arc::new(RwLock::new(HashMap::new())),
94            ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
95        }
96    }
97
98    /// Restore a single session by ID from the store
99    ///
100    /// Searches all registered stores for the given session ID and restores it
101    /// into the in-memory session map. Returns an error if not found.
102    pub async fn restore_session_by_id(&self, session_id: &str) -> Result<()> {
103        // Check if already loaded
104        {
105            let sessions = self.sessions.read().await;
106            if sessions.contains_key(session_id) {
107                return Ok(());
108            }
109        }
110
111        let stores = self.stores.read().await;
112        for (backend, store) in stores.iter() {
113            match store.load(session_id).await {
114                Ok(Some(data)) => {
115                    {
116                        let mut storage_types = self.session_storage_types.write().await;
117                        storage_types.insert(data.id.clone(), backend.clone());
118                    }
119                    self.restore_session(data).await?;
120                    return Ok(());
121                }
122                Ok(None) => continue,
123                Err(e) => {
124                    tracing::warn!(
125                        "Failed to load session {} from {:?}: {}",
126                        session_id,
127                        backend,
128                        e
129                    );
130                    continue;
131                }
132            }
133        }
134
135        Err(anyhow::anyhow!(
136            "Session {} not found in any store",
137            session_id
138        ))
139    }
140
141    /// Load all sessions from all registered stores
142    pub async fn load_all_sessions(&mut self) -> Result<usize> {
143        let stores = self.stores.read().await;
144        let mut loaded = 0;
145
146        for (backend, store) in stores.iter() {
147            let session_ids = match store.list().await {
148                Ok(ids) => ids,
149                Err(e) => {
150                    tracing::warn!("Failed to list sessions from {:?} store: {}", backend, e);
151                    continue;
152                }
153            };
154
155            for id in session_ids {
156                match store.load(&id).await {
157                    Ok(Some(data)) => {
158                        // Record the storage type for this session
159                        {
160                            let mut storage_types = self.session_storage_types.write().await;
161                            storage_types.insert(data.id.clone(), backend.clone());
162                        }
163
164                        if let Err(e) = self.restore_session(data).await {
165                            tracing::warn!("Failed to restore session {}: {}", id, e);
166                        } else {
167                            loaded += 1;
168                        }
169                    }
170                    Ok(None) => {
171                        tracing::warn!("Session {} not found in store", id);
172                    }
173                    Err(e) => {
174                        tracing::warn!("Failed to load session {}: {}", id, e);
175                    }
176                }
177            }
178        }
179
180        tracing::info!("Loaded {} sessions from store", loaded);
181        Ok(loaded)
182    }
183
184    /// Restore a session from SessionData
185    async fn restore_session(&self, data: SessionData) -> Result<()> {
186        let tools = self.tool_executor.definitions();
187        let mut session = Session::new(data.id.clone(), data.config.clone(), tools).await?;
188
189        // Restore serializable state
190        session.restore_from_data(&data);
191
192        // Restore LLM config if present (without API key - must be reconfigured)
193        if let Some(llm_config) = &data.llm_config {
194            let mut configs = self.llm_configs.write().await;
195            configs.insert(data.id.clone(), llm_config.clone());
196        }
197
198        let mut sessions = self.sessions.write().await;
199        sessions.insert(data.id.clone(), Arc::new(RwLock::new(session)));
200
201        tracing::info!("Restored session: {}", data.id);
202        Ok(())
203    }
204
205    /// Save a session to the store
206    async fn save_session(&self, session_id: &str) -> Result<()> {
207        // Get the storage type for this session
208        let storage_type = {
209            let storage_types = self.session_storage_types.read().await;
210            storage_types.get(session_id).cloned()
211        };
212
213        let Some(storage_type) = storage_type else {
214            // No storage type means memory-only session
215            return Ok(());
216        };
217
218        // Skip saving for memory storage
219        if storage_type == crate::config::StorageBackend::Memory {
220            return Ok(());
221        }
222
223        // Get the appropriate store
224        let stores = self.stores.read().await;
225        let Some(store) = stores.get(&storage_type) else {
226            tracing::warn!("No store available for storage type: {:?}", storage_type);
227            return Ok(());
228        };
229
230        let session_lock = self.get_session(session_id).await?;
231        let session = session_lock.read().await;
232
233        // Get LLM config if set
234        let llm_config = {
235            let configs = self.llm_configs.read().await;
236            configs.get(session_id).cloned()
237        };
238
239        let data = session.to_session_data(llm_config);
240        store.save(&data).await?;
241
242        tracing::debug!("Saved session: {}", session_id);
243        Ok(())
244    }
245
246    /// Persist a session to store, logging and emitting an event on failure.
247    /// This is a non-fatal wrapper around `save_session` — the operation
248    /// succeeds in memory even if persistence fails.
249    async fn persist_or_warn(&self, session_id: &str, operation: &str) {
250        if let Err(e) = self.save_session(session_id).await {
251            tracing::warn!(
252                "Failed to persist session {} after {}: {}",
253                session_id,
254                operation,
255                e
256            );
257            // Emit event so SDK clients can react
258            if let Ok(session_lock) = self.get_session(session_id).await {
259                let session = session_lock.read().await;
260                let _ = session.event_tx().send(AgentEvent::PersistenceFailed {
261                    session_id: session_id.to_string(),
262                    operation: operation.to_string(),
263                    error: e.to_string(),
264                });
265            }
266        }
267    }
268
269    /// Spawn persistence as a background task (non-blocking).
270    ///
271    /// All `SessionManager` fields are `Arc`-wrapped, so `clone()` is a
272    /// cheap pointer-copy. This keeps file I/O off the response path.
273    fn persist_in_background(&self, session_id: &str, operation: &str) {
274        let mgr = self.clone();
275        let sid = session_id.to_string();
276        let op = operation.to_string();
277        tokio::spawn(async move {
278            mgr.persist_or_warn(&sid, &op).await;
279        });
280    }
281
282    /// Create a new session
283    pub async fn create_session(&self, id: String, config: SessionConfig) -> Result<String> {
284        tracing::info!(name: "a3s.session.create", session_id = %id, "Creating session");
285
286        // Record the storage type for this session
287        {
288            let mut storage_types = self.session_storage_types.write().await;
289            storage_types.insert(id.clone(), config.storage_type.clone());
290        }
291
292        // Get tool definitions from the executor
293        let tools = self.tool_executor.definitions();
294        let mut session = Session::new(id.clone(), config, tools).await?;
295
296        // Start the command queue
297        session.start_queue().await?;
298
299        // Set max context length if provided
300        if session.config.max_context_length > 0 {
301            session.context_usage.max_tokens = session.config.max_context_length as usize;
302        }
303
304        {
305            let mut sessions = self.sessions.write().await;
306            sessions.insert(id.clone(), Arc::new(RwLock::new(session)));
307        }
308
309        // Persist to store
310        self.persist_in_background(&id, "create");
311
312        tracing::info!("Created session: {}", id);
313        Ok(id)
314    }
315
316    /// Destroy a session
317    pub async fn destroy_session(&self, id: &str) -> Result<()> {
318        tracing::info!(name: "a3s.session.destroy", session_id = %id, "Destroying session");
319
320        // Get the storage type before removing the session
321        let storage_type = {
322            let storage_types = self.session_storage_types.read().await;
323            storage_types.get(id).cloned()
324        };
325
326        {
327            let mut sessions = self.sessions.write().await;
328            sessions.remove(id);
329        }
330
331        // Remove LLM config
332        {
333            let mut configs = self.llm_configs.write().await;
334            configs.remove(id);
335        }
336
337        // Remove storage type tracking
338        {
339            let mut storage_types = self.session_storage_types.write().await;
340            storage_types.remove(id);
341        }
342
343        // Delete from store if applicable
344        if let Some(storage_type) = storage_type {
345            if storage_type != crate::config::StorageBackend::Memory {
346                let stores = self.stores.read().await;
347                if let Some(store) = stores.get(&storage_type) {
348                    if let Err(e) = store.delete(id).await {
349                        tracing::warn!("Failed to delete session {} from store: {}", id, e);
350                    }
351                }
352            }
353        }
354
355        tracing::info!("Destroyed session: {}", id);
356        Ok(())
357    }
358
359    /// Get a session by ID
360    pub async fn get_session(&self, id: &str) -> Result<Arc<RwLock<Session>>> {
361        let sessions = self.sessions.read().await;
362        sessions
363            .get(id)
364            .cloned()
365            .context(format!("Session not found: {}", id))
366    }
367
368    /// Create a child session for a subagent
369    ///
370    /// Child sessions inherit the parent's LLM client but have their own
371    /// permission policy and configuration.
372    pub async fn create_child_session(
373        &self,
374        parent_id: &str,
375        child_id: String,
376        mut config: SessionConfig,
377    ) -> Result<String> {
378        // Verify parent exists and inherit HITL policy
379        let parent_lock = self.get_session(parent_id).await?;
380        let parent_llm_client = {
381            let parent = parent_lock.read().await;
382
383            // Inherit parent's confirmation policy if child doesn't have one
384            if config.confirmation_policy.is_none() {
385                let parent_policy = parent.confirmation_manager.policy().await;
386                config.confirmation_policy = Some(parent_policy);
387            }
388
389            parent.llm_client.clone()
390        };
391
392        // Set parent_id in config
393        config.parent_id = Some(parent_id.to_string());
394
395        // Get tool definitions from the executor
396        let tools = self.tool_executor.definitions();
397        let mut session = Session::new(child_id.clone(), config, tools).await?;
398
399        // Inherit LLM client from parent if not set
400        if session.llm_client.is_none() {
401            session.llm_client = parent_llm_client.or_else(|| self.llm_client.clone());
402        }
403
404        // Start the command queue
405        session.start_queue().await?;
406
407        // Set max context length if provided
408        if session.config.max_context_length > 0 {
409            session.context_usage.max_tokens = session.config.max_context_length as usize;
410        }
411
412        {
413            let mut sessions = self.sessions.write().await;
414            sessions.insert(child_id.clone(), Arc::new(RwLock::new(session)));
415        }
416
417        // Persist to store
418        self.persist_in_background(&child_id, "create_child");
419
420        tracing::info!(
421            "Created child session: {} (parent: {})",
422            child_id,
423            parent_id
424        );
425        Ok(child_id)
426    }
427
428    /// Get all child sessions for a parent session
429    pub async fn get_child_sessions(&self, parent_id: &str) -> Vec<String> {
430        let sessions = self.sessions.read().await;
431        let mut children = Vec::new();
432
433        for (id, session_lock) in sessions.iter() {
434            let session = session_lock.read().await;
435            if session.parent_id.as_deref() == Some(parent_id) {
436                children.push(id.clone());
437            }
438        }
439
440        children
441    }
442
443    /// Check if a session is a child session
444    pub async fn is_child_session(&self, session_id: &str) -> Result<bool> {
445        let session_lock = self.get_session(session_id).await?;
446        let session = session_lock.read().await;
447        Ok(session.is_child_session())
448    }
449
450    /// Generate response for a prompt
451    pub async fn generate(&self, session_id: &str, prompt: &str) -> Result<AgentResult> {
452        let session_lock = self.get_session(session_id).await?;
453
454        // Check if session is paused
455        {
456            let session = session_lock.read().await;
457            if session.state == SessionState::Paused {
458                anyhow::bail!(
459                    "Session {} is paused. Call Resume before generating.",
460                    session_id
461                );
462            }
463        }
464
465        // Get session state and LLM client
466        let (
467            history,
468            system,
469            tools,
470            session_llm_client,
471            permission_policy,
472            confirmation_manager,
473            context_providers,
474            session_workspace,
475            tool_metrics,
476            hook_engine,
477            planning_enabled,
478            goal_tracking,
479            loaded_skills,
480        ) = {
481            let session = session_lock.read().await;
482            (
483                session.messages.clone(),
484                session.system().map(String::from),
485                session.tools.clone(),
486                session.llm_client.clone(),
487                session.permission_policy.clone(),
488                session.confirmation_manager.clone(),
489                session.context_providers.clone(),
490                session.config.workspace.clone(),
491                session.tool_metrics.clone(),
492                session.config.hook_engine.clone(),
493                session.config.planning_enabled,
494                session.config.goal_tracking,
495                session.loaded_skills.clone(),
496            )
497        };
498
499        // Use session's LLM client if configured, otherwise use default
500        let llm_client = if let Some(client) = session_llm_client {
501            client
502        } else if let Some(client) = &self.llm_client {
503            client.clone()
504        } else {
505            anyhow::bail!(
506                "LLM client not configured for session {}. Please call Configure RPC with model configuration first.",
507                session_id
508            );
509        };
510
511        // Construct per-session ToolContext from session workspace, falling back to server default
512        let tool_context = if session_workspace.is_empty() {
513            crate::tools::ToolContext::new(self.tool_executor.workspace().clone())
514                .with_session_id(session_id)
515        } else {
516            crate::tools::ToolContext::new(std::path::PathBuf::from(&session_workspace))
517                .with_session_id(session_id)
518        };
519
520        // Create agent loop with permission policy, confirmation manager, and context providers
521        let config = AgentConfig {
522            system_prompt: system,
523            tools,
524            max_tool_rounds: 50,
525            permission_policy: Some(permission_policy),
526            confirmation_manager: Some(confirmation_manager),
527            context_providers,
528            planning_enabled,
529            goal_tracking,
530            skill_tool_filters: loaded_skills,
531            hook_engine,
532        };
533
534        let agent = AgentLoop::new(llm_client, self.tool_executor.clone(), tool_context, config)
535            .with_tool_metrics(tool_metrics);
536
537        // Execute with session context
538        let result = agent
539            .execute_with_session(&history, prompt, Some(session_id), None)
540            .await?;
541
542        // Update session
543        {
544            let mut session = session_lock.write().await;
545            session.messages = result.messages.clone();
546            session.update_usage(&result.usage);
547        }
548
549        // Persist to store
550        self.persist_in_background(session_id, "generate");
551
552        // Auto-compact if context usage exceeds threshold
553        if let Err(e) = self.maybe_auto_compact(session_id).await {
554            tracing::warn!("Auto-compact failed for session {}: {}", session_id, e);
555        }
556
557        Ok(result)
558    }
559
560    /// Generate response with streaming events
561    pub async fn generate_streaming(
562        &self,
563        session_id: &str,
564        prompt: &str,
565    ) -> Result<(
566        mpsc::Receiver<AgentEvent>,
567        tokio::task::JoinHandle<Result<AgentResult>>,
568    )> {
569        let session_lock = self.get_session(session_id).await?;
570
571        // Check if session is paused
572        {
573            let session = session_lock.read().await;
574            if session.state == SessionState::Paused {
575                anyhow::bail!(
576                    "Session {} is paused. Call Resume before generating.",
577                    session_id
578                );
579            }
580        }
581
582        // Get session state and LLM client
583        let (
584            history,
585            system,
586            tools,
587            session_llm_client,
588            permission_policy,
589            confirmation_manager,
590            context_providers,
591            session_workspace,
592            tool_metrics,
593            hook_engine,
594            planning_enabled,
595            goal_tracking,
596            loaded_skills,
597        ) = {
598            let session = session_lock.read().await;
599            (
600                session.messages.clone(),
601                session.system().map(String::from),
602                session.tools.clone(),
603                session.llm_client.clone(),
604                session.permission_policy.clone(),
605                session.confirmation_manager.clone(),
606                session.context_providers.clone(),
607                session.config.workspace.clone(),
608                session.tool_metrics.clone(),
609                session.config.hook_engine.clone(),
610                session.config.planning_enabled,
611                session.config.goal_tracking,
612                session.loaded_skills.clone(),
613            )
614        };
615
616        // Use session's LLM client if configured, otherwise use default
617        let llm_client = if let Some(client) = session_llm_client {
618            client
619        } else if let Some(client) = &self.llm_client {
620            client.clone()
621        } else {
622            anyhow::bail!(
623                "LLM client not configured for session {}. Please call Configure RPC with model configuration first.",
624                session_id
625            );
626        };
627
628        // Construct per-session ToolContext from session workspace, falling back to server default
629        let tool_context = if session_workspace.is_empty() {
630            crate::tools::ToolContext::new(self.tool_executor.workspace().clone())
631                .with_session_id(session_id)
632        } else {
633            crate::tools::ToolContext::new(std::path::PathBuf::from(&session_workspace))
634                .with_session_id(session_id)
635        };
636
637        // Create agent loop with permission policy, confirmation manager, and context providers
638        let config = AgentConfig {
639            system_prompt: system,
640            tools,
641            max_tool_rounds: 50,
642            permission_policy: Some(permission_policy),
643            confirmation_manager: Some(confirmation_manager),
644            context_providers,
645            planning_enabled,
646            goal_tracking,
647            skill_tool_filters: loaded_skills,
648            hook_engine,
649        };
650
651        let agent = AgentLoop::new(llm_client, self.tool_executor.clone(), tool_context, config)
652            .with_tool_metrics(tool_metrics);
653
654        // Execute with streaming
655        let (rx, handle) = agent.execute_streaming(&history, prompt).await?;
656
657        // Store the abort handle for cancellation support
658        let abort_handle = handle.abort_handle();
659        {
660            let mut ops = self.ongoing_operations.write().await;
661            ops.insert(session_id.to_string(), abort_handle);
662        }
663
664        // Spawn task to update session after completion
665        let session_lock_clone = session_lock.clone();
666        let original_handle = handle;
667        let stores = self.stores.clone();
668        let session_storage_types = self.session_storage_types.clone();
669        let llm_configs = self.llm_configs.clone();
670        let session_id_owned = session_id.to_string();
671        let ongoing_operations = self.ongoing_operations.clone();
672        let session_manager = self.clone();
673
674        let wrapped_handle = tokio::spawn(async move {
675            let result = original_handle.await??;
676
677            // Remove from ongoing operations
678            {
679                let mut ops = ongoing_operations.write().await;
680                ops.remove(&session_id_owned);
681            }
682
683            // Update session
684            {
685                let mut session = session_lock_clone.write().await;
686                session.messages = result.messages.clone();
687                session.update_usage(&result.usage);
688            }
689
690            // Persist to store
691            let storage_type = {
692                let storage_types = session_storage_types.read().await;
693                storage_types.get(&session_id_owned).cloned()
694            };
695
696            if let Some(storage_type) = storage_type {
697                if storage_type != crate::config::StorageBackend::Memory {
698                    let stores_guard = stores.read().await;
699                    if let Some(store) = stores_guard.get(&storage_type) {
700                        let session = session_lock_clone.read().await;
701                        let llm_config = {
702                            let configs = llm_configs.read().await;
703                            configs.get(&session_id_owned).cloned()
704                        };
705                        let data = session.to_session_data(llm_config);
706                        if let Err(e) = store.save(&data).await {
707                            tracing::warn!(
708                                "Failed to persist session {} after streaming: {}",
709                                session_id_owned,
710                                e
711                            );
712                        }
713                    }
714                }
715            }
716
717            // Auto-compact if context usage exceeds threshold
718            if let Err(e) = session_manager.maybe_auto_compact(&session_id_owned).await {
719                tracing::warn!(
720                    "Auto-compact failed for session {}: {}",
721                    session_id_owned,
722                    e
723                );
724            }
725
726            Ok(result)
727        });
728
729        Ok((rx, wrapped_handle))
730    }
731
732    /// Get context usage for a session
733    pub async fn context_usage(&self, session_id: &str) -> Result<ContextUsage> {
734        let session_lock = self.get_session(session_id).await?;
735        let session = session_lock.read().await;
736        Ok(session.context_usage.clone())
737    }
738
739    /// Get conversation history for a session
740    pub async fn history(&self, session_id: &str) -> Result<Vec<Message>> {
741        let session_lock = self.get_session(session_id).await?;
742        let session = session_lock.read().await;
743        Ok(session.messages.clone())
744    }
745
746    /// Clear session history
747    pub async fn clear(&self, session_id: &str) -> Result<()> {
748        {
749            let session_lock = self.get_session(session_id).await?;
750            let mut session = session_lock.write().await;
751            session.clear();
752        }
753
754        // Persist to store
755        self.persist_in_background(session_id, "clear");
756
757        Ok(())
758    }
759
760    /// Compact session context
761    pub async fn compact(&self, session_id: &str) -> Result<()> {
762        tracing::info!(name: "a3s.session.compact", session_id = %session_id, "Compacting session context");
763
764        {
765            let session_lock = self.get_session(session_id).await?;
766            let mut session = session_lock.write().await;
767
768            // Get LLM client for compaction (if available)
769            let llm_client = if let Some(client) = &session.llm_client {
770                client.clone()
771            } else if let Some(client) = &self.llm_client {
772                client.clone()
773            } else {
774                // If no LLM client available, just do simple truncation
775                tracing::warn!("No LLM client configured for compaction, using simple truncation");
776                let keep_messages = 20;
777                if session.messages.len() > keep_messages {
778                    let len = session.messages.len();
779                    session.messages = session.messages.split_off(len - keep_messages);
780                }
781                // Persist after truncation
782                drop(session);
783                self.persist_in_background(session_id, "compact");
784                return Ok(());
785            };
786
787            session.compact(&llm_client).await?;
788        }
789
790        // Persist to store
791        self.persist_in_background(session_id, "compact");
792
793        Ok(())
794    }
795
796    /// Check if auto-compaction should be triggered and perform it if needed.
797    ///
798    /// Called after `generate()` / `generate_streaming()` updates session usage.
799    /// Triggers compaction when:
800    /// - `auto_compact` is enabled in session config
801    /// - `context_usage.percent` exceeds `auto_compact_threshold`
802    pub async fn maybe_auto_compact(&self, session_id: &str) -> Result<bool> {
803        let (should_compact, percent_before, messages_before) = {
804            let session_lock = self.get_session(session_id).await?;
805            let session = session_lock.read().await;
806
807            if !session.config.auto_compact {
808                return Ok(false);
809            }
810
811            let threshold = session.config.auto_compact_threshold;
812            let percent = session.context_usage.percent;
813            let msg_count = session.messages.len();
814
815            tracing::debug!(
816                "Auto-compact check for session {}: percent={:.2}%, threshold={:.2}%, messages={}",
817                session_id,
818                percent * 100.0,
819                threshold * 100.0,
820                msg_count,
821            );
822
823            (percent >= threshold, percent, msg_count)
824        };
825
826        if !should_compact {
827            return Ok(false);
828        }
829
830        tracing::info!(
831            name: "a3s.session.auto_compact",
832            session_id = %session_id,
833            percent_before = %format!("{:.1}%", percent_before * 100.0),
834            messages_before = %messages_before,
835            "Auto-compacting session due to high context usage"
836        );
837
838        // Perform compaction (reuses existing compact logic)
839        self.compact(session_id).await?;
840
841        // Get post-compaction message count
842        let messages_after = {
843            let session_lock = self.get_session(session_id).await?;
844            let session = session_lock.read().await;
845            session.messages.len()
846        };
847
848        // Broadcast event to notify clients
849        let event = AgentEvent::ContextCompacted {
850            session_id: session_id.to_string(),
851            before_messages: messages_before,
852            after_messages: messages_after,
853            percent_before,
854        };
855
856        // Try to send via session's event broadcaster
857        if let Ok(session_lock) = self.get_session(session_id).await {
858            let session = session_lock.read().await;
859            let _ = session.event_tx.send(event);
860        }
861
862        tracing::info!(
863            name: "a3s.session.auto_compact.done",
864            session_id = %session_id,
865            messages_before = %messages_before,
866            messages_after = %messages_after,
867            "Auto-compaction complete"
868        );
869
870        Ok(true)
871    }
872
873    /// Resolve the LLM client for a session (session-level -> default fallback)
874    ///
875    /// Returns `None` if no LLM client is configured at either level.
876    pub async fn get_llm_for_session(
877        &self,
878        session_id: &str,
879    ) -> Result<Option<Arc<dyn LlmClient>>> {
880        let session_lock = self.get_session(session_id).await?;
881        let session = session_lock.read().await;
882
883        if let Some(client) = &session.llm_client {
884            return Ok(Some(client.clone()));
885        }
886
887        Ok(self.llm_client.clone())
888    }
889
890    /// Configure session
891    pub async fn configure(
892        &self,
893        session_id: &str,
894        thinking: Option<bool>,
895        budget: Option<usize>,
896        model_config: Option<LlmConfig>,
897    ) -> Result<()> {
898        {
899            let session_lock = self.get_session(session_id).await?;
900            let mut session = session_lock.write().await;
901
902            if let Some(t) = thinking {
903                session.thinking_enabled = t;
904            }
905            if let Some(b) = budget {
906                session.thinking_budget = Some(b);
907            }
908            if let Some(ref config) = model_config {
909                tracing::info!(
910                    "Configuring session {} with LLM: provider={}, model={}",
911                    session_id,
912                    config.provider,
913                    config.model
914                );
915                session.model_name = Some(config.model.clone());
916                session.llm_client = Some(llm::create_client_with_config(config.clone()));
917            }
918        }
919
920        // Store LLM config for persistence (without API key)
921        if let Some(config) = model_config {
922            let llm_config_data = LlmConfigData {
923                provider: config.provider,
924                model: config.model,
925                api_key: None, // Don't persist API key
926                base_url: config.base_url,
927            };
928            let mut configs = self.llm_configs.write().await;
929            configs.insert(session_id.to_string(), llm_config_data);
930        }
931
932        // Persist to store
933        self.persist_in_background(session_id, "configure");
934
935        Ok(())
936    }
937
938    /// Get session count
939    pub async fn session_count(&self) -> usize {
940        let sessions = self.sessions.read().await;
941        sessions.len()
942    }
943
944    /// Check health of all registered stores
945    pub async fn store_health(&self) -> Vec<(String, Result<()>)> {
946        let stores = self.stores.read().await;
947        let mut results = Vec::new();
948        for (_, store) in stores.iter() {
949            let name = store.backend_name().to_string();
950            let result = store.health_check().await;
951            results.push((name, result));
952        }
953        results
954    }
955
956    /// List all loaded tools (built-in tools)
957    pub fn list_tools(&self) -> Vec<crate::llm::ToolDefinition> {
958        self.tool_executor.definitions()
959    }
960
961    /// Pause a session
962    pub async fn pause_session(&self, session_id: &str) -> Result<bool> {
963        let paused = {
964            let session_lock = self.get_session(session_id).await?;
965            let mut session = session_lock.write().await;
966            session.pause()
967        };
968
969        if paused {
970            self.persist_in_background(session_id, "pause");
971        }
972
973        Ok(paused)
974    }
975
976    /// Resume a session
977    pub async fn resume_session(&self, session_id: &str) -> Result<bool> {
978        let resumed = {
979            let session_lock = self.get_session(session_id).await?;
980            let mut session = session_lock.write().await;
981            session.resume()
982        };
983
984        if resumed {
985            self.persist_in_background(session_id, "resume");
986        }
987
988        Ok(resumed)
989    }
990
991    /// Cancel an ongoing operation for a session
992    ///
993    /// Returns true if an operation was cancelled, false if no operation was running.
994    pub async fn cancel_operation(&self, session_id: &str) -> Result<bool> {
995        // First, cancel any pending HITL confirmations
996        let session_lock = self.get_session(session_id).await?;
997        let cancelled_confirmations = {
998            let session = session_lock.read().await;
999            session.confirmation_manager.cancel_all().await
1000        };
1001
1002        if cancelled_confirmations > 0 {
1003            tracing::info!(
1004                "Cancelled {} pending confirmations for session {}",
1005                cancelled_confirmations,
1006                session_id
1007            );
1008        }
1009
1010        // Then, abort the ongoing operation if any
1011        let abort_handle = {
1012            let mut ops = self.ongoing_operations.write().await;
1013            ops.remove(session_id)
1014        };
1015
1016        if let Some(handle) = abort_handle {
1017            handle.abort();
1018            tracing::info!("Cancelled ongoing operation for session {}", session_id);
1019            Ok(true)
1020        } else if cancelled_confirmations > 0 {
1021            // We cancelled confirmations but no main operation
1022            Ok(true)
1023        } else {
1024            tracing::debug!("No ongoing operation to cancel for session {}", session_id);
1025            Ok(false)
1026        }
1027    }
1028
1029    /// Get all sessions (returns session locks for iteration)
1030    pub async fn get_all_sessions(&self) -> Vec<Arc<RwLock<Session>>> {
1031        let sessions = self.sessions.read().await;
1032        sessions.values().cloned().collect()
1033    }
1034
1035    /// Get tool executor reference
1036    pub fn tool_executor(&self) -> &Arc<ToolExecutor> {
1037        &self.tool_executor
1038    }
1039
1040    /// Confirm a tool execution (HITL)
1041    pub async fn confirm_tool(
1042        &self,
1043        session_id: &str,
1044        tool_id: &str,
1045        approved: bool,
1046        reason: Option<String>,
1047    ) -> Result<bool> {
1048        let session_lock = self.get_session(session_id).await?;
1049        let session = session_lock.read().await;
1050        session
1051            .confirmation_manager
1052            .confirm(tool_id, approved, reason)
1053            .await
1054            .map_err(|e| anyhow::anyhow!(e))
1055    }
1056
1057    /// Set confirmation policy for a session (HITL)
1058    pub async fn set_confirmation_policy(
1059        &self,
1060        session_id: &str,
1061        policy: ConfirmationPolicy,
1062    ) -> Result<ConfirmationPolicy> {
1063        {
1064            let session_lock = self.get_session(session_id).await?;
1065            let session = session_lock.read().await;
1066            session.set_confirmation_policy(policy.clone()).await;
1067        }
1068
1069        // Update config for persistence
1070        {
1071            let session_lock = self.get_session(session_id).await?;
1072            let mut session = session_lock.write().await;
1073            session.config.confirmation_policy = Some(policy.clone());
1074        }
1075
1076        // Persist to store
1077        self.persist_in_background(session_id, "set_confirmation_policy");
1078
1079        Ok(policy)
1080    }
1081
1082    /// Get confirmation policy for a session (HITL)
1083    pub async fn get_confirmation_policy(&self, session_id: &str) -> Result<ConfirmationPolicy> {
1084        let session_lock = self.get_session(session_id).await?;
1085        let session = session_lock.read().await;
1086        Ok(session.confirmation_policy().await)
1087    }
1088
1089    /// Set permission policy for a session
1090    pub async fn set_permission_policy(
1091        &self,
1092        session_id: &str,
1093        policy: PermissionPolicy,
1094    ) -> Result<PermissionPolicy> {
1095        {
1096            let session_lock = self.get_session(session_id).await?;
1097            let session = session_lock.read().await;
1098            session.set_permission_policy(policy.clone()).await;
1099        }
1100
1101        // Update config for persistence
1102        {
1103            let session_lock = self.get_session(session_id).await?;
1104            let mut session = session_lock.write().await;
1105            session.config.permission_policy = Some(policy.clone());
1106        }
1107
1108        // Persist to store
1109        self.persist_in_background(session_id, "set_permission_policy");
1110
1111        Ok(policy)
1112    }
1113
1114    /// Get permission policy for a session
1115    pub async fn get_permission_policy(&self, session_id: &str) -> Result<PermissionPolicy> {
1116        let session_lock = self.get_session(session_id).await?;
1117        let session = session_lock.read().await;
1118        Ok(session.permission_policy().await)
1119    }
1120
1121    /// Check permission for a tool invocation
1122    pub async fn check_permission(
1123        &self,
1124        session_id: &str,
1125        tool_name: &str,
1126        args: &serde_json::Value,
1127    ) -> Result<PermissionDecision> {
1128        let session_lock = self.get_session(session_id).await?;
1129        let session = session_lock.read().await;
1130        Ok(session.check_permission(tool_name, args).await)
1131    }
1132
1133    /// Add a permission rule
1134    pub async fn add_permission_rule(
1135        &self,
1136        session_id: &str,
1137        rule_type: &str,
1138        rule: &str,
1139    ) -> Result<()> {
1140        let session_lock = self.get_session(session_id).await?;
1141        let session = session_lock.read().await;
1142        match rule_type {
1143            "allow" => session.add_allow_rule(rule).await,
1144            "deny" => session.add_deny_rule(rule).await,
1145            "ask" => session.add_ask_rule(rule).await,
1146            _ => anyhow::bail!("Unknown rule type: {}", rule_type),
1147        }
1148        Ok(())
1149    }
1150
1151    /// Add a context provider to a session
1152    pub async fn add_context_provider(
1153        &self,
1154        session_id: &str,
1155        provider: Arc<dyn crate::context::ContextProvider>,
1156    ) -> Result<()> {
1157        let session_lock = self.get_session(session_id).await?;
1158        let mut session = session_lock.write().await;
1159        session.add_context_provider(provider);
1160        Ok(())
1161    }
1162
1163    /// Remove a context provider from a session by name
1164    pub async fn remove_context_provider(&self, session_id: &str, name: &str) -> Result<bool> {
1165        let session_lock = self.get_session(session_id).await?;
1166        let mut session = session_lock.write().await;
1167        Ok(session.remove_context_provider(name))
1168    }
1169
1170    /// List context provider names for a session
1171    pub async fn list_context_providers(&self, session_id: &str) -> Result<Vec<String>> {
1172        let session_lock = self.get_session(session_id).await?;
1173        let session = session_lock.read().await;
1174        Ok(session.context_provider_names())
1175    }
1176
1177    /// Set lane handler configuration
1178    pub async fn set_lane_handler(
1179        &self,
1180        session_id: &str,
1181        lane: crate::hitl::SessionLane,
1182        config: crate::queue::LaneHandlerConfig,
1183    ) -> Result<()> {
1184        let session_lock = self.get_session(session_id).await?;
1185        let session = session_lock.read().await;
1186        session.set_lane_handler(lane, config).await;
1187        Ok(())
1188    }
1189
1190    /// Get lane handler configuration
1191    pub async fn get_lane_handler(
1192        &self,
1193        session_id: &str,
1194        lane: crate::hitl::SessionLane,
1195    ) -> Result<crate::queue::LaneHandlerConfig> {
1196        let session_lock = self.get_session(session_id).await?;
1197        let session = session_lock.read().await;
1198        Ok(session.get_lane_handler(lane).await)
1199    }
1200
1201    /// Complete an external task
1202    pub async fn complete_external_task(
1203        &self,
1204        session_id: &str,
1205        task_id: &str,
1206        result: crate::queue::ExternalTaskResult,
1207    ) -> Result<bool> {
1208        let session_lock = self.get_session(session_id).await?;
1209        let session = session_lock.read().await;
1210        Ok(session.complete_external_task(task_id, result).await)
1211    }
1212
1213    /// Get pending external tasks for a session
1214    pub async fn pending_external_tasks(
1215        &self,
1216        session_id: &str,
1217    ) -> Result<Vec<crate::queue::ExternalTask>> {
1218        let session_lock = self.get_session(session_id).await?;
1219        let session = session_lock.read().await;
1220        Ok(session.pending_external_tasks().await)
1221    }
1222
1223    // ========================================================================
1224    // Task Management
1225    // ========================================================================
1226
1227    /// Get tasks for a session
1228    pub async fn get_tasks(&self, session_id: &str) -> Result<Vec<Task>> {
1229        let session_lock = self.get_session(session_id).await?;
1230        let session = session_lock.read().await;
1231        Ok(session.get_tasks().to_vec())
1232    }
1233
1234    /// Set tasks for a session
1235    pub async fn set_tasks(&self, session_id: &str, tasks: Vec<Task>) -> Result<Vec<Task>> {
1236        {
1237            let session_lock = self.get_session(session_id).await?;
1238            let mut session = session_lock.write().await;
1239            session.set_tasks(tasks);
1240        }
1241
1242        // Save session after updating tasks
1243        self.persist_in_background(session_id, "task_update");
1244
1245        // Return updated tasks
1246        self.get_tasks(session_id).await
1247    }
1248
1249    /// Fork a session, creating a new session with copied history and configuration
1250    ///
1251    /// The forked session gets:
1252    /// - A new unique ID
1253    /// - Copied conversation history (messages)
1254    /// - Copied configuration (with optional name override)
1255    /// - Copied usage statistics and cost
1256    /// - Copied tasks
1257    /// - `parent_id` set to the source session ID
1258    /// - Fresh timestamps (created_at = now)
1259    ///
1260    /// Non-serializable state (queue, HITL, permissions) is freshly initialized.
1261    pub async fn fork_session(
1262        &self,
1263        source_id: &str,
1264        new_id: String,
1265        new_name: Option<String>,
1266    ) -> Result<String> {
1267        tracing::info!(
1268            name: "a3s.session.fork",
1269            source_id = %source_id,
1270            new_id = %new_id,
1271            "Forking session"
1272        );
1273
1274        // Read source session data
1275        let (
1276            source_config,
1277            source_messages,
1278            source_usage,
1279            source_cost,
1280            source_model_name,
1281            source_thinking_enabled,
1282            source_thinking_budget,
1283            source_tasks,
1284            source_context_usage,
1285        ) = {
1286            let session_lock = self
1287                .get_session(source_id)
1288                .await
1289                .context(format!("Source session '{}' not found for fork", source_id))?;
1290            let session = session_lock.read().await;
1291            (
1292                session.config.clone(),
1293                session.messages.clone(),
1294                session.total_usage.clone(),
1295                session.total_cost,
1296                session.model_name.clone(),
1297                session.thinking_enabled,
1298                session.thinking_budget,
1299                session.tasks.clone(),
1300                session.context_usage.clone(),
1301            )
1302        };
1303
1304        // Copy LLM config if source has one
1305        let source_llm_config = {
1306            let configs = self.llm_configs.read().await;
1307            configs.get(source_id).cloned()
1308        };
1309
1310        // Build forked config
1311        let mut forked_config = source_config;
1312        if let Some(name) = new_name {
1313            forked_config.name = name;
1314        } else {
1315            forked_config.name = format!("{} (fork)", forked_config.name);
1316        }
1317        forked_config.parent_id = Some(source_id.to_string());
1318
1319        // Create the new session
1320        let tools = self.tool_executor.definitions();
1321        let mut new_session = Session::new(new_id.clone(), forked_config, tools).await?;
1322
1323        // Copy state from source
1324        new_session.messages = source_messages;
1325        new_session.total_usage = source_usage;
1326        new_session.total_cost = source_cost;
1327        new_session.model_name = source_model_name;
1328        new_session.thinking_enabled = source_thinking_enabled;
1329        new_session.thinking_budget = source_thinking_budget;
1330        new_session.tasks = source_tasks;
1331        new_session.context_usage = source_context_usage;
1332
1333        // Start the command queue
1334        new_session.start_queue().await?;
1335
1336        // Record storage type
1337        {
1338            let mut storage_types = self.session_storage_types.write().await;
1339            storage_types.insert(new_id.clone(), new_session.config.storage_type.clone());
1340        }
1341
1342        // Copy LLM config if source had one
1343        if let Some(llm_config) = source_llm_config {
1344            let mut configs = self.llm_configs.write().await;
1345            configs.insert(new_id.clone(), llm_config);
1346        }
1347
1348        // Insert the new session
1349        {
1350            let mut sessions = self.sessions.write().await;
1351            sessions.insert(new_id.clone(), Arc::new(RwLock::new(new_session)));
1352        }
1353
1354        // Persist to store
1355        self.persist_in_background(&new_id, "fork");
1356
1357        tracing::info!(
1358            "Forked session '{}' -> '{}' with parent_id set",
1359            source_id,
1360            new_id,
1361        );
1362
1363        Ok(new_id)
1364    }
1365
1366    /// Generate a short title for a session based on its conversation content
1367    ///
1368    /// Uses the session's LLM client (or default) to generate a concise title
1369    /// from the first few messages. The title is automatically set on the session.
1370    ///
1371    /// Returns the generated title, or None if no LLM client is available
1372    /// or the session has no messages.
1373    pub async fn generate_title(&self, session_id: &str) -> Result<Option<String>> {
1374        tracing::info!(
1375            name: "a3s.session.generate_title",
1376            session_id = %session_id,
1377            "Generating session title"
1378        );
1379
1380        // Get the first few messages for context
1381        let messages = {
1382            let session_lock = self.get_session(session_id).await?;
1383            let session = session_lock.read().await;
1384
1385            if session.messages.is_empty() {
1386                return Ok(None);
1387            }
1388
1389            // Take up to the first 4 messages for title generation
1390            session.messages.iter().take(4).cloned().collect::<Vec<_>>()
1391        };
1392
1393        // Get LLM client
1394        let llm_client = self.get_llm_for_session(session_id).await?;
1395        let Some(client) = llm_client else {
1396            tracing::debug!("No LLM client available for title generation");
1397            return Ok(None);
1398        };
1399
1400        // Build a summary of the conversation for the title prompt
1401        let mut conversation_summary = String::new();
1402        for msg in &messages {
1403            let role = &msg.role;
1404            for block in &msg.content {
1405                if let ContentBlock::Text { text } = block {
1406                    // Limit each message to 200 chars for the title prompt
1407                    let truncated = if text.len() > 200 {
1408                        format!("{}...", &text[..200])
1409                    } else {
1410                        text.clone()
1411                    };
1412                    conversation_summary.push_str(&format!("{}: {}\n", role, truncated));
1413                }
1414            }
1415        }
1416
1417        if conversation_summary.is_empty() {
1418            return Ok(None);
1419        }
1420
1421        // Ask LLM to generate a title
1422        let title_prompt = Message::user(&crate::prompts::render(
1423            crate::prompts::TITLE_GENERATE,
1424            &[("conversation", &conversation_summary)],
1425        ));
1426
1427        let response = client
1428            .complete(&[title_prompt], None, &[])
1429            .await
1430            .context("Failed to generate session title")?;
1431
1432        // Extract title from response
1433        let title = response
1434            .message
1435            .content
1436            .iter()
1437            .find_map(|block| {
1438                if let ContentBlock::Text { text } = block {
1439                    Some(text.trim().to_string())
1440                } else {
1441                    None
1442                }
1443            })
1444            .unwrap_or_default();
1445
1446        if title.is_empty() {
1447            return Ok(None);
1448        }
1449
1450        // Truncate if too long (safety net)
1451        let title = if title.len() > 80 {
1452            format!("{}...", &title[..77])
1453        } else {
1454            title
1455        };
1456
1457        // Update session name
1458        {
1459            let session_lock = self.get_session(session_id).await?;
1460            let mut session = session_lock.write().await;
1461            session.config.name = title.clone();
1462            session.touch();
1463        }
1464
1465        // Persist
1466        self.persist_in_background(session_id, "title_generation");
1467
1468        tracing::info!("Generated title for session '{}': '{}'", session_id, title);
1469        Ok(Some(title))
1470    }
1471}