1use super::{ContextUsage, Session, SessionConfig, SessionState};
4use crate::agent::{AgentConfig, AgentEvent, AgentLoop, AgentResult};
5use crate::hitl::ConfirmationPolicy;
6use crate::llm::{self, LlmClient, LlmConfig, Message};
7use crate::mcp::McpManager;
8use crate::memory::AgentMemory;
9use crate::permissions::PermissionPolicy;
10use crate::prompts::SystemPromptSlots;
11use crate::skills::SkillRegistry;
12use crate::store::{FileSessionStore, LlmConfigData, SessionData, SessionStore};
13use crate::tools::ToolExecutor;
14use crate::DocumentParserRegistry;
15use a3s_memory::MemoryStore;
16use anyhow::{Context, Result};
17use std::collections::HashMap;
18use std::sync::Arc;
19use tokio::sync::{mpsc, RwLock};
20
21#[derive(Clone)]
23pub struct SessionManager {
24 pub(crate) sessions: Arc<RwLock<HashMap<String, Arc<RwLock<Session>>>>>,
25 pub(crate) llm_client: Arc<RwLock<Option<Arc<dyn LlmClient>>>>,
28 pub(crate) tool_executor: Arc<ToolExecutor>,
29 pub(crate) stores: Arc<RwLock<HashMap<crate::config::StorageBackend, Arc<dyn SessionStore>>>>,
31 pub(crate) session_storage_types: Arc<RwLock<HashMap<String, crate::config::StorageBackend>>>,
33 pub(crate) llm_configs: Arc<RwLock<HashMap<String, LlmConfigData>>>,
35 pub(crate) ongoing_operations:
37 Arc<RwLock<HashMap<String, tokio_util::sync::CancellationToken>>>,
38 pub(crate) skill_registry: Arc<RwLock<Option<Arc<SkillRegistry>>>>,
40 pub(crate) session_skill_registries: Arc<RwLock<HashMap<String, Arc<SkillRegistry>>>>,
42 pub(crate) memory_store: Arc<RwLock<Option<Arc<dyn MemoryStore>>>>,
46 pub(crate) mcp_manager: Arc<RwLock<Option<Arc<McpManager>>>>,
49 pub(crate) document_parser_registry: Arc<RwLock<Option<Arc<DocumentParserRegistry>>>>,
51}
52
53impl SessionManager {
54 fn compact_json_value(value: &serde_json::Value) -> String {
55 let raw = match value {
56 serde_json::Value::Null => String::new(),
57 serde_json::Value::String(s) => s.clone(),
58 _ => serde_json::to_string(value).unwrap_or_default(),
59 };
60 let compact = raw.split_whitespace().collect::<Vec<_>>().join(" ");
61 if compact.len() > 180 {
62 format!("{}...", &compact[..180])
63 } else {
64 compact
65 }
66 }
67
68 pub fn new(llm_client: Option<Arc<dyn LlmClient>>, tool_executor: Arc<ToolExecutor>) -> Self {
70 Self {
71 sessions: Arc::new(RwLock::new(HashMap::new())),
72 llm_client: Arc::new(RwLock::new(llm_client)),
73 tool_executor,
74 stores: Arc::new(RwLock::new(HashMap::new())),
75 session_storage_types: Arc::new(RwLock::new(HashMap::new())),
76 llm_configs: Arc::new(RwLock::new(HashMap::new())),
77 ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
78 skill_registry: Arc::new(RwLock::new(None)),
79 session_skill_registries: Arc::new(RwLock::new(HashMap::new())),
80 memory_store: Arc::new(RwLock::new(None)),
81 mcp_manager: Arc::new(RwLock::new(None)),
82 document_parser_registry: Arc::new(RwLock::new(None)),
83 }
84 }
85
86 pub async fn with_persistence<P: AsRef<std::path::Path>>(
90 llm_client: Option<Arc<dyn LlmClient>>,
91 tool_executor: Arc<ToolExecutor>,
92 sessions_dir: P,
93 ) -> Result<Self> {
94 let store = FileSessionStore::new(sessions_dir).await?;
95 let mut stores = HashMap::new();
96 stores.insert(
97 crate::config::StorageBackend::File,
98 Arc::new(store) as Arc<dyn SessionStore>,
99 );
100
101 let manager = Self {
102 sessions: Arc::new(RwLock::new(HashMap::new())),
103 llm_client: Arc::new(RwLock::new(llm_client)),
104 tool_executor,
105 stores: Arc::new(RwLock::new(stores)),
106 session_storage_types: Arc::new(RwLock::new(HashMap::new())),
107 llm_configs: Arc::new(RwLock::new(HashMap::new())),
108 ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
109 skill_registry: Arc::new(RwLock::new(None)),
110 session_skill_registries: Arc::new(RwLock::new(HashMap::new())),
111 memory_store: Arc::new(RwLock::new(None)),
112 mcp_manager: Arc::new(RwLock::new(None)),
113 document_parser_registry: Arc::new(RwLock::new(None)),
114 };
115
116 Ok(manager)
117 }
118
119 pub fn with_store(
124 llm_client: Option<Arc<dyn LlmClient>>,
125 tool_executor: Arc<ToolExecutor>,
126 store: Arc<dyn SessionStore>,
127 backend: crate::config::StorageBackend,
128 ) -> Self {
129 let mut stores = HashMap::new();
130 stores.insert(backend, store);
131
132 Self {
133 sessions: Arc::new(RwLock::new(HashMap::new())),
134 llm_client: Arc::new(RwLock::new(llm_client)),
135 tool_executor,
136 stores: Arc::new(RwLock::new(stores)),
137 session_storage_types: Arc::new(RwLock::new(HashMap::new())),
138 llm_configs: Arc::new(RwLock::new(HashMap::new())),
139 ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
140 skill_registry: Arc::new(RwLock::new(None)),
141 session_skill_registries: Arc::new(RwLock::new(HashMap::new())),
142 memory_store: Arc::new(RwLock::new(None)),
143 mcp_manager: Arc::new(RwLock::new(None)),
144 document_parser_registry: Arc::new(RwLock::new(None)),
145 }
146 }
147
148 pub async fn set_default_llm(&self, client: Option<Arc<dyn LlmClient>>) {
153 *self.llm_client.write().await = client;
154 }
155
156 pub async fn set_skill_registry(
163 &self,
164 registry: Arc<SkillRegistry>,
165 skills_dir: std::path::PathBuf,
166 ) {
167 let manage_tool = crate::skills::ManageSkillTool::new(registry.clone(), skills_dir);
169 self.tool_executor
170 .register_dynamic_tool(Arc::new(manage_tool));
171
172 *self.skill_registry.write().await = Some(registry);
173 }
174
175 pub async fn skill_registry(&self) -> Option<Arc<SkillRegistry>> {
177 self.skill_registry.read().await.clone()
178 }
179
180 pub async fn set_session_skill_registry(
182 &self,
183 session_id: impl Into<String>,
184 registry: Arc<SkillRegistry>,
185 ) {
186 self.session_skill_registries
187 .write()
188 .await
189 .insert(session_id.into(), registry);
190 }
191
192 pub async fn session_skill_registry(&self, session_id: &str) -> Option<Arc<SkillRegistry>> {
194 self.session_skill_registries
195 .read()
196 .await
197 .get(session_id)
198 .cloned()
199 }
200
201 pub async fn set_memory_store(&self, store: Arc<dyn MemoryStore>) {
208 *self.memory_store.write().await = Some(store);
209 }
210
211 pub async fn memory_store(&self) -> Option<Arc<dyn MemoryStore>> {
213 self.memory_store.read().await.clone()
214 }
215
216 pub async fn set_mcp_manager(&self, manager: Arc<McpManager>) {
221 let all_tools = manager.get_all_tools().await;
222 let mut by_server: HashMap<String, Vec<crate::mcp::McpTool>> = HashMap::new();
223 for (server, tool) in all_tools {
224 by_server.entry(server).or_default().push(tool);
225 }
226 for (server_name, tools) in by_server {
227 for tool in
228 crate::mcp::tools::create_mcp_tools(&server_name, tools, Arc::clone(&manager))
229 {
230 self.tool_executor.register_dynamic_tool(tool);
231 }
232 }
233 *self.mcp_manager.write().await = Some(manager);
234 }
235
236 pub async fn add_mcp_server(&self, config: crate::mcp::McpServerConfig) -> Result<()> {
241 let manager = {
242 let guard = self.mcp_manager.read().await;
243 match guard.clone() {
244 Some(m) => m,
245 None => {
246 drop(guard);
247 let m = Arc::new(McpManager::new());
248 *self.mcp_manager.write().await = Some(Arc::clone(&m));
249 m
250 }
251 }
252 };
253 let name = config.name.clone();
254 manager.register_server(config).await;
255 manager.connect(&name).await?;
256 let tools = manager.get_server_tools(&name).await;
257 for tool in crate::mcp::tools::create_mcp_tools(&name, tools, Arc::clone(&manager)) {
258 self.tool_executor.register_dynamic_tool(tool);
259 }
260 Ok(())
261 }
262
263 pub async fn remove_mcp_server(&self, name: &str) -> Result<()> {
267 let guard = self.mcp_manager.read().await;
268 if let Some(ref manager) = *guard {
269 manager.disconnect(name).await?;
270 }
271 self.tool_executor
272 .unregister_tools_by_prefix(&format!("mcp__{name}__"));
273 Ok(())
274 }
275
276 pub async fn mcp_status(
278 &self,
279 ) -> std::collections::HashMap<String, crate::mcp::McpServerStatus> {
280 let guard = self.mcp_manager.read().await;
281 match guard.as_ref() {
282 Some(m) => {
283 let m = Arc::clone(m);
284 drop(guard);
285 m.get_status().await
286 }
287 None => std::collections::HashMap::new(),
288 }
289 }
290
291 pub async fn set_document_parser_registry(&self, registry: Arc<DocumentParserRegistry>) {
293 *self.document_parser_registry.write().await = Some(registry);
294 }
295
296 pub async fn document_parser_registry(&self) -> Option<Arc<DocumentParserRegistry>> {
298 self.document_parser_registry.read().await.clone()
299 }
300
301 pub async fn restore_session_by_id(&self, session_id: &str) -> Result<()> {
306 {
308 let sessions = self.sessions.read().await;
309 if sessions.contains_key(session_id) {
310 return Ok(());
311 }
312 }
313
314 let stores = self.stores.read().await;
315 for (backend, store) in stores.iter() {
316 match store.load(session_id).await {
317 Ok(Some(data)) => {
318 {
319 let mut storage_types = self.session_storage_types.write().await;
320 storage_types.insert(data.id.clone(), backend.clone());
321 }
322 self.restore_session(data).await?;
323 return Ok(());
324 }
325 Ok(None) => continue,
326 Err(e) => {
327 tracing::warn!(
328 "Failed to load session {} from {:?}: {}",
329 session_id,
330 backend,
331 e
332 );
333 continue;
334 }
335 }
336 }
337
338 Err(anyhow::anyhow!(
339 "Session {} not found in any store",
340 session_id
341 ))
342 }
343
344 pub async fn load_all_sessions(&mut self) -> Result<usize> {
346 let stores = self.stores.read().await;
347 let mut loaded = 0;
348
349 for (backend, store) in stores.iter() {
350 let session_ids = match store.list().await {
351 Ok(ids) => ids,
352 Err(e) => {
353 tracing::warn!("Failed to list sessions from {:?} store: {}", backend, e);
354 continue;
355 }
356 };
357
358 for id in session_ids {
359 match store.load(&id).await {
360 Ok(Some(data)) => {
361 {
363 let mut storage_types = self.session_storage_types.write().await;
364 storage_types.insert(data.id.clone(), backend.clone());
365 }
366
367 if let Err(e) = self.restore_session(data).await {
368 tracing::warn!("Failed to restore session {}: {}", id, e);
369 } else {
370 loaded += 1;
371 }
372 }
373 Ok(None) => {
374 tracing::warn!("Session {} not found in store", id);
375 }
376 Err(e) => {
377 tracing::warn!("Failed to load session {}: {}", id, e);
378 }
379 }
380 }
381 }
382
383 tracing::info!("Loaded {} sessions from store", loaded);
384 Ok(loaded)
385 }
386
387 async fn restore_session(&self, data: SessionData) -> Result<()> {
389 let tools = self.tool_executor.definitions();
390 let mut session = Session::new(data.id.clone(), data.config.clone(), tools).await?;
391
392 session.restore_from_data(&data);
394
395 if let Some(llm_config) = &data.llm_config {
397 let mut configs = self.llm_configs.write().await;
398 configs.insert(data.id.clone(), llm_config.clone());
399 }
400
401 let mut sessions = self.sessions.write().await;
402 sessions.insert(data.id.clone(), Arc::new(RwLock::new(session)));
403
404 tracing::info!("Restored session: {}", data.id);
405 Ok(())
406 }
407
408 async fn save_session(&self, session_id: &str) -> Result<()> {
410 let storage_type = {
412 let storage_types = self.session_storage_types.read().await;
413 storage_types.get(session_id).cloned()
414 };
415
416 let Some(storage_type) = storage_type else {
417 return Ok(());
419 };
420
421 if storage_type == crate::config::StorageBackend::Memory {
423 return Ok(());
424 }
425
426 let stores = self.stores.read().await;
428 let Some(store) = stores.get(&storage_type) else {
429 tracing::warn!("No store available for storage type: {:?}", storage_type);
430 return Ok(());
431 };
432
433 let session_lock = self.get_session(session_id).await?;
434 let session = session_lock.read().await;
435
436 let llm_config = {
438 let configs = self.llm_configs.read().await;
439 configs.get(session_id).cloned()
440 };
441
442 let data = session.to_session_data(llm_config);
443 store.save(&data).await?;
444
445 tracing::debug!("Saved session: {}", session_id);
446 Ok(())
447 }
448
449 async fn persist_or_warn(&self, session_id: &str, operation: &str) {
453 if let Err(e) = self.save_session(session_id).await {
454 tracing::warn!(
455 "Failed to persist session {} after {}: {}",
456 session_id,
457 operation,
458 e
459 );
460 if let Ok(session_lock) = self.get_session(session_id).await {
462 let session = session_lock.read().await;
463 let _ = session.event_tx().send(AgentEvent::PersistenceFailed {
464 session_id: session_id.to_string(),
465 operation: operation.to_string(),
466 error: e.to_string(),
467 });
468 }
469 }
470 }
471
472 fn persist_in_background(&self, session_id: &str, operation: &str) {
477 let mgr = self.clone();
478 let sid = session_id.to_string();
479 let op = operation.to_string();
480 tokio::spawn(async move {
481 mgr.persist_or_warn(&sid, &op).await;
482 });
483 }
484
485 pub async fn create_session(&self, id: String, config: SessionConfig) -> Result<String> {
487 tracing::info!(name: "a3s.session.create", session_id = %id, "Creating session");
488
489 {
491 let mut storage_types = self.session_storage_types.write().await;
492 storage_types.insert(id.clone(), config.storage_type.clone());
493 }
494
495 let tools = self.tool_executor.definitions();
497 let mut session = Session::new(id.clone(), config, tools).await?;
498
499 session.start_queue().await?;
501
502 if session.config.max_context_length > 0 {
504 session.context_usage.max_tokens = session.config.max_context_length as usize;
505 }
506
507 {
508 let mut sessions = self.sessions.write().await;
509 sessions.insert(id.clone(), Arc::new(RwLock::new(session)));
510 }
511
512 self.persist_in_background(&id, "create");
514
515 tracing::info!("Created session: {}", id);
516 Ok(id)
517 }
518
519 pub async fn destroy_session(&self, id: &str) -> Result<()> {
521 tracing::info!(name: "a3s.session.destroy", session_id = %id, "Destroying session");
522
523 let storage_type = {
525 let storage_types = self.session_storage_types.read().await;
526 storage_types.get(id).cloned()
527 };
528
529 {
530 let mut sessions = self.sessions.write().await;
531 sessions.remove(id);
532 }
533
534 {
536 let mut configs = self.llm_configs.write().await;
537 configs.remove(id);
538 }
539
540 {
541 let mut registries = self.session_skill_registries.write().await;
542 registries.remove(id);
543 }
544
545 {
547 let mut storage_types = self.session_storage_types.write().await;
548 storage_types.remove(id);
549 }
550
551 if let Some(storage_type) = storage_type {
553 if storage_type != crate::config::StorageBackend::Memory {
554 let stores = self.stores.read().await;
555 if let Some(store) = stores.get(&storage_type) {
556 if let Err(e) = store.delete(id).await {
557 tracing::warn!("Failed to delete session {} from store: {}", id, e);
558 }
559 }
560 }
561 }
562
563 tracing::info!("Destroyed session: {}", id);
564 Ok(())
565 }
566
567 pub async fn get_session(&self, id: &str) -> Result<Arc<RwLock<Session>>> {
569 let sessions = self.sessions.read().await;
570 sessions
571 .get(id)
572 .cloned()
573 .context(format!("Session not found: {}", id))
574 }
575
576 pub async fn create_child_session(
581 &self,
582 parent_id: &str,
583 child_id: String,
584 mut config: SessionConfig,
585 ) -> Result<String> {
586 let parent_lock = self.get_session(parent_id).await?;
588 let parent_llm_client = {
589 let parent = parent_lock.read().await;
590
591 if config.confirmation_policy.is_none() {
593 let parent_policy = parent.confirmation_manager.policy().await;
594 config.confirmation_policy = Some(parent_policy);
595 }
596
597 parent.llm_client.clone()
598 };
599
600 config.parent_id = Some(parent_id.to_string());
602
603 let tools = self.tool_executor.definitions();
605 let mut session = Session::new(child_id.clone(), config, tools).await?;
606
607 if session.llm_client.is_none() {
609 let default_llm = self.llm_client.read().await.clone();
610 session.llm_client = parent_llm_client.or(default_llm);
611 }
612
613 session.start_queue().await?;
615
616 if session.config.max_context_length > 0 {
618 session.context_usage.max_tokens = session.config.max_context_length as usize;
619 }
620
621 {
622 let mut sessions = self.sessions.write().await;
623 sessions.insert(child_id.clone(), Arc::new(RwLock::new(session)));
624 }
625
626 self.persist_in_background(&child_id, "create_child");
628
629 tracing::info!(
630 "Created child session: {} (parent: {})",
631 child_id,
632 parent_id
633 );
634 Ok(child_id)
635 }
636
637 pub async fn get_child_sessions(&self, parent_id: &str) -> Vec<String> {
639 let sessions = self.sessions.read().await;
640 let mut children = Vec::new();
641
642 for (id, session_lock) in sessions.iter() {
643 let session = session_lock.read().await;
644 if session.parent_id.as_deref() == Some(parent_id) {
645 children.push(id.clone());
646 }
647 }
648
649 children
650 }
651
652 pub async fn is_child_session(&self, session_id: &str) -> Result<bool> {
654 let session_lock = self.get_session(session_id).await?;
655 let session = session_lock.read().await;
656 Ok(session.is_child_session())
657 }
658
659 pub async fn generate(&self, session_id: &str, prompt: &str) -> Result<AgentResult> {
661 let session_lock = self.get_session(session_id).await?;
662
663 {
665 let session = session_lock.read().await;
666 if session.state == SessionState::Paused {
667 anyhow::bail!(
668 "Session {} is paused. Call Resume before generating.",
669 session_id
670 );
671 }
672 }
673
674 let (
676 history,
677 system,
678 tools,
679 session_llm_client,
680 permission_checker,
681 confirmation_manager,
682 context_providers,
683 session_workspace,
684 tool_metrics,
685 hook_engine,
686 planning_enabled,
687 goal_tracking,
688 ) = {
689 let session = session_lock.read().await;
690 (
691 session.messages.clone(),
692 session.system().map(String::from),
693 session.tools.clone(),
694 session.llm_client.clone(),
695 session.permission_checker.clone(),
696 session.confirmation_manager.clone(),
697 session.context_providers.clone(),
698 session.config.workspace.clone(),
699 session.tool_metrics.clone(),
700 session.config.hook_engine.clone(),
701 session.config.planning_enabled,
702 session.config.goal_tracking,
703 )
704 };
705
706 let llm_client = if let Some(client) = session_llm_client {
708 client
709 } else if let Some(client) = self.llm_client.read().await.clone() {
710 client
711 } else {
712 anyhow::bail!(
713 "LLM client not configured for session {}. Please call Configure RPC with model configuration first.",
714 session_id
715 );
716 };
717
718 let tool_context = if session_workspace.is_empty() {
720 crate::tools::ToolContext::new(self.tool_executor.workspace().clone())
721 .with_session_id(session_id)
722 } else {
723 crate::tools::ToolContext::new(std::path::PathBuf::from(&session_workspace))
724 .with_session_id(session_id)
725 };
726 let tool_context = if let Some(registry) = self.document_parser_registry().await {
727 tool_context.with_document_parsers(registry)
728 } else {
729 tool_context
730 };
731
732 let skill_registry = match self.session_skill_registry(session_id).await {
734 Some(registry) => Some(registry),
735 None => self.skill_registry.read().await.clone(),
736 };
737 let system = if let Some(ref registry) = skill_registry {
738 let skill_prompt = registry.to_system_prompt();
739 if skill_prompt.is_empty() {
740 system
741 } else {
742 match system {
743 Some(existing) => Some(format!("{}\n\n{}", existing, skill_prompt)),
744 None => Some(skill_prompt),
745 }
746 }
747 } else {
748 system
749 };
750
751 let effective_prompt = if let Some(ref registry) = skill_registry {
753 let skill_content = registry.match_skills(prompt);
754 if skill_content.is_empty() {
755 prompt.to_string()
756 } else {
757 format!("{}\n\n---\n\n{}", skill_content, prompt)
758 }
759 } else {
760 prompt.to_string()
761 };
762
763 let memory = self
765 .memory_store
766 .read()
767 .await
768 .as_ref()
769 .map(|store| Arc::new(AgentMemory::new(store.clone())));
770
771 let config = AgentConfig {
773 prompt_slots: match system {
774 Some(s) => SystemPromptSlots::from_legacy(s),
775 None => SystemPromptSlots::default(),
776 },
777 tools,
778 max_tool_rounds: 50,
779 permission_checker: Some(permission_checker),
780 confirmation_manager: Some(confirmation_manager),
781 context_providers,
782 planning_enabled,
783 goal_tracking,
784 hook_engine,
785 skill_registry,
786 memory,
787 ..AgentConfig::default()
788 };
789
790 let agent = AgentLoop::new(llm_client, self.tool_executor.clone(), tool_context, config)
791 .with_tool_metrics(tool_metrics);
792
793 let result = agent
795 .execute_with_session(&history, &effective_prompt, Some(session_id), None, None)
796 .await?;
797
798 {
800 let mut session = session_lock.write().await;
801 session.messages = result.messages.clone();
802 session.update_usage(&result.usage);
803 }
804
805 self.persist_in_background(session_id, "generate");
807
808 if let Err(e) = self.maybe_auto_compact(session_id).await {
810 tracing::warn!("Auto-compact failed for session {}: {}", session_id, e);
811 }
812
813 Ok(result)
814 }
815
816 pub async fn generate_streaming(
818 &self,
819 session_id: &str,
820 prompt: &str,
821 ) -> Result<(
822 mpsc::Receiver<AgentEvent>,
823 tokio::task::JoinHandle<Result<AgentResult>>,
824 tokio_util::sync::CancellationToken,
825 )> {
826 let session_lock = self.get_session(session_id).await?;
827
828 {
830 let session = session_lock.read().await;
831 if session.state == SessionState::Paused {
832 anyhow::bail!(
833 "Session {} is paused. Call Resume before generating.",
834 session_id
835 );
836 }
837 }
838
839 let (
841 history,
842 system,
843 tools,
844 session_llm_client,
845 permission_checker,
846 confirmation_manager,
847 context_providers,
848 session_workspace,
849 tool_metrics,
850 hook_engine,
851 planning_enabled,
852 goal_tracking,
853 ) = {
854 let session = session_lock.read().await;
855 (
856 session.messages.clone(),
857 session.system().map(String::from),
858 session.tools.clone(),
859 session.llm_client.clone(),
860 session.permission_checker.clone(),
861 session.confirmation_manager.clone(),
862 session.context_providers.clone(),
863 session.config.workspace.clone(),
864 session.tool_metrics.clone(),
865 session.config.hook_engine.clone(),
866 session.config.planning_enabled,
867 session.config.goal_tracking,
868 )
869 };
870
871 let llm_client = if let Some(client) = session_llm_client {
873 client
874 } else if let Some(client) = self.llm_client.read().await.clone() {
875 client
876 } else {
877 anyhow::bail!(
878 "LLM client not configured for session {}. Please call Configure RPC with model configuration first.",
879 session_id
880 );
881 };
882
883 let tool_context = if session_workspace.is_empty() {
885 crate::tools::ToolContext::new(self.tool_executor.workspace().clone())
886 .with_session_id(session_id)
887 } else {
888 crate::tools::ToolContext::new(std::path::PathBuf::from(&session_workspace))
889 .with_session_id(session_id)
890 };
891 let tool_context = if let Some(registry) = self.document_parser_registry().await {
892 tool_context.with_document_parsers(registry)
893 } else {
894 tool_context
895 };
896
897 let skill_registry = match self.session_skill_registry(session_id).await {
899 Some(registry) => Some(registry),
900 None => self.skill_registry.read().await.clone(),
901 };
902 let system = if let Some(ref registry) = skill_registry {
903 let skill_prompt = registry.to_system_prompt();
904 if skill_prompt.is_empty() {
905 system
906 } else {
907 match system {
908 Some(existing) => Some(format!("{}\n\n{}", existing, skill_prompt)),
909 None => Some(skill_prompt),
910 }
911 }
912 } else {
913 system
914 };
915
916 let effective_prompt = if let Some(ref registry) = skill_registry {
918 let skill_content = registry.match_skills(prompt);
919 if skill_content.is_empty() {
920 prompt.to_string()
921 } else {
922 format!("{}\n\n---\n\n{}", skill_content, prompt)
923 }
924 } else {
925 prompt.to_string()
926 };
927
928 let memory = self
930 .memory_store
931 .read()
932 .await
933 .as_ref()
934 .map(|store| Arc::new(AgentMemory::new(store.clone())));
935
936 let config = AgentConfig {
938 prompt_slots: match system {
939 Some(s) => SystemPromptSlots::from_legacy(s),
940 None => SystemPromptSlots::default(),
941 },
942 tools,
943 max_tool_rounds: 50,
944 permission_checker: Some(permission_checker),
945 confirmation_manager: Some(confirmation_manager),
946 context_providers,
947 planning_enabled,
948 goal_tracking,
949 hook_engine,
950 skill_registry,
951 memory,
952 ..AgentConfig::default()
953 };
954
955 let agent = AgentLoop::new(llm_client, self.tool_executor.clone(), tool_context, config)
956 .with_tool_metrics(tool_metrics);
957
958 let (rx, handle, cancel_token) =
960 agent.execute_streaming(&history, &effective_prompt).await?;
961
962 let cancel_token_clone = cancel_token.clone();
964 {
965 let mut ops = self.ongoing_operations.write().await;
966 ops.insert(session_id.to_string(), cancel_token);
967 }
968
969 let session_lock_clone = session_lock.clone();
971 let original_handle = handle;
972 let stores = self.stores.clone();
973 let session_storage_types = self.session_storage_types.clone();
974 let llm_configs = self.llm_configs.clone();
975 let session_id_owned = session_id.to_string();
976 let ongoing_operations = self.ongoing_operations.clone();
977 let session_manager = self.clone();
978
979 let wrapped_handle = tokio::spawn(async move {
980 let result = original_handle.await??;
981
982 {
984 let mut ops = ongoing_operations.write().await;
985 ops.remove(&session_id_owned);
986 }
987
988 {
990 let mut session = session_lock_clone.write().await;
991 session.messages = result.messages.clone();
992 session.update_usage(&result.usage);
993 }
994
995 let storage_type = {
997 let storage_types = session_storage_types.read().await;
998 storage_types.get(&session_id_owned).cloned()
999 };
1000
1001 if let Some(storage_type) = storage_type {
1002 if storage_type != crate::config::StorageBackend::Memory {
1003 let stores_guard = stores.read().await;
1004 if let Some(store) = stores_guard.get(&storage_type) {
1005 let session = session_lock_clone.read().await;
1006 let llm_config = {
1007 let configs = llm_configs.read().await;
1008 configs.get(&session_id_owned).cloned()
1009 };
1010 let data = session.to_session_data(llm_config);
1011 if let Err(e) = store.save(&data).await {
1012 tracing::warn!(
1013 "Failed to persist session {} after streaming: {}",
1014 session_id_owned,
1015 e
1016 );
1017 }
1018 }
1019 }
1020 }
1021
1022 if let Err(e) = session_manager.maybe_auto_compact(&session_id_owned).await {
1024 tracing::warn!(
1025 "Auto-compact failed for session {}: {}",
1026 session_id_owned,
1027 e
1028 );
1029 }
1030
1031 Ok(result)
1032 });
1033
1034 Ok((rx, wrapped_handle, cancel_token_clone))
1035 }
1036
1037 pub async fn context_usage(&self, session_id: &str) -> Result<ContextUsage> {
1039 let session_lock = self.get_session(session_id).await?;
1040 let session = session_lock.read().await;
1041 Ok(session.context_usage.clone())
1042 }
1043
1044 pub async fn history(&self, session_id: &str) -> Result<Vec<Message>> {
1046 let session_lock = self.get_session(session_id).await?;
1047 let session = session_lock.read().await;
1048 Ok(session.messages.clone())
1049 }
1050
1051 pub async fn clear(&self, session_id: &str) -> Result<()> {
1053 {
1054 let session_lock = self.get_session(session_id).await?;
1055 let mut session = session_lock.write().await;
1056 session.clear();
1057 }
1058
1059 self.persist_in_background(session_id, "clear");
1061
1062 Ok(())
1063 }
1064
1065 pub async fn compact(&self, session_id: &str) -> Result<()> {
1067 tracing::info!(name: "a3s.session.compact", session_id = %session_id, "Compacting session context");
1068
1069 {
1070 let session_lock = self.get_session(session_id).await?;
1071 let mut session = session_lock.write().await;
1072
1073 let llm_client = if let Some(client) = &session.llm_client {
1075 client.clone()
1076 } else if let Some(client) = self.llm_client.read().await.clone() {
1077 client
1078 } else {
1079 tracing::warn!("No LLM client configured for compaction, using simple truncation");
1081 let keep_messages = 20;
1082 if session.messages.len() > keep_messages {
1083 let len = session.messages.len();
1084 session.messages = session.messages.split_off(len - keep_messages);
1085 }
1086 drop(session);
1088 self.persist_in_background(session_id, "compact");
1089 return Ok(());
1090 };
1091
1092 session.compact(&llm_client).await?;
1093 }
1094
1095 self.persist_in_background(session_id, "compact");
1097
1098 Ok(())
1099 }
1100
1101 pub async fn maybe_auto_compact(&self, session_id: &str) -> Result<bool> {
1108 let (should_compact, percent_before, messages_before) = {
1109 let session_lock = self.get_session(session_id).await?;
1110 let session = session_lock.read().await;
1111
1112 if !session.config.auto_compact {
1113 return Ok(false);
1114 }
1115
1116 let threshold = session.config.auto_compact_threshold;
1117 let percent = session.context_usage.percent;
1118 let msg_count = session.messages.len();
1119
1120 tracing::debug!(
1121 "Auto-compact check for session {}: percent={:.2}%, threshold={:.2}%, messages={}",
1122 session_id,
1123 percent * 100.0,
1124 threshold * 100.0,
1125 msg_count,
1126 );
1127
1128 (percent >= threshold, percent, msg_count)
1129 };
1130
1131 if !should_compact {
1132 return Ok(false);
1133 }
1134
1135 tracing::info!(
1136 name: "a3s.session.auto_compact",
1137 session_id = %session_id,
1138 percent_before = %format!("{:.1}%", percent_before * 100.0),
1139 messages_before = %messages_before,
1140 "Auto-compacting session due to high context usage"
1141 );
1142
1143 self.compact(session_id).await?;
1145
1146 let messages_after = {
1148 let session_lock = self.get_session(session_id).await?;
1149 let session = session_lock.read().await;
1150 session.messages.len()
1151 };
1152
1153 let event = AgentEvent::ContextCompacted {
1155 session_id: session_id.to_string(),
1156 before_messages: messages_before,
1157 after_messages: messages_after,
1158 percent_before,
1159 };
1160
1161 if let Ok(session_lock) = self.get_session(session_id).await {
1163 let session = session_lock.read().await;
1164 let _ = session.event_tx.send(event);
1165 }
1166
1167 tracing::info!(
1168 name: "a3s.session.auto_compact.done",
1169 session_id = %session_id,
1170 messages_before = %messages_before,
1171 messages_after = %messages_after,
1172 "Auto-compaction complete"
1173 );
1174
1175 Ok(true)
1176 }
1177
1178 pub async fn get_llm_for_session(
1182 &self,
1183 session_id: &str,
1184 ) -> Result<Option<Arc<dyn LlmClient>>> {
1185 let session_lock = self.get_session(session_id).await?;
1186 let session = session_lock.read().await;
1187
1188 if let Some(client) = &session.llm_client {
1189 return Ok(Some(client.clone()));
1190 }
1191
1192 Ok(self.llm_client.read().await.clone())
1193 }
1194
1195 pub async fn btw_with_context(
1201 &self,
1202 session_id: &str,
1203 question: &str,
1204 runtime_context: Option<&str>,
1205 ) -> Result<crate::agent_api::BtwResult> {
1206 let question = question.trim();
1207 if question.is_empty() {
1208 anyhow::bail!("btw: question cannot be empty");
1209 }
1210
1211 let session_lock = self.get_session(session_id).await?;
1212 let (history_snapshot, session_runtime) = {
1213 let session = session_lock.read().await;
1214 let mut sections = Vec::new();
1215
1216 let pending_confirmations = session.confirmation_manager.pending_confirmations().await;
1217 if !pending_confirmations.is_empty() {
1218 let mut lines = pending_confirmations
1219 .into_iter()
1220 .map(|pending| {
1221 let arg_summary = Self::compact_json_value(&pending.args);
1222 if arg_summary.is_empty() {
1223 format!(
1224 "- {} [{}] remaining={}ms",
1225 pending.tool_name, pending.tool_id, pending.remaining_ms
1226 )
1227 } else {
1228 format!(
1229 "- {} [{}] remaining={}ms {}",
1230 pending.tool_name,
1231 pending.tool_id,
1232 pending.remaining_ms,
1233 arg_summary
1234 )
1235 }
1236 })
1237 .collect::<Vec<_>>();
1238 lines.sort();
1239 sections.push(format!("[pending confirmations]\n{}", lines.join("\n")));
1240 }
1241
1242 let stats = session.command_queue.stats().await;
1243 if stats.total_active > 0 || stats.total_pending > 0 || stats.external_pending > 0 {
1244 let mut lines = vec![format!(
1245 "active={}, pending={}, external_pending={}",
1246 stats.total_active, stats.total_pending, stats.external_pending
1247 )];
1248 let mut lanes = stats
1249 .lanes
1250 .into_values()
1251 .filter(|lane| lane.active > 0 || lane.pending > 0)
1252 .map(|lane| {
1253 format!(
1254 "- {:?}: active={}, pending={}, handler={:?}",
1255 lane.lane, lane.active, lane.pending, lane.handler_mode
1256 )
1257 })
1258 .collect::<Vec<_>>();
1259 lanes.sort();
1260 lines.extend(lanes);
1261 sections.push(format!("[session queue]\n{}", lines.join("\n")));
1262 }
1263
1264 let external_tasks = session.command_queue.pending_external_tasks().await;
1265 if !external_tasks.is_empty() {
1266 let mut lines = external_tasks
1267 .into_iter()
1268 .take(6)
1269 .map(|task| {
1270 let payload_summary = Self::compact_json_value(&task.payload);
1271 if payload_summary.is_empty() {
1272 format!(
1273 "- {} {:?} remaining={}ms",
1274 task.command_type,
1275 task.lane,
1276 task.remaining_ms()
1277 )
1278 } else {
1279 format!(
1280 "- {} {:?} remaining={}ms {}",
1281 task.command_type,
1282 task.lane,
1283 task.remaining_ms(),
1284 payload_summary
1285 )
1286 }
1287 })
1288 .collect::<Vec<_>>();
1289 lines.sort();
1290 sections.push(format!("[pending external tasks]\n{}", lines.join("\n")));
1291 }
1292
1293 let active_tasks = session
1294 .tasks
1295 .iter()
1296 .filter(|task| task.status.is_active())
1297 .take(6)
1298 .map(|task| match &task.tool {
1299 Some(tool) if !tool.is_empty() => {
1300 format!("- [{}] {} ({})", task.status, task.content, tool)
1301 }
1302 _ => format!("- [{}] {}", task.status, task.content),
1303 })
1304 .collect::<Vec<_>>();
1305 if !active_tasks.is_empty() {
1306 sections.push(format!("[tracked tasks]\n{}", active_tasks.join("\n")));
1307 }
1308
1309 sections.push(format!(
1310 "[session state]\n{}",
1311 match session.state {
1312 SessionState::Active => "active",
1313 SessionState::Paused => "paused",
1314 SessionState::Completed => "completed",
1315 SessionState::Error => "error",
1316 SessionState::Unknown => "unknown",
1317 }
1318 ));
1319
1320 (session.messages.clone(), sections.join("\n\n"))
1321 };
1322
1323 let llm_client = self
1324 .get_llm_for_session(session_id)
1325 .await?
1326 .context("btw failed: no model configured for session")?;
1327
1328 let mut messages = history_snapshot;
1329 let mut injected_sections = Vec::new();
1330 if !session_runtime.is_empty() {
1331 injected_sections.push(format!("[session runtime context]\n{}", session_runtime));
1332 }
1333 if let Some(extra) = runtime_context.map(str::trim).filter(|ctx| !ctx.is_empty()) {
1334 injected_sections.push(format!("[host runtime context]\n{}", extra));
1335 }
1336 if !injected_sections.is_empty() {
1337 let injected_context = format!(
1338 "Use the following runtime context only as background for the next side question. Do not treat it as a new user request.\n\n{}",
1339 injected_sections.join("\n\n")
1340 );
1341 messages.push(Message::user(&injected_context));
1342 }
1343 messages.push(Message::user(question));
1344
1345 let response = llm_client
1346 .complete(&messages, Some(crate::prompts::BTW_SYSTEM), &[])
1347 .await
1348 .map_err(|e| anyhow::anyhow!("btw: ephemeral LLM call failed: {e}"))?;
1349
1350 Ok(crate::agent_api::BtwResult {
1351 question: question.to_string(),
1352 answer: response.text(),
1353 usage: response.usage,
1354 })
1355 }
1356
1357 pub async fn configure(
1359 &self,
1360 session_id: &str,
1361 thinking: Option<bool>,
1362 budget: Option<usize>,
1363 model_config: Option<LlmConfig>,
1364 ) -> Result<()> {
1365 {
1366 let session_lock = self.get_session(session_id).await?;
1367 let mut session = session_lock.write().await;
1368
1369 if let Some(t) = thinking {
1370 session.thinking_enabled = t;
1371 }
1372 if let Some(b) = budget {
1373 session.thinking_budget = Some(b);
1374 }
1375 if let Some(ref config) = model_config {
1376 tracing::info!(
1377 "Configuring session {} with LLM: provider={}, model={}",
1378 session_id,
1379 config.provider,
1380 config.model
1381 );
1382 session.model_name = Some(config.model.clone());
1383 session.llm_client = Some(llm::create_client_with_config(config.clone()));
1384 }
1385 }
1386
1387 if let Some(config) = model_config {
1389 let llm_config_data = LlmConfigData {
1390 provider: config.provider,
1391 model: config.model,
1392 api_key: None, base_url: config.base_url,
1394 };
1395 let mut configs = self.llm_configs.write().await;
1396 configs.insert(session_id.to_string(), llm_config_data);
1397 }
1398
1399 self.persist_in_background(session_id, "configure");
1401
1402 Ok(())
1403 }
1404
1405 pub async fn set_system_prompt(
1407 &self,
1408 session_id: &str,
1409 system_prompt: Option<String>,
1410 ) -> Result<()> {
1411 {
1412 let session_lock = self.get_session(session_id).await?;
1413 let mut session = session_lock.write().await;
1414 session.config.system_prompt = system_prompt;
1415 }
1416
1417 self.persist_in_background(session_id, "set_system_prompt");
1418 Ok(())
1419 }
1420
1421 pub async fn session_count(&self) -> usize {
1423 let sessions = self.sessions.read().await;
1424 sessions.len()
1425 }
1426
1427 pub async fn store_health(&self) -> Vec<(String, Result<()>)> {
1429 let stores = self.stores.read().await;
1430 let mut results = Vec::new();
1431 for (_, store) in stores.iter() {
1432 let name = store.backend_name().to_string();
1433 let result = store.health_check().await;
1434 results.push((name, result));
1435 }
1436 results
1437 }
1438
1439 pub fn list_tools(&self) -> Vec<crate::llm::ToolDefinition> {
1441 self.tool_executor.definitions()
1442 }
1443
1444 pub async fn pause_session(&self, session_id: &str) -> Result<bool> {
1446 let paused = {
1447 let session_lock = self.get_session(session_id).await?;
1448 let mut session = session_lock.write().await;
1449 session.pause()
1450 };
1451
1452 if paused {
1453 self.persist_in_background(session_id, "pause");
1454 }
1455
1456 Ok(paused)
1457 }
1458
1459 pub async fn resume_session(&self, session_id: &str) -> Result<bool> {
1461 let resumed = {
1462 let session_lock = self.get_session(session_id).await?;
1463 let mut session = session_lock.write().await;
1464 session.resume()
1465 };
1466
1467 if resumed {
1468 self.persist_in_background(session_id, "resume");
1469 }
1470
1471 Ok(resumed)
1472 }
1473
1474 pub async fn cancel_operation(&self, session_id: &str) -> Result<bool> {
1478 let session_lock = self.get_session(session_id).await?;
1480 let cancelled_confirmations = {
1481 let session = session_lock.read().await;
1482 session.confirmation_manager.cancel_all().await
1483 };
1484
1485 if cancelled_confirmations > 0 {
1486 tracing::info!(
1487 "Cancelled {} pending confirmations for session {}",
1488 cancelled_confirmations,
1489 session_id
1490 );
1491 }
1492
1493 let cancel_token = {
1495 let mut ops = self.ongoing_operations.write().await;
1496 ops.remove(session_id)
1497 };
1498
1499 if let Some(token) = cancel_token {
1500 token.cancel();
1501 tracing::info!("Cancelled ongoing operation for session {}", session_id);
1502 Ok(true)
1503 } else if cancelled_confirmations > 0 {
1504 Ok(true)
1506 } else {
1507 tracing::debug!("No ongoing operation to cancel for session {}", session_id);
1508 Ok(false)
1509 }
1510 }
1511
1512 pub async fn get_all_sessions(&self) -> Vec<Arc<RwLock<Session>>> {
1514 let sessions = self.sessions.read().await;
1515 sessions.values().cloned().collect()
1516 }
1517
1518 pub fn tool_executor(&self) -> &Arc<ToolExecutor> {
1520 &self.tool_executor
1521 }
1522
1523 pub async fn confirm_tool(
1525 &self,
1526 session_id: &str,
1527 tool_id: &str,
1528 approved: bool,
1529 reason: Option<String>,
1530 ) -> Result<bool> {
1531 let session_lock = self.get_session(session_id).await?;
1532 let session = session_lock.read().await;
1533 session
1534 .confirmation_manager
1535 .confirm(tool_id, approved, reason)
1536 .await
1537 .map_err(|e| anyhow::anyhow!(e))
1538 }
1539
1540 pub async fn set_confirmation_policy(
1542 &self,
1543 session_id: &str,
1544 policy: ConfirmationPolicy,
1545 ) -> Result<ConfirmationPolicy> {
1546 {
1547 let session_lock = self.get_session(session_id).await?;
1548 let session = session_lock.read().await;
1549 session.set_confirmation_policy(policy.clone()).await;
1550 }
1551
1552 {
1554 let session_lock = self.get_session(session_id).await?;
1555 let mut session = session_lock.write().await;
1556 session.config.confirmation_policy = Some(policy.clone());
1557 }
1558
1559 self.persist_in_background(session_id, "set_confirmation_policy");
1561
1562 Ok(policy)
1563 }
1564
1565 pub async fn set_permission_policy(
1567 &self,
1568 session_id: &str,
1569 policy: PermissionPolicy,
1570 ) -> Result<PermissionPolicy> {
1571 {
1572 let session_lock = self.get_session(session_id).await?;
1573 let mut session = session_lock.write().await;
1574 session.set_permission_policy(policy.clone());
1575 }
1576
1577 self.persist_in_background(session_id, "set_permission_policy");
1578
1579 Ok(policy)
1580 }
1581
1582 pub async fn get_confirmation_policy(&self, session_id: &str) -> Result<ConfirmationPolicy> {
1584 let session_lock = self.get_session(session_id).await?;
1585 let session = session_lock.read().await;
1586 Ok(session.confirmation_policy().await)
1587 }
1588}