1use super::{ContextUsage, Session, SessionConfig, SessionState};
4use crate::agent::{AgentConfig, AgentEvent, AgentLoop, AgentResult};
5use crate::hitl::ConfirmationPolicy;
6use crate::llm::{self, LlmClient, LlmConfig, Message};
7use crate::mcp::McpManager;
8use crate::memory::AgentMemory;
9use crate::permissions::PermissionPolicy;
10use crate::prompts::SystemPromptSlots;
11use crate::skills::SkillRegistry;
12use crate::store::{FileSessionStore, LlmConfigData, SessionData, SessionStore};
13use crate::text::truncate_utf8;
14use crate::tools::ToolExecutor;
15use crate::PlanningMode;
16use a3s_memory::MemoryStore;
17use anyhow::{Context, Result};
18use std::collections::HashMap;
19use std::sync::Arc;
20use tokio::sync::{mpsc, RwLock};
21
22#[derive(Clone)]
24pub struct SessionManager {
25 pub(crate) sessions: Arc<RwLock<HashMap<String, Arc<RwLock<Session>>>>>,
26 pub(crate) llm_client: Arc<RwLock<Option<Arc<dyn LlmClient>>>>,
29 pub(crate) tool_executor: Arc<ToolExecutor>,
30 pub(crate) stores: Arc<RwLock<HashMap<crate::config::StorageBackend, Arc<dyn SessionStore>>>>,
32 pub(crate) session_storage_types: Arc<RwLock<HashMap<String, crate::config::StorageBackend>>>,
34 pub(crate) llm_configs: Arc<RwLock<HashMap<String, LlmConfigData>>>,
36 pub(crate) ongoing_operations:
38 Arc<RwLock<HashMap<String, tokio_util::sync::CancellationToken>>>,
39 pub(crate) skill_registry: Arc<RwLock<Option<Arc<SkillRegistry>>>>,
41 pub(crate) session_skill_registries: Arc<RwLock<HashMap<String, Arc<SkillRegistry>>>>,
43 pub(crate) memory_store: Arc<RwLock<Option<Arc<dyn MemoryStore>>>>,
47 pub(crate) mcp_manager: Arc<RwLock<Option<Arc<McpManager>>>>,
50}
51
52impl SessionManager {
53 fn compact_json_value(value: &serde_json::Value) -> String {
54 let raw = match value {
55 serde_json::Value::Null => String::new(),
56 serde_json::Value::String(s) => s.clone(),
57 _ => serde_json::to_string(value).unwrap_or_default(),
58 };
59 let compact = raw.split_whitespace().collect::<Vec<_>>().join(" ");
60 if compact.len() > 180 {
61 format!("{}...", truncate_utf8(&compact, 180))
62 } else {
63 compact
64 }
65 }
66
67 pub fn new(llm_client: Option<Arc<dyn LlmClient>>, tool_executor: Arc<ToolExecutor>) -> Self {
69 Self {
70 sessions: Arc::new(RwLock::new(HashMap::new())),
71 llm_client: Arc::new(RwLock::new(llm_client)),
72 tool_executor,
73 stores: Arc::new(RwLock::new(HashMap::new())),
74 session_storage_types: Arc::new(RwLock::new(HashMap::new())),
75 llm_configs: Arc::new(RwLock::new(HashMap::new())),
76 ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
77 skill_registry: Arc::new(RwLock::new(None)),
78 session_skill_registries: Arc::new(RwLock::new(HashMap::new())),
79 memory_store: Arc::new(RwLock::new(None)),
80 mcp_manager: Arc::new(RwLock::new(None)),
81 }
82 }
83
84 pub async fn with_persistence<P: AsRef<std::path::Path>>(
88 llm_client: Option<Arc<dyn LlmClient>>,
89 tool_executor: Arc<ToolExecutor>,
90 sessions_dir: P,
91 ) -> Result<Self> {
92 let store = FileSessionStore::new(sessions_dir).await?;
93 let mut stores = HashMap::new();
94 stores.insert(
95 crate::config::StorageBackend::File,
96 Arc::new(store) as Arc<dyn SessionStore>,
97 );
98
99 let manager = Self {
100 sessions: Arc::new(RwLock::new(HashMap::new())),
101 llm_client: Arc::new(RwLock::new(llm_client)),
102 tool_executor,
103 stores: Arc::new(RwLock::new(stores)),
104 session_storage_types: Arc::new(RwLock::new(HashMap::new())),
105 llm_configs: Arc::new(RwLock::new(HashMap::new())),
106 ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
107 skill_registry: Arc::new(RwLock::new(None)),
108 session_skill_registries: Arc::new(RwLock::new(HashMap::new())),
109 memory_store: Arc::new(RwLock::new(None)),
110 mcp_manager: Arc::new(RwLock::new(None)),
111 };
112
113 Ok(manager)
114 }
115
116 pub fn with_store(
121 llm_client: Option<Arc<dyn LlmClient>>,
122 tool_executor: Arc<ToolExecutor>,
123 store: Arc<dyn SessionStore>,
124 backend: crate::config::StorageBackend,
125 ) -> Self {
126 let mut stores = HashMap::new();
127 stores.insert(backend, store);
128
129 Self {
130 sessions: Arc::new(RwLock::new(HashMap::new())),
131 llm_client: Arc::new(RwLock::new(llm_client)),
132 tool_executor,
133 stores: Arc::new(RwLock::new(stores)),
134 session_storage_types: Arc::new(RwLock::new(HashMap::new())),
135 llm_configs: Arc::new(RwLock::new(HashMap::new())),
136 ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
137 skill_registry: Arc::new(RwLock::new(None)),
138 session_skill_registries: Arc::new(RwLock::new(HashMap::new())),
139 memory_store: Arc::new(RwLock::new(None)),
140 mcp_manager: Arc::new(RwLock::new(None)),
141 }
142 }
143
144 pub async fn set_default_llm(&self, client: Option<Arc<dyn LlmClient>>) {
149 *self.llm_client.write().await = client;
150 }
151
152 pub async fn set_skill_registry(
159 &self,
160 registry: Arc<SkillRegistry>,
161 skills_dir: std::path::PathBuf,
162 ) {
163 let manage_tool = crate::skills::ManageSkillTool::new(registry.clone(), skills_dir);
165 self.tool_executor
166 .register_dynamic_tool(Arc::new(manage_tool));
167
168 *self.skill_registry.write().await = Some(registry);
169 }
170
171 pub async fn skill_registry(&self) -> Option<Arc<SkillRegistry>> {
173 self.skill_registry.read().await.clone()
174 }
175
176 pub async fn set_session_skill_registry(
178 &self,
179 session_id: impl Into<String>,
180 registry: Arc<SkillRegistry>,
181 ) {
182 self.session_skill_registries
183 .write()
184 .await
185 .insert(session_id.into(), registry);
186 }
187
188 pub async fn session_skill_registry(&self, session_id: &str) -> Option<Arc<SkillRegistry>> {
190 self.session_skill_registries
191 .read()
192 .await
193 .get(session_id)
194 .cloned()
195 }
196
197 pub async fn set_memory_store(&self, store: Arc<dyn MemoryStore>) {
204 *self.memory_store.write().await = Some(store);
205 }
206
207 pub async fn memory_store(&self) -> Option<Arc<dyn MemoryStore>> {
209 self.memory_store.read().await.clone()
210 }
211
212 pub async fn set_mcp_manager(&self, manager: Arc<McpManager>) {
217 let all_tools = manager.get_all_tools().await;
218 let mut by_server: HashMap<String, Vec<crate::mcp::McpTool>> = HashMap::new();
219 for (server, tool) in all_tools {
220 by_server.entry(server).or_default().push(tool);
221 }
222 for (server_name, tools) in by_server {
223 for tool in
224 crate::mcp::tools::create_mcp_tools(&server_name, tools, Arc::clone(&manager))
225 {
226 self.tool_executor.register_dynamic_tool(tool);
227 }
228 }
229 *self.mcp_manager.write().await = Some(manager);
230 }
231
232 pub async fn add_mcp_server(&self, config: crate::mcp::McpServerConfig) -> Result<()> {
237 let manager = {
238 let guard = self.mcp_manager.read().await;
239 match guard.clone() {
240 Some(m) => m,
241 None => {
242 drop(guard);
243 let m = Arc::new(McpManager::new());
244 *self.mcp_manager.write().await = Some(Arc::clone(&m));
245 m
246 }
247 }
248 };
249 let name = config.name.clone();
250 manager.register_server(config).await;
251 manager.connect(&name).await?;
252 let tools = manager.get_server_tools(&name).await;
253 for tool in crate::mcp::tools::create_mcp_tools(&name, tools, Arc::clone(&manager)) {
254 self.tool_executor.register_dynamic_tool(tool);
255 }
256 Ok(())
257 }
258
259 pub async fn remove_mcp_server(&self, name: &str) -> Result<()> {
263 let guard = self.mcp_manager.read().await;
264 if let Some(ref manager) = *guard {
265 manager.disconnect(name).await?;
266 }
267 self.tool_executor
268 .unregister_tools_by_prefix(&format!("mcp__{name}__"));
269 Ok(())
270 }
271
272 pub async fn mcp_status(
274 &self,
275 ) -> std::collections::HashMap<String, crate::mcp::McpServerStatus> {
276 let guard = self.mcp_manager.read().await;
277 match guard.as_ref() {
278 Some(m) => {
279 let m = Arc::clone(m);
280 drop(guard);
281 m.get_status().await
282 }
283 None => std::collections::HashMap::new(),
284 }
285 }
286
287 pub async fn restore_session_by_id(&self, session_id: &str) -> Result<()> {
292 {
294 let sessions = self.sessions.read().await;
295 if sessions.contains_key(session_id) {
296 return Ok(());
297 }
298 }
299
300 let stores = self.stores.read().await;
301 for (backend, store) in stores.iter() {
302 match store.load(session_id).await {
303 Ok(Some(data)) => {
304 {
305 let mut storage_types = self.session_storage_types.write().await;
306 storage_types.insert(data.id.clone(), backend.clone());
307 }
308 self.restore_session(data).await?;
309 return Ok(());
310 }
311 Ok(None) => continue,
312 Err(e) => {
313 tracing::warn!(
314 "Failed to load session {} from {:?}: {}",
315 session_id,
316 backend,
317 e
318 );
319 continue;
320 }
321 }
322 }
323
324 Err(anyhow::anyhow!(
325 "Session {} not found in any store",
326 session_id
327 ))
328 }
329
330 pub async fn load_all_sessions(&mut self) -> Result<usize> {
332 let stores = self.stores.read().await;
333 let mut loaded = 0;
334
335 for (backend, store) in stores.iter() {
336 let session_ids = match store.list().await {
337 Ok(ids) => ids,
338 Err(e) => {
339 tracing::warn!("Failed to list sessions from {:?} store: {}", backend, e);
340 continue;
341 }
342 };
343
344 for id in session_ids {
345 match store.load(&id).await {
346 Ok(Some(data)) => {
347 {
349 let mut storage_types = self.session_storage_types.write().await;
350 storage_types.insert(data.id.clone(), backend.clone());
351 }
352
353 if let Err(e) = self.restore_session(data).await {
354 tracing::warn!("Failed to restore session {}: {}", id, e);
355 } else {
356 loaded += 1;
357 }
358 }
359 Ok(None) => {
360 tracing::warn!("Session {} not found in store", id);
361 }
362 Err(e) => {
363 tracing::warn!("Failed to load session {}: {}", id, e);
364 }
365 }
366 }
367 }
368
369 tracing::info!("Loaded {} sessions from store", loaded);
370 Ok(loaded)
371 }
372
373 async fn restore_session(&self, data: SessionData) -> Result<()> {
375 let tools = self.tool_executor.definitions();
376 let mut session = Session::new(data.id.clone(), data.config.clone(), tools).await?;
377
378 session.restore_from_data(&data);
380
381 if let Some(llm_config) = &data.llm_config {
383 let mut configs = self.llm_configs.write().await;
384 configs.insert(data.id.clone(), llm_config.clone());
385 }
386
387 let mut sessions = self.sessions.write().await;
388 sessions.insert(data.id.clone(), Arc::new(RwLock::new(session)));
389
390 tracing::info!("Restored session: {}", data.id);
391 Ok(())
392 }
393
394 async fn save_session(&self, session_id: &str) -> Result<()> {
396 let storage_type = {
398 let storage_types = self.session_storage_types.read().await;
399 storage_types.get(session_id).cloned()
400 };
401
402 let Some(storage_type) = storage_type else {
403 return Ok(());
405 };
406
407 if storage_type == crate::config::StorageBackend::Memory {
409 return Ok(());
410 }
411
412 let stores = self.stores.read().await;
414 let Some(store) = stores.get(&storage_type) else {
415 tracing::warn!("No store available for storage type: {:?}", storage_type);
416 return Ok(());
417 };
418
419 let session_lock = self.get_session(session_id).await?;
420 let session = session_lock.read().await;
421
422 let llm_config = {
424 let configs = self.llm_configs.read().await;
425 configs.get(session_id).cloned()
426 };
427
428 let data = session.to_session_data(llm_config);
429 store.save(&data).await?;
430
431 tracing::debug!("Saved session: {}", session_id);
432 Ok(())
433 }
434
435 async fn persist_or_warn(&self, session_id: &str, operation: &str) {
439 if let Err(e) = self.save_session(session_id).await {
440 tracing::warn!(
441 "Failed to persist session {} after {}: {}",
442 session_id,
443 operation,
444 e
445 );
446 if let Ok(session_lock) = self.get_session(session_id).await {
448 let session = session_lock.read().await;
449 let _ = session.event_tx().send(AgentEvent::PersistenceFailed {
450 session_id: session_id.to_string(),
451 operation: operation.to_string(),
452 error: e.to_string(),
453 });
454 }
455 }
456 }
457
458 fn persist_in_background(&self, session_id: &str, operation: &str) {
463 let mgr = self.clone();
464 let sid = session_id.to_string();
465 let op = operation.to_string();
466 tokio::spawn(async move {
467 mgr.persist_or_warn(&sid, &op).await;
468 });
469 }
470
471 pub async fn create_session(&self, id: String, config: SessionConfig) -> Result<String> {
473 tracing::info!(name: "a3s.session.create", session_id = %id, "Creating session");
474
475 {
477 let mut storage_types = self.session_storage_types.write().await;
478 storage_types.insert(id.clone(), config.storage_type.clone());
479 }
480
481 let tools = self.tool_executor.definitions();
483 let mut session = Session::new(id.clone(), config, tools).await?;
484
485 session.start_queue().await?;
487
488 if session.config.max_context_length > 0 {
490 session.context_usage.max_tokens = session.config.max_context_length as usize;
491 }
492
493 {
494 let mut sessions = self.sessions.write().await;
495 sessions.insert(id.clone(), Arc::new(RwLock::new(session)));
496 }
497
498 self.persist_in_background(&id, "create");
500
501 tracing::info!("Created session: {}", id);
502 Ok(id)
503 }
504
505 pub async fn destroy_session(&self, id: &str) -> Result<()> {
507 tracing::info!(name: "a3s.session.destroy", session_id = %id, "Destroying session");
508
509 let storage_type = {
511 let storage_types = self.session_storage_types.read().await;
512 storage_types.get(id).cloned()
513 };
514
515 {
516 let mut sessions = self.sessions.write().await;
517 sessions.remove(id);
518 }
519
520 {
522 let mut configs = self.llm_configs.write().await;
523 configs.remove(id);
524 }
525
526 {
527 let mut registries = self.session_skill_registries.write().await;
528 registries.remove(id);
529 }
530
531 {
533 let mut storage_types = self.session_storage_types.write().await;
534 storage_types.remove(id);
535 }
536
537 if let Some(storage_type) = storage_type {
539 if storage_type != crate::config::StorageBackend::Memory {
540 let stores = self.stores.read().await;
541 if let Some(store) = stores.get(&storage_type) {
542 if let Err(e) = store.delete(id).await {
543 tracing::warn!("Failed to delete session {} from store: {}", id, e);
544 }
545 }
546 }
547 }
548
549 tracing::info!("Destroyed session: {}", id);
550 Ok(())
551 }
552
553 pub async fn get_session(&self, id: &str) -> Result<Arc<RwLock<Session>>> {
555 let sessions = self.sessions.read().await;
556 sessions
557 .get(id)
558 .cloned()
559 .context(format!("Session not found: {}", id))
560 }
561
562 pub async fn create_child_session(
567 &self,
568 parent_id: &str,
569 child_id: String,
570 mut config: SessionConfig,
571 ) -> Result<String> {
572 let parent_lock = self.get_session(parent_id).await?;
574 let parent_llm_client = {
575 let parent = parent_lock.read().await;
576
577 if config.confirmation_policy.is_none() {
579 let parent_policy = parent.confirmation_manager.policy().await;
580 config.confirmation_policy = Some(parent_policy);
581 }
582
583 parent.llm_client.clone()
584 };
585
586 config.parent_id = Some(parent_id.to_string());
588
589 let tools = self.tool_executor.definitions();
591 let mut session = Session::new(child_id.clone(), config, tools).await?;
592
593 if session.llm_client.is_none() {
595 let default_llm = self.llm_client.read().await.clone();
596 session.llm_client = parent_llm_client.or(default_llm);
597 }
598
599 session.start_queue().await?;
601
602 if session.config.max_context_length > 0 {
604 session.context_usage.max_tokens = session.config.max_context_length as usize;
605 }
606
607 {
608 let mut sessions = self.sessions.write().await;
609 sessions.insert(child_id.clone(), Arc::new(RwLock::new(session)));
610 }
611
612 self.persist_in_background(&child_id, "create_child");
614
615 tracing::info!(
616 "Created child session: {} (parent: {})",
617 child_id,
618 parent_id
619 );
620 Ok(child_id)
621 }
622
623 pub async fn get_child_sessions(&self, parent_id: &str) -> Vec<String> {
625 let sessions = self.sessions.read().await;
626 let mut children = Vec::new();
627
628 for (id, session_lock) in sessions.iter() {
629 let session = session_lock.read().await;
630 if session.parent_id.as_deref() == Some(parent_id) {
631 children.push(id.clone());
632 }
633 }
634
635 children
636 }
637
638 pub async fn is_child_session(&self, session_id: &str) -> Result<bool> {
640 let session_lock = self.get_session(session_id).await?;
641 let session = session_lock.read().await;
642 Ok(session.is_child_session())
643 }
644
645 pub async fn generate(&self, session_id: &str, prompt: &str) -> Result<AgentResult> {
647 let session_lock = self.get_session(session_id).await?;
648
649 {
651 let session = session_lock.read().await;
652 if session.state == SessionState::Paused {
653 anyhow::bail!(
654 "Session {} is paused. Call Resume before generating.",
655 session_id
656 );
657 }
658 }
659
660 let (
662 history,
663 system,
664 tools,
665 session_llm_client,
666 permission_checker,
667 confirmation_manager,
668 context_providers,
669 session_workspace,
670 tool_metrics,
671 hook_engine,
672 planning_mode,
673 goal_tracking,
674 ) = {
675 let session = session_lock.read().await;
676 (
677 session.messages.clone(),
678 session.system().map(String::from),
679 session.tools.clone(),
680 session.llm_client.clone(),
681 session.permission_checker.clone(),
682 session.confirmation_manager.clone(),
683 session.context_providers.clone(),
684 session.config.workspace.clone(),
685 session.tool_metrics.clone(),
686 session.config.hook_engine.clone(),
687 session.config.planning_mode,
688 session.config.goal_tracking,
689 )
690 };
691
692 let llm_client = if let Some(client) = session_llm_client {
694 client
695 } else if let Some(client) = self.llm_client.read().await.clone() {
696 client
697 } else {
698 anyhow::bail!(
699 "LLM client not configured for session {}. Please call Configure RPC with model configuration first.",
700 session_id
701 );
702 };
703
704 let tool_context = if session_workspace.is_empty() {
706 crate::tools::ToolContext::new(self.tool_executor.workspace().clone())
707 .with_session_id(session_id)
708 } else {
709 crate::tools::ToolContext::new(std::path::PathBuf::from(&session_workspace))
710 .with_session_id(session_id)
711 };
712 let tool_context = if let Some(command_env) = self.tool_executor.command_env() {
713 tool_context.with_command_env(command_env)
714 } else {
715 tool_context
716 };
717
718 let skill_registry = match self.session_skill_registry(session_id).await {
720 Some(registry) => Some(registry),
721 None => self.skill_registry.read().await.clone(),
722 };
723 let system = if let Some(ref registry) = skill_registry {
724 let skill_prompt = registry.to_system_prompt();
725 if skill_prompt.is_empty() {
726 system
727 } else {
728 match system {
729 Some(existing) => Some(format!("{}\n\n{}", existing, skill_prompt)),
730 None => Some(skill_prompt),
731 }
732 }
733 } else {
734 system
735 };
736
737 let effective_prompt = if let Some(ref registry) = skill_registry {
739 let skill_content = registry.match_skills(prompt);
740 if skill_content.is_empty() {
741 prompt.to_string()
742 } else {
743 format!("{}\n\n---\n\n{}", skill_content, prompt)
744 }
745 } else {
746 prompt.to_string()
747 };
748
749 let memory = self
751 .memory_store
752 .read()
753 .await
754 .as_ref()
755 .map(|store| Arc::new(AgentMemory::new(store.clone())));
756
757 let config = AgentConfig {
759 prompt_slots: match system {
760 Some(s) => SystemPromptSlots::from_legacy(s),
761 None => SystemPromptSlots::default(),
762 },
763 tools,
764 max_tool_rounds: 50,
765 permission_checker: Some(permission_checker),
766 confirmation_manager: Some(confirmation_manager),
767 context_providers,
768 planning_mode,
769 goal_tracking,
770 hook_engine,
771 skill_registry,
772 memory,
773 ..AgentConfig::default()
774 };
775
776 let agent = AgentLoop::new(llm_client, self.tool_executor.clone(), tool_context, config)
777 .with_tool_metrics(tool_metrics);
778
779 let result = agent
781 .execute_with_session(&history, &effective_prompt, Some(session_id), None, None)
782 .await?;
783
784 {
786 let mut session = session_lock.write().await;
787 session.messages = result.messages.clone();
788 session.update_usage(&result.usage);
789 }
790
791 self.persist_in_background(session_id, "generate");
793
794 if let Err(e) = self.maybe_auto_compact(session_id).await {
796 tracing::warn!("Auto-compact failed for session {}: {}", session_id, e);
797 }
798
799 Ok(result)
800 }
801
802 pub async fn generate_streaming(
804 &self,
805 session_id: &str,
806 prompt: &str,
807 ) -> Result<(
808 mpsc::Receiver<AgentEvent>,
809 tokio::task::JoinHandle<Result<AgentResult>>,
810 tokio_util::sync::CancellationToken,
811 )> {
812 let session_lock = self.get_session(session_id).await?;
813
814 {
816 let session = session_lock.read().await;
817 if session.state == SessionState::Paused {
818 anyhow::bail!(
819 "Session {} is paused. Call Resume before generating.",
820 session_id
821 );
822 }
823 }
824
825 let (
827 history,
828 system,
829 tools,
830 session_llm_client,
831 permission_checker,
832 confirmation_manager,
833 context_providers,
834 session_workspace,
835 tool_metrics,
836 hook_engine,
837 planning_mode,
838 goal_tracking,
839 ) = {
840 let session = session_lock.read().await;
841 (
842 session.messages.clone(),
843 session.system().map(String::from),
844 session.tools.clone(),
845 session.llm_client.clone(),
846 session.permission_checker.clone(),
847 session.confirmation_manager.clone(),
848 session.context_providers.clone(),
849 session.config.workspace.clone(),
850 session.tool_metrics.clone(),
851 session.config.hook_engine.clone(),
852 session.config.planning_mode,
853 session.config.goal_tracking,
854 )
855 };
856
857 let llm_client = if let Some(client) = session_llm_client {
859 client
860 } else if let Some(client) = self.llm_client.read().await.clone() {
861 client
862 } else {
863 anyhow::bail!(
864 "LLM client not configured for session {}. Please call Configure RPC with model configuration first.",
865 session_id
866 );
867 };
868
869 let tool_context = if session_workspace.is_empty() {
871 crate::tools::ToolContext::new(self.tool_executor.workspace().clone())
872 .with_session_id(session_id)
873 } else {
874 crate::tools::ToolContext::new(std::path::PathBuf::from(&session_workspace))
875 .with_session_id(session_id)
876 };
877 let tool_context = if let Some(command_env) = self.tool_executor.command_env() {
878 tool_context.with_command_env(command_env)
879 } else {
880 tool_context
881 };
882
883 let skill_registry = match self.session_skill_registry(session_id).await {
885 Some(registry) => Some(registry),
886 None => self.skill_registry.read().await.clone(),
887 };
888 let system = if let Some(ref registry) = skill_registry {
889 let skill_prompt = registry.to_system_prompt();
890 if skill_prompt.is_empty() {
891 system
892 } else {
893 match system {
894 Some(existing) => Some(format!("{}\n\n{}", existing, skill_prompt)),
895 None => Some(skill_prompt),
896 }
897 }
898 } else {
899 system
900 };
901
902 let effective_prompt = if let Some(ref registry) = skill_registry {
904 let skill_content = registry.match_skills(prompt);
905 if skill_content.is_empty() {
906 prompt.to_string()
907 } else {
908 format!("{}\n\n---\n\n{}", skill_content, prompt)
909 }
910 } else {
911 prompt.to_string()
912 };
913
914 let memory = self
916 .memory_store
917 .read()
918 .await
919 .as_ref()
920 .map(|store| Arc::new(AgentMemory::new(store.clone())));
921
922 let config = AgentConfig {
924 prompt_slots: match system {
925 Some(s) => SystemPromptSlots::from_legacy(s),
926 None => SystemPromptSlots::default(),
927 },
928 tools,
929 max_tool_rounds: 50,
930 permission_checker: Some(permission_checker),
931 confirmation_manager: Some(confirmation_manager),
932 context_providers,
933 planning_mode,
934 goal_tracking,
935 hook_engine,
936 skill_registry,
937 memory,
938 ..AgentConfig::default()
939 };
940
941 let agent = AgentLoop::new(llm_client, self.tool_executor.clone(), tool_context, config)
942 .with_tool_metrics(tool_metrics);
943
944 let (rx, handle, cancel_token) =
946 agent.execute_streaming(&history, &effective_prompt).await?;
947
948 let cancel_token_clone = cancel_token.clone();
950 {
951 let mut ops = self.ongoing_operations.write().await;
952 ops.insert(session_id.to_string(), cancel_token);
953 }
954
955 let session_lock_clone = session_lock.clone();
957 let original_handle = handle;
958 let stores = self.stores.clone();
959 let session_storage_types = self.session_storage_types.clone();
960 let llm_configs = self.llm_configs.clone();
961 let session_id_owned = session_id.to_string();
962 let ongoing_operations = self.ongoing_operations.clone();
963 let session_manager = self.clone();
964
965 let wrapped_handle = tokio::spawn(async move {
966 let result = original_handle.await??;
967
968 {
970 let mut ops = ongoing_operations.write().await;
971 ops.remove(&session_id_owned);
972 }
973
974 {
976 let mut session = session_lock_clone.write().await;
977 session.messages = result.messages.clone();
978 session.update_usage(&result.usage);
979 }
980
981 let storage_type = {
983 let storage_types = session_storage_types.read().await;
984 storage_types.get(&session_id_owned).cloned()
985 };
986
987 if let Some(storage_type) = storage_type {
988 if storage_type != crate::config::StorageBackend::Memory {
989 let stores_guard = stores.read().await;
990 if let Some(store) = stores_guard.get(&storage_type) {
991 let session = session_lock_clone.read().await;
992 let llm_config = {
993 let configs = llm_configs.read().await;
994 configs.get(&session_id_owned).cloned()
995 };
996 let data = session.to_session_data(llm_config);
997 if let Err(e) = store.save(&data).await {
998 tracing::warn!(
999 "Failed to persist session {} after streaming: {}",
1000 session_id_owned,
1001 e
1002 );
1003 }
1004 }
1005 }
1006 }
1007
1008 if let Err(e) = session_manager.maybe_auto_compact(&session_id_owned).await {
1010 tracing::warn!(
1011 "Auto-compact failed for session {}: {}",
1012 session_id_owned,
1013 e
1014 );
1015 }
1016
1017 Ok(result)
1018 });
1019
1020 Ok((rx, wrapped_handle, cancel_token_clone))
1021 }
1022
1023 pub async fn context_usage(&self, session_id: &str) -> Result<ContextUsage> {
1025 let session_lock = self.get_session(session_id).await?;
1026 let session = session_lock.read().await;
1027 Ok(session.context_usage.clone())
1028 }
1029
1030 pub async fn history(&self, session_id: &str) -> Result<Vec<Message>> {
1032 let session_lock = self.get_session(session_id).await?;
1033 let session = session_lock.read().await;
1034 Ok(session.messages.clone())
1035 }
1036
1037 pub async fn clear(&self, session_id: &str) -> Result<()> {
1039 {
1040 let session_lock = self.get_session(session_id).await?;
1041 let mut session = session_lock.write().await;
1042 session.clear();
1043 }
1044
1045 self.persist_in_background(session_id, "clear");
1047
1048 Ok(())
1049 }
1050
1051 pub async fn compact(&self, session_id: &str) -> Result<()> {
1053 tracing::info!(name: "a3s.session.compact", session_id = %session_id, "Compacting session context");
1054
1055 {
1056 let session_lock = self.get_session(session_id).await?;
1057 let mut session = session_lock.write().await;
1058
1059 let llm_client = if let Some(client) = &session.llm_client {
1061 client.clone()
1062 } else if let Some(client) = self.llm_client.read().await.clone() {
1063 client
1064 } else {
1065 tracing::warn!("No LLM client configured for compaction, using simple truncation");
1067 let keep_messages = 20;
1068 if session.messages.len() > keep_messages {
1069 let len = session.messages.len();
1070 session.messages = session.messages.split_off(len - keep_messages);
1071 }
1072 drop(session);
1074 self.persist_in_background(session_id, "compact");
1075 return Ok(());
1076 };
1077
1078 session.compact(&llm_client).await?;
1079 }
1080
1081 self.persist_in_background(session_id, "compact");
1083
1084 Ok(())
1085 }
1086
1087 pub async fn maybe_auto_compact(&self, session_id: &str) -> Result<bool> {
1094 let (should_compact, percent_before, messages_before) = {
1095 let session_lock = self.get_session(session_id).await?;
1096 let session = session_lock.read().await;
1097
1098 if !session.config.auto_compact {
1099 return Ok(false);
1100 }
1101
1102 let threshold = session.config.auto_compact_threshold;
1103 let percent = session.context_usage.percent;
1104 let msg_count = session.messages.len();
1105
1106 tracing::debug!(
1107 "Auto-compact check for session {}: percent={:.2}%, threshold={:.2}%, messages={}",
1108 session_id,
1109 percent * 100.0,
1110 threshold * 100.0,
1111 msg_count,
1112 );
1113
1114 (percent >= threshold, percent, msg_count)
1115 };
1116
1117 if !should_compact {
1118 return Ok(false);
1119 }
1120
1121 tracing::info!(
1122 name: "a3s.session.auto_compact",
1123 session_id = %session_id,
1124 percent_before = %format!("{:.1}%", percent_before * 100.0),
1125 messages_before = %messages_before,
1126 "Auto-compacting session due to high context usage"
1127 );
1128
1129 self.compact(session_id).await?;
1131
1132 let messages_after = {
1134 let session_lock = self.get_session(session_id).await?;
1135 let session = session_lock.read().await;
1136 session.messages.len()
1137 };
1138
1139 let event = AgentEvent::ContextCompacted {
1141 session_id: session_id.to_string(),
1142 before_messages: messages_before,
1143 after_messages: messages_after,
1144 percent_before,
1145 };
1146
1147 if let Ok(session_lock) = self.get_session(session_id).await {
1149 let session = session_lock.read().await;
1150 let _ = session.event_tx.send(event);
1151 }
1152
1153 tracing::info!(
1154 name: "a3s.session.auto_compact.done",
1155 session_id = %session_id,
1156 messages_before = %messages_before,
1157 messages_after = %messages_after,
1158 "Auto-compaction complete"
1159 );
1160
1161 Ok(true)
1162 }
1163
1164 pub async fn get_llm_for_session(
1168 &self,
1169 session_id: &str,
1170 ) -> Result<Option<Arc<dyn LlmClient>>> {
1171 let session_lock = self.get_session(session_id).await?;
1172 let session = session_lock.read().await;
1173
1174 if let Some(client) = &session.llm_client {
1175 return Ok(Some(client.clone()));
1176 }
1177
1178 Ok(self.llm_client.read().await.clone())
1179 }
1180
1181 pub async fn btw_with_context(
1187 &self,
1188 session_id: &str,
1189 question: &str,
1190 runtime_context: Option<&str>,
1191 ) -> Result<crate::agent_api::BtwResult> {
1192 let question = question.trim();
1193 if question.is_empty() {
1194 anyhow::bail!("btw: question cannot be empty");
1195 }
1196
1197 let session_lock = self.get_session(session_id).await?;
1198 let (history_snapshot, session_runtime) = {
1199 let session = session_lock.read().await;
1200 let mut sections = Vec::new();
1201
1202 let pending_confirmations = session.confirmation_manager.pending_confirmations().await;
1203 if !pending_confirmations.is_empty() {
1204 let mut lines = pending_confirmations
1205 .into_iter()
1206 .map(|pending| {
1207 let arg_summary = Self::compact_json_value(&pending.args);
1208 if arg_summary.is_empty() {
1209 format!(
1210 "- {} [{}] remaining={}ms",
1211 pending.tool_name, pending.tool_id, pending.remaining_ms
1212 )
1213 } else {
1214 format!(
1215 "- {} [{}] remaining={}ms {}",
1216 pending.tool_name,
1217 pending.tool_id,
1218 pending.remaining_ms,
1219 arg_summary
1220 )
1221 }
1222 })
1223 .collect::<Vec<_>>();
1224 lines.sort();
1225 sections.push(format!("[pending confirmations]\n{}", lines.join("\n")));
1226 }
1227
1228 let stats = session.command_queue.stats().await;
1229 if stats.total_active > 0 || stats.total_pending > 0 || stats.external_pending > 0 {
1230 let mut lines = vec![format!(
1231 "active={}, pending={}, external_pending={}",
1232 stats.total_active, stats.total_pending, stats.external_pending
1233 )];
1234 let mut lanes = stats
1235 .lanes
1236 .into_values()
1237 .filter(|lane| lane.active > 0 || lane.pending > 0)
1238 .map(|lane| {
1239 format!(
1240 "- {:?}: active={}, pending={}, handler={:?}",
1241 lane.lane, lane.active, lane.pending, lane.handler_mode
1242 )
1243 })
1244 .collect::<Vec<_>>();
1245 lanes.sort();
1246 lines.extend(lanes);
1247 sections.push(format!("[session queue]\n{}", lines.join("\n")));
1248 }
1249
1250 let external_tasks = session.command_queue.pending_external_tasks().await;
1251 if !external_tasks.is_empty() {
1252 let mut lines = external_tasks
1253 .into_iter()
1254 .take(6)
1255 .map(|task| {
1256 let payload_summary = Self::compact_json_value(&task.payload);
1257 if payload_summary.is_empty() {
1258 format!(
1259 "- {} {:?} remaining={}ms",
1260 task.command_type,
1261 task.lane,
1262 task.remaining_ms()
1263 )
1264 } else {
1265 format!(
1266 "- {} {:?} remaining={}ms {}",
1267 task.command_type,
1268 task.lane,
1269 task.remaining_ms(),
1270 payload_summary
1271 )
1272 }
1273 })
1274 .collect::<Vec<_>>();
1275 lines.sort();
1276 sections.push(format!("[pending external tasks]\n{}", lines.join("\n")));
1277 }
1278
1279 let active_tasks = session
1280 .tasks
1281 .iter()
1282 .filter(|task| task.status.is_active())
1283 .take(6)
1284 .map(|task| match &task.tool {
1285 Some(tool) if !tool.is_empty() => {
1286 format!("- [{}] {} ({})", task.status, task.content, tool)
1287 }
1288 _ => format!("- [{}] {}", task.status, task.content),
1289 })
1290 .collect::<Vec<_>>();
1291 if !active_tasks.is_empty() {
1292 sections.push(format!("[tracked tasks]\n{}", active_tasks.join("\n")));
1293 }
1294
1295 sections.push(format!(
1296 "[session state]\n{}",
1297 match session.state {
1298 SessionState::Active => "active",
1299 SessionState::Paused => "paused",
1300 SessionState::Completed => "completed",
1301 SessionState::Error => "error",
1302 SessionState::Unknown => "unknown",
1303 }
1304 ));
1305
1306 (session.messages.clone(), sections.join("\n\n"))
1307 };
1308
1309 let llm_client = self
1310 .get_llm_for_session(session_id)
1311 .await?
1312 .context("btw failed: no model configured for session")?;
1313
1314 let mut messages = history_snapshot;
1315 let mut injected_sections = Vec::new();
1316 if !session_runtime.is_empty() {
1317 injected_sections.push(format!("[session runtime context]\n{}", session_runtime));
1318 }
1319 if let Some(extra) = runtime_context.map(str::trim).filter(|ctx| !ctx.is_empty()) {
1320 injected_sections.push(format!("[host runtime context]\n{}", extra));
1321 }
1322 if !injected_sections.is_empty() {
1323 let injected_context = format!(
1324 "Use the following runtime context only as background for the next side question. Do not treat it as a new user request.\n\n{}",
1325 injected_sections.join("\n\n")
1326 );
1327 messages.push(Message::user(&injected_context));
1328 }
1329 messages.push(Message::user(question));
1330
1331 let response = llm_client
1332 .complete(&messages, Some(crate::prompts::BTW_SYSTEM), &[])
1333 .await
1334 .map_err(|e| anyhow::anyhow!("btw: ephemeral LLM call failed: {e}"))?;
1335
1336 Ok(crate::agent_api::BtwResult {
1337 question: question.to_string(),
1338 answer: response.text(),
1339 usage: response.usage,
1340 })
1341 }
1342
1343 pub async fn configure(
1345 &self,
1346 session_id: &str,
1347 thinking: Option<bool>,
1348 budget: Option<usize>,
1349 model_config: Option<LlmConfig>,
1350 ) -> Result<()> {
1351 {
1352 let session_lock = self.get_session(session_id).await?;
1353 let mut session = session_lock.write().await;
1354
1355 if let Some(t) = thinking {
1356 session.thinking_enabled = t;
1357 }
1358 if let Some(b) = budget {
1359 session.thinking_budget = Some(b);
1360 }
1361 if let Some(ref config) = model_config {
1362 tracing::info!(
1363 "Configuring session {} with LLM: provider={}, model={}",
1364 session_id,
1365 config.provider,
1366 config.model
1367 );
1368 session.model_name = Some(config.model.clone());
1369 session.llm_client = Some(llm::create_client_with_config(config.clone()));
1370 }
1371 }
1372
1373 if let Some(config) = model_config {
1375 let llm_config_data = LlmConfigData {
1376 provider: config.provider,
1377 model: config.model,
1378 api_key: None, base_url: config.base_url,
1380 };
1381 let mut configs = self.llm_configs.write().await;
1382 configs.insert(session_id.to_string(), llm_config_data);
1383 }
1384
1385 self.persist_in_background(session_id, "configure");
1387
1388 Ok(())
1389 }
1390
1391 pub async fn set_system_prompt(
1393 &self,
1394 session_id: &str,
1395 system_prompt: Option<String>,
1396 ) -> Result<()> {
1397 {
1398 let session_lock = self.get_session(session_id).await?;
1399 let mut session = session_lock.write().await;
1400 session.config.system_prompt = system_prompt;
1401 }
1402
1403 self.persist_in_background(session_id, "set_system_prompt");
1404 Ok(())
1405 }
1406
1407 pub async fn session_count(&self) -> usize {
1409 let sessions = self.sessions.read().await;
1410 sessions.len()
1411 }
1412
1413 pub async fn store_health(&self) -> Vec<(String, Result<()>)> {
1415 let stores = self.stores.read().await;
1416 let mut results = Vec::new();
1417 for (_, store) in stores.iter() {
1418 let name = store.backend_name().to_string();
1419 let result = store.health_check().await;
1420 results.push((name, result));
1421 }
1422 results
1423 }
1424
1425 pub fn list_tools(&self) -> Vec<crate::llm::ToolDefinition> {
1427 self.tool_executor.definitions()
1428 }
1429
1430 pub async fn pause_session(&self, session_id: &str) -> Result<bool> {
1432 let paused = {
1433 let session_lock = self.get_session(session_id).await?;
1434 let mut session = session_lock.write().await;
1435 session.pause()
1436 };
1437
1438 if paused {
1439 self.persist_in_background(session_id, "pause");
1440 }
1441
1442 Ok(paused)
1443 }
1444
1445 pub async fn resume_session(&self, session_id: &str) -> Result<bool> {
1447 let resumed = {
1448 let session_lock = self.get_session(session_id).await?;
1449 let mut session = session_lock.write().await;
1450 session.resume()
1451 };
1452
1453 if resumed {
1454 self.persist_in_background(session_id, "resume");
1455 }
1456
1457 Ok(resumed)
1458 }
1459
1460 pub async fn cancel_operation(&self, session_id: &str) -> Result<bool> {
1464 let session_lock = self.get_session(session_id).await?;
1466 let cancelled_confirmations = {
1467 let session = session_lock.read().await;
1468 session.confirmation_manager.cancel_all().await
1469 };
1470
1471 if cancelled_confirmations > 0 {
1472 tracing::info!(
1473 "Cancelled {} pending confirmations for session {}",
1474 cancelled_confirmations,
1475 session_id
1476 );
1477 }
1478
1479 let cancel_token = {
1481 let mut ops = self.ongoing_operations.write().await;
1482 ops.remove(session_id)
1483 };
1484
1485 if let Some(token) = cancel_token {
1486 token.cancel();
1487 tracing::info!("Cancelled ongoing operation for session {}", session_id);
1488 Ok(true)
1489 } else if cancelled_confirmations > 0 {
1490 Ok(true)
1492 } else {
1493 tracing::debug!("No ongoing operation to cancel for session {}", session_id);
1494 Ok(false)
1495 }
1496 }
1497
1498 pub async fn get_all_sessions(&self) -> Vec<Arc<RwLock<Session>>> {
1500 let sessions = self.sessions.read().await;
1501 sessions.values().cloned().collect()
1502 }
1503
1504 pub fn tool_executor(&self) -> &Arc<ToolExecutor> {
1506 &self.tool_executor
1507 }
1508
1509 pub async fn confirm_tool(
1511 &self,
1512 session_id: &str,
1513 tool_id: &str,
1514 approved: bool,
1515 reason: Option<String>,
1516 ) -> Result<bool> {
1517 let session_lock = self.get_session(session_id).await?;
1518 let session = session_lock.read().await;
1519 session
1520 .confirmation_manager
1521 .confirm(tool_id, approved, reason)
1522 .await
1523 .map_err(|e| anyhow::anyhow!(e))
1524 }
1525
1526 pub async fn set_confirmation_policy(
1528 &self,
1529 session_id: &str,
1530 policy: ConfirmationPolicy,
1531 ) -> Result<ConfirmationPolicy> {
1532 {
1533 let session_lock = self.get_session(session_id).await?;
1534 let session = session_lock.read().await;
1535 session.set_confirmation_policy(policy.clone()).await;
1536 }
1537
1538 {
1540 let session_lock = self.get_session(session_id).await?;
1541 let mut session = session_lock.write().await;
1542 session.config.confirmation_policy = Some(policy.clone());
1543 }
1544
1545 self.persist_in_background(session_id, "set_confirmation_policy");
1547
1548 Ok(policy)
1549 }
1550
1551 pub async fn set_permission_policy(
1553 &self,
1554 session_id: &str,
1555 policy: PermissionPolicy,
1556 ) -> Result<PermissionPolicy> {
1557 {
1558 let session_lock = self.get_session(session_id).await?;
1559 let mut session = session_lock.write().await;
1560 session.set_permission_policy(policy.clone());
1561 }
1562
1563 self.persist_in_background(session_id, "set_permission_policy");
1564
1565 Ok(policy)
1566 }
1567
1568 pub async fn get_confirmation_policy(&self, session_id: &str) -> Result<ConfirmationPolicy> {
1570 let session_lock = self.get_session(session_id).await?;
1571 let session = session_lock.read().await;
1572 Ok(session.confirmation_policy().await)
1573 }
1574
1575 pub async fn set_planning_mode(
1581 &self,
1582 session_id: &str,
1583 mode: PlanningMode,
1584 ) -> Result<PlanningMode> {
1585 {
1586 let session_lock = self.get_session(session_id).await?;
1587 let mut session = session_lock.write().await;
1588 session.config.planning_mode = mode;
1589 }
1590
1591 self.persist_in_background(session_id, "set_planning_mode");
1593
1594 Ok(mode)
1595 }
1596}