1use super::{ContextUsage, Session, SessionConfig, SessionState};
4use crate::agent::{AgentConfig, AgentEvent, AgentLoop, AgentResult};
5use crate::hitl::ConfirmationPolicy;
6use crate::llm::{self, LlmClient, LlmConfig, Message};
7use crate::mcp::McpManager;
8use crate::memory::AgentMemory;
9use crate::permissions::PermissionPolicy;
10use crate::prompts::SystemPromptSlots;
11use crate::skills::SkillRegistry;
12use crate::store::{FileSessionStore, LlmConfigData, SessionData, SessionStore};
13use crate::text::truncate_utf8;
14use crate::tools::ToolExecutor;
15use crate::DocumentParserRegistry;
16use a3s_memory::MemoryStore;
17use anyhow::{Context, Result};
18use std::collections::HashMap;
19use std::sync::Arc;
20use tokio::sync::{mpsc, RwLock};
21
22#[derive(Clone)]
24pub struct SessionManager {
25 pub(crate) sessions: Arc<RwLock<HashMap<String, Arc<RwLock<Session>>>>>,
26 pub(crate) llm_client: Arc<RwLock<Option<Arc<dyn LlmClient>>>>,
29 pub(crate) tool_executor: Arc<ToolExecutor>,
30 pub(crate) stores: Arc<RwLock<HashMap<crate::config::StorageBackend, Arc<dyn SessionStore>>>>,
32 pub(crate) session_storage_types: Arc<RwLock<HashMap<String, crate::config::StorageBackend>>>,
34 pub(crate) llm_configs: Arc<RwLock<HashMap<String, LlmConfigData>>>,
36 pub(crate) ongoing_operations:
38 Arc<RwLock<HashMap<String, tokio_util::sync::CancellationToken>>>,
39 pub(crate) skill_registry: Arc<RwLock<Option<Arc<SkillRegistry>>>>,
41 pub(crate) session_skill_registries: Arc<RwLock<HashMap<String, Arc<SkillRegistry>>>>,
43 pub(crate) memory_store: Arc<RwLock<Option<Arc<dyn MemoryStore>>>>,
47 pub(crate) mcp_manager: Arc<RwLock<Option<Arc<McpManager>>>>,
50 pub(crate) document_parser_registry: Arc<RwLock<Option<Arc<DocumentParserRegistry>>>>,
52}
53
54impl SessionManager {
55 fn compact_json_value(value: &serde_json::Value) -> String {
56 let raw = match value {
57 serde_json::Value::Null => String::new(),
58 serde_json::Value::String(s) => s.clone(),
59 _ => serde_json::to_string(value).unwrap_or_default(),
60 };
61 let compact = raw.split_whitespace().collect::<Vec<_>>().join(" ");
62 if compact.len() > 180 {
63 format!("{}...", truncate_utf8(&compact, 180))
64 } else {
65 compact
66 }
67 }
68
69 pub fn new(llm_client: Option<Arc<dyn LlmClient>>, tool_executor: Arc<ToolExecutor>) -> Self {
71 Self {
72 sessions: Arc::new(RwLock::new(HashMap::new())),
73 llm_client: Arc::new(RwLock::new(llm_client)),
74 tool_executor,
75 stores: Arc::new(RwLock::new(HashMap::new())),
76 session_storage_types: Arc::new(RwLock::new(HashMap::new())),
77 llm_configs: Arc::new(RwLock::new(HashMap::new())),
78 ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
79 skill_registry: Arc::new(RwLock::new(None)),
80 session_skill_registries: Arc::new(RwLock::new(HashMap::new())),
81 memory_store: Arc::new(RwLock::new(None)),
82 mcp_manager: Arc::new(RwLock::new(None)),
83 document_parser_registry: Arc::new(RwLock::new(Some(Arc::new(
84 DocumentParserRegistry::new(),
85 )))),
86 }
87 }
88
89 pub async fn with_persistence<P: AsRef<std::path::Path>>(
93 llm_client: Option<Arc<dyn LlmClient>>,
94 tool_executor: Arc<ToolExecutor>,
95 sessions_dir: P,
96 ) -> Result<Self> {
97 let store = FileSessionStore::new(sessions_dir).await?;
98 let mut stores = HashMap::new();
99 stores.insert(
100 crate::config::StorageBackend::File,
101 Arc::new(store) as Arc<dyn SessionStore>,
102 );
103
104 let manager = Self {
105 sessions: Arc::new(RwLock::new(HashMap::new())),
106 llm_client: Arc::new(RwLock::new(llm_client)),
107 tool_executor,
108 stores: Arc::new(RwLock::new(stores)),
109 session_storage_types: Arc::new(RwLock::new(HashMap::new())),
110 llm_configs: Arc::new(RwLock::new(HashMap::new())),
111 ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
112 skill_registry: Arc::new(RwLock::new(None)),
113 session_skill_registries: Arc::new(RwLock::new(HashMap::new())),
114 memory_store: Arc::new(RwLock::new(None)),
115 mcp_manager: Arc::new(RwLock::new(None)),
116 document_parser_registry: Arc::new(RwLock::new(Some(Arc::new(
117 DocumentParserRegistry::new(),
118 )))),
119 };
120
121 Ok(manager)
122 }
123
124 pub fn with_store(
129 llm_client: Option<Arc<dyn LlmClient>>,
130 tool_executor: Arc<ToolExecutor>,
131 store: Arc<dyn SessionStore>,
132 backend: crate::config::StorageBackend,
133 ) -> Self {
134 let mut stores = HashMap::new();
135 stores.insert(backend, store);
136
137 Self {
138 sessions: Arc::new(RwLock::new(HashMap::new())),
139 llm_client: Arc::new(RwLock::new(llm_client)),
140 tool_executor,
141 stores: Arc::new(RwLock::new(stores)),
142 session_storage_types: Arc::new(RwLock::new(HashMap::new())),
143 llm_configs: Arc::new(RwLock::new(HashMap::new())),
144 ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
145 skill_registry: Arc::new(RwLock::new(None)),
146 session_skill_registries: Arc::new(RwLock::new(HashMap::new())),
147 memory_store: Arc::new(RwLock::new(None)),
148 mcp_manager: Arc::new(RwLock::new(None)),
149 document_parser_registry: Arc::new(RwLock::new(Some(Arc::new(
150 DocumentParserRegistry::new(),
151 )))),
152 }
153 }
154
155 pub async fn set_default_llm(&self, client: Option<Arc<dyn LlmClient>>) {
160 *self.llm_client.write().await = client;
161 }
162
163 pub async fn set_skill_registry(
170 &self,
171 registry: Arc<SkillRegistry>,
172 skills_dir: std::path::PathBuf,
173 ) {
174 let manage_tool = crate::skills::ManageSkillTool::new(registry.clone(), skills_dir);
176 self.tool_executor
177 .register_dynamic_tool(Arc::new(manage_tool));
178
179 *self.skill_registry.write().await = Some(registry);
180 }
181
182 pub async fn skill_registry(&self) -> Option<Arc<SkillRegistry>> {
184 self.skill_registry.read().await.clone()
185 }
186
187 pub async fn set_session_skill_registry(
189 &self,
190 session_id: impl Into<String>,
191 registry: Arc<SkillRegistry>,
192 ) {
193 self.session_skill_registries
194 .write()
195 .await
196 .insert(session_id.into(), registry);
197 }
198
199 pub async fn session_skill_registry(&self, session_id: &str) -> Option<Arc<SkillRegistry>> {
201 self.session_skill_registries
202 .read()
203 .await
204 .get(session_id)
205 .cloned()
206 }
207
208 pub async fn set_memory_store(&self, store: Arc<dyn MemoryStore>) {
215 *self.memory_store.write().await = Some(store);
216 }
217
218 pub async fn memory_store(&self) -> Option<Arc<dyn MemoryStore>> {
220 self.memory_store.read().await.clone()
221 }
222
223 pub async fn set_mcp_manager(&self, manager: Arc<McpManager>) {
228 let all_tools = manager.get_all_tools().await;
229 let mut by_server: HashMap<String, Vec<crate::mcp::McpTool>> = HashMap::new();
230 for (server, tool) in all_tools {
231 by_server.entry(server).or_default().push(tool);
232 }
233 for (server_name, tools) in by_server {
234 for tool in
235 crate::mcp::tools::create_mcp_tools(&server_name, tools, Arc::clone(&manager))
236 {
237 self.tool_executor.register_dynamic_tool(tool);
238 }
239 }
240 *self.mcp_manager.write().await = Some(manager);
241 }
242
243 pub async fn add_mcp_server(&self, config: crate::mcp::McpServerConfig) -> Result<()> {
248 let manager = {
249 let guard = self.mcp_manager.read().await;
250 match guard.clone() {
251 Some(m) => m,
252 None => {
253 drop(guard);
254 let m = Arc::new(McpManager::new());
255 *self.mcp_manager.write().await = Some(Arc::clone(&m));
256 m
257 }
258 }
259 };
260 let name = config.name.clone();
261 manager.register_server(config).await;
262 manager.connect(&name).await?;
263 let tools = manager.get_server_tools(&name).await;
264 for tool in crate::mcp::tools::create_mcp_tools(&name, tools, Arc::clone(&manager)) {
265 self.tool_executor.register_dynamic_tool(tool);
266 }
267 Ok(())
268 }
269
270 pub async fn remove_mcp_server(&self, name: &str) -> Result<()> {
274 let guard = self.mcp_manager.read().await;
275 if let Some(ref manager) = *guard {
276 manager.disconnect(name).await?;
277 }
278 self.tool_executor
279 .unregister_tools_by_prefix(&format!("mcp__{name}__"));
280 Ok(())
281 }
282
283 pub async fn mcp_status(
285 &self,
286 ) -> std::collections::HashMap<String, crate::mcp::McpServerStatus> {
287 let guard = self.mcp_manager.read().await;
288 match guard.as_ref() {
289 Some(m) => {
290 let m = Arc::clone(m);
291 drop(guard);
292 m.get_status().await
293 }
294 None => std::collections::HashMap::new(),
295 }
296 }
297
298 pub async fn set_document_parser_registry(&self, registry: Arc<DocumentParserRegistry>) {
300 *self.document_parser_registry.write().await = Some(registry);
301 }
302
303 pub async fn document_parser_registry(&self) -> Option<Arc<DocumentParserRegistry>> {
305 self.document_parser_registry.read().await.clone()
306 }
307
308 pub async fn restore_session_by_id(&self, session_id: &str) -> Result<()> {
313 {
315 let sessions = self.sessions.read().await;
316 if sessions.contains_key(session_id) {
317 return Ok(());
318 }
319 }
320
321 let stores = self.stores.read().await;
322 for (backend, store) in stores.iter() {
323 match store.load(session_id).await {
324 Ok(Some(data)) => {
325 {
326 let mut storage_types = self.session_storage_types.write().await;
327 storage_types.insert(data.id.clone(), backend.clone());
328 }
329 self.restore_session(data).await?;
330 return Ok(());
331 }
332 Ok(None) => continue,
333 Err(e) => {
334 tracing::warn!(
335 "Failed to load session {} from {:?}: {}",
336 session_id,
337 backend,
338 e
339 );
340 continue;
341 }
342 }
343 }
344
345 Err(anyhow::anyhow!(
346 "Session {} not found in any store",
347 session_id
348 ))
349 }
350
351 pub async fn load_all_sessions(&mut self) -> Result<usize> {
353 let stores = self.stores.read().await;
354 let mut loaded = 0;
355
356 for (backend, store) in stores.iter() {
357 let session_ids = match store.list().await {
358 Ok(ids) => ids,
359 Err(e) => {
360 tracing::warn!("Failed to list sessions from {:?} store: {}", backend, e);
361 continue;
362 }
363 };
364
365 for id in session_ids {
366 match store.load(&id).await {
367 Ok(Some(data)) => {
368 {
370 let mut storage_types = self.session_storage_types.write().await;
371 storage_types.insert(data.id.clone(), backend.clone());
372 }
373
374 if let Err(e) = self.restore_session(data).await {
375 tracing::warn!("Failed to restore session {}: {}", id, e);
376 } else {
377 loaded += 1;
378 }
379 }
380 Ok(None) => {
381 tracing::warn!("Session {} not found in store", id);
382 }
383 Err(e) => {
384 tracing::warn!("Failed to load session {}: {}", id, e);
385 }
386 }
387 }
388 }
389
390 tracing::info!("Loaded {} sessions from store", loaded);
391 Ok(loaded)
392 }
393
394 async fn restore_session(&self, data: SessionData) -> Result<()> {
396 let tools = self.tool_executor.definitions();
397 let mut session = Session::new(data.id.clone(), data.config.clone(), tools).await?;
398
399 session.restore_from_data(&data);
401
402 if let Some(llm_config) = &data.llm_config {
404 let mut configs = self.llm_configs.write().await;
405 configs.insert(data.id.clone(), llm_config.clone());
406 }
407
408 let mut sessions = self.sessions.write().await;
409 sessions.insert(data.id.clone(), Arc::new(RwLock::new(session)));
410
411 tracing::info!("Restored session: {}", data.id);
412 Ok(())
413 }
414
415 async fn save_session(&self, session_id: &str) -> Result<()> {
417 let storage_type = {
419 let storage_types = self.session_storage_types.read().await;
420 storage_types.get(session_id).cloned()
421 };
422
423 let Some(storage_type) = storage_type else {
424 return Ok(());
426 };
427
428 if storage_type == crate::config::StorageBackend::Memory {
430 return Ok(());
431 }
432
433 let stores = self.stores.read().await;
435 let Some(store) = stores.get(&storage_type) else {
436 tracing::warn!("No store available for storage type: {:?}", storage_type);
437 return Ok(());
438 };
439
440 let session_lock = self.get_session(session_id).await?;
441 let session = session_lock.read().await;
442
443 let llm_config = {
445 let configs = self.llm_configs.read().await;
446 configs.get(session_id).cloned()
447 };
448
449 let data = session.to_session_data(llm_config);
450 store.save(&data).await?;
451
452 tracing::debug!("Saved session: {}", session_id);
453 Ok(())
454 }
455
456 async fn persist_or_warn(&self, session_id: &str, operation: &str) {
460 if let Err(e) = self.save_session(session_id).await {
461 tracing::warn!(
462 "Failed to persist session {} after {}: {}",
463 session_id,
464 operation,
465 e
466 );
467 if let Ok(session_lock) = self.get_session(session_id).await {
469 let session = session_lock.read().await;
470 let _ = session.event_tx().send(AgentEvent::PersistenceFailed {
471 session_id: session_id.to_string(),
472 operation: operation.to_string(),
473 error: e.to_string(),
474 });
475 }
476 }
477 }
478
479 fn persist_in_background(&self, session_id: &str, operation: &str) {
484 let mgr = self.clone();
485 let sid = session_id.to_string();
486 let op = operation.to_string();
487 tokio::spawn(async move {
488 mgr.persist_or_warn(&sid, &op).await;
489 });
490 }
491
492 pub async fn create_session(&self, id: String, config: SessionConfig) -> Result<String> {
494 tracing::info!(name: "a3s.session.create", session_id = %id, "Creating session");
495
496 {
498 let mut storage_types = self.session_storage_types.write().await;
499 storage_types.insert(id.clone(), config.storage_type.clone());
500 }
501
502 let tools = self.tool_executor.definitions();
504 let mut session = Session::new(id.clone(), config, tools).await?;
505
506 session.start_queue().await?;
508
509 if session.config.max_context_length > 0 {
511 session.context_usage.max_tokens = session.config.max_context_length as usize;
512 }
513
514 {
515 let mut sessions = self.sessions.write().await;
516 sessions.insert(id.clone(), Arc::new(RwLock::new(session)));
517 }
518
519 self.persist_in_background(&id, "create");
521
522 tracing::info!("Created session: {}", id);
523 Ok(id)
524 }
525
526 pub async fn destroy_session(&self, id: &str) -> Result<()> {
528 tracing::info!(name: "a3s.session.destroy", session_id = %id, "Destroying session");
529
530 let storage_type = {
532 let storage_types = self.session_storage_types.read().await;
533 storage_types.get(id).cloned()
534 };
535
536 {
537 let mut sessions = self.sessions.write().await;
538 sessions.remove(id);
539 }
540
541 {
543 let mut configs = self.llm_configs.write().await;
544 configs.remove(id);
545 }
546
547 {
548 let mut registries = self.session_skill_registries.write().await;
549 registries.remove(id);
550 }
551
552 {
554 let mut storage_types = self.session_storage_types.write().await;
555 storage_types.remove(id);
556 }
557
558 if let Some(storage_type) = storage_type {
560 if storage_type != crate::config::StorageBackend::Memory {
561 let stores = self.stores.read().await;
562 if let Some(store) = stores.get(&storage_type) {
563 if let Err(e) = store.delete(id).await {
564 tracing::warn!("Failed to delete session {} from store: {}", id, e);
565 }
566 }
567 }
568 }
569
570 tracing::info!("Destroyed session: {}", id);
571 Ok(())
572 }
573
574 pub async fn get_session(&self, id: &str) -> Result<Arc<RwLock<Session>>> {
576 let sessions = self.sessions.read().await;
577 sessions
578 .get(id)
579 .cloned()
580 .context(format!("Session not found: {}", id))
581 }
582
583 pub async fn create_child_session(
588 &self,
589 parent_id: &str,
590 child_id: String,
591 mut config: SessionConfig,
592 ) -> Result<String> {
593 let parent_lock = self.get_session(parent_id).await?;
595 let parent_llm_client = {
596 let parent = parent_lock.read().await;
597
598 if config.confirmation_policy.is_none() {
600 let parent_policy = parent.confirmation_manager.policy().await;
601 config.confirmation_policy = Some(parent_policy);
602 }
603
604 parent.llm_client.clone()
605 };
606
607 config.parent_id = Some(parent_id.to_string());
609
610 let tools = self.tool_executor.definitions();
612 let mut session = Session::new(child_id.clone(), config, tools).await?;
613
614 if session.llm_client.is_none() {
616 let default_llm = self.llm_client.read().await.clone();
617 session.llm_client = parent_llm_client.or(default_llm);
618 }
619
620 session.start_queue().await?;
622
623 if session.config.max_context_length > 0 {
625 session.context_usage.max_tokens = session.config.max_context_length as usize;
626 }
627
628 {
629 let mut sessions = self.sessions.write().await;
630 sessions.insert(child_id.clone(), Arc::new(RwLock::new(session)));
631 }
632
633 self.persist_in_background(&child_id, "create_child");
635
636 tracing::info!(
637 "Created child session: {} (parent: {})",
638 child_id,
639 parent_id
640 );
641 Ok(child_id)
642 }
643
644 pub async fn get_child_sessions(&self, parent_id: &str) -> Vec<String> {
646 let sessions = self.sessions.read().await;
647 let mut children = Vec::new();
648
649 for (id, session_lock) in sessions.iter() {
650 let session = session_lock.read().await;
651 if session.parent_id.as_deref() == Some(parent_id) {
652 children.push(id.clone());
653 }
654 }
655
656 children
657 }
658
659 pub async fn is_child_session(&self, session_id: &str) -> Result<bool> {
661 let session_lock = self.get_session(session_id).await?;
662 let session = session_lock.read().await;
663 Ok(session.is_child_session())
664 }
665
666 pub async fn generate(&self, session_id: &str, prompt: &str) -> Result<AgentResult> {
668 let session_lock = self.get_session(session_id).await?;
669
670 {
672 let session = session_lock.read().await;
673 if session.state == SessionState::Paused {
674 anyhow::bail!(
675 "Session {} is paused. Call Resume before generating.",
676 session_id
677 );
678 }
679 }
680
681 let (
683 history,
684 system,
685 tools,
686 session_llm_client,
687 permission_checker,
688 confirmation_manager,
689 context_providers,
690 session_workspace,
691 tool_metrics,
692 hook_engine,
693 planning_enabled,
694 goal_tracking,
695 ) = {
696 let session = session_lock.read().await;
697 (
698 session.messages.clone(),
699 session.system().map(String::from),
700 session.tools.clone(),
701 session.llm_client.clone(),
702 session.permission_checker.clone(),
703 session.confirmation_manager.clone(),
704 session.context_providers.clone(),
705 session.config.workspace.clone(),
706 session.tool_metrics.clone(),
707 session.config.hook_engine.clone(),
708 session.config.planning_enabled,
709 session.config.goal_tracking,
710 )
711 };
712
713 let llm_client = if let Some(client) = session_llm_client {
715 client
716 } else if let Some(client) = self.llm_client.read().await.clone() {
717 client
718 } else {
719 anyhow::bail!(
720 "LLM client not configured for session {}. Please call Configure RPC with model configuration first.",
721 session_id
722 );
723 };
724
725 let tool_context = if session_workspace.is_empty() {
727 crate::tools::ToolContext::new(self.tool_executor.workspace().clone())
728 .with_session_id(session_id)
729 } else {
730 crate::tools::ToolContext::new(std::path::PathBuf::from(&session_workspace))
731 .with_session_id(session_id)
732 };
733 let tool_context = if let Some(registry) = self.document_parser_registry().await {
734 tool_context.with_document_parsers(registry)
735 } else {
736 tool_context
737 };
738 let tool_context = if let Some(command_env) = self.tool_executor.command_env() {
739 tool_context.with_command_env(command_env)
740 } else {
741 tool_context
742 };
743
744 let skill_registry = match self.session_skill_registry(session_id).await {
746 Some(registry) => Some(registry),
747 None => self.skill_registry.read().await.clone(),
748 };
749 let system = if let Some(ref registry) = skill_registry {
750 let skill_prompt = registry.to_system_prompt();
751 if skill_prompt.is_empty() {
752 system
753 } else {
754 match system {
755 Some(existing) => Some(format!("{}\n\n{}", existing, skill_prompt)),
756 None => Some(skill_prompt),
757 }
758 }
759 } else {
760 system
761 };
762
763 let effective_prompt = if let Some(ref registry) = skill_registry {
765 let skill_content = registry.match_skills(prompt);
766 if skill_content.is_empty() {
767 prompt.to_string()
768 } else {
769 format!("{}\n\n---\n\n{}", skill_content, prompt)
770 }
771 } else {
772 prompt.to_string()
773 };
774
775 let memory = self
777 .memory_store
778 .read()
779 .await
780 .as_ref()
781 .map(|store| Arc::new(AgentMemory::new(store.clone())));
782
783 let config = AgentConfig {
785 prompt_slots: match system {
786 Some(s) => SystemPromptSlots::from_legacy(s),
787 None => SystemPromptSlots::default(),
788 },
789 tools,
790 max_tool_rounds: 50,
791 permission_checker: Some(permission_checker),
792 confirmation_manager: Some(confirmation_manager),
793 context_providers,
794 planning_enabled,
795 goal_tracking,
796 hook_engine,
797 skill_registry,
798 memory,
799 ..AgentConfig::default()
800 };
801
802 let agent = AgentLoop::new(llm_client, self.tool_executor.clone(), tool_context, config)
803 .with_tool_metrics(tool_metrics);
804
805 let result = agent
807 .execute_with_session(&history, &effective_prompt, Some(session_id), None, None)
808 .await?;
809
810 {
812 let mut session = session_lock.write().await;
813 session.messages = result.messages.clone();
814 session.update_usage(&result.usage);
815 }
816
817 self.persist_in_background(session_id, "generate");
819
820 if let Err(e) = self.maybe_auto_compact(session_id).await {
822 tracing::warn!("Auto-compact failed for session {}: {}", session_id, e);
823 }
824
825 Ok(result)
826 }
827
828 pub async fn generate_streaming(
830 &self,
831 session_id: &str,
832 prompt: &str,
833 ) -> Result<(
834 mpsc::Receiver<AgentEvent>,
835 tokio::task::JoinHandle<Result<AgentResult>>,
836 tokio_util::sync::CancellationToken,
837 )> {
838 let session_lock = self.get_session(session_id).await?;
839
840 {
842 let session = session_lock.read().await;
843 if session.state == SessionState::Paused {
844 anyhow::bail!(
845 "Session {} is paused. Call Resume before generating.",
846 session_id
847 );
848 }
849 }
850
851 let (
853 history,
854 system,
855 tools,
856 session_llm_client,
857 permission_checker,
858 confirmation_manager,
859 context_providers,
860 session_workspace,
861 tool_metrics,
862 hook_engine,
863 planning_enabled,
864 goal_tracking,
865 ) = {
866 let session = session_lock.read().await;
867 (
868 session.messages.clone(),
869 session.system().map(String::from),
870 session.tools.clone(),
871 session.llm_client.clone(),
872 session.permission_checker.clone(),
873 session.confirmation_manager.clone(),
874 session.context_providers.clone(),
875 session.config.workspace.clone(),
876 session.tool_metrics.clone(),
877 session.config.hook_engine.clone(),
878 session.config.planning_enabled,
879 session.config.goal_tracking,
880 )
881 };
882
883 let llm_client = if let Some(client) = session_llm_client {
885 client
886 } else if let Some(client) = self.llm_client.read().await.clone() {
887 client
888 } else {
889 anyhow::bail!(
890 "LLM client not configured for session {}. Please call Configure RPC with model configuration first.",
891 session_id
892 );
893 };
894
895 let tool_context = if session_workspace.is_empty() {
897 crate::tools::ToolContext::new(self.tool_executor.workspace().clone())
898 .with_session_id(session_id)
899 } else {
900 crate::tools::ToolContext::new(std::path::PathBuf::from(&session_workspace))
901 .with_session_id(session_id)
902 };
903 let tool_context = if let Some(registry) = self.document_parser_registry().await {
904 tool_context.with_document_parsers(registry)
905 } else {
906 tool_context
907 };
908 let tool_context = if let Some(command_env) = self.tool_executor.command_env() {
909 tool_context.with_command_env(command_env)
910 } else {
911 tool_context
912 };
913
914 let skill_registry = match self.session_skill_registry(session_id).await {
916 Some(registry) => Some(registry),
917 None => self.skill_registry.read().await.clone(),
918 };
919 let system = if let Some(ref registry) = skill_registry {
920 let skill_prompt = registry.to_system_prompt();
921 if skill_prompt.is_empty() {
922 system
923 } else {
924 match system {
925 Some(existing) => Some(format!("{}\n\n{}", existing, skill_prompt)),
926 None => Some(skill_prompt),
927 }
928 }
929 } else {
930 system
931 };
932
933 let effective_prompt = if let Some(ref registry) = skill_registry {
935 let skill_content = registry.match_skills(prompt);
936 if skill_content.is_empty() {
937 prompt.to_string()
938 } else {
939 format!("{}\n\n---\n\n{}", skill_content, prompt)
940 }
941 } else {
942 prompt.to_string()
943 };
944
945 let memory = self
947 .memory_store
948 .read()
949 .await
950 .as_ref()
951 .map(|store| Arc::new(AgentMemory::new(store.clone())));
952
953 let config = AgentConfig {
955 prompt_slots: match system {
956 Some(s) => SystemPromptSlots::from_legacy(s),
957 None => SystemPromptSlots::default(),
958 },
959 tools,
960 max_tool_rounds: 50,
961 permission_checker: Some(permission_checker),
962 confirmation_manager: Some(confirmation_manager),
963 context_providers,
964 planning_enabled,
965 goal_tracking,
966 hook_engine,
967 skill_registry,
968 memory,
969 ..AgentConfig::default()
970 };
971
972 let agent = AgentLoop::new(llm_client, self.tool_executor.clone(), tool_context, config)
973 .with_tool_metrics(tool_metrics);
974
975 let (rx, handle, cancel_token) =
977 agent.execute_streaming(&history, &effective_prompt).await?;
978
979 let cancel_token_clone = cancel_token.clone();
981 {
982 let mut ops = self.ongoing_operations.write().await;
983 ops.insert(session_id.to_string(), cancel_token);
984 }
985
986 let session_lock_clone = session_lock.clone();
988 let original_handle = handle;
989 let stores = self.stores.clone();
990 let session_storage_types = self.session_storage_types.clone();
991 let llm_configs = self.llm_configs.clone();
992 let session_id_owned = session_id.to_string();
993 let ongoing_operations = self.ongoing_operations.clone();
994 let session_manager = self.clone();
995
996 let wrapped_handle = tokio::spawn(async move {
997 let result = original_handle.await??;
998
999 {
1001 let mut ops = ongoing_operations.write().await;
1002 ops.remove(&session_id_owned);
1003 }
1004
1005 {
1007 let mut session = session_lock_clone.write().await;
1008 session.messages = result.messages.clone();
1009 session.update_usage(&result.usage);
1010 }
1011
1012 let storage_type = {
1014 let storage_types = session_storage_types.read().await;
1015 storage_types.get(&session_id_owned).cloned()
1016 };
1017
1018 if let Some(storage_type) = storage_type {
1019 if storage_type != crate::config::StorageBackend::Memory {
1020 let stores_guard = stores.read().await;
1021 if let Some(store) = stores_guard.get(&storage_type) {
1022 let session = session_lock_clone.read().await;
1023 let llm_config = {
1024 let configs = llm_configs.read().await;
1025 configs.get(&session_id_owned).cloned()
1026 };
1027 let data = session.to_session_data(llm_config);
1028 if let Err(e) = store.save(&data).await {
1029 tracing::warn!(
1030 "Failed to persist session {} after streaming: {}",
1031 session_id_owned,
1032 e
1033 );
1034 }
1035 }
1036 }
1037 }
1038
1039 if let Err(e) = session_manager.maybe_auto_compact(&session_id_owned).await {
1041 tracing::warn!(
1042 "Auto-compact failed for session {}: {}",
1043 session_id_owned,
1044 e
1045 );
1046 }
1047
1048 Ok(result)
1049 });
1050
1051 Ok((rx, wrapped_handle, cancel_token_clone))
1052 }
1053
1054 pub async fn context_usage(&self, session_id: &str) -> Result<ContextUsage> {
1056 let session_lock = self.get_session(session_id).await?;
1057 let session = session_lock.read().await;
1058 Ok(session.context_usage.clone())
1059 }
1060
1061 pub async fn history(&self, session_id: &str) -> Result<Vec<Message>> {
1063 let session_lock = self.get_session(session_id).await?;
1064 let session = session_lock.read().await;
1065 Ok(session.messages.clone())
1066 }
1067
1068 pub async fn clear(&self, session_id: &str) -> Result<()> {
1070 {
1071 let session_lock = self.get_session(session_id).await?;
1072 let mut session = session_lock.write().await;
1073 session.clear();
1074 }
1075
1076 self.persist_in_background(session_id, "clear");
1078
1079 Ok(())
1080 }
1081
1082 pub async fn compact(&self, session_id: &str) -> Result<()> {
1084 tracing::info!(name: "a3s.session.compact", session_id = %session_id, "Compacting session context");
1085
1086 {
1087 let session_lock = self.get_session(session_id).await?;
1088 let mut session = session_lock.write().await;
1089
1090 let llm_client = if let Some(client) = &session.llm_client {
1092 client.clone()
1093 } else if let Some(client) = self.llm_client.read().await.clone() {
1094 client
1095 } else {
1096 tracing::warn!("No LLM client configured for compaction, using simple truncation");
1098 let keep_messages = 20;
1099 if session.messages.len() > keep_messages {
1100 let len = session.messages.len();
1101 session.messages = session.messages.split_off(len - keep_messages);
1102 }
1103 drop(session);
1105 self.persist_in_background(session_id, "compact");
1106 return Ok(());
1107 };
1108
1109 session.compact(&llm_client).await?;
1110 }
1111
1112 self.persist_in_background(session_id, "compact");
1114
1115 Ok(())
1116 }
1117
1118 pub async fn maybe_auto_compact(&self, session_id: &str) -> Result<bool> {
1125 let (should_compact, percent_before, messages_before) = {
1126 let session_lock = self.get_session(session_id).await?;
1127 let session = session_lock.read().await;
1128
1129 if !session.config.auto_compact {
1130 return Ok(false);
1131 }
1132
1133 let threshold = session.config.auto_compact_threshold;
1134 let percent = session.context_usage.percent;
1135 let msg_count = session.messages.len();
1136
1137 tracing::debug!(
1138 "Auto-compact check for session {}: percent={:.2}%, threshold={:.2}%, messages={}",
1139 session_id,
1140 percent * 100.0,
1141 threshold * 100.0,
1142 msg_count,
1143 );
1144
1145 (percent >= threshold, percent, msg_count)
1146 };
1147
1148 if !should_compact {
1149 return Ok(false);
1150 }
1151
1152 tracing::info!(
1153 name: "a3s.session.auto_compact",
1154 session_id = %session_id,
1155 percent_before = %format!("{:.1}%", percent_before * 100.0),
1156 messages_before = %messages_before,
1157 "Auto-compacting session due to high context usage"
1158 );
1159
1160 self.compact(session_id).await?;
1162
1163 let messages_after = {
1165 let session_lock = self.get_session(session_id).await?;
1166 let session = session_lock.read().await;
1167 session.messages.len()
1168 };
1169
1170 let event = AgentEvent::ContextCompacted {
1172 session_id: session_id.to_string(),
1173 before_messages: messages_before,
1174 after_messages: messages_after,
1175 percent_before,
1176 };
1177
1178 if let Ok(session_lock) = self.get_session(session_id).await {
1180 let session = session_lock.read().await;
1181 let _ = session.event_tx.send(event);
1182 }
1183
1184 tracing::info!(
1185 name: "a3s.session.auto_compact.done",
1186 session_id = %session_id,
1187 messages_before = %messages_before,
1188 messages_after = %messages_after,
1189 "Auto-compaction complete"
1190 );
1191
1192 Ok(true)
1193 }
1194
1195 pub async fn get_llm_for_session(
1199 &self,
1200 session_id: &str,
1201 ) -> Result<Option<Arc<dyn LlmClient>>> {
1202 let session_lock = self.get_session(session_id).await?;
1203 let session = session_lock.read().await;
1204
1205 if let Some(client) = &session.llm_client {
1206 return Ok(Some(client.clone()));
1207 }
1208
1209 Ok(self.llm_client.read().await.clone())
1210 }
1211
1212 pub async fn btw_with_context(
1218 &self,
1219 session_id: &str,
1220 question: &str,
1221 runtime_context: Option<&str>,
1222 ) -> Result<crate::agent_api::BtwResult> {
1223 let question = question.trim();
1224 if question.is_empty() {
1225 anyhow::bail!("btw: question cannot be empty");
1226 }
1227
1228 let session_lock = self.get_session(session_id).await?;
1229 let (history_snapshot, session_runtime) = {
1230 let session = session_lock.read().await;
1231 let mut sections = Vec::new();
1232
1233 let pending_confirmations = session.confirmation_manager.pending_confirmations().await;
1234 if !pending_confirmations.is_empty() {
1235 let mut lines = pending_confirmations
1236 .into_iter()
1237 .map(|pending| {
1238 let arg_summary = Self::compact_json_value(&pending.args);
1239 if arg_summary.is_empty() {
1240 format!(
1241 "- {} [{}] remaining={}ms",
1242 pending.tool_name, pending.tool_id, pending.remaining_ms
1243 )
1244 } else {
1245 format!(
1246 "- {} [{}] remaining={}ms {}",
1247 pending.tool_name,
1248 pending.tool_id,
1249 pending.remaining_ms,
1250 arg_summary
1251 )
1252 }
1253 })
1254 .collect::<Vec<_>>();
1255 lines.sort();
1256 sections.push(format!("[pending confirmations]\n{}", lines.join("\n")));
1257 }
1258
1259 let stats = session.command_queue.stats().await;
1260 if stats.total_active > 0 || stats.total_pending > 0 || stats.external_pending > 0 {
1261 let mut lines = vec![format!(
1262 "active={}, pending={}, external_pending={}",
1263 stats.total_active, stats.total_pending, stats.external_pending
1264 )];
1265 let mut lanes = stats
1266 .lanes
1267 .into_values()
1268 .filter(|lane| lane.active > 0 || lane.pending > 0)
1269 .map(|lane| {
1270 format!(
1271 "- {:?}: active={}, pending={}, handler={:?}",
1272 lane.lane, lane.active, lane.pending, lane.handler_mode
1273 )
1274 })
1275 .collect::<Vec<_>>();
1276 lanes.sort();
1277 lines.extend(lanes);
1278 sections.push(format!("[session queue]\n{}", lines.join("\n")));
1279 }
1280
1281 let external_tasks = session.command_queue.pending_external_tasks().await;
1282 if !external_tasks.is_empty() {
1283 let mut lines = external_tasks
1284 .into_iter()
1285 .take(6)
1286 .map(|task| {
1287 let payload_summary = Self::compact_json_value(&task.payload);
1288 if payload_summary.is_empty() {
1289 format!(
1290 "- {} {:?} remaining={}ms",
1291 task.command_type,
1292 task.lane,
1293 task.remaining_ms()
1294 )
1295 } else {
1296 format!(
1297 "- {} {:?} remaining={}ms {}",
1298 task.command_type,
1299 task.lane,
1300 task.remaining_ms(),
1301 payload_summary
1302 )
1303 }
1304 })
1305 .collect::<Vec<_>>();
1306 lines.sort();
1307 sections.push(format!("[pending external tasks]\n{}", lines.join("\n")));
1308 }
1309
1310 let active_tasks = session
1311 .tasks
1312 .iter()
1313 .filter(|task| task.status.is_active())
1314 .take(6)
1315 .map(|task| match &task.tool {
1316 Some(tool) if !tool.is_empty() => {
1317 format!("- [{}] {} ({})", task.status, task.content, tool)
1318 }
1319 _ => format!("- [{}] {}", task.status, task.content),
1320 })
1321 .collect::<Vec<_>>();
1322 if !active_tasks.is_empty() {
1323 sections.push(format!("[tracked tasks]\n{}", active_tasks.join("\n")));
1324 }
1325
1326 sections.push(format!(
1327 "[session state]\n{}",
1328 match session.state {
1329 SessionState::Active => "active",
1330 SessionState::Paused => "paused",
1331 SessionState::Completed => "completed",
1332 SessionState::Error => "error",
1333 SessionState::Unknown => "unknown",
1334 }
1335 ));
1336
1337 (session.messages.clone(), sections.join("\n\n"))
1338 };
1339
1340 let llm_client = self
1341 .get_llm_for_session(session_id)
1342 .await?
1343 .context("btw failed: no model configured for session")?;
1344
1345 let mut messages = history_snapshot;
1346 let mut injected_sections = Vec::new();
1347 if !session_runtime.is_empty() {
1348 injected_sections.push(format!("[session runtime context]\n{}", session_runtime));
1349 }
1350 if let Some(extra) = runtime_context.map(str::trim).filter(|ctx| !ctx.is_empty()) {
1351 injected_sections.push(format!("[host runtime context]\n{}", extra));
1352 }
1353 if !injected_sections.is_empty() {
1354 let injected_context = format!(
1355 "Use the following runtime context only as background for the next side question. Do not treat it as a new user request.\n\n{}",
1356 injected_sections.join("\n\n")
1357 );
1358 messages.push(Message::user(&injected_context));
1359 }
1360 messages.push(Message::user(question));
1361
1362 let response = llm_client
1363 .complete(&messages, Some(crate::prompts::BTW_SYSTEM), &[])
1364 .await
1365 .map_err(|e| anyhow::anyhow!("btw: ephemeral LLM call failed: {e}"))?;
1366
1367 Ok(crate::agent_api::BtwResult {
1368 question: question.to_string(),
1369 answer: response.text(),
1370 usage: response.usage,
1371 })
1372 }
1373
1374 pub async fn configure(
1376 &self,
1377 session_id: &str,
1378 thinking: Option<bool>,
1379 budget: Option<usize>,
1380 model_config: Option<LlmConfig>,
1381 ) -> Result<()> {
1382 {
1383 let session_lock = self.get_session(session_id).await?;
1384 let mut session = session_lock.write().await;
1385
1386 if let Some(t) = thinking {
1387 session.thinking_enabled = t;
1388 }
1389 if let Some(b) = budget {
1390 session.thinking_budget = Some(b);
1391 }
1392 if let Some(ref config) = model_config {
1393 tracing::info!(
1394 "Configuring session {} with LLM: provider={}, model={}",
1395 session_id,
1396 config.provider,
1397 config.model
1398 );
1399 session.model_name = Some(config.model.clone());
1400 session.llm_client = Some(llm::create_client_with_config(config.clone()));
1401 }
1402 }
1403
1404 if let Some(config) = model_config {
1406 let llm_config_data = LlmConfigData {
1407 provider: config.provider,
1408 model: config.model,
1409 api_key: None, base_url: config.base_url,
1411 };
1412 let mut configs = self.llm_configs.write().await;
1413 configs.insert(session_id.to_string(), llm_config_data);
1414 }
1415
1416 self.persist_in_background(session_id, "configure");
1418
1419 Ok(())
1420 }
1421
1422 pub async fn set_system_prompt(
1424 &self,
1425 session_id: &str,
1426 system_prompt: Option<String>,
1427 ) -> Result<()> {
1428 {
1429 let session_lock = self.get_session(session_id).await?;
1430 let mut session = session_lock.write().await;
1431 session.config.system_prompt = system_prompt;
1432 }
1433
1434 self.persist_in_background(session_id, "set_system_prompt");
1435 Ok(())
1436 }
1437
1438 pub async fn session_count(&self) -> usize {
1440 let sessions = self.sessions.read().await;
1441 sessions.len()
1442 }
1443
1444 pub async fn store_health(&self) -> Vec<(String, Result<()>)> {
1446 let stores = self.stores.read().await;
1447 let mut results = Vec::new();
1448 for (_, store) in stores.iter() {
1449 let name = store.backend_name().to_string();
1450 let result = store.health_check().await;
1451 results.push((name, result));
1452 }
1453 results
1454 }
1455
1456 pub fn list_tools(&self) -> Vec<crate::llm::ToolDefinition> {
1458 self.tool_executor.definitions()
1459 }
1460
1461 pub async fn pause_session(&self, session_id: &str) -> Result<bool> {
1463 let paused = {
1464 let session_lock = self.get_session(session_id).await?;
1465 let mut session = session_lock.write().await;
1466 session.pause()
1467 };
1468
1469 if paused {
1470 self.persist_in_background(session_id, "pause");
1471 }
1472
1473 Ok(paused)
1474 }
1475
1476 pub async fn resume_session(&self, session_id: &str) -> Result<bool> {
1478 let resumed = {
1479 let session_lock = self.get_session(session_id).await?;
1480 let mut session = session_lock.write().await;
1481 session.resume()
1482 };
1483
1484 if resumed {
1485 self.persist_in_background(session_id, "resume");
1486 }
1487
1488 Ok(resumed)
1489 }
1490
1491 pub async fn cancel_operation(&self, session_id: &str) -> Result<bool> {
1495 let session_lock = self.get_session(session_id).await?;
1497 let cancelled_confirmations = {
1498 let session = session_lock.read().await;
1499 session.confirmation_manager.cancel_all().await
1500 };
1501
1502 if cancelled_confirmations > 0 {
1503 tracing::info!(
1504 "Cancelled {} pending confirmations for session {}",
1505 cancelled_confirmations,
1506 session_id
1507 );
1508 }
1509
1510 let cancel_token = {
1512 let mut ops = self.ongoing_operations.write().await;
1513 ops.remove(session_id)
1514 };
1515
1516 if let Some(token) = cancel_token {
1517 token.cancel();
1518 tracing::info!("Cancelled ongoing operation for session {}", session_id);
1519 Ok(true)
1520 } else if cancelled_confirmations > 0 {
1521 Ok(true)
1523 } else {
1524 tracing::debug!("No ongoing operation to cancel for session {}", session_id);
1525 Ok(false)
1526 }
1527 }
1528
1529 pub async fn get_all_sessions(&self) -> Vec<Arc<RwLock<Session>>> {
1531 let sessions = self.sessions.read().await;
1532 sessions.values().cloned().collect()
1533 }
1534
1535 pub fn tool_executor(&self) -> &Arc<ToolExecutor> {
1537 &self.tool_executor
1538 }
1539
1540 pub async fn confirm_tool(
1542 &self,
1543 session_id: &str,
1544 tool_id: &str,
1545 approved: bool,
1546 reason: Option<String>,
1547 ) -> Result<bool> {
1548 let session_lock = self.get_session(session_id).await?;
1549 let session = session_lock.read().await;
1550 session
1551 .confirmation_manager
1552 .confirm(tool_id, approved, reason)
1553 .await
1554 .map_err(|e| anyhow::anyhow!(e))
1555 }
1556
1557 pub async fn set_confirmation_policy(
1559 &self,
1560 session_id: &str,
1561 policy: ConfirmationPolicy,
1562 ) -> Result<ConfirmationPolicy> {
1563 {
1564 let session_lock = self.get_session(session_id).await?;
1565 let session = session_lock.read().await;
1566 session.set_confirmation_policy(policy.clone()).await;
1567 }
1568
1569 {
1571 let session_lock = self.get_session(session_id).await?;
1572 let mut session = session_lock.write().await;
1573 session.config.confirmation_policy = Some(policy.clone());
1574 }
1575
1576 self.persist_in_background(session_id, "set_confirmation_policy");
1578
1579 Ok(policy)
1580 }
1581
1582 pub async fn set_permission_policy(
1584 &self,
1585 session_id: &str,
1586 policy: PermissionPolicy,
1587 ) -> Result<PermissionPolicy> {
1588 {
1589 let session_lock = self.get_session(session_id).await?;
1590 let mut session = session_lock.write().await;
1591 session.set_permission_policy(policy.clone());
1592 }
1593
1594 self.persist_in_background(session_id, "set_permission_policy");
1595
1596 Ok(policy)
1597 }
1598
1599 pub async fn get_confirmation_policy(&self, session_id: &str) -> Result<ConfirmationPolicy> {
1601 let session_lock = self.get_session(session_id).await?;
1602 let session = session_lock.read().await;
1603 Ok(session.confirmation_policy().await)
1604 }
1605}