1use super::{ContextUsage, Session, SessionConfig, SessionState};
4use crate::agent::{AgentConfig, AgentEvent, AgentLoop, AgentResult};
5use crate::document_parser::DocumentParserRegistry;
6use crate::hitl::ConfirmationPolicy;
7use crate::llm::{self, LlmClient, LlmConfig, Message};
8use crate::mcp::McpManager;
9use crate::memory::AgentMemory;
10use crate::permissions::PermissionPolicy;
11use crate::prompts::SystemPromptSlots;
12use crate::skills::SkillRegistry;
13use crate::store::{FileSessionStore, LlmConfigData, SessionData, SessionStore};
14use crate::text::truncate_utf8;
15use crate::tools::ToolExecutor;
16use a3s_memory::MemoryStore;
17use anyhow::{Context, Result};
18use std::collections::HashMap;
19use std::sync::Arc;
20use tokio::sync::{mpsc, RwLock};
21
22#[derive(Clone)]
24pub struct SessionManager {
25 pub(crate) sessions: Arc<RwLock<HashMap<String, Arc<RwLock<Session>>>>>,
26 pub(crate) llm_client: Arc<RwLock<Option<Arc<dyn LlmClient>>>>,
29 pub(crate) tool_executor: Arc<ToolExecutor>,
30 pub(crate) stores: Arc<RwLock<HashMap<crate::config::StorageBackend, Arc<dyn SessionStore>>>>,
32 pub(crate) session_storage_types: Arc<RwLock<HashMap<String, crate::config::StorageBackend>>>,
34 pub(crate) llm_configs: Arc<RwLock<HashMap<String, LlmConfigData>>>,
36 pub(crate) ongoing_operations:
38 Arc<RwLock<HashMap<String, tokio_util::sync::CancellationToken>>>,
39 pub(crate) skill_registry: Arc<RwLock<Option<Arc<SkillRegistry>>>>,
41 pub(crate) session_skill_registries: Arc<RwLock<HashMap<String, Arc<SkillRegistry>>>>,
43 pub(crate) memory_store: Arc<RwLock<Option<Arc<dyn MemoryStore>>>>,
47 pub(crate) mcp_manager: Arc<RwLock<Option<Arc<McpManager>>>>,
50 pub(crate) document_parser_registry: Arc<RwLock<Option<Arc<DocumentParserRegistry>>>>,
52}
53
54impl SessionManager {
55 fn compact_json_value(value: &serde_json::Value) -> String {
56 let raw = match value {
57 serde_json::Value::Null => String::new(),
58 serde_json::Value::String(s) => s.clone(),
59 _ => serde_json::to_string(value).unwrap_or_default(),
60 };
61 let compact = raw.split_whitespace().collect::<Vec<_>>().join(" ");
62 if compact.len() > 180 {
63 format!("{}...", truncate_utf8(&compact, 180))
64 } else {
65 compact
66 }
67 }
68
69 pub fn new(llm_client: Option<Arc<dyn LlmClient>>, tool_executor: Arc<ToolExecutor>) -> Self {
71 Self {
72 sessions: Arc::new(RwLock::new(HashMap::new())),
73 llm_client: Arc::new(RwLock::new(llm_client)),
74 tool_executor,
75 stores: Arc::new(RwLock::new(HashMap::new())),
76 session_storage_types: Arc::new(RwLock::new(HashMap::new())),
77 llm_configs: Arc::new(RwLock::new(HashMap::new())),
78 ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
79 skill_registry: Arc::new(RwLock::new(None)),
80 session_skill_registries: Arc::new(RwLock::new(HashMap::new())),
81 memory_store: Arc::new(RwLock::new(None)),
82 mcp_manager: Arc::new(RwLock::new(None)),
83 document_parser_registry: Arc::new(RwLock::new(Some(Arc::new(
84 crate::document_registry_factory::build_document_parser_registry(
85 crate::config::DocumentParserConfig::default(),
86 None,
87 ),
88 )))),
89 }
90 }
91
92 pub async fn with_persistence<P: AsRef<std::path::Path>>(
96 llm_client: Option<Arc<dyn LlmClient>>,
97 tool_executor: Arc<ToolExecutor>,
98 sessions_dir: P,
99 ) -> Result<Self> {
100 let store = FileSessionStore::new(sessions_dir).await?;
101 let mut stores = HashMap::new();
102 stores.insert(
103 crate::config::StorageBackend::File,
104 Arc::new(store) as Arc<dyn SessionStore>,
105 );
106
107 let manager = Self {
108 sessions: Arc::new(RwLock::new(HashMap::new())),
109 llm_client: Arc::new(RwLock::new(llm_client)),
110 tool_executor,
111 stores: Arc::new(RwLock::new(stores)),
112 session_storage_types: Arc::new(RwLock::new(HashMap::new())),
113 llm_configs: Arc::new(RwLock::new(HashMap::new())),
114 ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
115 skill_registry: Arc::new(RwLock::new(None)),
116 session_skill_registries: Arc::new(RwLock::new(HashMap::new())),
117 memory_store: Arc::new(RwLock::new(None)),
118 mcp_manager: Arc::new(RwLock::new(None)),
119 document_parser_registry: Arc::new(RwLock::new(Some(Arc::new(
120 crate::document_registry_factory::build_document_parser_registry(
121 crate::config::DocumentParserConfig::default(),
122 None,
123 ),
124 )))),
125 };
126
127 Ok(manager)
128 }
129
130 pub fn with_store(
135 llm_client: Option<Arc<dyn LlmClient>>,
136 tool_executor: Arc<ToolExecutor>,
137 store: Arc<dyn SessionStore>,
138 backend: crate::config::StorageBackend,
139 ) -> Self {
140 let mut stores = HashMap::new();
141 stores.insert(backend, store);
142
143 Self {
144 sessions: Arc::new(RwLock::new(HashMap::new())),
145 llm_client: Arc::new(RwLock::new(llm_client)),
146 tool_executor,
147 stores: Arc::new(RwLock::new(stores)),
148 session_storage_types: Arc::new(RwLock::new(HashMap::new())),
149 llm_configs: Arc::new(RwLock::new(HashMap::new())),
150 ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
151 skill_registry: Arc::new(RwLock::new(None)),
152 session_skill_registries: Arc::new(RwLock::new(HashMap::new())),
153 memory_store: Arc::new(RwLock::new(None)),
154 mcp_manager: Arc::new(RwLock::new(None)),
155 document_parser_registry: Arc::new(RwLock::new(Some(Arc::new(
156 crate::document_registry_factory::build_document_parser_registry(
157 crate::config::DocumentParserConfig::default(),
158 None,
159 ),
160 )))),
161 }
162 }
163
164 pub async fn set_default_llm(&self, client: Option<Arc<dyn LlmClient>>) {
169 *self.llm_client.write().await = client;
170 }
171
172 pub async fn set_skill_registry(
179 &self,
180 registry: Arc<SkillRegistry>,
181 skills_dir: std::path::PathBuf,
182 ) {
183 let manage_tool = crate::skills::ManageSkillTool::new(registry.clone(), skills_dir);
185 self.tool_executor
186 .register_dynamic_tool(Arc::new(manage_tool));
187
188 *self.skill_registry.write().await = Some(registry);
189 }
190
191 pub async fn skill_registry(&self) -> Option<Arc<SkillRegistry>> {
193 self.skill_registry.read().await.clone()
194 }
195
196 pub async fn set_session_skill_registry(
198 &self,
199 session_id: impl Into<String>,
200 registry: Arc<SkillRegistry>,
201 ) {
202 self.session_skill_registries
203 .write()
204 .await
205 .insert(session_id.into(), registry);
206 }
207
208 pub async fn session_skill_registry(&self, session_id: &str) -> Option<Arc<SkillRegistry>> {
210 self.session_skill_registries
211 .read()
212 .await
213 .get(session_id)
214 .cloned()
215 }
216
217 pub async fn set_memory_store(&self, store: Arc<dyn MemoryStore>) {
224 *self.memory_store.write().await = Some(store);
225 }
226
227 pub async fn memory_store(&self) -> Option<Arc<dyn MemoryStore>> {
229 self.memory_store.read().await.clone()
230 }
231
232 pub async fn set_mcp_manager(&self, manager: Arc<McpManager>) {
237 let all_tools = manager.get_all_tools().await;
238 let mut by_server: HashMap<String, Vec<crate::mcp::McpTool>> = HashMap::new();
239 for (server, tool) in all_tools {
240 by_server.entry(server).or_default().push(tool);
241 }
242 for (server_name, tools) in by_server {
243 for tool in
244 crate::mcp::tools::create_mcp_tools(&server_name, tools, Arc::clone(&manager))
245 {
246 self.tool_executor.register_dynamic_tool(tool);
247 }
248 }
249 *self.mcp_manager.write().await = Some(manager);
250 }
251
252 pub async fn add_mcp_server(&self, config: crate::mcp::McpServerConfig) -> Result<()> {
257 let manager = {
258 let guard = self.mcp_manager.read().await;
259 match guard.clone() {
260 Some(m) => m,
261 None => {
262 drop(guard);
263 let m = Arc::new(McpManager::new());
264 *self.mcp_manager.write().await = Some(Arc::clone(&m));
265 m
266 }
267 }
268 };
269 let name = config.name.clone();
270 manager.register_server(config).await;
271 manager.connect(&name).await?;
272 let tools = manager.get_server_tools(&name).await;
273 for tool in crate::mcp::tools::create_mcp_tools(&name, tools, Arc::clone(&manager)) {
274 self.tool_executor.register_dynamic_tool(tool);
275 }
276 Ok(())
277 }
278
279 pub async fn remove_mcp_server(&self, name: &str) -> Result<()> {
283 let guard = self.mcp_manager.read().await;
284 if let Some(ref manager) = *guard {
285 manager.disconnect(name).await?;
286 }
287 self.tool_executor
288 .unregister_tools_by_prefix(&format!("mcp__{name}__"));
289 Ok(())
290 }
291
292 pub async fn mcp_status(
294 &self,
295 ) -> std::collections::HashMap<String, crate::mcp::McpServerStatus> {
296 let guard = self.mcp_manager.read().await;
297 match guard.as_ref() {
298 Some(m) => {
299 let m = Arc::clone(m);
300 drop(guard);
301 m.get_status().await
302 }
303 None => std::collections::HashMap::new(),
304 }
305 }
306
307 pub async fn set_document_parser_registry(&self, registry: Arc<DocumentParserRegistry>) {
309 *self.document_parser_registry.write().await = Some(registry);
310 }
311
312 pub async fn document_parser_registry(&self) -> Option<Arc<DocumentParserRegistry>> {
314 self.document_parser_registry.read().await.clone()
315 }
316
317 pub async fn restore_session_by_id(&self, session_id: &str) -> Result<()> {
322 {
324 let sessions = self.sessions.read().await;
325 if sessions.contains_key(session_id) {
326 return Ok(());
327 }
328 }
329
330 let stores = self.stores.read().await;
331 for (backend, store) in stores.iter() {
332 match store.load(session_id).await {
333 Ok(Some(data)) => {
334 {
335 let mut storage_types = self.session_storage_types.write().await;
336 storage_types.insert(data.id.clone(), backend.clone());
337 }
338 self.restore_session(data).await?;
339 return Ok(());
340 }
341 Ok(None) => continue,
342 Err(e) => {
343 tracing::warn!(
344 "Failed to load session {} from {:?}: {}",
345 session_id,
346 backend,
347 e
348 );
349 continue;
350 }
351 }
352 }
353
354 Err(anyhow::anyhow!(
355 "Session {} not found in any store",
356 session_id
357 ))
358 }
359
360 pub async fn load_all_sessions(&mut self) -> Result<usize> {
362 let stores = self.stores.read().await;
363 let mut loaded = 0;
364
365 for (backend, store) in stores.iter() {
366 let session_ids = match store.list().await {
367 Ok(ids) => ids,
368 Err(e) => {
369 tracing::warn!("Failed to list sessions from {:?} store: {}", backend, e);
370 continue;
371 }
372 };
373
374 for id in session_ids {
375 match store.load(&id).await {
376 Ok(Some(data)) => {
377 {
379 let mut storage_types = self.session_storage_types.write().await;
380 storage_types.insert(data.id.clone(), backend.clone());
381 }
382
383 if let Err(e) = self.restore_session(data).await {
384 tracing::warn!("Failed to restore session {}: {}", id, e);
385 } else {
386 loaded += 1;
387 }
388 }
389 Ok(None) => {
390 tracing::warn!("Session {} not found in store", id);
391 }
392 Err(e) => {
393 tracing::warn!("Failed to load session {}: {}", id, e);
394 }
395 }
396 }
397 }
398
399 tracing::info!("Loaded {} sessions from store", loaded);
400 Ok(loaded)
401 }
402
403 async fn restore_session(&self, data: SessionData) -> Result<()> {
405 let tools = self.tool_executor.definitions();
406 let mut session = Session::new(data.id.clone(), data.config.clone(), tools).await?;
407
408 session.restore_from_data(&data);
410
411 if let Some(llm_config) = &data.llm_config {
413 let mut configs = self.llm_configs.write().await;
414 configs.insert(data.id.clone(), llm_config.clone());
415 }
416
417 let mut sessions = self.sessions.write().await;
418 sessions.insert(data.id.clone(), Arc::new(RwLock::new(session)));
419
420 tracing::info!("Restored session: {}", data.id);
421 Ok(())
422 }
423
424 async fn save_session(&self, session_id: &str) -> Result<()> {
426 let storage_type = {
428 let storage_types = self.session_storage_types.read().await;
429 storage_types.get(session_id).cloned()
430 };
431
432 let Some(storage_type) = storage_type else {
433 return Ok(());
435 };
436
437 if storage_type == crate::config::StorageBackend::Memory {
439 return Ok(());
440 }
441
442 let stores = self.stores.read().await;
444 let Some(store) = stores.get(&storage_type) else {
445 tracing::warn!("No store available for storage type: {:?}", storage_type);
446 return Ok(());
447 };
448
449 let session_lock = self.get_session(session_id).await?;
450 let session = session_lock.read().await;
451
452 let llm_config = {
454 let configs = self.llm_configs.read().await;
455 configs.get(session_id).cloned()
456 };
457
458 let data = session.to_session_data(llm_config);
459 store.save(&data).await?;
460
461 tracing::debug!("Saved session: {}", session_id);
462 Ok(())
463 }
464
465 async fn persist_or_warn(&self, session_id: &str, operation: &str) {
469 if let Err(e) = self.save_session(session_id).await {
470 tracing::warn!(
471 "Failed to persist session {} after {}: {}",
472 session_id,
473 operation,
474 e
475 );
476 if let Ok(session_lock) = self.get_session(session_id).await {
478 let session = session_lock.read().await;
479 let _ = session.event_tx().send(AgentEvent::PersistenceFailed {
480 session_id: session_id.to_string(),
481 operation: operation.to_string(),
482 error: e.to_string(),
483 });
484 }
485 }
486 }
487
488 fn persist_in_background(&self, session_id: &str, operation: &str) {
493 let mgr = self.clone();
494 let sid = session_id.to_string();
495 let op = operation.to_string();
496 tokio::spawn(async move {
497 mgr.persist_or_warn(&sid, &op).await;
498 });
499 }
500
501 pub async fn create_session(&self, id: String, config: SessionConfig) -> Result<String> {
503 tracing::info!(name: "a3s.session.create", session_id = %id, "Creating session");
504
505 {
507 let mut storage_types = self.session_storage_types.write().await;
508 storage_types.insert(id.clone(), config.storage_type.clone());
509 }
510
511 let tools = self.tool_executor.definitions();
513 let mut session = Session::new(id.clone(), config, tools).await?;
514
515 session.start_queue().await?;
517
518 if session.config.max_context_length > 0 {
520 session.context_usage.max_tokens = session.config.max_context_length as usize;
521 }
522
523 {
524 let mut sessions = self.sessions.write().await;
525 sessions.insert(id.clone(), Arc::new(RwLock::new(session)));
526 }
527
528 self.persist_in_background(&id, "create");
530
531 tracing::info!("Created session: {}", id);
532 Ok(id)
533 }
534
535 pub async fn destroy_session(&self, id: &str) -> Result<()> {
537 tracing::info!(name: "a3s.session.destroy", session_id = %id, "Destroying session");
538
539 let storage_type = {
541 let storage_types = self.session_storage_types.read().await;
542 storage_types.get(id).cloned()
543 };
544
545 {
546 let mut sessions = self.sessions.write().await;
547 sessions.remove(id);
548 }
549
550 {
552 let mut configs = self.llm_configs.write().await;
553 configs.remove(id);
554 }
555
556 {
557 let mut registries = self.session_skill_registries.write().await;
558 registries.remove(id);
559 }
560
561 {
563 let mut storage_types = self.session_storage_types.write().await;
564 storage_types.remove(id);
565 }
566
567 if let Some(storage_type) = storage_type {
569 if storage_type != crate::config::StorageBackend::Memory {
570 let stores = self.stores.read().await;
571 if let Some(store) = stores.get(&storage_type) {
572 if let Err(e) = store.delete(id).await {
573 tracing::warn!("Failed to delete session {} from store: {}", id, e);
574 }
575 }
576 }
577 }
578
579 tracing::info!("Destroyed session: {}", id);
580 Ok(())
581 }
582
583 pub async fn get_session(&self, id: &str) -> Result<Arc<RwLock<Session>>> {
585 let sessions = self.sessions.read().await;
586 sessions
587 .get(id)
588 .cloned()
589 .context(format!("Session not found: {}", id))
590 }
591
592 pub async fn create_child_session(
597 &self,
598 parent_id: &str,
599 child_id: String,
600 mut config: SessionConfig,
601 ) -> Result<String> {
602 let parent_lock = self.get_session(parent_id).await?;
604 let parent_llm_client = {
605 let parent = parent_lock.read().await;
606
607 if config.confirmation_policy.is_none() {
609 let parent_policy = parent.confirmation_manager.policy().await;
610 config.confirmation_policy = Some(parent_policy);
611 }
612
613 parent.llm_client.clone()
614 };
615
616 config.parent_id = Some(parent_id.to_string());
618
619 let tools = self.tool_executor.definitions();
621 let mut session = Session::new(child_id.clone(), config, tools).await?;
622
623 if session.llm_client.is_none() {
625 let default_llm = self.llm_client.read().await.clone();
626 session.llm_client = parent_llm_client.or(default_llm);
627 }
628
629 session.start_queue().await?;
631
632 if session.config.max_context_length > 0 {
634 session.context_usage.max_tokens = session.config.max_context_length as usize;
635 }
636
637 {
638 let mut sessions = self.sessions.write().await;
639 sessions.insert(child_id.clone(), Arc::new(RwLock::new(session)));
640 }
641
642 self.persist_in_background(&child_id, "create_child");
644
645 tracing::info!(
646 "Created child session: {} (parent: {})",
647 child_id,
648 parent_id
649 );
650 Ok(child_id)
651 }
652
653 pub async fn get_child_sessions(&self, parent_id: &str) -> Vec<String> {
655 let sessions = self.sessions.read().await;
656 let mut children = Vec::new();
657
658 for (id, session_lock) in sessions.iter() {
659 let session = session_lock.read().await;
660 if session.parent_id.as_deref() == Some(parent_id) {
661 children.push(id.clone());
662 }
663 }
664
665 children
666 }
667
668 pub async fn is_child_session(&self, session_id: &str) -> Result<bool> {
670 let session_lock = self.get_session(session_id).await?;
671 let session = session_lock.read().await;
672 Ok(session.is_child_session())
673 }
674
675 pub async fn generate(&self, session_id: &str, prompt: &str) -> Result<AgentResult> {
677 let session_lock = self.get_session(session_id).await?;
678
679 {
681 let session = session_lock.read().await;
682 if session.state == SessionState::Paused {
683 anyhow::bail!(
684 "Session {} is paused. Call Resume before generating.",
685 session_id
686 );
687 }
688 }
689
690 let (
692 history,
693 system,
694 tools,
695 session_llm_client,
696 permission_checker,
697 confirmation_manager,
698 context_providers,
699 session_workspace,
700 tool_metrics,
701 hook_engine,
702 planning_mode,
703 goal_tracking,
704 ) = {
705 let session = session_lock.read().await;
706 (
707 session.messages.clone(),
708 session.system().map(String::from),
709 session.tools.clone(),
710 session.llm_client.clone(),
711 session.permission_checker.clone(),
712 session.confirmation_manager.clone(),
713 session.context_providers.clone(),
714 session.config.workspace.clone(),
715 session.tool_metrics.clone(),
716 session.config.hook_engine.clone(),
717 session.config.planning_mode,
718 session.config.goal_tracking,
719 )
720 };
721
722 let llm_client = if let Some(client) = session_llm_client {
724 client
725 } else if let Some(client) = self.llm_client.read().await.clone() {
726 client
727 } else {
728 anyhow::bail!(
729 "LLM client not configured for session {}. Please call Configure RPC with model configuration first.",
730 session_id
731 );
732 };
733
734 let tool_context = if session_workspace.is_empty() {
736 crate::tools::ToolContext::new(self.tool_executor.workspace().clone())
737 .with_session_id(session_id)
738 } else {
739 crate::tools::ToolContext::new(std::path::PathBuf::from(&session_workspace))
740 .with_session_id(session_id)
741 };
742 let tool_context = if let Some(registry) = self.document_parser_registry().await {
743 tool_context.with_document_parsers(registry)
744 } else {
745 tool_context
746 };
747 let tool_context = tool_context.with_document_pipeline(Arc::new(
748 crate::document_pipeline_defaults::build_default_document_pipeline_registry(),
749 ));
750 let tool_context = if let Some(command_env) = self.tool_executor.command_env() {
751 tool_context.with_command_env(command_env)
752 } else {
753 tool_context
754 };
755
756 let skill_registry = match self.session_skill_registry(session_id).await {
758 Some(registry) => Some(registry),
759 None => self.skill_registry.read().await.clone(),
760 };
761 let system = if let Some(ref registry) = skill_registry {
762 let skill_prompt = registry.to_system_prompt();
763 if skill_prompt.is_empty() {
764 system
765 } else {
766 match system {
767 Some(existing) => Some(format!("{}\n\n{}", existing, skill_prompt)),
768 None => Some(skill_prompt),
769 }
770 }
771 } else {
772 system
773 };
774
775 let effective_prompt = if let Some(ref registry) = skill_registry {
777 let skill_content = registry.match_skills(prompt);
778 if skill_content.is_empty() {
779 prompt.to_string()
780 } else {
781 format!("{}\n\n---\n\n{}", skill_content, prompt)
782 }
783 } else {
784 prompt.to_string()
785 };
786
787 let memory = self
789 .memory_store
790 .read()
791 .await
792 .as_ref()
793 .map(|store| Arc::new(AgentMemory::new(store.clone())));
794
795 let config = AgentConfig {
797 prompt_slots: match system {
798 Some(s) => SystemPromptSlots::from_legacy(s),
799 None => SystemPromptSlots::default(),
800 },
801 tools,
802 max_tool_rounds: 50,
803 permission_checker: Some(permission_checker),
804 confirmation_manager: Some(confirmation_manager),
805 context_providers,
806 planning_mode,
807 goal_tracking,
808 hook_engine,
809 skill_registry,
810 memory,
811 ..AgentConfig::default()
812 };
813
814 let agent = AgentLoop::new(llm_client, self.tool_executor.clone(), tool_context, config)
815 .with_tool_metrics(tool_metrics);
816
817 let result = agent
819 .execute_with_session(&history, &effective_prompt, Some(session_id), None, None)
820 .await?;
821
822 {
824 let mut session = session_lock.write().await;
825 session.messages = result.messages.clone();
826 session.update_usage(&result.usage);
827 }
828
829 self.persist_in_background(session_id, "generate");
831
832 if let Err(e) = self.maybe_auto_compact(session_id).await {
834 tracing::warn!("Auto-compact failed for session {}: {}", session_id, e);
835 }
836
837 Ok(result)
838 }
839
840 pub async fn generate_streaming(
842 &self,
843 session_id: &str,
844 prompt: &str,
845 ) -> Result<(
846 mpsc::Receiver<AgentEvent>,
847 tokio::task::JoinHandle<Result<AgentResult>>,
848 tokio_util::sync::CancellationToken,
849 )> {
850 let session_lock = self.get_session(session_id).await?;
851
852 {
854 let session = session_lock.read().await;
855 if session.state == SessionState::Paused {
856 anyhow::bail!(
857 "Session {} is paused. Call Resume before generating.",
858 session_id
859 );
860 }
861 }
862
863 let (
865 history,
866 system,
867 tools,
868 session_llm_client,
869 permission_checker,
870 confirmation_manager,
871 context_providers,
872 session_workspace,
873 tool_metrics,
874 hook_engine,
875 planning_mode,
876 goal_tracking,
877 ) = {
878 let session = session_lock.read().await;
879 (
880 session.messages.clone(),
881 session.system().map(String::from),
882 session.tools.clone(),
883 session.llm_client.clone(),
884 session.permission_checker.clone(),
885 session.confirmation_manager.clone(),
886 session.context_providers.clone(),
887 session.config.workspace.clone(),
888 session.tool_metrics.clone(),
889 session.config.hook_engine.clone(),
890 session.config.planning_mode,
891 session.config.goal_tracking,
892 )
893 };
894
895 let llm_client = if let Some(client) = session_llm_client {
897 client
898 } else if let Some(client) = self.llm_client.read().await.clone() {
899 client
900 } else {
901 anyhow::bail!(
902 "LLM client not configured for session {}. Please call Configure RPC with model configuration first.",
903 session_id
904 );
905 };
906
907 let tool_context = if session_workspace.is_empty() {
909 crate::tools::ToolContext::new(self.tool_executor.workspace().clone())
910 .with_session_id(session_id)
911 } else {
912 crate::tools::ToolContext::new(std::path::PathBuf::from(&session_workspace))
913 .with_session_id(session_id)
914 };
915 let tool_context = if let Some(registry) = self.document_parser_registry().await {
916 tool_context.with_document_parsers(registry)
917 } else {
918 tool_context
919 };
920 let tool_context = if let Some(command_env) = self.tool_executor.command_env() {
921 tool_context.with_command_env(command_env)
922 } else {
923 tool_context
924 };
925
926 let skill_registry = match self.session_skill_registry(session_id).await {
928 Some(registry) => Some(registry),
929 None => self.skill_registry.read().await.clone(),
930 };
931 let system = if let Some(ref registry) = skill_registry {
932 let skill_prompt = registry.to_system_prompt();
933 if skill_prompt.is_empty() {
934 system
935 } else {
936 match system {
937 Some(existing) => Some(format!("{}\n\n{}", existing, skill_prompt)),
938 None => Some(skill_prompt),
939 }
940 }
941 } else {
942 system
943 };
944
945 let effective_prompt = if let Some(ref registry) = skill_registry {
947 let skill_content = registry.match_skills(prompt);
948 if skill_content.is_empty() {
949 prompt.to_string()
950 } else {
951 format!("{}\n\n---\n\n{}", skill_content, prompt)
952 }
953 } else {
954 prompt.to_string()
955 };
956
957 let memory = self
959 .memory_store
960 .read()
961 .await
962 .as_ref()
963 .map(|store| Arc::new(AgentMemory::new(store.clone())));
964
965 let config = AgentConfig {
967 prompt_slots: match system {
968 Some(s) => SystemPromptSlots::from_legacy(s),
969 None => SystemPromptSlots::default(),
970 },
971 tools,
972 max_tool_rounds: 50,
973 permission_checker: Some(permission_checker),
974 confirmation_manager: Some(confirmation_manager),
975 context_providers,
976 planning_mode,
977 goal_tracking,
978 hook_engine,
979 skill_registry,
980 memory,
981 ..AgentConfig::default()
982 };
983
984 let agent = AgentLoop::new(llm_client, self.tool_executor.clone(), tool_context, config)
985 .with_tool_metrics(tool_metrics);
986
987 let (rx, handle, cancel_token) =
989 agent.execute_streaming(&history, &effective_prompt).await?;
990
991 let cancel_token_clone = cancel_token.clone();
993 {
994 let mut ops = self.ongoing_operations.write().await;
995 ops.insert(session_id.to_string(), cancel_token);
996 }
997
998 let session_lock_clone = session_lock.clone();
1000 let original_handle = handle;
1001 let stores = self.stores.clone();
1002 let session_storage_types = self.session_storage_types.clone();
1003 let llm_configs = self.llm_configs.clone();
1004 let session_id_owned = session_id.to_string();
1005 let ongoing_operations = self.ongoing_operations.clone();
1006 let session_manager = self.clone();
1007
1008 let wrapped_handle = tokio::spawn(async move {
1009 let result = original_handle.await??;
1010
1011 {
1013 let mut ops = ongoing_operations.write().await;
1014 ops.remove(&session_id_owned);
1015 }
1016
1017 {
1019 let mut session = session_lock_clone.write().await;
1020 session.messages = result.messages.clone();
1021 session.update_usage(&result.usage);
1022 }
1023
1024 let storage_type = {
1026 let storage_types = session_storage_types.read().await;
1027 storage_types.get(&session_id_owned).cloned()
1028 };
1029
1030 if let Some(storage_type) = storage_type {
1031 if storage_type != crate::config::StorageBackend::Memory {
1032 let stores_guard = stores.read().await;
1033 if let Some(store) = stores_guard.get(&storage_type) {
1034 let session = session_lock_clone.read().await;
1035 let llm_config = {
1036 let configs = llm_configs.read().await;
1037 configs.get(&session_id_owned).cloned()
1038 };
1039 let data = session.to_session_data(llm_config);
1040 if let Err(e) = store.save(&data).await {
1041 tracing::warn!(
1042 "Failed to persist session {} after streaming: {}",
1043 session_id_owned,
1044 e
1045 );
1046 }
1047 }
1048 }
1049 }
1050
1051 if let Err(e) = session_manager.maybe_auto_compact(&session_id_owned).await {
1053 tracing::warn!(
1054 "Auto-compact failed for session {}: {}",
1055 session_id_owned,
1056 e
1057 );
1058 }
1059
1060 Ok(result)
1061 });
1062
1063 Ok((rx, wrapped_handle, cancel_token_clone))
1064 }
1065
1066 pub async fn context_usage(&self, session_id: &str) -> Result<ContextUsage> {
1068 let session_lock = self.get_session(session_id).await?;
1069 let session = session_lock.read().await;
1070 Ok(session.context_usage.clone())
1071 }
1072
1073 pub async fn history(&self, session_id: &str) -> Result<Vec<Message>> {
1075 let session_lock = self.get_session(session_id).await?;
1076 let session = session_lock.read().await;
1077 Ok(session.messages.clone())
1078 }
1079
1080 pub async fn clear(&self, session_id: &str) -> Result<()> {
1082 {
1083 let session_lock = self.get_session(session_id).await?;
1084 let mut session = session_lock.write().await;
1085 session.clear();
1086 }
1087
1088 self.persist_in_background(session_id, "clear");
1090
1091 Ok(())
1092 }
1093
1094 pub async fn compact(&self, session_id: &str) -> Result<()> {
1096 tracing::info!(name: "a3s.session.compact", session_id = %session_id, "Compacting session context");
1097
1098 {
1099 let session_lock = self.get_session(session_id).await?;
1100 let mut session = session_lock.write().await;
1101
1102 let llm_client = if let Some(client) = &session.llm_client {
1104 client.clone()
1105 } else if let Some(client) = self.llm_client.read().await.clone() {
1106 client
1107 } else {
1108 tracing::warn!("No LLM client configured for compaction, using simple truncation");
1110 let keep_messages = 20;
1111 if session.messages.len() > keep_messages {
1112 let len = session.messages.len();
1113 session.messages = session.messages.split_off(len - keep_messages);
1114 }
1115 drop(session);
1117 self.persist_in_background(session_id, "compact");
1118 return Ok(());
1119 };
1120
1121 session.compact(&llm_client).await?;
1122 }
1123
1124 self.persist_in_background(session_id, "compact");
1126
1127 Ok(())
1128 }
1129
1130 pub async fn maybe_auto_compact(&self, session_id: &str) -> Result<bool> {
1137 let (should_compact, percent_before, messages_before) = {
1138 let session_lock = self.get_session(session_id).await?;
1139 let session = session_lock.read().await;
1140
1141 if !session.config.auto_compact {
1142 return Ok(false);
1143 }
1144
1145 let threshold = session.config.auto_compact_threshold;
1146 let percent = session.context_usage.percent;
1147 let msg_count = session.messages.len();
1148
1149 tracing::debug!(
1150 "Auto-compact check for session {}: percent={:.2}%, threshold={:.2}%, messages={}",
1151 session_id,
1152 percent * 100.0,
1153 threshold * 100.0,
1154 msg_count,
1155 );
1156
1157 (percent >= threshold, percent, msg_count)
1158 };
1159
1160 if !should_compact {
1161 return Ok(false);
1162 }
1163
1164 tracing::info!(
1165 name: "a3s.session.auto_compact",
1166 session_id = %session_id,
1167 percent_before = %format!("{:.1}%", percent_before * 100.0),
1168 messages_before = %messages_before,
1169 "Auto-compacting session due to high context usage"
1170 );
1171
1172 self.compact(session_id).await?;
1174
1175 let messages_after = {
1177 let session_lock = self.get_session(session_id).await?;
1178 let session = session_lock.read().await;
1179 session.messages.len()
1180 };
1181
1182 let event = AgentEvent::ContextCompacted {
1184 session_id: session_id.to_string(),
1185 before_messages: messages_before,
1186 after_messages: messages_after,
1187 percent_before,
1188 };
1189
1190 if let Ok(session_lock) = self.get_session(session_id).await {
1192 let session = session_lock.read().await;
1193 let _ = session.event_tx.send(event);
1194 }
1195
1196 tracing::info!(
1197 name: "a3s.session.auto_compact.done",
1198 session_id = %session_id,
1199 messages_before = %messages_before,
1200 messages_after = %messages_after,
1201 "Auto-compaction complete"
1202 );
1203
1204 Ok(true)
1205 }
1206
1207 pub async fn get_llm_for_session(
1211 &self,
1212 session_id: &str,
1213 ) -> Result<Option<Arc<dyn LlmClient>>> {
1214 let session_lock = self.get_session(session_id).await?;
1215 let session = session_lock.read().await;
1216
1217 if let Some(client) = &session.llm_client {
1218 return Ok(Some(client.clone()));
1219 }
1220
1221 Ok(self.llm_client.read().await.clone())
1222 }
1223
1224 pub async fn btw_with_context(
1230 &self,
1231 session_id: &str,
1232 question: &str,
1233 runtime_context: Option<&str>,
1234 ) -> Result<crate::agent_api::BtwResult> {
1235 let question = question.trim();
1236 if question.is_empty() {
1237 anyhow::bail!("btw: question cannot be empty");
1238 }
1239
1240 let session_lock = self.get_session(session_id).await?;
1241 let (history_snapshot, session_runtime) = {
1242 let session = session_lock.read().await;
1243 let mut sections = Vec::new();
1244
1245 let pending_confirmations = session.confirmation_manager.pending_confirmations().await;
1246 if !pending_confirmations.is_empty() {
1247 let mut lines = pending_confirmations
1248 .into_iter()
1249 .map(|pending| {
1250 let arg_summary = Self::compact_json_value(&pending.args);
1251 if arg_summary.is_empty() {
1252 format!(
1253 "- {} [{}] remaining={}ms",
1254 pending.tool_name, pending.tool_id, pending.remaining_ms
1255 )
1256 } else {
1257 format!(
1258 "- {} [{}] remaining={}ms {}",
1259 pending.tool_name,
1260 pending.tool_id,
1261 pending.remaining_ms,
1262 arg_summary
1263 )
1264 }
1265 })
1266 .collect::<Vec<_>>();
1267 lines.sort();
1268 sections.push(format!("[pending confirmations]\n{}", lines.join("\n")));
1269 }
1270
1271 let stats = session.command_queue.stats().await;
1272 if stats.total_active > 0 || stats.total_pending > 0 || stats.external_pending > 0 {
1273 let mut lines = vec![format!(
1274 "active={}, pending={}, external_pending={}",
1275 stats.total_active, stats.total_pending, stats.external_pending
1276 )];
1277 let mut lanes = stats
1278 .lanes
1279 .into_values()
1280 .filter(|lane| lane.active > 0 || lane.pending > 0)
1281 .map(|lane| {
1282 format!(
1283 "- {:?}: active={}, pending={}, handler={:?}",
1284 lane.lane, lane.active, lane.pending, lane.handler_mode
1285 )
1286 })
1287 .collect::<Vec<_>>();
1288 lanes.sort();
1289 lines.extend(lanes);
1290 sections.push(format!("[session queue]\n{}", lines.join("\n")));
1291 }
1292
1293 let external_tasks = session.command_queue.pending_external_tasks().await;
1294 if !external_tasks.is_empty() {
1295 let mut lines = external_tasks
1296 .into_iter()
1297 .take(6)
1298 .map(|task| {
1299 let payload_summary = Self::compact_json_value(&task.payload);
1300 if payload_summary.is_empty() {
1301 format!(
1302 "- {} {:?} remaining={}ms",
1303 task.command_type,
1304 task.lane,
1305 task.remaining_ms()
1306 )
1307 } else {
1308 format!(
1309 "- {} {:?} remaining={}ms {}",
1310 task.command_type,
1311 task.lane,
1312 task.remaining_ms(),
1313 payload_summary
1314 )
1315 }
1316 })
1317 .collect::<Vec<_>>();
1318 lines.sort();
1319 sections.push(format!("[pending external tasks]\n{}", lines.join("\n")));
1320 }
1321
1322 let active_tasks = session
1323 .tasks
1324 .iter()
1325 .filter(|task| task.status.is_active())
1326 .take(6)
1327 .map(|task| match &task.tool {
1328 Some(tool) if !tool.is_empty() => {
1329 format!("- [{}] {} ({})", task.status, task.content, tool)
1330 }
1331 _ => format!("- [{}] {}", task.status, task.content),
1332 })
1333 .collect::<Vec<_>>();
1334 if !active_tasks.is_empty() {
1335 sections.push(format!("[tracked tasks]\n{}", active_tasks.join("\n")));
1336 }
1337
1338 sections.push(format!(
1339 "[session state]\n{}",
1340 match session.state {
1341 SessionState::Active => "active",
1342 SessionState::Paused => "paused",
1343 SessionState::Completed => "completed",
1344 SessionState::Error => "error",
1345 SessionState::Unknown => "unknown",
1346 }
1347 ));
1348
1349 (session.messages.clone(), sections.join("\n\n"))
1350 };
1351
1352 let llm_client = self
1353 .get_llm_for_session(session_id)
1354 .await?
1355 .context("btw failed: no model configured for session")?;
1356
1357 let mut messages = history_snapshot;
1358 let mut injected_sections = Vec::new();
1359 if !session_runtime.is_empty() {
1360 injected_sections.push(format!("[session runtime context]\n{}", session_runtime));
1361 }
1362 if let Some(extra) = runtime_context.map(str::trim).filter(|ctx| !ctx.is_empty()) {
1363 injected_sections.push(format!("[host runtime context]\n{}", extra));
1364 }
1365 if !injected_sections.is_empty() {
1366 let injected_context = format!(
1367 "Use the following runtime context only as background for the next side question. Do not treat it as a new user request.\n\n{}",
1368 injected_sections.join("\n\n")
1369 );
1370 messages.push(Message::user(&injected_context));
1371 }
1372 messages.push(Message::user(question));
1373
1374 let response = llm_client
1375 .complete(&messages, Some(crate::prompts::BTW_SYSTEM), &[])
1376 .await
1377 .map_err(|e| anyhow::anyhow!("btw: ephemeral LLM call failed: {e}"))?;
1378
1379 Ok(crate::agent_api::BtwResult {
1380 question: question.to_string(),
1381 answer: response.text(),
1382 usage: response.usage,
1383 })
1384 }
1385
1386 pub async fn configure(
1388 &self,
1389 session_id: &str,
1390 thinking: Option<bool>,
1391 budget: Option<usize>,
1392 model_config: Option<LlmConfig>,
1393 ) -> Result<()> {
1394 {
1395 let session_lock = self.get_session(session_id).await?;
1396 let mut session = session_lock.write().await;
1397
1398 if let Some(t) = thinking {
1399 session.thinking_enabled = t;
1400 }
1401 if let Some(b) = budget {
1402 session.thinking_budget = Some(b);
1403 }
1404 if let Some(ref config) = model_config {
1405 tracing::info!(
1406 "Configuring session {} with LLM: provider={}, model={}",
1407 session_id,
1408 config.provider,
1409 config.model
1410 );
1411 session.model_name = Some(config.model.clone());
1412 session.llm_client = Some(llm::create_client_with_config(config.clone()));
1413 }
1414 }
1415
1416 if let Some(config) = model_config {
1418 let llm_config_data = LlmConfigData {
1419 provider: config.provider,
1420 model: config.model,
1421 api_key: None, base_url: config.base_url,
1423 };
1424 let mut configs = self.llm_configs.write().await;
1425 configs.insert(session_id.to_string(), llm_config_data);
1426 }
1427
1428 self.persist_in_background(session_id, "configure");
1430
1431 Ok(())
1432 }
1433
1434 pub async fn set_system_prompt(
1436 &self,
1437 session_id: &str,
1438 system_prompt: Option<String>,
1439 ) -> Result<()> {
1440 {
1441 let session_lock = self.get_session(session_id).await?;
1442 let mut session = session_lock.write().await;
1443 session.config.system_prompt = system_prompt;
1444 }
1445
1446 self.persist_in_background(session_id, "set_system_prompt");
1447 Ok(())
1448 }
1449
1450 pub async fn session_count(&self) -> usize {
1452 let sessions = self.sessions.read().await;
1453 sessions.len()
1454 }
1455
1456 pub async fn store_health(&self) -> Vec<(String, Result<()>)> {
1458 let stores = self.stores.read().await;
1459 let mut results = Vec::new();
1460 for (_, store) in stores.iter() {
1461 let name = store.backend_name().to_string();
1462 let result = store.health_check().await;
1463 results.push((name, result));
1464 }
1465 results
1466 }
1467
1468 pub fn list_tools(&self) -> Vec<crate::llm::ToolDefinition> {
1470 self.tool_executor.definitions()
1471 }
1472
1473 pub async fn pause_session(&self, session_id: &str) -> Result<bool> {
1475 let paused = {
1476 let session_lock = self.get_session(session_id).await?;
1477 let mut session = session_lock.write().await;
1478 session.pause()
1479 };
1480
1481 if paused {
1482 self.persist_in_background(session_id, "pause");
1483 }
1484
1485 Ok(paused)
1486 }
1487
1488 pub async fn resume_session(&self, session_id: &str) -> Result<bool> {
1490 let resumed = {
1491 let session_lock = self.get_session(session_id).await?;
1492 let mut session = session_lock.write().await;
1493 session.resume()
1494 };
1495
1496 if resumed {
1497 self.persist_in_background(session_id, "resume");
1498 }
1499
1500 Ok(resumed)
1501 }
1502
1503 pub async fn cancel_operation(&self, session_id: &str) -> Result<bool> {
1507 let session_lock = self.get_session(session_id).await?;
1509 let cancelled_confirmations = {
1510 let session = session_lock.read().await;
1511 session.confirmation_manager.cancel_all().await
1512 };
1513
1514 if cancelled_confirmations > 0 {
1515 tracing::info!(
1516 "Cancelled {} pending confirmations for session {}",
1517 cancelled_confirmations,
1518 session_id
1519 );
1520 }
1521
1522 let cancel_token = {
1524 let mut ops = self.ongoing_operations.write().await;
1525 ops.remove(session_id)
1526 };
1527
1528 if let Some(token) = cancel_token {
1529 token.cancel();
1530 tracing::info!("Cancelled ongoing operation for session {}", session_id);
1531 Ok(true)
1532 } else if cancelled_confirmations > 0 {
1533 Ok(true)
1535 } else {
1536 tracing::debug!("No ongoing operation to cancel for session {}", session_id);
1537 Ok(false)
1538 }
1539 }
1540
1541 pub async fn get_all_sessions(&self) -> Vec<Arc<RwLock<Session>>> {
1543 let sessions = self.sessions.read().await;
1544 sessions.values().cloned().collect()
1545 }
1546
1547 pub fn tool_executor(&self) -> &Arc<ToolExecutor> {
1549 &self.tool_executor
1550 }
1551
1552 pub async fn confirm_tool(
1554 &self,
1555 session_id: &str,
1556 tool_id: &str,
1557 approved: bool,
1558 reason: Option<String>,
1559 ) -> Result<bool> {
1560 let session_lock = self.get_session(session_id).await?;
1561 let session = session_lock.read().await;
1562 session
1563 .confirmation_manager
1564 .confirm(tool_id, approved, reason)
1565 .await
1566 .map_err(|e| anyhow::anyhow!(e))
1567 }
1568
1569 pub async fn set_confirmation_policy(
1571 &self,
1572 session_id: &str,
1573 policy: ConfirmationPolicy,
1574 ) -> Result<ConfirmationPolicy> {
1575 {
1576 let session_lock = self.get_session(session_id).await?;
1577 let session = session_lock.read().await;
1578 session.set_confirmation_policy(policy.clone()).await;
1579 }
1580
1581 {
1583 let session_lock = self.get_session(session_id).await?;
1584 let mut session = session_lock.write().await;
1585 session.config.confirmation_policy = Some(policy.clone());
1586 }
1587
1588 self.persist_in_background(session_id, "set_confirmation_policy");
1590
1591 Ok(policy)
1592 }
1593
1594 pub async fn set_permission_policy(
1596 &self,
1597 session_id: &str,
1598 policy: PermissionPolicy,
1599 ) -> Result<PermissionPolicy> {
1600 {
1601 let session_lock = self.get_session(session_id).await?;
1602 let mut session = session_lock.write().await;
1603 session.set_permission_policy(policy.clone());
1604 }
1605
1606 self.persist_in_background(session_id, "set_permission_policy");
1607
1608 Ok(policy)
1609 }
1610
1611 pub async fn get_confirmation_policy(&self, session_id: &str) -> Result<ConfirmationPolicy> {
1613 let session_lock = self.get_session(session_id).await?;
1614 let session = session_lock.read().await;
1615 Ok(session.confirmation_policy().await)
1616 }
1617}