1use super::{ContextUsage, Session, SessionConfig, SessionState};
4use crate::agent::{AgentConfig, AgentEvent, AgentLoop, AgentResult};
5use crate::hitl::ConfirmationPolicy;
6use crate::llm::{self, LlmClient, LlmConfig, Message};
7use crate::memory::AgentMemory;
8use crate::prompts::SystemPromptSlots;
9use crate::skills::SkillRegistry;
10use crate::store::{FileSessionStore, LlmConfigData, SessionData, SessionStore};
11use crate::mcp::McpManager;
12use crate::tools::ToolExecutor;
13use a3s_memory::MemoryStore;
14use anyhow::{Context, Result};
15use std::collections::HashMap;
16use std::sync::Arc;
17use tokio::sync::{mpsc, RwLock};
18
19#[derive(Clone)]
21pub struct SessionManager {
22 pub(crate) sessions: Arc<RwLock<HashMap<String, Arc<RwLock<Session>>>>>,
23 pub(crate) llm_client: Arc<RwLock<Option<Arc<dyn LlmClient>>>>,
26 pub(crate) tool_executor: Arc<ToolExecutor>,
27 pub(crate) stores: Arc<RwLock<HashMap<crate::config::StorageBackend, Arc<dyn SessionStore>>>>,
29 pub(crate) session_storage_types: Arc<RwLock<HashMap<String, crate::config::StorageBackend>>>,
31 pub(crate) llm_configs: Arc<RwLock<HashMap<String, LlmConfigData>>>,
33 pub(crate) ongoing_operations: Arc<RwLock<HashMap<String, tokio::task::AbortHandle>>>,
35 pub(crate) skill_registry: Arc<RwLock<Option<Arc<SkillRegistry>>>>,
37 pub(crate) memory_store: Arc<RwLock<Option<Arc<dyn MemoryStore>>>>,
41 pub(crate) mcp_manager: Arc<RwLock<Option<Arc<McpManager>>>>,
44}
45
46impl SessionManager {
47 pub fn new(llm_client: Option<Arc<dyn LlmClient>>, tool_executor: Arc<ToolExecutor>) -> Self {
49 Self {
50 sessions: Arc::new(RwLock::new(HashMap::new())),
51 llm_client: Arc::new(RwLock::new(llm_client)),
52 tool_executor,
53 stores: Arc::new(RwLock::new(HashMap::new())),
54 session_storage_types: Arc::new(RwLock::new(HashMap::new())),
55 llm_configs: Arc::new(RwLock::new(HashMap::new())),
56 ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
57 skill_registry: Arc::new(RwLock::new(None)),
58 memory_store: Arc::new(RwLock::new(None)),
59 mcp_manager: Arc::new(RwLock::new(None)),
60 }
61 }
62
63 pub async fn with_persistence<P: AsRef<std::path::Path>>(
67 llm_client: Option<Arc<dyn LlmClient>>,
68 tool_executor: Arc<ToolExecutor>,
69 sessions_dir: P,
70 ) -> Result<Self> {
71 let store = FileSessionStore::new(sessions_dir).await?;
72 let mut stores = HashMap::new();
73 stores.insert(
74 crate::config::StorageBackend::File,
75 Arc::new(store) as Arc<dyn SessionStore>,
76 );
77
78 let manager = Self {
79 sessions: Arc::new(RwLock::new(HashMap::new())),
80 llm_client: Arc::new(RwLock::new(llm_client)),
81 tool_executor,
82 stores: Arc::new(RwLock::new(stores)),
83 session_storage_types: Arc::new(RwLock::new(HashMap::new())),
84 llm_configs: Arc::new(RwLock::new(HashMap::new())),
85 ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
86 skill_registry: Arc::new(RwLock::new(None)),
87 memory_store: Arc::new(RwLock::new(None)),
88 mcp_manager: Arc::new(RwLock::new(None)),
89 };
90
91 Ok(manager)
92 }
93
94 pub fn with_store(
99 llm_client: Option<Arc<dyn LlmClient>>,
100 tool_executor: Arc<ToolExecutor>,
101 store: Arc<dyn SessionStore>,
102 backend: crate::config::StorageBackend,
103 ) -> Self {
104 let mut stores = HashMap::new();
105 stores.insert(backend, store);
106
107 Self {
108 sessions: Arc::new(RwLock::new(HashMap::new())),
109 llm_client: Arc::new(RwLock::new(llm_client)),
110 tool_executor,
111 stores: Arc::new(RwLock::new(stores)),
112 session_storage_types: Arc::new(RwLock::new(HashMap::new())),
113 llm_configs: Arc::new(RwLock::new(HashMap::new())),
114 ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
115 skill_registry: Arc::new(RwLock::new(None)),
116 memory_store: Arc::new(RwLock::new(None)),
117 mcp_manager: Arc::new(RwLock::new(None)),
118 }
119 }
120
121 pub async fn set_default_llm(&self, client: Option<Arc<dyn LlmClient>>) {
126 *self.llm_client.write().await = client;
127 }
128
129 pub async fn set_skill_registry(
136 &self,
137 registry: Arc<SkillRegistry>,
138 skills_dir: std::path::PathBuf,
139 ) {
140 let manage_tool = crate::skills::ManageSkillTool::new(registry.clone(), skills_dir);
142 self.tool_executor
143 .register_dynamic_tool(Arc::new(manage_tool));
144
145 *self.skill_registry.write().await = Some(registry);
146 }
147
148 pub async fn skill_registry(&self) -> Option<Arc<SkillRegistry>> {
150 self.skill_registry.read().await.clone()
151 }
152
153 pub async fn set_memory_store(&self, store: Arc<dyn MemoryStore>) {
160 *self.memory_store.write().await = Some(store);
161 }
162
163 pub async fn memory_store(&self) -> Option<Arc<dyn MemoryStore>> {
165 self.memory_store.read().await.clone()
166 }
167
168 pub async fn set_mcp_manager(&self, manager: Arc<McpManager>) {
173 let all_tools = manager.get_all_tools().await;
174 let mut by_server: HashMap<String, Vec<crate::mcp::McpTool>> = HashMap::new();
175 for (server, tool) in all_tools {
176 by_server.entry(server).or_default().push(tool);
177 }
178 for (server_name, tools) in by_server {
179 for tool in crate::mcp::tools::create_mcp_tools(&server_name, tools, Arc::clone(&manager)) {
180 self.tool_executor.register_dynamic_tool(tool);
181 }
182 }
183 *self.mcp_manager.write().await = Some(manager);
184 }
185
186 pub async fn add_mcp_server(&self, config: crate::mcp::McpServerConfig) -> Result<()> {
191 let manager = {
192 let guard = self.mcp_manager.read().await;
193 match guard.clone() {
194 Some(m) => m,
195 None => {
196 drop(guard);
197 let m = Arc::new(McpManager::new());
198 *self.mcp_manager.write().await = Some(Arc::clone(&m));
199 m
200 }
201 }
202 };
203 let name = config.name.clone();
204 manager.register_server(config).await;
205 manager.connect(&name).await?;
206 let tools = manager.get_server_tools(&name).await;
207 for tool in crate::mcp::tools::create_mcp_tools(&name, tools, Arc::clone(&manager)) {
208 self.tool_executor.register_dynamic_tool(tool);
209 }
210 Ok(())
211 }
212
213 pub async fn remove_mcp_server(&self, name: &str) -> Result<()> {
217 let guard = self.mcp_manager.read().await;
218 if let Some(ref manager) = *guard {
219 manager.disconnect(name).await?;
220 }
221 self.tool_executor.unregister_tools_by_prefix(&format!("mcp__{name}__"));
222 Ok(())
223 }
224
225 pub async fn mcp_status(&self) -> std::collections::HashMap<String, crate::mcp::McpServerStatus> {
227 let guard = self.mcp_manager.read().await;
228 match guard.as_ref() {
229 Some(m) => {
230 let m = Arc::clone(m);
231 drop(guard);
232 m.get_status().await
233 }
234 None => std::collections::HashMap::new(),
235 }
236 }
237
238 pub async fn restore_session_by_id(&self, session_id: &str) -> Result<()> {
243 {
245 let sessions = self.sessions.read().await;
246 if sessions.contains_key(session_id) {
247 return Ok(());
248 }
249 }
250
251 let stores = self.stores.read().await;
252 for (backend, store) in stores.iter() {
253 match store.load(session_id).await {
254 Ok(Some(data)) => {
255 {
256 let mut storage_types = self.session_storage_types.write().await;
257 storage_types.insert(data.id.clone(), backend.clone());
258 }
259 self.restore_session(data).await?;
260 return Ok(());
261 }
262 Ok(None) => continue,
263 Err(e) => {
264 tracing::warn!(
265 "Failed to load session {} from {:?}: {}",
266 session_id,
267 backend,
268 e
269 );
270 continue;
271 }
272 }
273 }
274
275 Err(anyhow::anyhow!(
276 "Session {} not found in any store",
277 session_id
278 ))
279 }
280
281 pub async fn load_all_sessions(&mut self) -> Result<usize> {
283 let stores = self.stores.read().await;
284 let mut loaded = 0;
285
286 for (backend, store) in stores.iter() {
287 let session_ids = match store.list().await {
288 Ok(ids) => ids,
289 Err(e) => {
290 tracing::warn!("Failed to list sessions from {:?} store: {}", backend, e);
291 continue;
292 }
293 };
294
295 for id in session_ids {
296 match store.load(&id).await {
297 Ok(Some(data)) => {
298 {
300 let mut storage_types = self.session_storage_types.write().await;
301 storage_types.insert(data.id.clone(), backend.clone());
302 }
303
304 if let Err(e) = self.restore_session(data).await {
305 tracing::warn!("Failed to restore session {}: {}", id, e);
306 } else {
307 loaded += 1;
308 }
309 }
310 Ok(None) => {
311 tracing::warn!("Session {} not found in store", id);
312 }
313 Err(e) => {
314 tracing::warn!("Failed to load session {}: {}", id, e);
315 }
316 }
317 }
318 }
319
320 tracing::info!("Loaded {} sessions from store", loaded);
321 Ok(loaded)
322 }
323
324 async fn restore_session(&self, data: SessionData) -> Result<()> {
326 let tools = self.tool_executor.definitions();
327 let mut session = Session::new(data.id.clone(), data.config.clone(), tools).await?;
328
329 session.restore_from_data(&data);
331
332 if let Some(llm_config) = &data.llm_config {
334 let mut configs = self.llm_configs.write().await;
335 configs.insert(data.id.clone(), llm_config.clone());
336 }
337
338 let mut sessions = self.sessions.write().await;
339 sessions.insert(data.id.clone(), Arc::new(RwLock::new(session)));
340
341 tracing::info!("Restored session: {}", data.id);
342 Ok(())
343 }
344
345 async fn save_session(&self, session_id: &str) -> Result<()> {
347 let storage_type = {
349 let storage_types = self.session_storage_types.read().await;
350 storage_types.get(session_id).cloned()
351 };
352
353 let Some(storage_type) = storage_type else {
354 return Ok(());
356 };
357
358 if storage_type == crate::config::StorageBackend::Memory {
360 return Ok(());
361 }
362
363 let stores = self.stores.read().await;
365 let Some(store) = stores.get(&storage_type) else {
366 tracing::warn!("No store available for storage type: {:?}", storage_type);
367 return Ok(());
368 };
369
370 let session_lock = self.get_session(session_id).await?;
371 let session = session_lock.read().await;
372
373 let llm_config = {
375 let configs = self.llm_configs.read().await;
376 configs.get(session_id).cloned()
377 };
378
379 let data = session.to_session_data(llm_config);
380 store.save(&data).await?;
381
382 tracing::debug!("Saved session: {}", session_id);
383 Ok(())
384 }
385
386 async fn persist_or_warn(&self, session_id: &str, operation: &str) {
390 if let Err(e) = self.save_session(session_id).await {
391 tracing::warn!(
392 "Failed to persist session {} after {}: {}",
393 session_id,
394 operation,
395 e
396 );
397 if let Ok(session_lock) = self.get_session(session_id).await {
399 let session = session_lock.read().await;
400 let _ = session.event_tx().send(AgentEvent::PersistenceFailed {
401 session_id: session_id.to_string(),
402 operation: operation.to_string(),
403 error: e.to_string(),
404 });
405 }
406 }
407 }
408
409 fn persist_in_background(&self, session_id: &str, operation: &str) {
414 let mgr = self.clone();
415 let sid = session_id.to_string();
416 let op = operation.to_string();
417 tokio::spawn(async move {
418 mgr.persist_or_warn(&sid, &op).await;
419 });
420 }
421
422 pub async fn create_session(&self, id: String, config: SessionConfig) -> Result<String> {
424 tracing::info!(name: "a3s.session.create", session_id = %id, "Creating session");
425
426 {
428 let mut storage_types = self.session_storage_types.write().await;
429 storage_types.insert(id.clone(), config.storage_type.clone());
430 }
431
432 let tools = self.tool_executor.definitions();
434 let mut session = Session::new(id.clone(), config, tools).await?;
435
436 session.start_queue().await?;
438
439 if session.config.max_context_length > 0 {
441 session.context_usage.max_tokens = session.config.max_context_length as usize;
442 }
443
444 {
445 let mut sessions = self.sessions.write().await;
446 sessions.insert(id.clone(), Arc::new(RwLock::new(session)));
447 }
448
449 self.persist_in_background(&id, "create");
451
452 tracing::info!("Created session: {}", id);
453 Ok(id)
454 }
455
456 pub async fn destroy_session(&self, id: &str) -> Result<()> {
458 tracing::info!(name: "a3s.session.destroy", session_id = %id, "Destroying session");
459
460 let storage_type = {
462 let storage_types = self.session_storage_types.read().await;
463 storage_types.get(id).cloned()
464 };
465
466 {
467 let mut sessions = self.sessions.write().await;
468 sessions.remove(id);
469 }
470
471 {
473 let mut configs = self.llm_configs.write().await;
474 configs.remove(id);
475 }
476
477 {
479 let mut storage_types = self.session_storage_types.write().await;
480 storage_types.remove(id);
481 }
482
483 if let Some(storage_type) = storage_type {
485 if storage_type != crate::config::StorageBackend::Memory {
486 let stores = self.stores.read().await;
487 if let Some(store) = stores.get(&storage_type) {
488 if let Err(e) = store.delete(id).await {
489 tracing::warn!("Failed to delete session {} from store: {}", id, e);
490 }
491 }
492 }
493 }
494
495 tracing::info!("Destroyed session: {}", id);
496 Ok(())
497 }
498
499 pub async fn get_session(&self, id: &str) -> Result<Arc<RwLock<Session>>> {
501 let sessions = self.sessions.read().await;
502 sessions
503 .get(id)
504 .cloned()
505 .context(format!("Session not found: {}", id))
506 }
507
508 pub async fn create_child_session(
513 &self,
514 parent_id: &str,
515 child_id: String,
516 mut config: SessionConfig,
517 ) -> Result<String> {
518 let parent_lock = self.get_session(parent_id).await?;
520 let parent_llm_client = {
521 let parent = parent_lock.read().await;
522
523 if config.confirmation_policy.is_none() {
525 let parent_policy = parent.confirmation_manager.policy().await;
526 config.confirmation_policy = Some(parent_policy);
527 }
528
529 parent.llm_client.clone()
530 };
531
532 config.parent_id = Some(parent_id.to_string());
534
535 let tools = self.tool_executor.definitions();
537 let mut session = Session::new(child_id.clone(), config, tools).await?;
538
539 if session.llm_client.is_none() {
541 let default_llm = self.llm_client.read().await.clone();
542 session.llm_client = parent_llm_client.or(default_llm);
543 }
544
545 session.start_queue().await?;
547
548 if session.config.max_context_length > 0 {
550 session.context_usage.max_tokens = session.config.max_context_length as usize;
551 }
552
553 {
554 let mut sessions = self.sessions.write().await;
555 sessions.insert(child_id.clone(), Arc::new(RwLock::new(session)));
556 }
557
558 self.persist_in_background(&child_id, "create_child");
560
561 tracing::info!(
562 "Created child session: {} (parent: {})",
563 child_id,
564 parent_id
565 );
566 Ok(child_id)
567 }
568
569 pub async fn get_child_sessions(&self, parent_id: &str) -> Vec<String> {
571 let sessions = self.sessions.read().await;
572 let mut children = Vec::new();
573
574 for (id, session_lock) in sessions.iter() {
575 let session = session_lock.read().await;
576 if session.parent_id.as_deref() == Some(parent_id) {
577 children.push(id.clone());
578 }
579 }
580
581 children
582 }
583
584 pub async fn is_child_session(&self, session_id: &str) -> Result<bool> {
586 let session_lock = self.get_session(session_id).await?;
587 let session = session_lock.read().await;
588 Ok(session.is_child_session())
589 }
590
591 pub async fn generate(&self, session_id: &str, prompt: &str) -> Result<AgentResult> {
593 let session_lock = self.get_session(session_id).await?;
594
595 {
597 let session = session_lock.read().await;
598 if session.state == SessionState::Paused {
599 anyhow::bail!(
600 "Session {} is paused. Call Resume before generating.",
601 session_id
602 );
603 }
604 }
605
606 let (
608 history,
609 system,
610 tools,
611 session_llm_client,
612 permission_checker,
613 confirmation_manager,
614 context_providers,
615 session_workspace,
616 tool_metrics,
617 hook_engine,
618 planning_enabled,
619 goal_tracking,
620 ) = {
621 let session = session_lock.read().await;
622 (
623 session.messages.clone(),
624 session.system().map(String::from),
625 session.tools.clone(),
626 session.llm_client.clone(),
627 session.permission_checker.clone(),
628 session.confirmation_manager.clone(),
629 session.context_providers.clone(),
630 session.config.workspace.clone(),
631 session.tool_metrics.clone(),
632 session.config.hook_engine.clone(),
633 session.config.planning_enabled,
634 session.config.goal_tracking,
635 )
636 };
637
638 let llm_client = if let Some(client) = session_llm_client {
640 client
641 } else if let Some(client) = self.llm_client.read().await.clone() {
642 client
643 } else {
644 anyhow::bail!(
645 "LLM client not configured for session {}. Please call Configure RPC with model configuration first.",
646 session_id
647 );
648 };
649
650 let tool_context = if session_workspace.is_empty() {
652 crate::tools::ToolContext::new(self.tool_executor.workspace().clone())
653 .with_session_id(session_id)
654 } else {
655 crate::tools::ToolContext::new(std::path::PathBuf::from(&session_workspace))
656 .with_session_id(session_id)
657 };
658
659 let skill_registry = self.skill_registry.read().await.clone();
661 let system = if let Some(ref registry) = skill_registry {
662 let skill_prompt = registry.to_system_prompt();
663 if skill_prompt.is_empty() {
664 system
665 } else {
666 match system {
667 Some(existing) => Some(format!("{}\n\n{}", existing, skill_prompt)),
668 None => Some(skill_prompt),
669 }
670 }
671 } else {
672 system
673 };
674
675 let effective_prompt = if let Some(ref registry) = skill_registry {
677 let skill_content = registry.match_skills(prompt);
678 if skill_content.is_empty() {
679 prompt.to_string()
680 } else {
681 format!("{}\n\n---\n\n{}", skill_content, prompt)
682 }
683 } else {
684 prompt.to_string()
685 };
686
687 let memory = self
689 .memory_store
690 .read()
691 .await
692 .as_ref()
693 .map(|store| Arc::new(AgentMemory::new(store.clone())));
694
695 let config = AgentConfig {
697 prompt_slots: match system {
698 Some(s) => SystemPromptSlots::from_legacy(s),
699 None => SystemPromptSlots::default(),
700 },
701 tools,
702 max_tool_rounds: 50,
703 permission_checker: Some(permission_checker),
704 confirmation_manager: Some(confirmation_manager),
705 context_providers,
706 planning_enabled,
707 goal_tracking,
708 hook_engine,
709 skill_registry,
710 memory,
711 ..AgentConfig::default()
712 };
713
714 let agent = AgentLoop::new(llm_client, self.tool_executor.clone(), tool_context, config)
715 .with_tool_metrics(tool_metrics);
716
717 let result = agent
719 .execute_with_session(&history, &effective_prompt, Some(session_id), None)
720 .await?;
721
722 {
724 let mut session = session_lock.write().await;
725 session.messages = result.messages.clone();
726 session.update_usage(&result.usage);
727 }
728
729 self.persist_in_background(session_id, "generate");
731
732 if let Err(e) = self.maybe_auto_compact(session_id).await {
734 tracing::warn!("Auto-compact failed for session {}: {}", session_id, e);
735 }
736
737 Ok(result)
738 }
739
740 pub async fn generate_streaming(
742 &self,
743 session_id: &str,
744 prompt: &str,
745 ) -> Result<(
746 mpsc::Receiver<AgentEvent>,
747 tokio::task::JoinHandle<Result<AgentResult>>,
748 )> {
749 let session_lock = self.get_session(session_id).await?;
750
751 {
753 let session = session_lock.read().await;
754 if session.state == SessionState::Paused {
755 anyhow::bail!(
756 "Session {} is paused. Call Resume before generating.",
757 session_id
758 );
759 }
760 }
761
762 let (
764 history,
765 system,
766 tools,
767 session_llm_client,
768 permission_checker,
769 confirmation_manager,
770 context_providers,
771 session_workspace,
772 tool_metrics,
773 hook_engine,
774 planning_enabled,
775 goal_tracking,
776 ) = {
777 let session = session_lock.read().await;
778 (
779 session.messages.clone(),
780 session.system().map(String::from),
781 session.tools.clone(),
782 session.llm_client.clone(),
783 session.permission_checker.clone(),
784 session.confirmation_manager.clone(),
785 session.context_providers.clone(),
786 session.config.workspace.clone(),
787 session.tool_metrics.clone(),
788 session.config.hook_engine.clone(),
789 session.config.planning_enabled,
790 session.config.goal_tracking,
791 )
792 };
793
794 let llm_client = if let Some(client) = session_llm_client {
796 client
797 } else if let Some(client) = self.llm_client.read().await.clone() {
798 client
799 } else {
800 anyhow::bail!(
801 "LLM client not configured for session {}. Please call Configure RPC with model configuration first.",
802 session_id
803 );
804 };
805
806 let tool_context = if session_workspace.is_empty() {
808 crate::tools::ToolContext::new(self.tool_executor.workspace().clone())
809 .with_session_id(session_id)
810 } else {
811 crate::tools::ToolContext::new(std::path::PathBuf::from(&session_workspace))
812 .with_session_id(session_id)
813 };
814
815 let skill_registry = self.skill_registry.read().await.clone();
817 let system = if let Some(ref registry) = skill_registry {
818 let skill_prompt = registry.to_system_prompt();
819 if skill_prompt.is_empty() {
820 system
821 } else {
822 match system {
823 Some(existing) => Some(format!("{}\n\n{}", existing, skill_prompt)),
824 None => Some(skill_prompt),
825 }
826 }
827 } else {
828 system
829 };
830
831 let effective_prompt = if let Some(ref registry) = skill_registry {
833 let skill_content = registry.match_skills(prompt);
834 if skill_content.is_empty() {
835 prompt.to_string()
836 } else {
837 format!("{}\n\n---\n\n{}", skill_content, prompt)
838 }
839 } else {
840 prompt.to_string()
841 };
842
843 let memory = self
845 .memory_store
846 .read()
847 .await
848 .as_ref()
849 .map(|store| Arc::new(AgentMemory::new(store.clone())));
850
851 let config = AgentConfig {
853 prompt_slots: match system {
854 Some(s) => SystemPromptSlots::from_legacy(s),
855 None => SystemPromptSlots::default(),
856 },
857 tools,
858 max_tool_rounds: 50,
859 permission_checker: Some(permission_checker),
860 confirmation_manager: Some(confirmation_manager),
861 context_providers,
862 planning_enabled,
863 goal_tracking,
864 hook_engine,
865 skill_registry,
866 memory,
867 ..AgentConfig::default()
868 };
869
870 let agent = AgentLoop::new(llm_client, self.tool_executor.clone(), tool_context, config)
871 .with_tool_metrics(tool_metrics);
872
873 let (rx, handle) = agent.execute_streaming(&history, &effective_prompt).await?;
875
876 let abort_handle = handle.abort_handle();
878 {
879 let mut ops = self.ongoing_operations.write().await;
880 ops.insert(session_id.to_string(), abort_handle);
881 }
882
883 let session_lock_clone = session_lock.clone();
885 let original_handle = handle;
886 let stores = self.stores.clone();
887 let session_storage_types = self.session_storage_types.clone();
888 let llm_configs = self.llm_configs.clone();
889 let session_id_owned = session_id.to_string();
890 let ongoing_operations = self.ongoing_operations.clone();
891 let session_manager = self.clone();
892
893 let wrapped_handle = tokio::spawn(async move {
894 let result = original_handle.await??;
895
896 {
898 let mut ops = ongoing_operations.write().await;
899 ops.remove(&session_id_owned);
900 }
901
902 {
904 let mut session = session_lock_clone.write().await;
905 session.messages = result.messages.clone();
906 session.update_usage(&result.usage);
907 }
908
909 let storage_type = {
911 let storage_types = session_storage_types.read().await;
912 storage_types.get(&session_id_owned).cloned()
913 };
914
915 if let Some(storage_type) = storage_type {
916 if storage_type != crate::config::StorageBackend::Memory {
917 let stores_guard = stores.read().await;
918 if let Some(store) = stores_guard.get(&storage_type) {
919 let session = session_lock_clone.read().await;
920 let llm_config = {
921 let configs = llm_configs.read().await;
922 configs.get(&session_id_owned).cloned()
923 };
924 let data = session.to_session_data(llm_config);
925 if let Err(e) = store.save(&data).await {
926 tracing::warn!(
927 "Failed to persist session {} after streaming: {}",
928 session_id_owned,
929 e
930 );
931 }
932 }
933 }
934 }
935
936 if let Err(e) = session_manager.maybe_auto_compact(&session_id_owned).await {
938 tracing::warn!(
939 "Auto-compact failed for session {}: {}",
940 session_id_owned,
941 e
942 );
943 }
944
945 Ok(result)
946 });
947
948 Ok((rx, wrapped_handle))
949 }
950
951 pub async fn context_usage(&self, session_id: &str) -> Result<ContextUsage> {
953 let session_lock = self.get_session(session_id).await?;
954 let session = session_lock.read().await;
955 Ok(session.context_usage.clone())
956 }
957
958 pub async fn history(&self, session_id: &str) -> Result<Vec<Message>> {
960 let session_lock = self.get_session(session_id).await?;
961 let session = session_lock.read().await;
962 Ok(session.messages.clone())
963 }
964
965 pub async fn clear(&self, session_id: &str) -> Result<()> {
967 {
968 let session_lock = self.get_session(session_id).await?;
969 let mut session = session_lock.write().await;
970 session.clear();
971 }
972
973 self.persist_in_background(session_id, "clear");
975
976 Ok(())
977 }
978
979 pub async fn compact(&self, session_id: &str) -> Result<()> {
981 tracing::info!(name: "a3s.session.compact", session_id = %session_id, "Compacting session context");
982
983 {
984 let session_lock = self.get_session(session_id).await?;
985 let mut session = session_lock.write().await;
986
987 let llm_client = if let Some(client) = &session.llm_client {
989 client.clone()
990 } else if let Some(client) = self.llm_client.read().await.clone() {
991 client
992 } else {
993 tracing::warn!("No LLM client configured for compaction, using simple truncation");
995 let keep_messages = 20;
996 if session.messages.len() > keep_messages {
997 let len = session.messages.len();
998 session.messages = session.messages.split_off(len - keep_messages);
999 }
1000 drop(session);
1002 self.persist_in_background(session_id, "compact");
1003 return Ok(());
1004 };
1005
1006 session.compact(&llm_client).await?;
1007 }
1008
1009 self.persist_in_background(session_id, "compact");
1011
1012 Ok(())
1013 }
1014
1015 pub async fn maybe_auto_compact(&self, session_id: &str) -> Result<bool> {
1022 let (should_compact, percent_before, messages_before) = {
1023 let session_lock = self.get_session(session_id).await?;
1024 let session = session_lock.read().await;
1025
1026 if !session.config.auto_compact {
1027 return Ok(false);
1028 }
1029
1030 let threshold = session.config.auto_compact_threshold;
1031 let percent = session.context_usage.percent;
1032 let msg_count = session.messages.len();
1033
1034 tracing::debug!(
1035 "Auto-compact check for session {}: percent={:.2}%, threshold={:.2}%, messages={}",
1036 session_id,
1037 percent * 100.0,
1038 threshold * 100.0,
1039 msg_count,
1040 );
1041
1042 (percent >= threshold, percent, msg_count)
1043 };
1044
1045 if !should_compact {
1046 return Ok(false);
1047 }
1048
1049 tracing::info!(
1050 name: "a3s.session.auto_compact",
1051 session_id = %session_id,
1052 percent_before = %format!("{:.1}%", percent_before * 100.0),
1053 messages_before = %messages_before,
1054 "Auto-compacting session due to high context usage"
1055 );
1056
1057 self.compact(session_id).await?;
1059
1060 let messages_after = {
1062 let session_lock = self.get_session(session_id).await?;
1063 let session = session_lock.read().await;
1064 session.messages.len()
1065 };
1066
1067 let event = AgentEvent::ContextCompacted {
1069 session_id: session_id.to_string(),
1070 before_messages: messages_before,
1071 after_messages: messages_after,
1072 percent_before,
1073 };
1074
1075 if let Ok(session_lock) = self.get_session(session_id).await {
1077 let session = session_lock.read().await;
1078 let _ = session.event_tx.send(event);
1079 }
1080
1081 tracing::info!(
1082 name: "a3s.session.auto_compact.done",
1083 session_id = %session_id,
1084 messages_before = %messages_before,
1085 messages_after = %messages_after,
1086 "Auto-compaction complete"
1087 );
1088
1089 Ok(true)
1090 }
1091
1092 pub async fn get_llm_for_session(
1096 &self,
1097 session_id: &str,
1098 ) -> Result<Option<Arc<dyn LlmClient>>> {
1099 let session_lock = self.get_session(session_id).await?;
1100 let session = session_lock.read().await;
1101
1102 if let Some(client) = &session.llm_client {
1103 return Ok(Some(client.clone()));
1104 }
1105
1106 Ok(self.llm_client.read().await.clone())
1107 }
1108
1109 pub async fn configure(
1111 &self,
1112 session_id: &str,
1113 thinking: Option<bool>,
1114 budget: Option<usize>,
1115 model_config: Option<LlmConfig>,
1116 ) -> Result<()> {
1117 {
1118 let session_lock = self.get_session(session_id).await?;
1119 let mut session = session_lock.write().await;
1120
1121 if let Some(t) = thinking {
1122 session.thinking_enabled = t;
1123 }
1124 if let Some(b) = budget {
1125 session.thinking_budget = Some(b);
1126 }
1127 if let Some(ref config) = model_config {
1128 tracing::info!(
1129 "Configuring session {} with LLM: provider={}, model={}",
1130 session_id,
1131 config.provider,
1132 config.model
1133 );
1134 session.model_name = Some(config.model.clone());
1135 session.llm_client = Some(llm::create_client_with_config(config.clone()));
1136 }
1137 }
1138
1139 if let Some(config) = model_config {
1141 let llm_config_data = LlmConfigData {
1142 provider: config.provider,
1143 model: config.model,
1144 api_key: None, base_url: config.base_url,
1146 };
1147 let mut configs = self.llm_configs.write().await;
1148 configs.insert(session_id.to_string(), llm_config_data);
1149 }
1150
1151 self.persist_in_background(session_id, "configure");
1153
1154 Ok(())
1155 }
1156
1157 pub async fn session_count(&self) -> usize {
1159 let sessions = self.sessions.read().await;
1160 sessions.len()
1161 }
1162
1163 pub async fn store_health(&self) -> Vec<(String, Result<()>)> {
1165 let stores = self.stores.read().await;
1166 let mut results = Vec::new();
1167 for (_, store) in stores.iter() {
1168 let name = store.backend_name().to_string();
1169 let result = store.health_check().await;
1170 results.push((name, result));
1171 }
1172 results
1173 }
1174
1175 pub fn list_tools(&self) -> Vec<crate::llm::ToolDefinition> {
1177 self.tool_executor.definitions()
1178 }
1179
1180 pub async fn pause_session(&self, session_id: &str) -> Result<bool> {
1182 let paused = {
1183 let session_lock = self.get_session(session_id).await?;
1184 let mut session = session_lock.write().await;
1185 session.pause()
1186 };
1187
1188 if paused {
1189 self.persist_in_background(session_id, "pause");
1190 }
1191
1192 Ok(paused)
1193 }
1194
1195 pub async fn resume_session(&self, session_id: &str) -> Result<bool> {
1197 let resumed = {
1198 let session_lock = self.get_session(session_id).await?;
1199 let mut session = session_lock.write().await;
1200 session.resume()
1201 };
1202
1203 if resumed {
1204 self.persist_in_background(session_id, "resume");
1205 }
1206
1207 Ok(resumed)
1208 }
1209
1210 pub async fn cancel_operation(&self, session_id: &str) -> Result<bool> {
1214 let session_lock = self.get_session(session_id).await?;
1216 let cancelled_confirmations = {
1217 let session = session_lock.read().await;
1218 session.confirmation_manager.cancel_all().await
1219 };
1220
1221 if cancelled_confirmations > 0 {
1222 tracing::info!(
1223 "Cancelled {} pending confirmations for session {}",
1224 cancelled_confirmations,
1225 session_id
1226 );
1227 }
1228
1229 let abort_handle = {
1231 let mut ops = self.ongoing_operations.write().await;
1232 ops.remove(session_id)
1233 };
1234
1235 if let Some(handle) = abort_handle {
1236 handle.abort();
1237 tracing::info!("Cancelled ongoing operation for session {}", session_id);
1238 Ok(true)
1239 } else if cancelled_confirmations > 0 {
1240 Ok(true)
1242 } else {
1243 tracing::debug!("No ongoing operation to cancel for session {}", session_id);
1244 Ok(false)
1245 }
1246 }
1247
1248 pub async fn get_all_sessions(&self) -> Vec<Arc<RwLock<Session>>> {
1250 let sessions = self.sessions.read().await;
1251 sessions.values().cloned().collect()
1252 }
1253
1254 pub fn tool_executor(&self) -> &Arc<ToolExecutor> {
1256 &self.tool_executor
1257 }
1258
1259 pub async fn confirm_tool(
1261 &self,
1262 session_id: &str,
1263 tool_id: &str,
1264 approved: bool,
1265 reason: Option<String>,
1266 ) -> Result<bool> {
1267 let session_lock = self.get_session(session_id).await?;
1268 let session = session_lock.read().await;
1269 session
1270 .confirmation_manager
1271 .confirm(tool_id, approved, reason)
1272 .await
1273 .map_err(|e| anyhow::anyhow!(e))
1274 }
1275
1276 pub async fn set_confirmation_policy(
1278 &self,
1279 session_id: &str,
1280 policy: ConfirmationPolicy,
1281 ) -> Result<ConfirmationPolicy> {
1282 {
1283 let session_lock = self.get_session(session_id).await?;
1284 let session = session_lock.read().await;
1285 session.set_confirmation_policy(policy.clone()).await;
1286 }
1287
1288 {
1290 let session_lock = self.get_session(session_id).await?;
1291 let mut session = session_lock.write().await;
1292 session.config.confirmation_policy = Some(policy.clone());
1293 }
1294
1295 self.persist_in_background(session_id, "set_confirmation_policy");
1297
1298 Ok(policy)
1299 }
1300
1301 pub async fn get_confirmation_policy(&self, session_id: &str) -> Result<ConfirmationPolicy> {
1303 let session_lock = self.get_session(session_id).await?;
1304 let session = session_lock.read().await;
1305 Ok(session.confirmation_policy().await)
1306 }
1307}