1use super::{ContextUsage, Session, SessionConfig, SessionState};
4use crate::agent::{AgentConfig, AgentEvent, AgentLoop, AgentResult};
5use crate::hitl::ConfirmationPolicy;
6use crate::llm::{self, LlmClient, LlmConfig, Message};
7use crate::mcp::McpManager;
8use crate::memory::AgentMemory;
9use crate::prompts::SystemPromptSlots;
10use crate::skills::SkillRegistry;
11use crate::store::{FileSessionStore, LlmConfigData, SessionData, SessionStore};
12use crate::tools::ToolExecutor;
13use a3s_memory::MemoryStore;
14use anyhow::{Context, Result};
15use std::collections::HashMap;
16use std::sync::Arc;
17use tokio::sync::{mpsc, RwLock};
18
19#[derive(Clone)]
21pub struct SessionManager {
22 pub(crate) sessions: Arc<RwLock<HashMap<String, Arc<RwLock<Session>>>>>,
23 pub(crate) llm_client: Arc<RwLock<Option<Arc<dyn LlmClient>>>>,
26 pub(crate) tool_executor: Arc<ToolExecutor>,
27 pub(crate) stores: Arc<RwLock<HashMap<crate::config::StorageBackend, Arc<dyn SessionStore>>>>,
29 pub(crate) session_storage_types: Arc<RwLock<HashMap<String, crate::config::StorageBackend>>>,
31 pub(crate) llm_configs: Arc<RwLock<HashMap<String, LlmConfigData>>>,
33 pub(crate) ongoing_operations:
35 Arc<RwLock<HashMap<String, tokio_util::sync::CancellationToken>>>,
36 pub(crate) skill_registry: Arc<RwLock<Option<Arc<SkillRegistry>>>>,
38 pub(crate) session_skill_registries: Arc<RwLock<HashMap<String, Arc<SkillRegistry>>>>,
40 pub(crate) memory_store: Arc<RwLock<Option<Arc<dyn MemoryStore>>>>,
44 pub(crate) mcp_manager: Arc<RwLock<Option<Arc<McpManager>>>>,
47}
48
49impl SessionManager {
50 pub fn new(llm_client: Option<Arc<dyn LlmClient>>, tool_executor: Arc<ToolExecutor>) -> Self {
52 Self {
53 sessions: Arc::new(RwLock::new(HashMap::new())),
54 llm_client: Arc::new(RwLock::new(llm_client)),
55 tool_executor,
56 stores: Arc::new(RwLock::new(HashMap::new())),
57 session_storage_types: Arc::new(RwLock::new(HashMap::new())),
58 llm_configs: Arc::new(RwLock::new(HashMap::new())),
59 ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
60 skill_registry: Arc::new(RwLock::new(None)),
61 session_skill_registries: Arc::new(RwLock::new(HashMap::new())),
62 memory_store: Arc::new(RwLock::new(None)),
63 mcp_manager: Arc::new(RwLock::new(None)),
64 }
65 }
66
67 pub async fn with_persistence<P: AsRef<std::path::Path>>(
71 llm_client: Option<Arc<dyn LlmClient>>,
72 tool_executor: Arc<ToolExecutor>,
73 sessions_dir: P,
74 ) -> Result<Self> {
75 let store = FileSessionStore::new(sessions_dir).await?;
76 let mut stores = HashMap::new();
77 stores.insert(
78 crate::config::StorageBackend::File,
79 Arc::new(store) as Arc<dyn SessionStore>,
80 );
81
82 let manager = Self {
83 sessions: Arc::new(RwLock::new(HashMap::new())),
84 llm_client: Arc::new(RwLock::new(llm_client)),
85 tool_executor,
86 stores: Arc::new(RwLock::new(stores)),
87 session_storage_types: Arc::new(RwLock::new(HashMap::new())),
88 llm_configs: Arc::new(RwLock::new(HashMap::new())),
89 ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
90 skill_registry: Arc::new(RwLock::new(None)),
91 session_skill_registries: Arc::new(RwLock::new(HashMap::new())),
92 memory_store: Arc::new(RwLock::new(None)),
93 mcp_manager: Arc::new(RwLock::new(None)),
94 };
95
96 Ok(manager)
97 }
98
99 pub fn with_store(
104 llm_client: Option<Arc<dyn LlmClient>>,
105 tool_executor: Arc<ToolExecutor>,
106 store: Arc<dyn SessionStore>,
107 backend: crate::config::StorageBackend,
108 ) -> Self {
109 let mut stores = HashMap::new();
110 stores.insert(backend, store);
111
112 Self {
113 sessions: Arc::new(RwLock::new(HashMap::new())),
114 llm_client: Arc::new(RwLock::new(llm_client)),
115 tool_executor,
116 stores: Arc::new(RwLock::new(stores)),
117 session_storage_types: Arc::new(RwLock::new(HashMap::new())),
118 llm_configs: Arc::new(RwLock::new(HashMap::new())),
119 ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
120 skill_registry: Arc::new(RwLock::new(None)),
121 session_skill_registries: Arc::new(RwLock::new(HashMap::new())),
122 memory_store: Arc::new(RwLock::new(None)),
123 mcp_manager: Arc::new(RwLock::new(None)),
124 }
125 }
126
127 pub async fn set_default_llm(&self, client: Option<Arc<dyn LlmClient>>) {
132 *self.llm_client.write().await = client;
133 }
134
135 pub async fn set_skill_registry(
142 &self,
143 registry: Arc<SkillRegistry>,
144 skills_dir: std::path::PathBuf,
145 ) {
146 let manage_tool = crate::skills::ManageSkillTool::new(registry.clone(), skills_dir);
148 self.tool_executor
149 .register_dynamic_tool(Arc::new(manage_tool));
150
151 *self.skill_registry.write().await = Some(registry);
152 }
153
154 pub async fn skill_registry(&self) -> Option<Arc<SkillRegistry>> {
156 self.skill_registry.read().await.clone()
157 }
158
159 pub async fn set_session_skill_registry(
161 &self,
162 session_id: impl Into<String>,
163 registry: Arc<SkillRegistry>,
164 ) {
165 self.session_skill_registries
166 .write()
167 .await
168 .insert(session_id.into(), registry);
169 }
170
171 pub async fn session_skill_registry(&self, session_id: &str) -> Option<Arc<SkillRegistry>> {
173 self.session_skill_registries
174 .read()
175 .await
176 .get(session_id)
177 .cloned()
178 }
179
180 pub async fn set_memory_store(&self, store: Arc<dyn MemoryStore>) {
187 *self.memory_store.write().await = Some(store);
188 }
189
190 pub async fn memory_store(&self) -> Option<Arc<dyn MemoryStore>> {
192 self.memory_store.read().await.clone()
193 }
194
195 pub async fn set_mcp_manager(&self, manager: Arc<McpManager>) {
200 let all_tools = manager.get_all_tools().await;
201 let mut by_server: HashMap<String, Vec<crate::mcp::McpTool>> = HashMap::new();
202 for (server, tool) in all_tools {
203 by_server.entry(server).or_default().push(tool);
204 }
205 for (server_name, tools) in by_server {
206 for tool in
207 crate::mcp::tools::create_mcp_tools(&server_name, tools, Arc::clone(&manager))
208 {
209 self.tool_executor.register_dynamic_tool(tool);
210 }
211 }
212 *self.mcp_manager.write().await = Some(manager);
213 }
214
215 pub async fn add_mcp_server(&self, config: crate::mcp::McpServerConfig) -> Result<()> {
220 let manager = {
221 let guard = self.mcp_manager.read().await;
222 match guard.clone() {
223 Some(m) => m,
224 None => {
225 drop(guard);
226 let m = Arc::new(McpManager::new());
227 *self.mcp_manager.write().await = Some(Arc::clone(&m));
228 m
229 }
230 }
231 };
232 let name = config.name.clone();
233 manager.register_server(config).await;
234 manager.connect(&name).await?;
235 let tools = manager.get_server_tools(&name).await;
236 for tool in crate::mcp::tools::create_mcp_tools(&name, tools, Arc::clone(&manager)) {
237 self.tool_executor.register_dynamic_tool(tool);
238 }
239 Ok(())
240 }
241
242 pub async fn remove_mcp_server(&self, name: &str) -> Result<()> {
246 let guard = self.mcp_manager.read().await;
247 if let Some(ref manager) = *guard {
248 manager.disconnect(name).await?;
249 }
250 self.tool_executor
251 .unregister_tools_by_prefix(&format!("mcp__{name}__"));
252 Ok(())
253 }
254
255 pub async fn mcp_status(
257 &self,
258 ) -> std::collections::HashMap<String, crate::mcp::McpServerStatus> {
259 let guard = self.mcp_manager.read().await;
260 match guard.as_ref() {
261 Some(m) => {
262 let m = Arc::clone(m);
263 drop(guard);
264 m.get_status().await
265 }
266 None => std::collections::HashMap::new(),
267 }
268 }
269
270 pub async fn restore_session_by_id(&self, session_id: &str) -> Result<()> {
275 {
277 let sessions = self.sessions.read().await;
278 if sessions.contains_key(session_id) {
279 return Ok(());
280 }
281 }
282
283 let stores = self.stores.read().await;
284 for (backend, store) in stores.iter() {
285 match store.load(session_id).await {
286 Ok(Some(data)) => {
287 {
288 let mut storage_types = self.session_storage_types.write().await;
289 storage_types.insert(data.id.clone(), backend.clone());
290 }
291 self.restore_session(data).await?;
292 return Ok(());
293 }
294 Ok(None) => continue,
295 Err(e) => {
296 tracing::warn!(
297 "Failed to load session {} from {:?}: {}",
298 session_id,
299 backend,
300 e
301 );
302 continue;
303 }
304 }
305 }
306
307 Err(anyhow::anyhow!(
308 "Session {} not found in any store",
309 session_id
310 ))
311 }
312
313 pub async fn load_all_sessions(&mut self) -> Result<usize> {
315 let stores = self.stores.read().await;
316 let mut loaded = 0;
317
318 for (backend, store) in stores.iter() {
319 let session_ids = match store.list().await {
320 Ok(ids) => ids,
321 Err(e) => {
322 tracing::warn!("Failed to list sessions from {:?} store: {}", backend, e);
323 continue;
324 }
325 };
326
327 for id in session_ids {
328 match store.load(&id).await {
329 Ok(Some(data)) => {
330 {
332 let mut storage_types = self.session_storage_types.write().await;
333 storage_types.insert(data.id.clone(), backend.clone());
334 }
335
336 if let Err(e) = self.restore_session(data).await {
337 tracing::warn!("Failed to restore session {}: {}", id, e);
338 } else {
339 loaded += 1;
340 }
341 }
342 Ok(None) => {
343 tracing::warn!("Session {} not found in store", id);
344 }
345 Err(e) => {
346 tracing::warn!("Failed to load session {}: {}", id, e);
347 }
348 }
349 }
350 }
351
352 tracing::info!("Loaded {} sessions from store", loaded);
353 Ok(loaded)
354 }
355
356 async fn restore_session(&self, data: SessionData) -> Result<()> {
358 let tools = self.tool_executor.definitions();
359 let mut session = Session::new(data.id.clone(), data.config.clone(), tools).await?;
360
361 session.restore_from_data(&data);
363
364 if let Some(llm_config) = &data.llm_config {
366 let mut configs = self.llm_configs.write().await;
367 configs.insert(data.id.clone(), llm_config.clone());
368 }
369
370 let mut sessions = self.sessions.write().await;
371 sessions.insert(data.id.clone(), Arc::new(RwLock::new(session)));
372
373 tracing::info!("Restored session: {}", data.id);
374 Ok(())
375 }
376
377 async fn save_session(&self, session_id: &str) -> Result<()> {
379 let storage_type = {
381 let storage_types = self.session_storage_types.read().await;
382 storage_types.get(session_id).cloned()
383 };
384
385 let Some(storage_type) = storage_type else {
386 return Ok(());
388 };
389
390 if storage_type == crate::config::StorageBackend::Memory {
392 return Ok(());
393 }
394
395 let stores = self.stores.read().await;
397 let Some(store) = stores.get(&storage_type) else {
398 tracing::warn!("No store available for storage type: {:?}", storage_type);
399 return Ok(());
400 };
401
402 let session_lock = self.get_session(session_id).await?;
403 let session = session_lock.read().await;
404
405 let llm_config = {
407 let configs = self.llm_configs.read().await;
408 configs.get(session_id).cloned()
409 };
410
411 let data = session.to_session_data(llm_config);
412 store.save(&data).await?;
413
414 tracing::debug!("Saved session: {}", session_id);
415 Ok(())
416 }
417
418 async fn persist_or_warn(&self, session_id: &str, operation: &str) {
422 if let Err(e) = self.save_session(session_id).await {
423 tracing::warn!(
424 "Failed to persist session {} after {}: {}",
425 session_id,
426 operation,
427 e
428 );
429 if let Ok(session_lock) = self.get_session(session_id).await {
431 let session = session_lock.read().await;
432 let _ = session.event_tx().send(AgentEvent::PersistenceFailed {
433 session_id: session_id.to_string(),
434 operation: operation.to_string(),
435 error: e.to_string(),
436 });
437 }
438 }
439 }
440
441 fn persist_in_background(&self, session_id: &str, operation: &str) {
446 let mgr = self.clone();
447 let sid = session_id.to_string();
448 let op = operation.to_string();
449 tokio::spawn(async move {
450 mgr.persist_or_warn(&sid, &op).await;
451 });
452 }
453
454 pub async fn create_session(&self, id: String, config: SessionConfig) -> Result<String> {
456 tracing::info!(name: "a3s.session.create", session_id = %id, "Creating session");
457
458 {
460 let mut storage_types = self.session_storage_types.write().await;
461 storage_types.insert(id.clone(), config.storage_type.clone());
462 }
463
464 let tools = self.tool_executor.definitions();
466 let mut session = Session::new(id.clone(), config, tools).await?;
467
468 session.start_queue().await?;
470
471 if session.config.max_context_length > 0 {
473 session.context_usage.max_tokens = session.config.max_context_length as usize;
474 }
475
476 {
477 let mut sessions = self.sessions.write().await;
478 sessions.insert(id.clone(), Arc::new(RwLock::new(session)));
479 }
480
481 self.persist_in_background(&id, "create");
483
484 tracing::info!("Created session: {}", id);
485 Ok(id)
486 }
487
488 pub async fn destroy_session(&self, id: &str) -> Result<()> {
490 tracing::info!(name: "a3s.session.destroy", session_id = %id, "Destroying session");
491
492 let storage_type = {
494 let storage_types = self.session_storage_types.read().await;
495 storage_types.get(id).cloned()
496 };
497
498 {
499 let mut sessions = self.sessions.write().await;
500 sessions.remove(id);
501 }
502
503 {
505 let mut configs = self.llm_configs.write().await;
506 configs.remove(id);
507 }
508
509 {
510 let mut registries = self.session_skill_registries.write().await;
511 registries.remove(id);
512 }
513
514 {
516 let mut storage_types = self.session_storage_types.write().await;
517 storage_types.remove(id);
518 }
519
520 if let Some(storage_type) = storage_type {
522 if storage_type != crate::config::StorageBackend::Memory {
523 let stores = self.stores.read().await;
524 if let Some(store) = stores.get(&storage_type) {
525 if let Err(e) = store.delete(id).await {
526 tracing::warn!("Failed to delete session {} from store: {}", id, e);
527 }
528 }
529 }
530 }
531
532 tracing::info!("Destroyed session: {}", id);
533 Ok(())
534 }
535
536 pub async fn get_session(&self, id: &str) -> Result<Arc<RwLock<Session>>> {
538 let sessions = self.sessions.read().await;
539 sessions
540 .get(id)
541 .cloned()
542 .context(format!("Session not found: {}", id))
543 }
544
545 pub async fn create_child_session(
550 &self,
551 parent_id: &str,
552 child_id: String,
553 mut config: SessionConfig,
554 ) -> Result<String> {
555 let parent_lock = self.get_session(parent_id).await?;
557 let parent_llm_client = {
558 let parent = parent_lock.read().await;
559
560 if config.confirmation_policy.is_none() {
562 let parent_policy = parent.confirmation_manager.policy().await;
563 config.confirmation_policy = Some(parent_policy);
564 }
565
566 parent.llm_client.clone()
567 };
568
569 config.parent_id = Some(parent_id.to_string());
571
572 let tools = self.tool_executor.definitions();
574 let mut session = Session::new(child_id.clone(), config, tools).await?;
575
576 if session.llm_client.is_none() {
578 let default_llm = self.llm_client.read().await.clone();
579 session.llm_client = parent_llm_client.or(default_llm);
580 }
581
582 session.start_queue().await?;
584
585 if session.config.max_context_length > 0 {
587 session.context_usage.max_tokens = session.config.max_context_length as usize;
588 }
589
590 {
591 let mut sessions = self.sessions.write().await;
592 sessions.insert(child_id.clone(), Arc::new(RwLock::new(session)));
593 }
594
595 self.persist_in_background(&child_id, "create_child");
597
598 tracing::info!(
599 "Created child session: {} (parent: {})",
600 child_id,
601 parent_id
602 );
603 Ok(child_id)
604 }
605
606 pub async fn get_child_sessions(&self, parent_id: &str) -> Vec<String> {
608 let sessions = self.sessions.read().await;
609 let mut children = Vec::new();
610
611 for (id, session_lock) in sessions.iter() {
612 let session = session_lock.read().await;
613 if session.parent_id.as_deref() == Some(parent_id) {
614 children.push(id.clone());
615 }
616 }
617
618 children
619 }
620
621 pub async fn is_child_session(&self, session_id: &str) -> Result<bool> {
623 let session_lock = self.get_session(session_id).await?;
624 let session = session_lock.read().await;
625 Ok(session.is_child_session())
626 }
627
628 pub async fn generate(&self, session_id: &str, prompt: &str) -> Result<AgentResult> {
630 let session_lock = self.get_session(session_id).await?;
631
632 {
634 let session = session_lock.read().await;
635 if session.state == SessionState::Paused {
636 anyhow::bail!(
637 "Session {} is paused. Call Resume before generating.",
638 session_id
639 );
640 }
641 }
642
643 let (
645 history,
646 system,
647 tools,
648 session_llm_client,
649 permission_checker,
650 confirmation_manager,
651 context_providers,
652 session_workspace,
653 tool_metrics,
654 hook_engine,
655 planning_enabled,
656 goal_tracking,
657 ) = {
658 let session = session_lock.read().await;
659 (
660 session.messages.clone(),
661 session.system().map(String::from),
662 session.tools.clone(),
663 session.llm_client.clone(),
664 session.permission_checker.clone(),
665 session.confirmation_manager.clone(),
666 session.context_providers.clone(),
667 session.config.workspace.clone(),
668 session.tool_metrics.clone(),
669 session.config.hook_engine.clone(),
670 session.config.planning_enabled,
671 session.config.goal_tracking,
672 )
673 };
674
675 let llm_client = if let Some(client) = session_llm_client {
677 client
678 } else if let Some(client) = self.llm_client.read().await.clone() {
679 client
680 } else {
681 anyhow::bail!(
682 "LLM client not configured for session {}. Please call Configure RPC with model configuration first.",
683 session_id
684 );
685 };
686
687 let tool_context = if session_workspace.is_empty() {
689 crate::tools::ToolContext::new(self.tool_executor.workspace().clone())
690 .with_session_id(session_id)
691 } else {
692 crate::tools::ToolContext::new(std::path::PathBuf::from(&session_workspace))
693 .with_session_id(session_id)
694 };
695
696 let skill_registry = match self.session_skill_registry(session_id).await {
698 Some(registry) => Some(registry),
699 None => self.skill_registry.read().await.clone(),
700 };
701 let system = if let Some(ref registry) = skill_registry {
702 let skill_prompt = registry.to_system_prompt();
703 if skill_prompt.is_empty() {
704 system
705 } else {
706 match system {
707 Some(existing) => Some(format!("{}\n\n{}", existing, skill_prompt)),
708 None => Some(skill_prompt),
709 }
710 }
711 } else {
712 system
713 };
714
715 let effective_prompt = if let Some(ref registry) = skill_registry {
717 let skill_content = registry.match_skills(prompt);
718 if skill_content.is_empty() {
719 prompt.to_string()
720 } else {
721 format!("{}\n\n---\n\n{}", skill_content, prompt)
722 }
723 } else {
724 prompt.to_string()
725 };
726
727 let memory = self
729 .memory_store
730 .read()
731 .await
732 .as_ref()
733 .map(|store| Arc::new(AgentMemory::new(store.clone())));
734
735 let config = AgentConfig {
737 prompt_slots: match system {
738 Some(s) => SystemPromptSlots::from_legacy(s),
739 None => SystemPromptSlots::default(),
740 },
741 tools,
742 max_tool_rounds: 50,
743 permission_checker: Some(permission_checker),
744 confirmation_manager: Some(confirmation_manager),
745 context_providers,
746 planning_enabled,
747 goal_tracking,
748 hook_engine,
749 skill_registry,
750 memory,
751 ..AgentConfig::default()
752 };
753
754 let agent = AgentLoop::new(llm_client, self.tool_executor.clone(), tool_context, config)
755 .with_tool_metrics(tool_metrics);
756
757 let result = agent
759 .execute_with_session(&history, &effective_prompt, Some(session_id), None, None)
760 .await?;
761
762 {
764 let mut session = session_lock.write().await;
765 session.messages = result.messages.clone();
766 session.update_usage(&result.usage);
767 }
768
769 self.persist_in_background(session_id, "generate");
771
772 if let Err(e) = self.maybe_auto_compact(session_id).await {
774 tracing::warn!("Auto-compact failed for session {}: {}", session_id, e);
775 }
776
777 Ok(result)
778 }
779
780 pub async fn generate_streaming(
782 &self,
783 session_id: &str,
784 prompt: &str,
785 ) -> Result<(
786 mpsc::Receiver<AgentEvent>,
787 tokio::task::JoinHandle<Result<AgentResult>>,
788 tokio_util::sync::CancellationToken,
789 )> {
790 let session_lock = self.get_session(session_id).await?;
791
792 {
794 let session = session_lock.read().await;
795 if session.state == SessionState::Paused {
796 anyhow::bail!(
797 "Session {} is paused. Call Resume before generating.",
798 session_id
799 );
800 }
801 }
802
803 let (
805 history,
806 system,
807 tools,
808 session_llm_client,
809 permission_checker,
810 confirmation_manager,
811 context_providers,
812 session_workspace,
813 tool_metrics,
814 hook_engine,
815 planning_enabled,
816 goal_tracking,
817 ) = {
818 let session = session_lock.read().await;
819 (
820 session.messages.clone(),
821 session.system().map(String::from),
822 session.tools.clone(),
823 session.llm_client.clone(),
824 session.permission_checker.clone(),
825 session.confirmation_manager.clone(),
826 session.context_providers.clone(),
827 session.config.workspace.clone(),
828 session.tool_metrics.clone(),
829 session.config.hook_engine.clone(),
830 session.config.planning_enabled,
831 session.config.goal_tracking,
832 )
833 };
834
835 let llm_client = if let Some(client) = session_llm_client {
837 client
838 } else if let Some(client) = self.llm_client.read().await.clone() {
839 client
840 } else {
841 anyhow::bail!(
842 "LLM client not configured for session {}. Please call Configure RPC with model configuration first.",
843 session_id
844 );
845 };
846
847 let tool_context = if session_workspace.is_empty() {
849 crate::tools::ToolContext::new(self.tool_executor.workspace().clone())
850 .with_session_id(session_id)
851 } else {
852 crate::tools::ToolContext::new(std::path::PathBuf::from(&session_workspace))
853 .with_session_id(session_id)
854 };
855
856 let skill_registry = match self.session_skill_registry(session_id).await {
858 Some(registry) => Some(registry),
859 None => self.skill_registry.read().await.clone(),
860 };
861 let system = if let Some(ref registry) = skill_registry {
862 let skill_prompt = registry.to_system_prompt();
863 if skill_prompt.is_empty() {
864 system
865 } else {
866 match system {
867 Some(existing) => Some(format!("{}\n\n{}", existing, skill_prompt)),
868 None => Some(skill_prompt),
869 }
870 }
871 } else {
872 system
873 };
874
875 let effective_prompt = if let Some(ref registry) = skill_registry {
877 let skill_content = registry.match_skills(prompt);
878 if skill_content.is_empty() {
879 prompt.to_string()
880 } else {
881 format!("{}\n\n---\n\n{}", skill_content, prompt)
882 }
883 } else {
884 prompt.to_string()
885 };
886
887 let memory = self
889 .memory_store
890 .read()
891 .await
892 .as_ref()
893 .map(|store| Arc::new(AgentMemory::new(store.clone())));
894
895 let config = AgentConfig {
897 prompt_slots: match system {
898 Some(s) => SystemPromptSlots::from_legacy(s),
899 None => SystemPromptSlots::default(),
900 },
901 tools,
902 max_tool_rounds: 50,
903 permission_checker: Some(permission_checker),
904 confirmation_manager: Some(confirmation_manager),
905 context_providers,
906 planning_enabled,
907 goal_tracking,
908 hook_engine,
909 skill_registry,
910 memory,
911 ..AgentConfig::default()
912 };
913
914 let agent = AgentLoop::new(llm_client, self.tool_executor.clone(), tool_context, config)
915 .with_tool_metrics(tool_metrics);
916
917 let (rx, handle, cancel_token) =
919 agent.execute_streaming(&history, &effective_prompt).await?;
920
921 let cancel_token_clone = cancel_token.clone();
923 {
924 let mut ops = self.ongoing_operations.write().await;
925 ops.insert(session_id.to_string(), cancel_token);
926 }
927
928 let session_lock_clone = session_lock.clone();
930 let original_handle = handle;
931 let stores = self.stores.clone();
932 let session_storage_types = self.session_storage_types.clone();
933 let llm_configs = self.llm_configs.clone();
934 let session_id_owned = session_id.to_string();
935 let ongoing_operations = self.ongoing_operations.clone();
936 let session_manager = self.clone();
937
938 let wrapped_handle = tokio::spawn(async move {
939 let result = original_handle.await??;
940
941 {
943 let mut ops = ongoing_operations.write().await;
944 ops.remove(&session_id_owned);
945 }
946
947 {
949 let mut session = session_lock_clone.write().await;
950 session.messages = result.messages.clone();
951 session.update_usage(&result.usage);
952 }
953
954 let storage_type = {
956 let storage_types = session_storage_types.read().await;
957 storage_types.get(&session_id_owned).cloned()
958 };
959
960 if let Some(storage_type) = storage_type {
961 if storage_type != crate::config::StorageBackend::Memory {
962 let stores_guard = stores.read().await;
963 if let Some(store) = stores_guard.get(&storage_type) {
964 let session = session_lock_clone.read().await;
965 let llm_config = {
966 let configs = llm_configs.read().await;
967 configs.get(&session_id_owned).cloned()
968 };
969 let data = session.to_session_data(llm_config);
970 if let Err(e) = store.save(&data).await {
971 tracing::warn!(
972 "Failed to persist session {} after streaming: {}",
973 session_id_owned,
974 e
975 );
976 }
977 }
978 }
979 }
980
981 if let Err(e) = session_manager.maybe_auto_compact(&session_id_owned).await {
983 tracing::warn!(
984 "Auto-compact failed for session {}: {}",
985 session_id_owned,
986 e
987 );
988 }
989
990 Ok(result)
991 });
992
993 Ok((rx, wrapped_handle, cancel_token_clone))
994 }
995
996 pub async fn context_usage(&self, session_id: &str) -> Result<ContextUsage> {
998 let session_lock = self.get_session(session_id).await?;
999 let session = session_lock.read().await;
1000 Ok(session.context_usage.clone())
1001 }
1002
1003 pub async fn history(&self, session_id: &str) -> Result<Vec<Message>> {
1005 let session_lock = self.get_session(session_id).await?;
1006 let session = session_lock.read().await;
1007 Ok(session.messages.clone())
1008 }
1009
1010 pub async fn clear(&self, session_id: &str) -> Result<()> {
1012 {
1013 let session_lock = self.get_session(session_id).await?;
1014 let mut session = session_lock.write().await;
1015 session.clear();
1016 }
1017
1018 self.persist_in_background(session_id, "clear");
1020
1021 Ok(())
1022 }
1023
1024 pub async fn compact(&self, session_id: &str) -> Result<()> {
1026 tracing::info!(name: "a3s.session.compact", session_id = %session_id, "Compacting session context");
1027
1028 {
1029 let session_lock = self.get_session(session_id).await?;
1030 let mut session = session_lock.write().await;
1031
1032 let llm_client = if let Some(client) = &session.llm_client {
1034 client.clone()
1035 } else if let Some(client) = self.llm_client.read().await.clone() {
1036 client
1037 } else {
1038 tracing::warn!("No LLM client configured for compaction, using simple truncation");
1040 let keep_messages = 20;
1041 if session.messages.len() > keep_messages {
1042 let len = session.messages.len();
1043 session.messages = session.messages.split_off(len - keep_messages);
1044 }
1045 drop(session);
1047 self.persist_in_background(session_id, "compact");
1048 return Ok(());
1049 };
1050
1051 session.compact(&llm_client).await?;
1052 }
1053
1054 self.persist_in_background(session_id, "compact");
1056
1057 Ok(())
1058 }
1059
1060 pub async fn maybe_auto_compact(&self, session_id: &str) -> Result<bool> {
1067 let (should_compact, percent_before, messages_before) = {
1068 let session_lock = self.get_session(session_id).await?;
1069 let session = session_lock.read().await;
1070
1071 if !session.config.auto_compact {
1072 return Ok(false);
1073 }
1074
1075 let threshold = session.config.auto_compact_threshold;
1076 let percent = session.context_usage.percent;
1077 let msg_count = session.messages.len();
1078
1079 tracing::debug!(
1080 "Auto-compact check for session {}: percent={:.2}%, threshold={:.2}%, messages={}",
1081 session_id,
1082 percent * 100.0,
1083 threshold * 100.0,
1084 msg_count,
1085 );
1086
1087 (percent >= threshold, percent, msg_count)
1088 };
1089
1090 if !should_compact {
1091 return Ok(false);
1092 }
1093
1094 tracing::info!(
1095 name: "a3s.session.auto_compact",
1096 session_id = %session_id,
1097 percent_before = %format!("{:.1}%", percent_before * 100.0),
1098 messages_before = %messages_before,
1099 "Auto-compacting session due to high context usage"
1100 );
1101
1102 self.compact(session_id).await?;
1104
1105 let messages_after = {
1107 let session_lock = self.get_session(session_id).await?;
1108 let session = session_lock.read().await;
1109 session.messages.len()
1110 };
1111
1112 let event = AgentEvent::ContextCompacted {
1114 session_id: session_id.to_string(),
1115 before_messages: messages_before,
1116 after_messages: messages_after,
1117 percent_before,
1118 };
1119
1120 if let Ok(session_lock) = self.get_session(session_id).await {
1122 let session = session_lock.read().await;
1123 let _ = session.event_tx.send(event);
1124 }
1125
1126 tracing::info!(
1127 name: "a3s.session.auto_compact.done",
1128 session_id = %session_id,
1129 messages_before = %messages_before,
1130 messages_after = %messages_after,
1131 "Auto-compaction complete"
1132 );
1133
1134 Ok(true)
1135 }
1136
1137 pub async fn get_llm_for_session(
1141 &self,
1142 session_id: &str,
1143 ) -> Result<Option<Arc<dyn LlmClient>>> {
1144 let session_lock = self.get_session(session_id).await?;
1145 let session = session_lock.read().await;
1146
1147 if let Some(client) = &session.llm_client {
1148 return Ok(Some(client.clone()));
1149 }
1150
1151 Ok(self.llm_client.read().await.clone())
1152 }
1153
1154 pub async fn configure(
1156 &self,
1157 session_id: &str,
1158 thinking: Option<bool>,
1159 budget: Option<usize>,
1160 model_config: Option<LlmConfig>,
1161 ) -> Result<()> {
1162 {
1163 let session_lock = self.get_session(session_id).await?;
1164 let mut session = session_lock.write().await;
1165
1166 if let Some(t) = thinking {
1167 session.thinking_enabled = t;
1168 }
1169 if let Some(b) = budget {
1170 session.thinking_budget = Some(b);
1171 }
1172 if let Some(ref config) = model_config {
1173 tracing::info!(
1174 "Configuring session {} with LLM: provider={}, model={}",
1175 session_id,
1176 config.provider,
1177 config.model
1178 );
1179 session.model_name = Some(config.model.clone());
1180 session.llm_client = Some(llm::create_client_with_config(config.clone()));
1181 }
1182 }
1183
1184 if let Some(config) = model_config {
1186 let llm_config_data = LlmConfigData {
1187 provider: config.provider,
1188 model: config.model,
1189 api_key: None, base_url: config.base_url,
1191 };
1192 let mut configs = self.llm_configs.write().await;
1193 configs.insert(session_id.to_string(), llm_config_data);
1194 }
1195
1196 self.persist_in_background(session_id, "configure");
1198
1199 Ok(())
1200 }
1201
1202 pub async fn set_system_prompt(
1204 &self,
1205 session_id: &str,
1206 system_prompt: Option<String>,
1207 ) -> Result<()> {
1208 {
1209 let session_lock = self.get_session(session_id).await?;
1210 let mut session = session_lock.write().await;
1211 session.config.system_prompt = system_prompt;
1212 }
1213
1214 self.persist_in_background(session_id, "set_system_prompt");
1215 Ok(())
1216 }
1217
1218 pub async fn session_count(&self) -> usize {
1220 let sessions = self.sessions.read().await;
1221 sessions.len()
1222 }
1223
1224 pub async fn store_health(&self) -> Vec<(String, Result<()>)> {
1226 let stores = self.stores.read().await;
1227 let mut results = Vec::new();
1228 for (_, store) in stores.iter() {
1229 let name = store.backend_name().to_string();
1230 let result = store.health_check().await;
1231 results.push((name, result));
1232 }
1233 results
1234 }
1235
1236 pub fn list_tools(&self) -> Vec<crate::llm::ToolDefinition> {
1238 self.tool_executor.definitions()
1239 }
1240
1241 pub async fn pause_session(&self, session_id: &str) -> Result<bool> {
1243 let paused = {
1244 let session_lock = self.get_session(session_id).await?;
1245 let mut session = session_lock.write().await;
1246 session.pause()
1247 };
1248
1249 if paused {
1250 self.persist_in_background(session_id, "pause");
1251 }
1252
1253 Ok(paused)
1254 }
1255
1256 pub async fn resume_session(&self, session_id: &str) -> Result<bool> {
1258 let resumed = {
1259 let session_lock = self.get_session(session_id).await?;
1260 let mut session = session_lock.write().await;
1261 session.resume()
1262 };
1263
1264 if resumed {
1265 self.persist_in_background(session_id, "resume");
1266 }
1267
1268 Ok(resumed)
1269 }
1270
1271 pub async fn cancel_operation(&self, session_id: &str) -> Result<bool> {
1275 let session_lock = self.get_session(session_id).await?;
1277 let cancelled_confirmations = {
1278 let session = session_lock.read().await;
1279 session.confirmation_manager.cancel_all().await
1280 };
1281
1282 if cancelled_confirmations > 0 {
1283 tracing::info!(
1284 "Cancelled {} pending confirmations for session {}",
1285 cancelled_confirmations,
1286 session_id
1287 );
1288 }
1289
1290 let cancel_token = {
1292 let mut ops = self.ongoing_operations.write().await;
1293 ops.remove(session_id)
1294 };
1295
1296 if let Some(token) = cancel_token {
1297 token.cancel();
1298 tracing::info!("Cancelled ongoing operation for session {}", session_id);
1299 Ok(true)
1300 } else if cancelled_confirmations > 0 {
1301 Ok(true)
1303 } else {
1304 tracing::debug!("No ongoing operation to cancel for session {}", session_id);
1305 Ok(false)
1306 }
1307 }
1308
1309 pub async fn get_all_sessions(&self) -> Vec<Arc<RwLock<Session>>> {
1311 let sessions = self.sessions.read().await;
1312 sessions.values().cloned().collect()
1313 }
1314
1315 pub fn tool_executor(&self) -> &Arc<ToolExecutor> {
1317 &self.tool_executor
1318 }
1319
1320 pub async fn confirm_tool(
1322 &self,
1323 session_id: &str,
1324 tool_id: &str,
1325 approved: bool,
1326 reason: Option<String>,
1327 ) -> Result<bool> {
1328 let session_lock = self.get_session(session_id).await?;
1329 let session = session_lock.read().await;
1330 session
1331 .confirmation_manager
1332 .confirm(tool_id, approved, reason)
1333 .await
1334 .map_err(|e| anyhow::anyhow!(e))
1335 }
1336
1337 pub async fn set_confirmation_policy(
1339 &self,
1340 session_id: &str,
1341 policy: ConfirmationPolicy,
1342 ) -> Result<ConfirmationPolicy> {
1343 {
1344 let session_lock = self.get_session(session_id).await?;
1345 let session = session_lock.read().await;
1346 session.set_confirmation_policy(policy.clone()).await;
1347 }
1348
1349 {
1351 let session_lock = self.get_session(session_id).await?;
1352 let mut session = session_lock.write().await;
1353 session.config.confirmation_policy = Some(policy.clone());
1354 }
1355
1356 self.persist_in_background(session_id, "set_confirmation_policy");
1358
1359 Ok(policy)
1360 }
1361
1362 pub async fn get_confirmation_policy(&self, session_id: &str) -> Result<ConfirmationPolicy> {
1364 let session_lock = self.get_session(session_id).await?;
1365 let session = session_lock.read().await;
1366 Ok(session.confirmation_policy().await)
1367 }
1368}