1use super::{ContextUsage, Session, SessionConfig, SessionState};
4use crate::agent::{AgentConfig, AgentEvent, AgentLoop, AgentResult};
5use crate::hitl::ConfirmationPolicy;
6use crate::llm::{self, LlmClient, LlmConfig, Message};
7use crate::mcp::McpManager;
8use crate::memory::AgentMemory;
9use crate::prompts::SystemPromptSlots;
10use crate::skills::SkillRegistry;
11use crate::store::{FileSessionStore, LlmConfigData, SessionData, SessionStore};
12use crate::tools::ToolExecutor;
13use a3s_memory::MemoryStore;
14use anyhow::{Context, Result};
15use std::collections::HashMap;
16use std::sync::Arc;
17use tokio::sync::{mpsc, RwLock};
18
19#[derive(Clone)]
21pub struct SessionManager {
22 pub(crate) sessions: Arc<RwLock<HashMap<String, Arc<RwLock<Session>>>>>,
23 pub(crate) llm_client: Arc<RwLock<Option<Arc<dyn LlmClient>>>>,
26 pub(crate) tool_executor: Arc<ToolExecutor>,
27 pub(crate) stores: Arc<RwLock<HashMap<crate::config::StorageBackend, Arc<dyn SessionStore>>>>,
29 pub(crate) session_storage_types: Arc<RwLock<HashMap<String, crate::config::StorageBackend>>>,
31 pub(crate) llm_configs: Arc<RwLock<HashMap<String, LlmConfigData>>>,
33 pub(crate) ongoing_operations: Arc<RwLock<HashMap<String, tokio::task::AbortHandle>>>,
35 pub(crate) skill_registry: Arc<RwLock<Option<Arc<SkillRegistry>>>>,
37 pub(crate) memory_store: Arc<RwLock<Option<Arc<dyn MemoryStore>>>>,
41 pub(crate) mcp_manager: Arc<RwLock<Option<Arc<McpManager>>>>,
44}
45
46impl SessionManager {
47 pub fn new(llm_client: Option<Arc<dyn LlmClient>>, tool_executor: Arc<ToolExecutor>) -> Self {
49 Self {
50 sessions: Arc::new(RwLock::new(HashMap::new())),
51 llm_client: Arc::new(RwLock::new(llm_client)),
52 tool_executor,
53 stores: Arc::new(RwLock::new(HashMap::new())),
54 session_storage_types: Arc::new(RwLock::new(HashMap::new())),
55 llm_configs: Arc::new(RwLock::new(HashMap::new())),
56 ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
57 skill_registry: Arc::new(RwLock::new(None)),
58 memory_store: Arc::new(RwLock::new(None)),
59 mcp_manager: Arc::new(RwLock::new(None)),
60 }
61 }
62
63 pub async fn with_persistence<P: AsRef<std::path::Path>>(
67 llm_client: Option<Arc<dyn LlmClient>>,
68 tool_executor: Arc<ToolExecutor>,
69 sessions_dir: P,
70 ) -> Result<Self> {
71 let store = FileSessionStore::new(sessions_dir).await?;
72 let mut stores = HashMap::new();
73 stores.insert(
74 crate::config::StorageBackend::File,
75 Arc::new(store) as Arc<dyn SessionStore>,
76 );
77
78 let manager = Self {
79 sessions: Arc::new(RwLock::new(HashMap::new())),
80 llm_client: Arc::new(RwLock::new(llm_client)),
81 tool_executor,
82 stores: Arc::new(RwLock::new(stores)),
83 session_storage_types: Arc::new(RwLock::new(HashMap::new())),
84 llm_configs: Arc::new(RwLock::new(HashMap::new())),
85 ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
86 skill_registry: Arc::new(RwLock::new(None)),
87 memory_store: Arc::new(RwLock::new(None)),
88 mcp_manager: Arc::new(RwLock::new(None)),
89 };
90
91 Ok(manager)
92 }
93
94 pub fn with_store(
99 llm_client: Option<Arc<dyn LlmClient>>,
100 tool_executor: Arc<ToolExecutor>,
101 store: Arc<dyn SessionStore>,
102 backend: crate::config::StorageBackend,
103 ) -> Self {
104 let mut stores = HashMap::new();
105 stores.insert(backend, store);
106
107 Self {
108 sessions: Arc::new(RwLock::new(HashMap::new())),
109 llm_client: Arc::new(RwLock::new(llm_client)),
110 tool_executor,
111 stores: Arc::new(RwLock::new(stores)),
112 session_storage_types: Arc::new(RwLock::new(HashMap::new())),
113 llm_configs: Arc::new(RwLock::new(HashMap::new())),
114 ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
115 skill_registry: Arc::new(RwLock::new(None)),
116 memory_store: Arc::new(RwLock::new(None)),
117 mcp_manager: Arc::new(RwLock::new(None)),
118 }
119 }
120
121 pub async fn set_default_llm(&self, client: Option<Arc<dyn LlmClient>>) {
126 *self.llm_client.write().await = client;
127 }
128
129 pub async fn set_skill_registry(
136 &self,
137 registry: Arc<SkillRegistry>,
138 skills_dir: std::path::PathBuf,
139 ) {
140 let manage_tool = crate::skills::ManageSkillTool::new(registry.clone(), skills_dir);
142 self.tool_executor
143 .register_dynamic_tool(Arc::new(manage_tool));
144
145 *self.skill_registry.write().await = Some(registry);
146 }
147
148 pub async fn skill_registry(&self) -> Option<Arc<SkillRegistry>> {
150 self.skill_registry.read().await.clone()
151 }
152
153 pub async fn set_memory_store(&self, store: Arc<dyn MemoryStore>) {
160 *self.memory_store.write().await = Some(store);
161 }
162
163 pub async fn memory_store(&self) -> Option<Arc<dyn MemoryStore>> {
165 self.memory_store.read().await.clone()
166 }
167
168 pub async fn set_mcp_manager(&self, manager: Arc<McpManager>) {
173 let all_tools = manager.get_all_tools().await;
174 let mut by_server: HashMap<String, Vec<crate::mcp::McpTool>> = HashMap::new();
175 for (server, tool) in all_tools {
176 by_server.entry(server).or_default().push(tool);
177 }
178 for (server_name, tools) in by_server {
179 for tool in
180 crate::mcp::tools::create_mcp_tools(&server_name, tools, Arc::clone(&manager))
181 {
182 self.tool_executor.register_dynamic_tool(tool);
183 }
184 }
185 *self.mcp_manager.write().await = Some(manager);
186 }
187
188 pub async fn add_mcp_server(&self, config: crate::mcp::McpServerConfig) -> Result<()> {
193 let manager = {
194 let guard = self.mcp_manager.read().await;
195 match guard.clone() {
196 Some(m) => m,
197 None => {
198 drop(guard);
199 let m = Arc::new(McpManager::new());
200 *self.mcp_manager.write().await = Some(Arc::clone(&m));
201 m
202 }
203 }
204 };
205 let name = config.name.clone();
206 manager.register_server(config).await;
207 manager.connect(&name).await?;
208 let tools = manager.get_server_tools(&name).await;
209 for tool in crate::mcp::tools::create_mcp_tools(&name, tools, Arc::clone(&manager)) {
210 self.tool_executor.register_dynamic_tool(tool);
211 }
212 Ok(())
213 }
214
215 pub async fn remove_mcp_server(&self, name: &str) -> Result<()> {
219 let guard = self.mcp_manager.read().await;
220 if let Some(ref manager) = *guard {
221 manager.disconnect(name).await?;
222 }
223 self.tool_executor
224 .unregister_tools_by_prefix(&format!("mcp__{name}__"));
225 Ok(())
226 }
227
228 pub async fn mcp_status(
230 &self,
231 ) -> std::collections::HashMap<String, crate::mcp::McpServerStatus> {
232 let guard = self.mcp_manager.read().await;
233 match guard.as_ref() {
234 Some(m) => {
235 let m = Arc::clone(m);
236 drop(guard);
237 m.get_status().await
238 }
239 None => std::collections::HashMap::new(),
240 }
241 }
242
243 pub async fn restore_session_by_id(&self, session_id: &str) -> Result<()> {
248 {
250 let sessions = self.sessions.read().await;
251 if sessions.contains_key(session_id) {
252 return Ok(());
253 }
254 }
255
256 let stores = self.stores.read().await;
257 for (backend, store) in stores.iter() {
258 match store.load(session_id).await {
259 Ok(Some(data)) => {
260 {
261 let mut storage_types = self.session_storage_types.write().await;
262 storage_types.insert(data.id.clone(), backend.clone());
263 }
264 self.restore_session(data).await?;
265 return Ok(());
266 }
267 Ok(None) => continue,
268 Err(e) => {
269 tracing::warn!(
270 "Failed to load session {} from {:?}: {}",
271 session_id,
272 backend,
273 e
274 );
275 continue;
276 }
277 }
278 }
279
280 Err(anyhow::anyhow!(
281 "Session {} not found in any store",
282 session_id
283 ))
284 }
285
286 pub async fn load_all_sessions(&mut self) -> Result<usize> {
288 let stores = self.stores.read().await;
289 let mut loaded = 0;
290
291 for (backend, store) in stores.iter() {
292 let session_ids = match store.list().await {
293 Ok(ids) => ids,
294 Err(e) => {
295 tracing::warn!("Failed to list sessions from {:?} store: {}", backend, e);
296 continue;
297 }
298 };
299
300 for id in session_ids {
301 match store.load(&id).await {
302 Ok(Some(data)) => {
303 {
305 let mut storage_types = self.session_storage_types.write().await;
306 storage_types.insert(data.id.clone(), backend.clone());
307 }
308
309 if let Err(e) = self.restore_session(data).await {
310 tracing::warn!("Failed to restore session {}: {}", id, e);
311 } else {
312 loaded += 1;
313 }
314 }
315 Ok(None) => {
316 tracing::warn!("Session {} not found in store", id);
317 }
318 Err(e) => {
319 tracing::warn!("Failed to load session {}: {}", id, e);
320 }
321 }
322 }
323 }
324
325 tracing::info!("Loaded {} sessions from store", loaded);
326 Ok(loaded)
327 }
328
329 async fn restore_session(&self, data: SessionData) -> Result<()> {
331 let tools = self.tool_executor.definitions();
332 let mut session = Session::new(data.id.clone(), data.config.clone(), tools).await?;
333
334 session.restore_from_data(&data);
336
337 if let Some(llm_config) = &data.llm_config {
339 let mut configs = self.llm_configs.write().await;
340 configs.insert(data.id.clone(), llm_config.clone());
341 }
342
343 let mut sessions = self.sessions.write().await;
344 sessions.insert(data.id.clone(), Arc::new(RwLock::new(session)));
345
346 tracing::info!("Restored session: {}", data.id);
347 Ok(())
348 }
349
350 async fn save_session(&self, session_id: &str) -> Result<()> {
352 let storage_type = {
354 let storage_types = self.session_storage_types.read().await;
355 storage_types.get(session_id).cloned()
356 };
357
358 let Some(storage_type) = storage_type else {
359 return Ok(());
361 };
362
363 if storage_type == crate::config::StorageBackend::Memory {
365 return Ok(());
366 }
367
368 let stores = self.stores.read().await;
370 let Some(store) = stores.get(&storage_type) else {
371 tracing::warn!("No store available for storage type: {:?}", storage_type);
372 return Ok(());
373 };
374
375 let session_lock = self.get_session(session_id).await?;
376 let session = session_lock.read().await;
377
378 let llm_config = {
380 let configs = self.llm_configs.read().await;
381 configs.get(session_id).cloned()
382 };
383
384 let data = session.to_session_data(llm_config);
385 store.save(&data).await?;
386
387 tracing::debug!("Saved session: {}", session_id);
388 Ok(())
389 }
390
391 async fn persist_or_warn(&self, session_id: &str, operation: &str) {
395 if let Err(e) = self.save_session(session_id).await {
396 tracing::warn!(
397 "Failed to persist session {} after {}: {}",
398 session_id,
399 operation,
400 e
401 );
402 if let Ok(session_lock) = self.get_session(session_id).await {
404 let session = session_lock.read().await;
405 let _ = session.event_tx().send(AgentEvent::PersistenceFailed {
406 session_id: session_id.to_string(),
407 operation: operation.to_string(),
408 error: e.to_string(),
409 });
410 }
411 }
412 }
413
414 fn persist_in_background(&self, session_id: &str, operation: &str) {
419 let mgr = self.clone();
420 let sid = session_id.to_string();
421 let op = operation.to_string();
422 tokio::spawn(async move {
423 mgr.persist_or_warn(&sid, &op).await;
424 });
425 }
426
427 pub async fn create_session(&self, id: String, config: SessionConfig) -> Result<String> {
429 tracing::info!(name: "a3s.session.create", session_id = %id, "Creating session");
430
431 {
433 let mut storage_types = self.session_storage_types.write().await;
434 storage_types.insert(id.clone(), config.storage_type.clone());
435 }
436
437 let tools = self.tool_executor.definitions();
439 let mut session = Session::new(id.clone(), config, tools).await?;
440
441 session.start_queue().await?;
443
444 if session.config.max_context_length > 0 {
446 session.context_usage.max_tokens = session.config.max_context_length as usize;
447 }
448
449 {
450 let mut sessions = self.sessions.write().await;
451 sessions.insert(id.clone(), Arc::new(RwLock::new(session)));
452 }
453
454 self.persist_in_background(&id, "create");
456
457 tracing::info!("Created session: {}", id);
458 Ok(id)
459 }
460
461 pub async fn destroy_session(&self, id: &str) -> Result<()> {
463 tracing::info!(name: "a3s.session.destroy", session_id = %id, "Destroying session");
464
465 let storage_type = {
467 let storage_types = self.session_storage_types.read().await;
468 storage_types.get(id).cloned()
469 };
470
471 {
472 let mut sessions = self.sessions.write().await;
473 sessions.remove(id);
474 }
475
476 {
478 let mut configs = self.llm_configs.write().await;
479 configs.remove(id);
480 }
481
482 {
484 let mut storage_types = self.session_storage_types.write().await;
485 storage_types.remove(id);
486 }
487
488 if let Some(storage_type) = storage_type {
490 if storage_type != crate::config::StorageBackend::Memory {
491 let stores = self.stores.read().await;
492 if let Some(store) = stores.get(&storage_type) {
493 if let Err(e) = store.delete(id).await {
494 tracing::warn!("Failed to delete session {} from store: {}", id, e);
495 }
496 }
497 }
498 }
499
500 tracing::info!("Destroyed session: {}", id);
501 Ok(())
502 }
503
504 pub async fn get_session(&self, id: &str) -> Result<Arc<RwLock<Session>>> {
506 let sessions = self.sessions.read().await;
507 sessions
508 .get(id)
509 .cloned()
510 .context(format!("Session not found: {}", id))
511 }
512
513 pub async fn create_child_session(
518 &self,
519 parent_id: &str,
520 child_id: String,
521 mut config: SessionConfig,
522 ) -> Result<String> {
523 let parent_lock = self.get_session(parent_id).await?;
525 let parent_llm_client = {
526 let parent = parent_lock.read().await;
527
528 if config.confirmation_policy.is_none() {
530 let parent_policy = parent.confirmation_manager.policy().await;
531 config.confirmation_policy = Some(parent_policy);
532 }
533
534 parent.llm_client.clone()
535 };
536
537 config.parent_id = Some(parent_id.to_string());
539
540 let tools = self.tool_executor.definitions();
542 let mut session = Session::new(child_id.clone(), config, tools).await?;
543
544 if session.llm_client.is_none() {
546 let default_llm = self.llm_client.read().await.clone();
547 session.llm_client = parent_llm_client.or(default_llm);
548 }
549
550 session.start_queue().await?;
552
553 if session.config.max_context_length > 0 {
555 session.context_usage.max_tokens = session.config.max_context_length as usize;
556 }
557
558 {
559 let mut sessions = self.sessions.write().await;
560 sessions.insert(child_id.clone(), Arc::new(RwLock::new(session)));
561 }
562
563 self.persist_in_background(&child_id, "create_child");
565
566 tracing::info!(
567 "Created child session: {} (parent: {})",
568 child_id,
569 parent_id
570 );
571 Ok(child_id)
572 }
573
574 pub async fn get_child_sessions(&self, parent_id: &str) -> Vec<String> {
576 let sessions = self.sessions.read().await;
577 let mut children = Vec::new();
578
579 for (id, session_lock) in sessions.iter() {
580 let session = session_lock.read().await;
581 if session.parent_id.as_deref() == Some(parent_id) {
582 children.push(id.clone());
583 }
584 }
585
586 children
587 }
588
589 pub async fn is_child_session(&self, session_id: &str) -> Result<bool> {
591 let session_lock = self.get_session(session_id).await?;
592 let session = session_lock.read().await;
593 Ok(session.is_child_session())
594 }
595
596 pub async fn generate(&self, session_id: &str, prompt: &str) -> Result<AgentResult> {
598 let session_lock = self.get_session(session_id).await?;
599
600 {
602 let session = session_lock.read().await;
603 if session.state == SessionState::Paused {
604 anyhow::bail!(
605 "Session {} is paused. Call Resume before generating.",
606 session_id
607 );
608 }
609 }
610
611 let (
613 history,
614 system,
615 tools,
616 session_llm_client,
617 permission_checker,
618 confirmation_manager,
619 context_providers,
620 session_workspace,
621 tool_metrics,
622 hook_engine,
623 planning_enabled,
624 goal_tracking,
625 ) = {
626 let session = session_lock.read().await;
627 (
628 session.messages.clone(),
629 session.system().map(String::from),
630 session.tools.clone(),
631 session.llm_client.clone(),
632 session.permission_checker.clone(),
633 session.confirmation_manager.clone(),
634 session.context_providers.clone(),
635 session.config.workspace.clone(),
636 session.tool_metrics.clone(),
637 session.config.hook_engine.clone(),
638 session.config.planning_enabled,
639 session.config.goal_tracking,
640 )
641 };
642
643 let llm_client = if let Some(client) = session_llm_client {
645 client
646 } else if let Some(client) = self.llm_client.read().await.clone() {
647 client
648 } else {
649 anyhow::bail!(
650 "LLM client not configured for session {}. Please call Configure RPC with model configuration first.",
651 session_id
652 );
653 };
654
655 let tool_context = if session_workspace.is_empty() {
657 crate::tools::ToolContext::new(self.tool_executor.workspace().clone())
658 .with_session_id(session_id)
659 } else {
660 crate::tools::ToolContext::new(std::path::PathBuf::from(&session_workspace))
661 .with_session_id(session_id)
662 };
663
664 let skill_registry = self.skill_registry.read().await.clone();
666 let system = if let Some(ref registry) = skill_registry {
667 let skill_prompt = registry.to_system_prompt();
668 if skill_prompt.is_empty() {
669 system
670 } else {
671 match system {
672 Some(existing) => Some(format!("{}\n\n{}", existing, skill_prompt)),
673 None => Some(skill_prompt),
674 }
675 }
676 } else {
677 system
678 };
679
680 let effective_prompt = if let Some(ref registry) = skill_registry {
682 let skill_content = registry.match_skills(prompt);
683 if skill_content.is_empty() {
684 prompt.to_string()
685 } else {
686 format!("{}\n\n---\n\n{}", skill_content, prompt)
687 }
688 } else {
689 prompt.to_string()
690 };
691
692 let memory = self
694 .memory_store
695 .read()
696 .await
697 .as_ref()
698 .map(|store| Arc::new(AgentMemory::new(store.clone())));
699
700 let config = AgentConfig {
702 prompt_slots: match system {
703 Some(s) => SystemPromptSlots::from_legacy(s),
704 None => SystemPromptSlots::default(),
705 },
706 tools,
707 max_tool_rounds: 50,
708 permission_checker: Some(permission_checker),
709 confirmation_manager: Some(confirmation_manager),
710 context_providers,
711 planning_enabled,
712 goal_tracking,
713 hook_engine,
714 skill_registry,
715 memory,
716 ..AgentConfig::default()
717 };
718
719 let agent = AgentLoop::new(llm_client, self.tool_executor.clone(), tool_context, config)
720 .with_tool_metrics(tool_metrics);
721
722 let result = agent
724 .execute_with_session(&history, &effective_prompt, Some(session_id), None)
725 .await?;
726
727 {
729 let mut session = session_lock.write().await;
730 session.messages = result.messages.clone();
731 session.update_usage(&result.usage);
732 }
733
734 self.persist_in_background(session_id, "generate");
736
737 if let Err(e) = self.maybe_auto_compact(session_id).await {
739 tracing::warn!("Auto-compact failed for session {}: {}", session_id, e);
740 }
741
742 Ok(result)
743 }
744
745 pub async fn generate_streaming(
747 &self,
748 session_id: &str,
749 prompt: &str,
750 ) -> Result<(
751 mpsc::Receiver<AgentEvent>,
752 tokio::task::JoinHandle<Result<AgentResult>>,
753 )> {
754 let session_lock = self.get_session(session_id).await?;
755
756 {
758 let session = session_lock.read().await;
759 if session.state == SessionState::Paused {
760 anyhow::bail!(
761 "Session {} is paused. Call Resume before generating.",
762 session_id
763 );
764 }
765 }
766
767 let (
769 history,
770 system,
771 tools,
772 session_llm_client,
773 permission_checker,
774 confirmation_manager,
775 context_providers,
776 session_workspace,
777 tool_metrics,
778 hook_engine,
779 planning_enabled,
780 goal_tracking,
781 ) = {
782 let session = session_lock.read().await;
783 (
784 session.messages.clone(),
785 session.system().map(String::from),
786 session.tools.clone(),
787 session.llm_client.clone(),
788 session.permission_checker.clone(),
789 session.confirmation_manager.clone(),
790 session.context_providers.clone(),
791 session.config.workspace.clone(),
792 session.tool_metrics.clone(),
793 session.config.hook_engine.clone(),
794 session.config.planning_enabled,
795 session.config.goal_tracking,
796 )
797 };
798
799 let llm_client = if let Some(client) = session_llm_client {
801 client
802 } else if let Some(client) = self.llm_client.read().await.clone() {
803 client
804 } else {
805 anyhow::bail!(
806 "LLM client not configured for session {}. Please call Configure RPC with model configuration first.",
807 session_id
808 );
809 };
810
811 let tool_context = if session_workspace.is_empty() {
813 crate::tools::ToolContext::new(self.tool_executor.workspace().clone())
814 .with_session_id(session_id)
815 } else {
816 crate::tools::ToolContext::new(std::path::PathBuf::from(&session_workspace))
817 .with_session_id(session_id)
818 };
819
820 let skill_registry = self.skill_registry.read().await.clone();
822 let system = if let Some(ref registry) = skill_registry {
823 let skill_prompt = registry.to_system_prompt();
824 if skill_prompt.is_empty() {
825 system
826 } else {
827 match system {
828 Some(existing) => Some(format!("{}\n\n{}", existing, skill_prompt)),
829 None => Some(skill_prompt),
830 }
831 }
832 } else {
833 system
834 };
835
836 let effective_prompt = if let Some(ref registry) = skill_registry {
838 let skill_content = registry.match_skills(prompt);
839 if skill_content.is_empty() {
840 prompt.to_string()
841 } else {
842 format!("{}\n\n---\n\n{}", skill_content, prompt)
843 }
844 } else {
845 prompt.to_string()
846 };
847
848 let memory = self
850 .memory_store
851 .read()
852 .await
853 .as_ref()
854 .map(|store| Arc::new(AgentMemory::new(store.clone())));
855
856 let config = AgentConfig {
858 prompt_slots: match system {
859 Some(s) => SystemPromptSlots::from_legacy(s),
860 None => SystemPromptSlots::default(),
861 },
862 tools,
863 max_tool_rounds: 50,
864 permission_checker: Some(permission_checker),
865 confirmation_manager: Some(confirmation_manager),
866 context_providers,
867 planning_enabled,
868 goal_tracking,
869 hook_engine,
870 skill_registry,
871 memory,
872 ..AgentConfig::default()
873 };
874
875 let agent = AgentLoop::new(llm_client, self.tool_executor.clone(), tool_context, config)
876 .with_tool_metrics(tool_metrics);
877
878 let (rx, handle) = agent.execute_streaming(&history, &effective_prompt).await?;
880
881 let abort_handle = handle.abort_handle();
883 {
884 let mut ops = self.ongoing_operations.write().await;
885 ops.insert(session_id.to_string(), abort_handle);
886 }
887
888 let session_lock_clone = session_lock.clone();
890 let original_handle = handle;
891 let stores = self.stores.clone();
892 let session_storage_types = self.session_storage_types.clone();
893 let llm_configs = self.llm_configs.clone();
894 let session_id_owned = session_id.to_string();
895 let ongoing_operations = self.ongoing_operations.clone();
896 let session_manager = self.clone();
897
898 let wrapped_handle = tokio::spawn(async move {
899 let result = original_handle.await??;
900
901 {
903 let mut ops = ongoing_operations.write().await;
904 ops.remove(&session_id_owned);
905 }
906
907 {
909 let mut session = session_lock_clone.write().await;
910 session.messages = result.messages.clone();
911 session.update_usage(&result.usage);
912 }
913
914 let storage_type = {
916 let storage_types = session_storage_types.read().await;
917 storage_types.get(&session_id_owned).cloned()
918 };
919
920 if let Some(storage_type) = storage_type {
921 if storage_type != crate::config::StorageBackend::Memory {
922 let stores_guard = stores.read().await;
923 if let Some(store) = stores_guard.get(&storage_type) {
924 let session = session_lock_clone.read().await;
925 let llm_config = {
926 let configs = llm_configs.read().await;
927 configs.get(&session_id_owned).cloned()
928 };
929 let data = session.to_session_data(llm_config);
930 if let Err(e) = store.save(&data).await {
931 tracing::warn!(
932 "Failed to persist session {} after streaming: {}",
933 session_id_owned,
934 e
935 );
936 }
937 }
938 }
939 }
940
941 if let Err(e) = session_manager.maybe_auto_compact(&session_id_owned).await {
943 tracing::warn!(
944 "Auto-compact failed for session {}: {}",
945 session_id_owned,
946 e
947 );
948 }
949
950 Ok(result)
951 });
952
953 Ok((rx, wrapped_handle))
954 }
955
956 pub async fn context_usage(&self, session_id: &str) -> Result<ContextUsage> {
958 let session_lock = self.get_session(session_id).await?;
959 let session = session_lock.read().await;
960 Ok(session.context_usage.clone())
961 }
962
963 pub async fn history(&self, session_id: &str) -> Result<Vec<Message>> {
965 let session_lock = self.get_session(session_id).await?;
966 let session = session_lock.read().await;
967 Ok(session.messages.clone())
968 }
969
970 pub async fn clear(&self, session_id: &str) -> Result<()> {
972 {
973 let session_lock = self.get_session(session_id).await?;
974 let mut session = session_lock.write().await;
975 session.clear();
976 }
977
978 self.persist_in_background(session_id, "clear");
980
981 Ok(())
982 }
983
984 pub async fn compact(&self, session_id: &str) -> Result<()> {
986 tracing::info!(name: "a3s.session.compact", session_id = %session_id, "Compacting session context");
987
988 {
989 let session_lock = self.get_session(session_id).await?;
990 let mut session = session_lock.write().await;
991
992 let llm_client = if let Some(client) = &session.llm_client {
994 client.clone()
995 } else if let Some(client) = self.llm_client.read().await.clone() {
996 client
997 } else {
998 tracing::warn!("No LLM client configured for compaction, using simple truncation");
1000 let keep_messages = 20;
1001 if session.messages.len() > keep_messages {
1002 let len = session.messages.len();
1003 session.messages = session.messages.split_off(len - keep_messages);
1004 }
1005 drop(session);
1007 self.persist_in_background(session_id, "compact");
1008 return Ok(());
1009 };
1010
1011 session.compact(&llm_client).await?;
1012 }
1013
1014 self.persist_in_background(session_id, "compact");
1016
1017 Ok(())
1018 }
1019
1020 pub async fn maybe_auto_compact(&self, session_id: &str) -> Result<bool> {
1027 let (should_compact, percent_before, messages_before) = {
1028 let session_lock = self.get_session(session_id).await?;
1029 let session = session_lock.read().await;
1030
1031 if !session.config.auto_compact {
1032 return Ok(false);
1033 }
1034
1035 let threshold = session.config.auto_compact_threshold;
1036 let percent = session.context_usage.percent;
1037 let msg_count = session.messages.len();
1038
1039 tracing::debug!(
1040 "Auto-compact check for session {}: percent={:.2}%, threshold={:.2}%, messages={}",
1041 session_id,
1042 percent * 100.0,
1043 threshold * 100.0,
1044 msg_count,
1045 );
1046
1047 (percent >= threshold, percent, msg_count)
1048 };
1049
1050 if !should_compact {
1051 return Ok(false);
1052 }
1053
1054 tracing::info!(
1055 name: "a3s.session.auto_compact",
1056 session_id = %session_id,
1057 percent_before = %format!("{:.1}%", percent_before * 100.0),
1058 messages_before = %messages_before,
1059 "Auto-compacting session due to high context usage"
1060 );
1061
1062 self.compact(session_id).await?;
1064
1065 let messages_after = {
1067 let session_lock = self.get_session(session_id).await?;
1068 let session = session_lock.read().await;
1069 session.messages.len()
1070 };
1071
1072 let event = AgentEvent::ContextCompacted {
1074 session_id: session_id.to_string(),
1075 before_messages: messages_before,
1076 after_messages: messages_after,
1077 percent_before,
1078 };
1079
1080 if let Ok(session_lock) = self.get_session(session_id).await {
1082 let session = session_lock.read().await;
1083 let _ = session.event_tx.send(event);
1084 }
1085
1086 tracing::info!(
1087 name: "a3s.session.auto_compact.done",
1088 session_id = %session_id,
1089 messages_before = %messages_before,
1090 messages_after = %messages_after,
1091 "Auto-compaction complete"
1092 );
1093
1094 Ok(true)
1095 }
1096
1097 pub async fn get_llm_for_session(
1101 &self,
1102 session_id: &str,
1103 ) -> Result<Option<Arc<dyn LlmClient>>> {
1104 let session_lock = self.get_session(session_id).await?;
1105 let session = session_lock.read().await;
1106
1107 if let Some(client) = &session.llm_client {
1108 return Ok(Some(client.clone()));
1109 }
1110
1111 Ok(self.llm_client.read().await.clone())
1112 }
1113
1114 pub async fn configure(
1116 &self,
1117 session_id: &str,
1118 thinking: Option<bool>,
1119 budget: Option<usize>,
1120 model_config: Option<LlmConfig>,
1121 ) -> Result<()> {
1122 {
1123 let session_lock = self.get_session(session_id).await?;
1124 let mut session = session_lock.write().await;
1125
1126 if let Some(t) = thinking {
1127 session.thinking_enabled = t;
1128 }
1129 if let Some(b) = budget {
1130 session.thinking_budget = Some(b);
1131 }
1132 if let Some(ref config) = model_config {
1133 tracing::info!(
1134 "Configuring session {} with LLM: provider={}, model={}",
1135 session_id,
1136 config.provider,
1137 config.model
1138 );
1139 session.model_name = Some(config.model.clone());
1140 session.llm_client = Some(llm::create_client_with_config(config.clone()));
1141 }
1142 }
1143
1144 if let Some(config) = model_config {
1146 let llm_config_data = LlmConfigData {
1147 provider: config.provider,
1148 model: config.model,
1149 api_key: None, base_url: config.base_url,
1151 };
1152 let mut configs = self.llm_configs.write().await;
1153 configs.insert(session_id.to_string(), llm_config_data);
1154 }
1155
1156 self.persist_in_background(session_id, "configure");
1158
1159 Ok(())
1160 }
1161
1162 pub async fn session_count(&self) -> usize {
1164 let sessions = self.sessions.read().await;
1165 sessions.len()
1166 }
1167
1168 pub async fn store_health(&self) -> Vec<(String, Result<()>)> {
1170 let stores = self.stores.read().await;
1171 let mut results = Vec::new();
1172 for (_, store) in stores.iter() {
1173 let name = store.backend_name().to_string();
1174 let result = store.health_check().await;
1175 results.push((name, result));
1176 }
1177 results
1178 }
1179
1180 pub fn list_tools(&self) -> Vec<crate::llm::ToolDefinition> {
1182 self.tool_executor.definitions()
1183 }
1184
1185 pub async fn pause_session(&self, session_id: &str) -> Result<bool> {
1187 let paused = {
1188 let session_lock = self.get_session(session_id).await?;
1189 let mut session = session_lock.write().await;
1190 session.pause()
1191 };
1192
1193 if paused {
1194 self.persist_in_background(session_id, "pause");
1195 }
1196
1197 Ok(paused)
1198 }
1199
1200 pub async fn resume_session(&self, session_id: &str) -> Result<bool> {
1202 let resumed = {
1203 let session_lock = self.get_session(session_id).await?;
1204 let mut session = session_lock.write().await;
1205 session.resume()
1206 };
1207
1208 if resumed {
1209 self.persist_in_background(session_id, "resume");
1210 }
1211
1212 Ok(resumed)
1213 }
1214
1215 pub async fn cancel_operation(&self, session_id: &str) -> Result<bool> {
1219 let session_lock = self.get_session(session_id).await?;
1221 let cancelled_confirmations = {
1222 let session = session_lock.read().await;
1223 session.confirmation_manager.cancel_all().await
1224 };
1225
1226 if cancelled_confirmations > 0 {
1227 tracing::info!(
1228 "Cancelled {} pending confirmations for session {}",
1229 cancelled_confirmations,
1230 session_id
1231 );
1232 }
1233
1234 let abort_handle = {
1236 let mut ops = self.ongoing_operations.write().await;
1237 ops.remove(session_id)
1238 };
1239
1240 if let Some(handle) = abort_handle {
1241 handle.abort();
1242 tracing::info!("Cancelled ongoing operation for session {}", session_id);
1243 Ok(true)
1244 } else if cancelled_confirmations > 0 {
1245 Ok(true)
1247 } else {
1248 tracing::debug!("No ongoing operation to cancel for session {}", session_id);
1249 Ok(false)
1250 }
1251 }
1252
1253 pub async fn get_all_sessions(&self) -> Vec<Arc<RwLock<Session>>> {
1255 let sessions = self.sessions.read().await;
1256 sessions.values().cloned().collect()
1257 }
1258
1259 pub fn tool_executor(&self) -> &Arc<ToolExecutor> {
1261 &self.tool_executor
1262 }
1263
1264 pub async fn confirm_tool(
1266 &self,
1267 session_id: &str,
1268 tool_id: &str,
1269 approved: bool,
1270 reason: Option<String>,
1271 ) -> Result<bool> {
1272 let session_lock = self.get_session(session_id).await?;
1273 let session = session_lock.read().await;
1274 session
1275 .confirmation_manager
1276 .confirm(tool_id, approved, reason)
1277 .await
1278 .map_err(|e| anyhow::anyhow!(e))
1279 }
1280
1281 pub async fn set_confirmation_policy(
1283 &self,
1284 session_id: &str,
1285 policy: ConfirmationPolicy,
1286 ) -> Result<ConfirmationPolicy> {
1287 {
1288 let session_lock = self.get_session(session_id).await?;
1289 let session = session_lock.read().await;
1290 session.set_confirmation_policy(policy.clone()).await;
1291 }
1292
1293 {
1295 let session_lock = self.get_session(session_id).await?;
1296 let mut session = session_lock.write().await;
1297 session.config.confirmation_policy = Some(policy.clone());
1298 }
1299
1300 self.persist_in_background(session_id, "set_confirmation_policy");
1302
1303 Ok(policy)
1304 }
1305
1306 pub async fn get_confirmation_policy(&self, session_id: &str) -> Result<ConfirmationPolicy> {
1308 let session_lock = self.get_session(session_id).await?;
1309 let session = session_lock.read().await;
1310 Ok(session.confirmation_policy().await)
1311 }
1312}