1use super::{ContextUsage, Session, SessionConfig, SessionState};
4use crate::agent::{AgentConfig, AgentEvent, AgentLoop, AgentResult};
5use crate::hitl::ConfirmationPolicy;
6use crate::llm::{self, LlmClient, LlmConfig, Message};
7use crate::mcp::McpManager;
8use crate::memory::AgentMemory;
9use crate::permissions::PermissionPolicy;
10use crate::prompts::SystemPromptSlots;
11use crate::skills::SkillRegistry;
12use crate::store::{FileSessionStore, LlmConfigData, SessionData, SessionStore};
13use crate::text::truncate_utf8;
14use crate::tools::ToolExecutor;
15use a3s_memory::MemoryStore;
16use anyhow::{Context, Result};
17use std::collections::HashMap;
18use std::sync::Arc;
19use tokio::sync::{mpsc, RwLock};
20
21#[derive(Clone)]
23pub struct SessionManager {
24 pub(crate) sessions: Arc<RwLock<HashMap<String, Arc<RwLock<Session>>>>>,
25 pub(crate) llm_client: Arc<RwLock<Option<Arc<dyn LlmClient>>>>,
28 pub(crate) tool_executor: Arc<ToolExecutor>,
29 pub(crate) stores: Arc<RwLock<HashMap<crate::config::StorageBackend, Arc<dyn SessionStore>>>>,
31 pub(crate) session_storage_types: Arc<RwLock<HashMap<String, crate::config::StorageBackend>>>,
33 pub(crate) llm_configs: Arc<RwLock<HashMap<String, LlmConfigData>>>,
35 pub(crate) ongoing_operations:
37 Arc<RwLock<HashMap<String, tokio_util::sync::CancellationToken>>>,
38 pub(crate) skill_registry: Arc<RwLock<Option<Arc<SkillRegistry>>>>,
40 pub(crate) session_skill_registries: Arc<RwLock<HashMap<String, Arc<SkillRegistry>>>>,
42 pub(crate) memory_store: Arc<RwLock<Option<Arc<dyn MemoryStore>>>>,
46 pub(crate) mcp_manager: Arc<RwLock<Option<Arc<McpManager>>>>,
49}
50
51impl SessionManager {
52 fn compact_json_value(value: &serde_json::Value) -> String {
53 let raw = match value {
54 serde_json::Value::Null => String::new(),
55 serde_json::Value::String(s) => s.clone(),
56 _ => serde_json::to_string(value).unwrap_or_default(),
57 };
58 let compact = raw.split_whitespace().collect::<Vec<_>>().join(" ");
59 if compact.len() > 180 {
60 format!("{}...", truncate_utf8(&compact, 180))
61 } else {
62 compact
63 }
64 }
65
66 pub fn new(llm_client: Option<Arc<dyn LlmClient>>, tool_executor: Arc<ToolExecutor>) -> Self {
68 Self {
69 sessions: Arc::new(RwLock::new(HashMap::new())),
70 llm_client: Arc::new(RwLock::new(llm_client)),
71 tool_executor,
72 stores: Arc::new(RwLock::new(HashMap::new())),
73 session_storage_types: Arc::new(RwLock::new(HashMap::new())),
74 llm_configs: Arc::new(RwLock::new(HashMap::new())),
75 ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
76 skill_registry: Arc::new(RwLock::new(None)),
77 session_skill_registries: Arc::new(RwLock::new(HashMap::new())),
78 memory_store: Arc::new(RwLock::new(None)),
79 mcp_manager: Arc::new(RwLock::new(None)),
80 }
81 }
82
83 pub async fn with_persistence<P: AsRef<std::path::Path>>(
87 llm_client: Option<Arc<dyn LlmClient>>,
88 tool_executor: Arc<ToolExecutor>,
89 sessions_dir: P,
90 ) -> Result<Self> {
91 let store = FileSessionStore::new(sessions_dir).await?;
92 let mut stores = HashMap::new();
93 stores.insert(
94 crate::config::StorageBackend::File,
95 Arc::new(store) as Arc<dyn SessionStore>,
96 );
97
98 let manager = Self {
99 sessions: Arc::new(RwLock::new(HashMap::new())),
100 llm_client: Arc::new(RwLock::new(llm_client)),
101 tool_executor,
102 stores: Arc::new(RwLock::new(stores)),
103 session_storage_types: Arc::new(RwLock::new(HashMap::new())),
104 llm_configs: Arc::new(RwLock::new(HashMap::new())),
105 ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
106 skill_registry: Arc::new(RwLock::new(None)),
107 session_skill_registries: Arc::new(RwLock::new(HashMap::new())),
108 memory_store: Arc::new(RwLock::new(None)),
109 mcp_manager: Arc::new(RwLock::new(None)),
110 };
111
112 Ok(manager)
113 }
114
115 pub fn with_store(
120 llm_client: Option<Arc<dyn LlmClient>>,
121 tool_executor: Arc<ToolExecutor>,
122 store: Arc<dyn SessionStore>,
123 backend: crate::config::StorageBackend,
124 ) -> Self {
125 let mut stores = HashMap::new();
126 stores.insert(backend, store);
127
128 Self {
129 sessions: Arc::new(RwLock::new(HashMap::new())),
130 llm_client: Arc::new(RwLock::new(llm_client)),
131 tool_executor,
132 stores: Arc::new(RwLock::new(stores)),
133 session_storage_types: Arc::new(RwLock::new(HashMap::new())),
134 llm_configs: Arc::new(RwLock::new(HashMap::new())),
135 ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
136 skill_registry: Arc::new(RwLock::new(None)),
137 session_skill_registries: Arc::new(RwLock::new(HashMap::new())),
138 memory_store: Arc::new(RwLock::new(None)),
139 mcp_manager: Arc::new(RwLock::new(None)),
140 }
141 }
142
143 pub async fn set_default_llm(&self, client: Option<Arc<dyn LlmClient>>) {
148 *self.llm_client.write().await = client;
149 }
150
151 pub async fn set_skill_registry(
158 &self,
159 registry: Arc<SkillRegistry>,
160 skills_dir: std::path::PathBuf,
161 ) {
162 let manage_tool = crate::skills::ManageSkillTool::new(registry.clone(), skills_dir);
164 self.tool_executor
165 .register_dynamic_tool(Arc::new(manage_tool));
166
167 *self.skill_registry.write().await = Some(registry);
168 }
169
170 pub async fn skill_registry(&self) -> Option<Arc<SkillRegistry>> {
172 self.skill_registry.read().await.clone()
173 }
174
175 pub async fn set_session_skill_registry(
177 &self,
178 session_id: impl Into<String>,
179 registry: Arc<SkillRegistry>,
180 ) {
181 self.session_skill_registries
182 .write()
183 .await
184 .insert(session_id.into(), registry);
185 }
186
187 pub async fn session_skill_registry(&self, session_id: &str) -> Option<Arc<SkillRegistry>> {
189 self.session_skill_registries
190 .read()
191 .await
192 .get(session_id)
193 .cloned()
194 }
195
196 pub async fn set_memory_store(&self, store: Arc<dyn MemoryStore>) {
203 *self.memory_store.write().await = Some(store);
204 }
205
206 pub async fn memory_store(&self) -> Option<Arc<dyn MemoryStore>> {
208 self.memory_store.read().await.clone()
209 }
210
211 pub async fn set_mcp_manager(&self, manager: Arc<McpManager>) {
216 let all_tools = manager.get_all_tools().await;
217 let mut by_server: HashMap<String, Vec<crate::mcp::McpTool>> = HashMap::new();
218 for (server, tool) in all_tools {
219 by_server.entry(server).or_default().push(tool);
220 }
221 for (server_name, tools) in by_server {
222 for tool in
223 crate::mcp::tools::create_mcp_tools(&server_name, tools, Arc::clone(&manager))
224 {
225 self.tool_executor.register_dynamic_tool(tool);
226 }
227 }
228 *self.mcp_manager.write().await = Some(manager);
229 }
230
231 pub async fn add_mcp_server(&self, config: crate::mcp::McpServerConfig) -> Result<()> {
236 let manager = {
237 let guard = self.mcp_manager.read().await;
238 match guard.clone() {
239 Some(m) => m,
240 None => {
241 drop(guard);
242 let m = Arc::new(McpManager::new());
243 *self.mcp_manager.write().await = Some(Arc::clone(&m));
244 m
245 }
246 }
247 };
248 let name = config.name.clone();
249 manager.register_server(config).await;
250 manager.connect(&name).await?;
251 let tools = manager.get_server_tools(&name).await;
252 for tool in crate::mcp::tools::create_mcp_tools(&name, tools, Arc::clone(&manager)) {
253 self.tool_executor.register_dynamic_tool(tool);
254 }
255 Ok(())
256 }
257
258 pub async fn remove_mcp_server(&self, name: &str) -> Result<()> {
262 let guard = self.mcp_manager.read().await;
263 if let Some(ref manager) = *guard {
264 manager.disconnect(name).await?;
265 }
266 self.tool_executor
267 .unregister_tools_by_prefix(&format!("mcp__{name}__"));
268 Ok(())
269 }
270
271 pub async fn mcp_status(
273 &self,
274 ) -> std::collections::HashMap<String, crate::mcp::McpServerStatus> {
275 let guard = self.mcp_manager.read().await;
276 match guard.as_ref() {
277 Some(m) => {
278 let m = Arc::clone(m);
279 drop(guard);
280 m.get_status().await
281 }
282 None => std::collections::HashMap::new(),
283 }
284 }
285
286 pub async fn restore_session_by_id(&self, session_id: &str) -> Result<()> {
291 {
293 let sessions = self.sessions.read().await;
294 if sessions.contains_key(session_id) {
295 return Ok(());
296 }
297 }
298
299 let stores = self.stores.read().await;
300 for (backend, store) in stores.iter() {
301 match store.load(session_id).await {
302 Ok(Some(data)) => {
303 {
304 let mut storage_types = self.session_storage_types.write().await;
305 storage_types.insert(data.id.clone(), backend.clone());
306 }
307 self.restore_session(data).await?;
308 return Ok(());
309 }
310 Ok(None) => continue,
311 Err(e) => {
312 tracing::warn!(
313 "Failed to load session {} from {:?}: {}",
314 session_id,
315 backend,
316 e
317 );
318 continue;
319 }
320 }
321 }
322
323 Err(anyhow::anyhow!(
324 "Session {} not found in any store",
325 session_id
326 ))
327 }
328
329 pub async fn load_all_sessions(&mut self) -> Result<usize> {
331 let stores = self.stores.read().await;
332 let mut loaded = 0;
333
334 for (backend, store) in stores.iter() {
335 let session_ids = match store.list().await {
336 Ok(ids) => ids,
337 Err(e) => {
338 tracing::warn!("Failed to list sessions from {:?} store: {}", backend, e);
339 continue;
340 }
341 };
342
343 for id in session_ids {
344 match store.load(&id).await {
345 Ok(Some(data)) => {
346 {
348 let mut storage_types = self.session_storage_types.write().await;
349 storage_types.insert(data.id.clone(), backend.clone());
350 }
351
352 if let Err(e) = self.restore_session(data).await {
353 tracing::warn!("Failed to restore session {}: {}", id, e);
354 } else {
355 loaded += 1;
356 }
357 }
358 Ok(None) => {
359 tracing::warn!("Session {} not found in store", id);
360 }
361 Err(e) => {
362 tracing::warn!("Failed to load session {}: {}", id, e);
363 }
364 }
365 }
366 }
367
368 tracing::info!("Loaded {} sessions from store", loaded);
369 Ok(loaded)
370 }
371
372 async fn restore_session(&self, data: SessionData) -> Result<()> {
374 let tools = self.tool_executor.definitions();
375 let mut session = Session::new(data.id.clone(), data.config.clone(), tools).await?;
376
377 session.restore_from_data(&data);
379
380 if let Some(llm_config) = &data.llm_config {
382 let mut configs = self.llm_configs.write().await;
383 configs.insert(data.id.clone(), llm_config.clone());
384 }
385
386 let mut sessions = self.sessions.write().await;
387 sessions.insert(data.id.clone(), Arc::new(RwLock::new(session)));
388
389 tracing::info!("Restored session: {}", data.id);
390 Ok(())
391 }
392
393 async fn save_session(&self, session_id: &str) -> Result<()> {
395 let storage_type = {
397 let storage_types = self.session_storage_types.read().await;
398 storage_types.get(session_id).cloned()
399 };
400
401 let Some(storage_type) = storage_type else {
402 return Ok(());
404 };
405
406 if storage_type == crate::config::StorageBackend::Memory {
408 return Ok(());
409 }
410
411 let stores = self.stores.read().await;
413 let Some(store) = stores.get(&storage_type) else {
414 tracing::warn!("No store available for storage type: {:?}", storage_type);
415 return Ok(());
416 };
417
418 let session_lock = self.get_session(session_id).await?;
419 let session = session_lock.read().await;
420
421 let llm_config = {
423 let configs = self.llm_configs.read().await;
424 configs.get(session_id).cloned()
425 };
426
427 let data = session.to_session_data(llm_config);
428 store.save(&data).await?;
429
430 tracing::debug!("Saved session: {}", session_id);
431 Ok(())
432 }
433
434 async fn persist_or_warn(&self, session_id: &str, operation: &str) {
438 if let Err(e) = self.save_session(session_id).await {
439 tracing::warn!(
440 "Failed to persist session {} after {}: {}",
441 session_id,
442 operation,
443 e
444 );
445 if let Ok(session_lock) = self.get_session(session_id).await {
447 let session = session_lock.read().await;
448 let _ = session.event_tx().send(AgentEvent::PersistenceFailed {
449 session_id: session_id.to_string(),
450 operation: operation.to_string(),
451 error: e.to_string(),
452 });
453 }
454 }
455 }
456
457 fn persist_in_background(&self, session_id: &str, operation: &str) {
462 let mgr = self.clone();
463 let sid = session_id.to_string();
464 let op = operation.to_string();
465 tokio::spawn(async move {
466 mgr.persist_or_warn(&sid, &op).await;
467 });
468 }
469
470 pub async fn create_session(&self, id: String, config: SessionConfig) -> Result<String> {
472 tracing::info!(name: "a3s.session.create", session_id = %id, "Creating session");
473
474 {
476 let mut storage_types = self.session_storage_types.write().await;
477 storage_types.insert(id.clone(), config.storage_type.clone());
478 }
479
480 let tools = self.tool_executor.definitions();
482 let mut session = Session::new(id.clone(), config, tools).await?;
483
484 session.start_queue().await?;
486
487 if session.config.max_context_length > 0 {
489 session.context_usage.max_tokens = session.config.max_context_length as usize;
490 }
491
492 {
493 let mut sessions = self.sessions.write().await;
494 sessions.insert(id.clone(), Arc::new(RwLock::new(session)));
495 }
496
497 self.persist_in_background(&id, "create");
499
500 tracing::info!("Created session: {}", id);
501 Ok(id)
502 }
503
504 pub async fn destroy_session(&self, id: &str) -> Result<()> {
506 tracing::info!(name: "a3s.session.destroy", session_id = %id, "Destroying session");
507
508 let storage_type = {
510 let storage_types = self.session_storage_types.read().await;
511 storage_types.get(id).cloned()
512 };
513
514 {
515 let mut sessions = self.sessions.write().await;
516 sessions.remove(id);
517 }
518
519 {
521 let mut configs = self.llm_configs.write().await;
522 configs.remove(id);
523 }
524
525 {
526 let mut registries = self.session_skill_registries.write().await;
527 registries.remove(id);
528 }
529
530 {
532 let mut storage_types = self.session_storage_types.write().await;
533 storage_types.remove(id);
534 }
535
536 if let Some(storage_type) = storage_type {
538 if storage_type != crate::config::StorageBackend::Memory {
539 let stores = self.stores.read().await;
540 if let Some(store) = stores.get(&storage_type) {
541 if let Err(e) = store.delete(id).await {
542 tracing::warn!("Failed to delete session {} from store: {}", id, e);
543 }
544 }
545 }
546 }
547
548 tracing::info!("Destroyed session: {}", id);
549 Ok(())
550 }
551
552 pub async fn get_session(&self, id: &str) -> Result<Arc<RwLock<Session>>> {
554 let sessions = self.sessions.read().await;
555 sessions
556 .get(id)
557 .cloned()
558 .context(format!("Session not found: {}", id))
559 }
560
561 pub async fn create_child_session(
566 &self,
567 parent_id: &str,
568 child_id: String,
569 mut config: SessionConfig,
570 ) -> Result<String> {
571 let parent_lock = self.get_session(parent_id).await?;
573 let parent_llm_client = {
574 let parent = parent_lock.read().await;
575
576 if config.confirmation_policy.is_none() {
578 let parent_policy = parent.confirmation_manager.policy().await;
579 config.confirmation_policy = Some(parent_policy);
580 }
581
582 parent.llm_client.clone()
583 };
584
585 config.parent_id = Some(parent_id.to_string());
587
588 let tools = self.tool_executor.definitions();
590 let mut session = Session::new(child_id.clone(), config, tools).await?;
591
592 if session.llm_client.is_none() {
594 let default_llm = self.llm_client.read().await.clone();
595 session.llm_client = parent_llm_client.or(default_llm);
596 }
597
598 session.start_queue().await?;
600
601 if session.config.max_context_length > 0 {
603 session.context_usage.max_tokens = session.config.max_context_length as usize;
604 }
605
606 {
607 let mut sessions = self.sessions.write().await;
608 sessions.insert(child_id.clone(), Arc::new(RwLock::new(session)));
609 }
610
611 self.persist_in_background(&child_id, "create_child");
613
614 tracing::info!(
615 "Created child session: {} (parent: {})",
616 child_id,
617 parent_id
618 );
619 Ok(child_id)
620 }
621
622 pub async fn get_child_sessions(&self, parent_id: &str) -> Vec<String> {
624 let sessions = self.sessions.read().await;
625 let mut children = Vec::new();
626
627 for (id, session_lock) in sessions.iter() {
628 let session = session_lock.read().await;
629 if session.parent_id.as_deref() == Some(parent_id) {
630 children.push(id.clone());
631 }
632 }
633
634 children
635 }
636
637 pub async fn is_child_session(&self, session_id: &str) -> Result<bool> {
639 let session_lock = self.get_session(session_id).await?;
640 let session = session_lock.read().await;
641 Ok(session.is_child_session())
642 }
643
644 pub async fn generate(&self, session_id: &str, prompt: &str) -> Result<AgentResult> {
646 let session_lock = self.get_session(session_id).await?;
647
648 {
650 let session = session_lock.read().await;
651 if session.state == SessionState::Paused {
652 anyhow::bail!(
653 "Session {} is paused. Call Resume before generating.",
654 session_id
655 );
656 }
657 }
658
659 let (
661 history,
662 system,
663 tools,
664 session_llm_client,
665 permission_checker,
666 confirmation_manager,
667 context_providers,
668 session_workspace,
669 tool_metrics,
670 hook_engine,
671 planning_mode,
672 goal_tracking,
673 ) = {
674 let session = session_lock.read().await;
675 (
676 session.messages.clone(),
677 session.system().map(String::from),
678 session.tools.clone(),
679 session.llm_client.clone(),
680 session.permission_checker.clone(),
681 session.confirmation_manager.clone(),
682 session.context_providers.clone(),
683 session.config.workspace.clone(),
684 session.tool_metrics.clone(),
685 session.config.hook_engine.clone(),
686 session.config.planning_mode,
687 session.config.goal_tracking,
688 )
689 };
690
691 let llm_client = if let Some(client) = session_llm_client {
693 client
694 } else if let Some(client) = self.llm_client.read().await.clone() {
695 client
696 } else {
697 anyhow::bail!(
698 "LLM client not configured for session {}. Please call Configure RPC with model configuration first.",
699 session_id
700 );
701 };
702
703 let tool_context = if session_workspace.is_empty() {
705 crate::tools::ToolContext::new(self.tool_executor.workspace().clone())
706 .with_session_id(session_id)
707 } else {
708 crate::tools::ToolContext::new(std::path::PathBuf::from(&session_workspace))
709 .with_session_id(session_id)
710 };
711 let tool_context = if let Some(command_env) = self.tool_executor.command_env() {
712 tool_context.with_command_env(command_env)
713 } else {
714 tool_context
715 };
716
717 let skill_registry = match self.session_skill_registry(session_id).await {
719 Some(registry) => Some(registry),
720 None => self.skill_registry.read().await.clone(),
721 };
722 let system = if let Some(ref registry) = skill_registry {
723 let skill_prompt = registry.to_system_prompt();
724 if skill_prompt.is_empty() {
725 system
726 } else {
727 match system {
728 Some(existing) => Some(format!("{}\n\n{}", existing, skill_prompt)),
729 None => Some(skill_prompt),
730 }
731 }
732 } else {
733 system
734 };
735
736 let effective_prompt = if let Some(ref registry) = skill_registry {
738 let skill_content = registry.match_skills(prompt);
739 if skill_content.is_empty() {
740 prompt.to_string()
741 } else {
742 format!("{}\n\n---\n\n{}", skill_content, prompt)
743 }
744 } else {
745 prompt.to_string()
746 };
747
748 let memory = self
750 .memory_store
751 .read()
752 .await
753 .as_ref()
754 .map(|store| Arc::new(AgentMemory::new(store.clone())));
755
756 let config = AgentConfig {
758 prompt_slots: match system {
759 Some(s) => SystemPromptSlots::from_legacy(s),
760 None => SystemPromptSlots::default(),
761 },
762 tools,
763 max_tool_rounds: 50,
764 permission_checker: Some(permission_checker),
765 confirmation_manager: Some(confirmation_manager),
766 context_providers,
767 planning_mode,
768 goal_tracking,
769 hook_engine,
770 skill_registry,
771 memory,
772 ..AgentConfig::default()
773 };
774
775 let agent = AgentLoop::new(llm_client, self.tool_executor.clone(), tool_context, config)
776 .with_tool_metrics(tool_metrics);
777
778 let result = agent
780 .execute_with_session(&history, &effective_prompt, Some(session_id), None, None)
781 .await?;
782
783 {
785 let mut session = session_lock.write().await;
786 session.messages = result.messages.clone();
787 session.update_usage(&result.usage);
788 }
789
790 self.persist_in_background(session_id, "generate");
792
793 if let Err(e) = self.maybe_auto_compact(session_id).await {
795 tracing::warn!("Auto-compact failed for session {}: {}", session_id, e);
796 }
797
798 Ok(result)
799 }
800
801 pub async fn generate_streaming(
803 &self,
804 session_id: &str,
805 prompt: &str,
806 ) -> Result<(
807 mpsc::Receiver<AgentEvent>,
808 tokio::task::JoinHandle<Result<AgentResult>>,
809 tokio_util::sync::CancellationToken,
810 )> {
811 let session_lock = self.get_session(session_id).await?;
812
813 {
815 let session = session_lock.read().await;
816 if session.state == SessionState::Paused {
817 anyhow::bail!(
818 "Session {} is paused. Call Resume before generating.",
819 session_id
820 );
821 }
822 }
823
824 let (
826 history,
827 system,
828 tools,
829 session_llm_client,
830 permission_checker,
831 confirmation_manager,
832 context_providers,
833 session_workspace,
834 tool_metrics,
835 hook_engine,
836 planning_mode,
837 goal_tracking,
838 ) = {
839 let session = session_lock.read().await;
840 (
841 session.messages.clone(),
842 session.system().map(String::from),
843 session.tools.clone(),
844 session.llm_client.clone(),
845 session.permission_checker.clone(),
846 session.confirmation_manager.clone(),
847 session.context_providers.clone(),
848 session.config.workspace.clone(),
849 session.tool_metrics.clone(),
850 session.config.hook_engine.clone(),
851 session.config.planning_mode,
852 session.config.goal_tracking,
853 )
854 };
855
856 let llm_client = if let Some(client) = session_llm_client {
858 client
859 } else if let Some(client) = self.llm_client.read().await.clone() {
860 client
861 } else {
862 anyhow::bail!(
863 "LLM client not configured for session {}. Please call Configure RPC with model configuration first.",
864 session_id
865 );
866 };
867
868 let tool_context = if session_workspace.is_empty() {
870 crate::tools::ToolContext::new(self.tool_executor.workspace().clone())
871 .with_session_id(session_id)
872 } else {
873 crate::tools::ToolContext::new(std::path::PathBuf::from(&session_workspace))
874 .with_session_id(session_id)
875 };
876 let tool_context = if let Some(command_env) = self.tool_executor.command_env() {
877 tool_context.with_command_env(command_env)
878 } else {
879 tool_context
880 };
881
882 let skill_registry = match self.session_skill_registry(session_id).await {
884 Some(registry) => Some(registry),
885 None => self.skill_registry.read().await.clone(),
886 };
887 let system = if let Some(ref registry) = skill_registry {
888 let skill_prompt = registry.to_system_prompt();
889 if skill_prompt.is_empty() {
890 system
891 } else {
892 match system {
893 Some(existing) => Some(format!("{}\n\n{}", existing, skill_prompt)),
894 None => Some(skill_prompt),
895 }
896 }
897 } else {
898 system
899 };
900
901 let effective_prompt = if let Some(ref registry) = skill_registry {
903 let skill_content = registry.match_skills(prompt);
904 if skill_content.is_empty() {
905 prompt.to_string()
906 } else {
907 format!("{}\n\n---\n\n{}", skill_content, prompt)
908 }
909 } else {
910 prompt.to_string()
911 };
912
913 let memory = self
915 .memory_store
916 .read()
917 .await
918 .as_ref()
919 .map(|store| Arc::new(AgentMemory::new(store.clone())));
920
921 let config = AgentConfig {
923 prompt_slots: match system {
924 Some(s) => SystemPromptSlots::from_legacy(s),
925 None => SystemPromptSlots::default(),
926 },
927 tools,
928 max_tool_rounds: 50,
929 permission_checker: Some(permission_checker),
930 confirmation_manager: Some(confirmation_manager),
931 context_providers,
932 planning_mode,
933 goal_tracking,
934 hook_engine,
935 skill_registry,
936 memory,
937 ..AgentConfig::default()
938 };
939
940 let agent = AgentLoop::new(llm_client, self.tool_executor.clone(), tool_context, config)
941 .with_tool_metrics(tool_metrics);
942
943 let (rx, handle, cancel_token) =
945 agent.execute_streaming(&history, &effective_prompt).await?;
946
947 let cancel_token_clone = cancel_token.clone();
949 {
950 let mut ops = self.ongoing_operations.write().await;
951 ops.insert(session_id.to_string(), cancel_token);
952 }
953
954 let session_lock_clone = session_lock.clone();
956 let original_handle = handle;
957 let stores = self.stores.clone();
958 let session_storage_types = self.session_storage_types.clone();
959 let llm_configs = self.llm_configs.clone();
960 let session_id_owned = session_id.to_string();
961 let ongoing_operations = self.ongoing_operations.clone();
962 let session_manager = self.clone();
963
964 let wrapped_handle = tokio::spawn(async move {
965 let result = original_handle.await??;
966
967 {
969 let mut ops = ongoing_operations.write().await;
970 ops.remove(&session_id_owned);
971 }
972
973 {
975 let mut session = session_lock_clone.write().await;
976 session.messages = result.messages.clone();
977 session.update_usage(&result.usage);
978 }
979
980 let storage_type = {
982 let storage_types = session_storage_types.read().await;
983 storage_types.get(&session_id_owned).cloned()
984 };
985
986 if let Some(storage_type) = storage_type {
987 if storage_type != crate::config::StorageBackend::Memory {
988 let stores_guard = stores.read().await;
989 if let Some(store) = stores_guard.get(&storage_type) {
990 let session = session_lock_clone.read().await;
991 let llm_config = {
992 let configs = llm_configs.read().await;
993 configs.get(&session_id_owned).cloned()
994 };
995 let data = session.to_session_data(llm_config);
996 if let Err(e) = store.save(&data).await {
997 tracing::warn!(
998 "Failed to persist session {} after streaming: {}",
999 session_id_owned,
1000 e
1001 );
1002 }
1003 }
1004 }
1005 }
1006
1007 if let Err(e) = session_manager.maybe_auto_compact(&session_id_owned).await {
1009 tracing::warn!(
1010 "Auto-compact failed for session {}: {}",
1011 session_id_owned,
1012 e
1013 );
1014 }
1015
1016 Ok(result)
1017 });
1018
1019 Ok((rx, wrapped_handle, cancel_token_clone))
1020 }
1021
1022 pub async fn context_usage(&self, session_id: &str) -> Result<ContextUsage> {
1024 let session_lock = self.get_session(session_id).await?;
1025 let session = session_lock.read().await;
1026 Ok(session.context_usage.clone())
1027 }
1028
1029 pub async fn history(&self, session_id: &str) -> Result<Vec<Message>> {
1031 let session_lock = self.get_session(session_id).await?;
1032 let session = session_lock.read().await;
1033 Ok(session.messages.clone())
1034 }
1035
1036 pub async fn clear(&self, session_id: &str) -> Result<()> {
1038 {
1039 let session_lock = self.get_session(session_id).await?;
1040 let mut session = session_lock.write().await;
1041 session.clear();
1042 }
1043
1044 self.persist_in_background(session_id, "clear");
1046
1047 Ok(())
1048 }
1049
1050 pub async fn compact(&self, session_id: &str) -> Result<()> {
1052 tracing::info!(name: "a3s.session.compact", session_id = %session_id, "Compacting session context");
1053
1054 {
1055 let session_lock = self.get_session(session_id).await?;
1056 let mut session = session_lock.write().await;
1057
1058 let llm_client = if let Some(client) = &session.llm_client {
1060 client.clone()
1061 } else if let Some(client) = self.llm_client.read().await.clone() {
1062 client
1063 } else {
1064 tracing::warn!("No LLM client configured for compaction, using simple truncation");
1066 let keep_messages = 20;
1067 if session.messages.len() > keep_messages {
1068 let len = session.messages.len();
1069 session.messages = session.messages.split_off(len - keep_messages);
1070 }
1071 drop(session);
1073 self.persist_in_background(session_id, "compact");
1074 return Ok(());
1075 };
1076
1077 session.compact(&llm_client).await?;
1078 }
1079
1080 self.persist_in_background(session_id, "compact");
1082
1083 Ok(())
1084 }
1085
1086 pub async fn maybe_auto_compact(&self, session_id: &str) -> Result<bool> {
1093 let (should_compact, percent_before, messages_before) = {
1094 let session_lock = self.get_session(session_id).await?;
1095 let session = session_lock.read().await;
1096
1097 if !session.config.auto_compact {
1098 return Ok(false);
1099 }
1100
1101 let threshold = session.config.auto_compact_threshold;
1102 let percent = session.context_usage.percent;
1103 let msg_count = session.messages.len();
1104
1105 tracing::debug!(
1106 "Auto-compact check for session {}: percent={:.2}%, threshold={:.2}%, messages={}",
1107 session_id,
1108 percent * 100.0,
1109 threshold * 100.0,
1110 msg_count,
1111 );
1112
1113 (percent >= threshold, percent, msg_count)
1114 };
1115
1116 if !should_compact {
1117 return Ok(false);
1118 }
1119
1120 tracing::info!(
1121 name: "a3s.session.auto_compact",
1122 session_id = %session_id,
1123 percent_before = %format!("{:.1}%", percent_before * 100.0),
1124 messages_before = %messages_before,
1125 "Auto-compacting session due to high context usage"
1126 );
1127
1128 self.compact(session_id).await?;
1130
1131 let messages_after = {
1133 let session_lock = self.get_session(session_id).await?;
1134 let session = session_lock.read().await;
1135 session.messages.len()
1136 };
1137
1138 let event = AgentEvent::ContextCompacted {
1140 session_id: session_id.to_string(),
1141 before_messages: messages_before,
1142 after_messages: messages_after,
1143 percent_before,
1144 };
1145
1146 if let Ok(session_lock) = self.get_session(session_id).await {
1148 let session = session_lock.read().await;
1149 let _ = session.event_tx.send(event);
1150 }
1151
1152 tracing::info!(
1153 name: "a3s.session.auto_compact.done",
1154 session_id = %session_id,
1155 messages_before = %messages_before,
1156 messages_after = %messages_after,
1157 "Auto-compaction complete"
1158 );
1159
1160 Ok(true)
1161 }
1162
1163 pub async fn get_llm_for_session(
1167 &self,
1168 session_id: &str,
1169 ) -> Result<Option<Arc<dyn LlmClient>>> {
1170 let session_lock = self.get_session(session_id).await?;
1171 let session = session_lock.read().await;
1172
1173 if let Some(client) = &session.llm_client {
1174 return Ok(Some(client.clone()));
1175 }
1176
1177 Ok(self.llm_client.read().await.clone())
1178 }
1179
1180 pub async fn btw_with_context(
1186 &self,
1187 session_id: &str,
1188 question: &str,
1189 runtime_context: Option<&str>,
1190 ) -> Result<crate::agent_api::BtwResult> {
1191 let question = question.trim();
1192 if question.is_empty() {
1193 anyhow::bail!("btw: question cannot be empty");
1194 }
1195
1196 let session_lock = self.get_session(session_id).await?;
1197 let (history_snapshot, session_runtime) = {
1198 let session = session_lock.read().await;
1199 let mut sections = Vec::new();
1200
1201 let pending_confirmations = session.confirmation_manager.pending_confirmations().await;
1202 if !pending_confirmations.is_empty() {
1203 let mut lines = pending_confirmations
1204 .into_iter()
1205 .map(|pending| {
1206 let arg_summary = Self::compact_json_value(&pending.args);
1207 if arg_summary.is_empty() {
1208 format!(
1209 "- {} [{}] remaining={}ms",
1210 pending.tool_name, pending.tool_id, pending.remaining_ms
1211 )
1212 } else {
1213 format!(
1214 "- {} [{}] remaining={}ms {}",
1215 pending.tool_name,
1216 pending.tool_id,
1217 pending.remaining_ms,
1218 arg_summary
1219 )
1220 }
1221 })
1222 .collect::<Vec<_>>();
1223 lines.sort();
1224 sections.push(format!("[pending confirmations]\n{}", lines.join("\n")));
1225 }
1226
1227 let stats = session.command_queue.stats().await;
1228 if stats.total_active > 0 || stats.total_pending > 0 || stats.external_pending > 0 {
1229 let mut lines = vec![format!(
1230 "active={}, pending={}, external_pending={}",
1231 stats.total_active, stats.total_pending, stats.external_pending
1232 )];
1233 let mut lanes = stats
1234 .lanes
1235 .into_values()
1236 .filter(|lane| lane.active > 0 || lane.pending > 0)
1237 .map(|lane| {
1238 format!(
1239 "- {:?}: active={}, pending={}, handler={:?}",
1240 lane.lane, lane.active, lane.pending, lane.handler_mode
1241 )
1242 })
1243 .collect::<Vec<_>>();
1244 lanes.sort();
1245 lines.extend(lanes);
1246 sections.push(format!("[session queue]\n{}", lines.join("\n")));
1247 }
1248
1249 let external_tasks = session.command_queue.pending_external_tasks().await;
1250 if !external_tasks.is_empty() {
1251 let mut lines = external_tasks
1252 .into_iter()
1253 .take(6)
1254 .map(|task| {
1255 let payload_summary = Self::compact_json_value(&task.payload);
1256 if payload_summary.is_empty() {
1257 format!(
1258 "- {} {:?} remaining={}ms",
1259 task.command_type,
1260 task.lane,
1261 task.remaining_ms()
1262 )
1263 } else {
1264 format!(
1265 "- {} {:?} remaining={}ms {}",
1266 task.command_type,
1267 task.lane,
1268 task.remaining_ms(),
1269 payload_summary
1270 )
1271 }
1272 })
1273 .collect::<Vec<_>>();
1274 lines.sort();
1275 sections.push(format!("[pending external tasks]\n{}", lines.join("\n")));
1276 }
1277
1278 let active_tasks = session
1279 .tasks
1280 .iter()
1281 .filter(|task| task.status.is_active())
1282 .take(6)
1283 .map(|task| match &task.tool {
1284 Some(tool) if !tool.is_empty() => {
1285 format!("- [{}] {} ({})", task.status, task.content, tool)
1286 }
1287 _ => format!("- [{}] {}", task.status, task.content),
1288 })
1289 .collect::<Vec<_>>();
1290 if !active_tasks.is_empty() {
1291 sections.push(format!("[tracked tasks]\n{}", active_tasks.join("\n")));
1292 }
1293
1294 sections.push(format!(
1295 "[session state]\n{}",
1296 match session.state {
1297 SessionState::Active => "active",
1298 SessionState::Paused => "paused",
1299 SessionState::Completed => "completed",
1300 SessionState::Error => "error",
1301 SessionState::Unknown => "unknown",
1302 }
1303 ));
1304
1305 (session.messages.clone(), sections.join("\n\n"))
1306 };
1307
1308 let llm_client = self
1309 .get_llm_for_session(session_id)
1310 .await?
1311 .context("btw failed: no model configured for session")?;
1312
1313 let mut messages = history_snapshot;
1314 let mut injected_sections = Vec::new();
1315 if !session_runtime.is_empty() {
1316 injected_sections.push(format!("[session runtime context]\n{}", session_runtime));
1317 }
1318 if let Some(extra) = runtime_context.map(str::trim).filter(|ctx| !ctx.is_empty()) {
1319 injected_sections.push(format!("[host runtime context]\n{}", extra));
1320 }
1321 if !injected_sections.is_empty() {
1322 let injected_context = format!(
1323 "Use the following runtime context only as background for the next side question. Do not treat it as a new user request.\n\n{}",
1324 injected_sections.join("\n\n")
1325 );
1326 messages.push(Message::user(&injected_context));
1327 }
1328 messages.push(Message::user(question));
1329
1330 let response = llm_client
1331 .complete(&messages, Some(crate::prompts::BTW_SYSTEM), &[])
1332 .await
1333 .map_err(|e| anyhow::anyhow!("btw: ephemeral LLM call failed: {e}"))?;
1334
1335 Ok(crate::agent_api::BtwResult {
1336 question: question.to_string(),
1337 answer: response.text(),
1338 usage: response.usage,
1339 })
1340 }
1341
1342 pub async fn configure(
1344 &self,
1345 session_id: &str,
1346 thinking: Option<bool>,
1347 budget: Option<usize>,
1348 model_config: Option<LlmConfig>,
1349 ) -> Result<()> {
1350 {
1351 let session_lock = self.get_session(session_id).await?;
1352 let mut session = session_lock.write().await;
1353
1354 if let Some(t) = thinking {
1355 session.thinking_enabled = t;
1356 }
1357 if let Some(b) = budget {
1358 session.thinking_budget = Some(b);
1359 }
1360 if let Some(ref config) = model_config {
1361 tracing::info!(
1362 "Configuring session {} with LLM: provider={}, model={}",
1363 session_id,
1364 config.provider,
1365 config.model
1366 );
1367 session.model_name = Some(config.model.clone());
1368 session.llm_client = Some(llm::create_client_with_config(config.clone()));
1369 }
1370 }
1371
1372 if let Some(config) = model_config {
1374 let llm_config_data = LlmConfigData {
1375 provider: config.provider,
1376 model: config.model,
1377 api_key: None, base_url: config.base_url,
1379 };
1380 let mut configs = self.llm_configs.write().await;
1381 configs.insert(session_id.to_string(), llm_config_data);
1382 }
1383
1384 self.persist_in_background(session_id, "configure");
1386
1387 Ok(())
1388 }
1389
1390 pub async fn set_system_prompt(
1392 &self,
1393 session_id: &str,
1394 system_prompt: Option<String>,
1395 ) -> Result<()> {
1396 {
1397 let session_lock = self.get_session(session_id).await?;
1398 let mut session = session_lock.write().await;
1399 session.config.system_prompt = system_prompt;
1400 }
1401
1402 self.persist_in_background(session_id, "set_system_prompt");
1403 Ok(())
1404 }
1405
1406 pub async fn session_count(&self) -> usize {
1408 let sessions = self.sessions.read().await;
1409 sessions.len()
1410 }
1411
1412 pub async fn store_health(&self) -> Vec<(String, Result<()>)> {
1414 let stores = self.stores.read().await;
1415 let mut results = Vec::new();
1416 for (_, store) in stores.iter() {
1417 let name = store.backend_name().to_string();
1418 let result = store.health_check().await;
1419 results.push((name, result));
1420 }
1421 results
1422 }
1423
1424 pub fn list_tools(&self) -> Vec<crate::llm::ToolDefinition> {
1426 self.tool_executor.definitions()
1427 }
1428
1429 pub async fn pause_session(&self, session_id: &str) -> Result<bool> {
1431 let paused = {
1432 let session_lock = self.get_session(session_id).await?;
1433 let mut session = session_lock.write().await;
1434 session.pause()
1435 };
1436
1437 if paused {
1438 self.persist_in_background(session_id, "pause");
1439 }
1440
1441 Ok(paused)
1442 }
1443
1444 pub async fn resume_session(&self, session_id: &str) -> Result<bool> {
1446 let resumed = {
1447 let session_lock = self.get_session(session_id).await?;
1448 let mut session = session_lock.write().await;
1449 session.resume()
1450 };
1451
1452 if resumed {
1453 self.persist_in_background(session_id, "resume");
1454 }
1455
1456 Ok(resumed)
1457 }
1458
1459 pub async fn cancel_operation(&self, session_id: &str) -> Result<bool> {
1463 let session_lock = self.get_session(session_id).await?;
1465 let cancelled_confirmations = {
1466 let session = session_lock.read().await;
1467 session.confirmation_manager.cancel_all().await
1468 };
1469
1470 if cancelled_confirmations > 0 {
1471 tracing::info!(
1472 "Cancelled {} pending confirmations for session {}",
1473 cancelled_confirmations,
1474 session_id
1475 );
1476 }
1477
1478 let cancel_token = {
1480 let mut ops = self.ongoing_operations.write().await;
1481 ops.remove(session_id)
1482 };
1483
1484 if let Some(token) = cancel_token {
1485 token.cancel();
1486 tracing::info!("Cancelled ongoing operation for session {}", session_id);
1487 Ok(true)
1488 } else if cancelled_confirmations > 0 {
1489 Ok(true)
1491 } else {
1492 tracing::debug!("No ongoing operation to cancel for session {}", session_id);
1493 Ok(false)
1494 }
1495 }
1496
1497 pub async fn get_all_sessions(&self) -> Vec<Arc<RwLock<Session>>> {
1499 let sessions = self.sessions.read().await;
1500 sessions.values().cloned().collect()
1501 }
1502
1503 pub fn tool_executor(&self) -> &Arc<ToolExecutor> {
1505 &self.tool_executor
1506 }
1507
1508 pub async fn confirm_tool(
1510 &self,
1511 session_id: &str,
1512 tool_id: &str,
1513 approved: bool,
1514 reason: Option<String>,
1515 ) -> Result<bool> {
1516 let session_lock = self.get_session(session_id).await?;
1517 let session = session_lock.read().await;
1518 session
1519 .confirmation_manager
1520 .confirm(tool_id, approved, reason)
1521 .await
1522 .map_err(|e| anyhow::anyhow!(e))
1523 }
1524
1525 pub async fn set_confirmation_policy(
1527 &self,
1528 session_id: &str,
1529 policy: ConfirmationPolicy,
1530 ) -> Result<ConfirmationPolicy> {
1531 {
1532 let session_lock = self.get_session(session_id).await?;
1533 let session = session_lock.read().await;
1534 session.set_confirmation_policy(policy.clone()).await;
1535 }
1536
1537 {
1539 let session_lock = self.get_session(session_id).await?;
1540 let mut session = session_lock.write().await;
1541 session.config.confirmation_policy = Some(policy.clone());
1542 }
1543
1544 self.persist_in_background(session_id, "set_confirmation_policy");
1546
1547 Ok(policy)
1548 }
1549
1550 pub async fn set_permission_policy(
1552 &self,
1553 session_id: &str,
1554 policy: PermissionPolicy,
1555 ) -> Result<PermissionPolicy> {
1556 {
1557 let session_lock = self.get_session(session_id).await?;
1558 let mut session = session_lock.write().await;
1559 session.set_permission_policy(policy.clone());
1560 }
1561
1562 self.persist_in_background(session_id, "set_permission_policy");
1563
1564 Ok(policy)
1565 }
1566
1567 pub async fn get_confirmation_policy(&self, session_id: &str) -> Result<ConfirmationPolicy> {
1569 let session_lock = self.get_session(session_id).await?;
1570 let session = session_lock.read().await;
1571 Ok(session.confirmation_policy().await)
1572 }
1573}