1use super::{ContextUsage, Session, SessionConfig, SessionState};
4use crate::agent::{AgentConfig, AgentEvent, AgentLoop, AgentResult};
5use crate::hitl::ConfirmationPolicy;
6use crate::llm::{self, LlmClient, LlmConfig, Message};
7use crate::memory::AgentMemory;
8use crate::prompts::SystemPromptSlots;
9use crate::skills::SkillRegistry;
10use crate::store::{FileSessionStore, LlmConfigData, SessionData, SessionStore};
11use crate::tools::ToolExecutor;
12use a3s_memory::MemoryStore;
13use anyhow::{Context, Result};
14use std::collections::HashMap;
15use std::sync::Arc;
16use tokio::sync::{mpsc, RwLock};
17
18#[derive(Clone)]
20pub struct SessionManager {
21 pub(crate) sessions: Arc<RwLock<HashMap<String, Arc<RwLock<Session>>>>>,
22 pub(crate) llm_client: Arc<RwLock<Option<Arc<dyn LlmClient>>>>,
25 pub(crate) tool_executor: Arc<ToolExecutor>,
26 pub(crate) stores: Arc<RwLock<HashMap<crate::config::StorageBackend, Arc<dyn SessionStore>>>>,
28 pub(crate) session_storage_types: Arc<RwLock<HashMap<String, crate::config::StorageBackend>>>,
30 pub(crate) llm_configs: Arc<RwLock<HashMap<String, LlmConfigData>>>,
32 pub(crate) ongoing_operations: Arc<RwLock<HashMap<String, tokio::task::AbortHandle>>>,
34 pub(crate) skill_registry: Arc<RwLock<Option<Arc<SkillRegistry>>>>,
36 pub(crate) memory_store: Arc<RwLock<Option<Arc<dyn MemoryStore>>>>,
40}
41
42impl SessionManager {
43 pub fn new(llm_client: Option<Arc<dyn LlmClient>>, tool_executor: Arc<ToolExecutor>) -> Self {
45 Self {
46 sessions: Arc::new(RwLock::new(HashMap::new())),
47 llm_client: Arc::new(RwLock::new(llm_client)),
48 tool_executor,
49 stores: Arc::new(RwLock::new(HashMap::new())),
50 session_storage_types: Arc::new(RwLock::new(HashMap::new())),
51 llm_configs: Arc::new(RwLock::new(HashMap::new())),
52 ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
53 skill_registry: Arc::new(RwLock::new(None)),
54 memory_store: Arc::new(RwLock::new(None)),
55 }
56 }
57
58 pub async fn with_persistence<P: AsRef<std::path::Path>>(
62 llm_client: Option<Arc<dyn LlmClient>>,
63 tool_executor: Arc<ToolExecutor>,
64 sessions_dir: P,
65 ) -> Result<Self> {
66 let store = FileSessionStore::new(sessions_dir).await?;
67 let mut stores = HashMap::new();
68 stores.insert(
69 crate::config::StorageBackend::File,
70 Arc::new(store) as Arc<dyn SessionStore>,
71 );
72
73 let manager = Self {
74 sessions: Arc::new(RwLock::new(HashMap::new())),
75 llm_client: Arc::new(RwLock::new(llm_client)),
76 tool_executor,
77 stores: Arc::new(RwLock::new(stores)),
78 session_storage_types: Arc::new(RwLock::new(HashMap::new())),
79 llm_configs: Arc::new(RwLock::new(HashMap::new())),
80 ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
81 skill_registry: Arc::new(RwLock::new(None)),
82 memory_store: Arc::new(RwLock::new(None)),
83 };
84
85 Ok(manager)
86 }
87
88 pub fn with_store(
93 llm_client: Option<Arc<dyn LlmClient>>,
94 tool_executor: Arc<ToolExecutor>,
95 store: Arc<dyn SessionStore>,
96 backend: crate::config::StorageBackend,
97 ) -> Self {
98 let mut stores = HashMap::new();
99 stores.insert(backend, store);
100
101 Self {
102 sessions: Arc::new(RwLock::new(HashMap::new())),
103 llm_client: Arc::new(RwLock::new(llm_client)),
104 tool_executor,
105 stores: Arc::new(RwLock::new(stores)),
106 session_storage_types: Arc::new(RwLock::new(HashMap::new())),
107 llm_configs: Arc::new(RwLock::new(HashMap::new())),
108 ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
109 skill_registry: Arc::new(RwLock::new(None)),
110 memory_store: Arc::new(RwLock::new(None)),
111 }
112 }
113
114 pub async fn set_default_llm(&self, client: Option<Arc<dyn LlmClient>>) {
119 *self.llm_client.write().await = client;
120 }
121
122 pub async fn set_skill_registry(
129 &self,
130 registry: Arc<SkillRegistry>,
131 skills_dir: std::path::PathBuf,
132 ) {
133 let manage_tool = crate::skills::ManageSkillTool::new(registry.clone(), skills_dir);
135 self.tool_executor
136 .register_dynamic_tool(Arc::new(manage_tool));
137
138 *self.skill_registry.write().await = Some(registry);
139 }
140
141 pub async fn skill_registry(&self) -> Option<Arc<SkillRegistry>> {
143 self.skill_registry.read().await.clone()
144 }
145
146 pub async fn set_memory_store(&self, store: Arc<dyn MemoryStore>) {
153 *self.memory_store.write().await = Some(store);
154 }
155
156 pub async fn memory_store(&self) -> Option<Arc<dyn MemoryStore>> {
158 self.memory_store.read().await.clone()
159 }
160
161 pub async fn restore_session_by_id(&self, session_id: &str) -> Result<()> {
166 {
168 let sessions = self.sessions.read().await;
169 if sessions.contains_key(session_id) {
170 return Ok(());
171 }
172 }
173
174 let stores = self.stores.read().await;
175 for (backend, store) in stores.iter() {
176 match store.load(session_id).await {
177 Ok(Some(data)) => {
178 {
179 let mut storage_types = self.session_storage_types.write().await;
180 storage_types.insert(data.id.clone(), backend.clone());
181 }
182 self.restore_session(data).await?;
183 return Ok(());
184 }
185 Ok(None) => continue,
186 Err(e) => {
187 tracing::warn!(
188 "Failed to load session {} from {:?}: {}",
189 session_id,
190 backend,
191 e
192 );
193 continue;
194 }
195 }
196 }
197
198 Err(anyhow::anyhow!(
199 "Session {} not found in any store",
200 session_id
201 ))
202 }
203
204 pub async fn load_all_sessions(&mut self) -> Result<usize> {
206 let stores = self.stores.read().await;
207 let mut loaded = 0;
208
209 for (backend, store) in stores.iter() {
210 let session_ids = match store.list().await {
211 Ok(ids) => ids,
212 Err(e) => {
213 tracing::warn!("Failed to list sessions from {:?} store: {}", backend, e);
214 continue;
215 }
216 };
217
218 for id in session_ids {
219 match store.load(&id).await {
220 Ok(Some(data)) => {
221 {
223 let mut storage_types = self.session_storage_types.write().await;
224 storage_types.insert(data.id.clone(), backend.clone());
225 }
226
227 if let Err(e) = self.restore_session(data).await {
228 tracing::warn!("Failed to restore session {}: {}", id, e);
229 } else {
230 loaded += 1;
231 }
232 }
233 Ok(None) => {
234 tracing::warn!("Session {} not found in store", id);
235 }
236 Err(e) => {
237 tracing::warn!("Failed to load session {}: {}", id, e);
238 }
239 }
240 }
241 }
242
243 tracing::info!("Loaded {} sessions from store", loaded);
244 Ok(loaded)
245 }
246
247 async fn restore_session(&self, data: SessionData) -> Result<()> {
249 let tools = self.tool_executor.definitions();
250 let mut session = Session::new(data.id.clone(), data.config.clone(), tools).await?;
251
252 session.restore_from_data(&data);
254
255 if let Some(llm_config) = &data.llm_config {
257 let mut configs = self.llm_configs.write().await;
258 configs.insert(data.id.clone(), llm_config.clone());
259 }
260
261 let mut sessions = self.sessions.write().await;
262 sessions.insert(data.id.clone(), Arc::new(RwLock::new(session)));
263
264 tracing::info!("Restored session: {}", data.id);
265 Ok(())
266 }
267
268 async fn save_session(&self, session_id: &str) -> Result<()> {
270 let storage_type = {
272 let storage_types = self.session_storage_types.read().await;
273 storage_types.get(session_id).cloned()
274 };
275
276 let Some(storage_type) = storage_type else {
277 return Ok(());
279 };
280
281 if storage_type == crate::config::StorageBackend::Memory {
283 return Ok(());
284 }
285
286 let stores = self.stores.read().await;
288 let Some(store) = stores.get(&storage_type) else {
289 tracing::warn!("No store available for storage type: {:?}", storage_type);
290 return Ok(());
291 };
292
293 let session_lock = self.get_session(session_id).await?;
294 let session = session_lock.read().await;
295
296 let llm_config = {
298 let configs = self.llm_configs.read().await;
299 configs.get(session_id).cloned()
300 };
301
302 let data = session.to_session_data(llm_config);
303 store.save(&data).await?;
304
305 tracing::debug!("Saved session: {}", session_id);
306 Ok(())
307 }
308
309 async fn persist_or_warn(&self, session_id: &str, operation: &str) {
313 if let Err(e) = self.save_session(session_id).await {
314 tracing::warn!(
315 "Failed to persist session {} after {}: {}",
316 session_id,
317 operation,
318 e
319 );
320 if let Ok(session_lock) = self.get_session(session_id).await {
322 let session = session_lock.read().await;
323 let _ = session.event_tx().send(AgentEvent::PersistenceFailed {
324 session_id: session_id.to_string(),
325 operation: operation.to_string(),
326 error: e.to_string(),
327 });
328 }
329 }
330 }
331
332 fn persist_in_background(&self, session_id: &str, operation: &str) {
337 let mgr = self.clone();
338 let sid = session_id.to_string();
339 let op = operation.to_string();
340 tokio::spawn(async move {
341 mgr.persist_or_warn(&sid, &op).await;
342 });
343 }
344
345 pub async fn create_session(&self, id: String, config: SessionConfig) -> Result<String> {
347 tracing::info!(name: "a3s.session.create", session_id = %id, "Creating session");
348
349 {
351 let mut storage_types = self.session_storage_types.write().await;
352 storage_types.insert(id.clone(), config.storage_type.clone());
353 }
354
355 let tools = self.tool_executor.definitions();
357 let mut session = Session::new(id.clone(), config, tools).await?;
358
359 session.start_queue().await?;
361
362 if session.config.max_context_length > 0 {
364 session.context_usage.max_tokens = session.config.max_context_length as usize;
365 }
366
367 {
368 let mut sessions = self.sessions.write().await;
369 sessions.insert(id.clone(), Arc::new(RwLock::new(session)));
370 }
371
372 self.persist_in_background(&id, "create");
374
375 tracing::info!("Created session: {}", id);
376 Ok(id)
377 }
378
379 pub async fn destroy_session(&self, id: &str) -> Result<()> {
381 tracing::info!(name: "a3s.session.destroy", session_id = %id, "Destroying session");
382
383 let storage_type = {
385 let storage_types = self.session_storage_types.read().await;
386 storage_types.get(id).cloned()
387 };
388
389 {
390 let mut sessions = self.sessions.write().await;
391 sessions.remove(id);
392 }
393
394 {
396 let mut configs = self.llm_configs.write().await;
397 configs.remove(id);
398 }
399
400 {
402 let mut storage_types = self.session_storage_types.write().await;
403 storage_types.remove(id);
404 }
405
406 if let Some(storage_type) = storage_type {
408 if storage_type != crate::config::StorageBackend::Memory {
409 let stores = self.stores.read().await;
410 if let Some(store) = stores.get(&storage_type) {
411 if let Err(e) = store.delete(id).await {
412 tracing::warn!("Failed to delete session {} from store: {}", id, e);
413 }
414 }
415 }
416 }
417
418 tracing::info!("Destroyed session: {}", id);
419 Ok(())
420 }
421
422 pub async fn get_session(&self, id: &str) -> Result<Arc<RwLock<Session>>> {
424 let sessions = self.sessions.read().await;
425 sessions
426 .get(id)
427 .cloned()
428 .context(format!("Session not found: {}", id))
429 }
430
431 pub async fn create_child_session(
436 &self,
437 parent_id: &str,
438 child_id: String,
439 mut config: SessionConfig,
440 ) -> Result<String> {
441 let parent_lock = self.get_session(parent_id).await?;
443 let parent_llm_client = {
444 let parent = parent_lock.read().await;
445
446 if config.confirmation_policy.is_none() {
448 let parent_policy = parent.confirmation_manager.policy().await;
449 config.confirmation_policy = Some(parent_policy);
450 }
451
452 parent.llm_client.clone()
453 };
454
455 config.parent_id = Some(parent_id.to_string());
457
458 let tools = self.tool_executor.definitions();
460 let mut session = Session::new(child_id.clone(), config, tools).await?;
461
462 if session.llm_client.is_none() {
464 let default_llm = self.llm_client.read().await.clone();
465 session.llm_client = parent_llm_client.or(default_llm);
466 }
467
468 session.start_queue().await?;
470
471 if session.config.max_context_length > 0 {
473 session.context_usage.max_tokens = session.config.max_context_length as usize;
474 }
475
476 {
477 let mut sessions = self.sessions.write().await;
478 sessions.insert(child_id.clone(), Arc::new(RwLock::new(session)));
479 }
480
481 self.persist_in_background(&child_id, "create_child");
483
484 tracing::info!(
485 "Created child session: {} (parent: {})",
486 child_id,
487 parent_id
488 );
489 Ok(child_id)
490 }
491
492 pub async fn get_child_sessions(&self, parent_id: &str) -> Vec<String> {
494 let sessions = self.sessions.read().await;
495 let mut children = Vec::new();
496
497 for (id, session_lock) in sessions.iter() {
498 let session = session_lock.read().await;
499 if session.parent_id.as_deref() == Some(parent_id) {
500 children.push(id.clone());
501 }
502 }
503
504 children
505 }
506
507 pub async fn is_child_session(&self, session_id: &str) -> Result<bool> {
509 let session_lock = self.get_session(session_id).await?;
510 let session = session_lock.read().await;
511 Ok(session.is_child_session())
512 }
513
514 pub async fn generate(&self, session_id: &str, prompt: &str) -> Result<AgentResult> {
516 let session_lock = self.get_session(session_id).await?;
517
518 {
520 let session = session_lock.read().await;
521 if session.state == SessionState::Paused {
522 anyhow::bail!(
523 "Session {} is paused. Call Resume before generating.",
524 session_id
525 );
526 }
527 }
528
529 let (
531 history,
532 system,
533 tools,
534 session_llm_client,
535 permission_checker,
536 confirmation_manager,
537 context_providers,
538 session_workspace,
539 tool_metrics,
540 hook_engine,
541 planning_enabled,
542 goal_tracking,
543 ) = {
544 let session = session_lock.read().await;
545 (
546 session.messages.clone(),
547 session.system().map(String::from),
548 session.tools.clone(),
549 session.llm_client.clone(),
550 session.permission_checker.clone(),
551 session.confirmation_manager.clone(),
552 session.context_providers.clone(),
553 session.config.workspace.clone(),
554 session.tool_metrics.clone(),
555 session.config.hook_engine.clone(),
556 session.config.planning_enabled,
557 session.config.goal_tracking,
558 )
559 };
560
561 let llm_client = if let Some(client) = session_llm_client {
563 client
564 } else if let Some(client) = self.llm_client.read().await.clone() {
565 client
566 } else {
567 anyhow::bail!(
568 "LLM client not configured for session {}. Please call Configure RPC with model configuration first.",
569 session_id
570 );
571 };
572
573 let tool_context = if session_workspace.is_empty() {
575 crate::tools::ToolContext::new(self.tool_executor.workspace().clone())
576 .with_session_id(session_id)
577 } else {
578 crate::tools::ToolContext::new(std::path::PathBuf::from(&session_workspace))
579 .with_session_id(session_id)
580 };
581
582 let skill_registry = self.skill_registry.read().await.clone();
584 let system = if let Some(ref registry) = skill_registry {
585 let skill_prompt = registry.to_system_prompt();
586 if skill_prompt.is_empty() {
587 system
588 } else {
589 match system {
590 Some(existing) => Some(format!("{}\n\n{}", existing, skill_prompt)),
591 None => Some(skill_prompt),
592 }
593 }
594 } else {
595 system
596 };
597
598 let effective_prompt = if let Some(ref registry) = skill_registry {
600 let skill_content = registry.match_skills(prompt);
601 if skill_content.is_empty() {
602 prompt.to_string()
603 } else {
604 format!("{}\n\n---\n\n{}", skill_content, prompt)
605 }
606 } else {
607 prompt.to_string()
608 };
609
610 let memory = self
612 .memory_store
613 .read()
614 .await
615 .as_ref()
616 .map(|store| Arc::new(AgentMemory::new(store.clone())));
617
618 let config = AgentConfig {
620 prompt_slots: match system {
621 Some(s) => SystemPromptSlots::from_legacy(s),
622 None => SystemPromptSlots::default(),
623 },
624 tools,
625 max_tool_rounds: 50,
626 permission_checker: Some(permission_checker),
627 confirmation_manager: Some(confirmation_manager),
628 context_providers,
629 planning_enabled,
630 goal_tracking,
631 hook_engine,
632 skill_registry,
633 memory,
634 ..AgentConfig::default()
635 };
636
637 let agent = AgentLoop::new(llm_client, self.tool_executor.clone(), tool_context, config)
638 .with_tool_metrics(tool_metrics);
639
640 let result = agent
642 .execute_with_session(&history, &effective_prompt, Some(session_id), None)
643 .await?;
644
645 {
647 let mut session = session_lock.write().await;
648 session.messages = result.messages.clone();
649 session.update_usage(&result.usage);
650 }
651
652 self.persist_in_background(session_id, "generate");
654
655 if let Err(e) = self.maybe_auto_compact(session_id).await {
657 tracing::warn!("Auto-compact failed for session {}: {}", session_id, e);
658 }
659
660 Ok(result)
661 }
662
663 pub async fn generate_streaming(
665 &self,
666 session_id: &str,
667 prompt: &str,
668 ) -> Result<(
669 mpsc::Receiver<AgentEvent>,
670 tokio::task::JoinHandle<Result<AgentResult>>,
671 )> {
672 let session_lock = self.get_session(session_id).await?;
673
674 {
676 let session = session_lock.read().await;
677 if session.state == SessionState::Paused {
678 anyhow::bail!(
679 "Session {} is paused. Call Resume before generating.",
680 session_id
681 );
682 }
683 }
684
685 let (
687 history,
688 system,
689 tools,
690 session_llm_client,
691 permission_checker,
692 confirmation_manager,
693 context_providers,
694 session_workspace,
695 tool_metrics,
696 hook_engine,
697 planning_enabled,
698 goal_tracking,
699 ) = {
700 let session = session_lock.read().await;
701 (
702 session.messages.clone(),
703 session.system().map(String::from),
704 session.tools.clone(),
705 session.llm_client.clone(),
706 session.permission_checker.clone(),
707 session.confirmation_manager.clone(),
708 session.context_providers.clone(),
709 session.config.workspace.clone(),
710 session.tool_metrics.clone(),
711 session.config.hook_engine.clone(),
712 session.config.planning_enabled,
713 session.config.goal_tracking,
714 )
715 };
716
717 let llm_client = if let Some(client) = session_llm_client {
719 client
720 } else if let Some(client) = self.llm_client.read().await.clone() {
721 client
722 } else {
723 anyhow::bail!(
724 "LLM client not configured for session {}. Please call Configure RPC with model configuration first.",
725 session_id
726 );
727 };
728
729 let tool_context = if session_workspace.is_empty() {
731 crate::tools::ToolContext::new(self.tool_executor.workspace().clone())
732 .with_session_id(session_id)
733 } else {
734 crate::tools::ToolContext::new(std::path::PathBuf::from(&session_workspace))
735 .with_session_id(session_id)
736 };
737
738 let skill_registry = self.skill_registry.read().await.clone();
740 let system = if let Some(ref registry) = skill_registry {
741 let skill_prompt = registry.to_system_prompt();
742 if skill_prompt.is_empty() {
743 system
744 } else {
745 match system {
746 Some(existing) => Some(format!("{}\n\n{}", existing, skill_prompt)),
747 None => Some(skill_prompt),
748 }
749 }
750 } else {
751 system
752 };
753
754 let effective_prompt = if let Some(ref registry) = skill_registry {
756 let skill_content = registry.match_skills(prompt);
757 if skill_content.is_empty() {
758 prompt.to_string()
759 } else {
760 format!("{}\n\n---\n\n{}", skill_content, prompt)
761 }
762 } else {
763 prompt.to_string()
764 };
765
766 let memory = self
768 .memory_store
769 .read()
770 .await
771 .as_ref()
772 .map(|store| Arc::new(AgentMemory::new(store.clone())));
773
774 let config = AgentConfig {
776 prompt_slots: match system {
777 Some(s) => SystemPromptSlots::from_legacy(s),
778 None => SystemPromptSlots::default(),
779 },
780 tools,
781 max_tool_rounds: 50,
782 permission_checker: Some(permission_checker),
783 confirmation_manager: Some(confirmation_manager),
784 context_providers,
785 planning_enabled,
786 goal_tracking,
787 hook_engine,
788 skill_registry,
789 memory,
790 ..AgentConfig::default()
791 };
792
793 let agent = AgentLoop::new(llm_client, self.tool_executor.clone(), tool_context, config)
794 .with_tool_metrics(tool_metrics);
795
796 let (rx, handle) = agent.execute_streaming(&history, &effective_prompt).await?;
798
799 let abort_handle = handle.abort_handle();
801 {
802 let mut ops = self.ongoing_operations.write().await;
803 ops.insert(session_id.to_string(), abort_handle);
804 }
805
806 let session_lock_clone = session_lock.clone();
808 let original_handle = handle;
809 let stores = self.stores.clone();
810 let session_storage_types = self.session_storage_types.clone();
811 let llm_configs = self.llm_configs.clone();
812 let session_id_owned = session_id.to_string();
813 let ongoing_operations = self.ongoing_operations.clone();
814 let session_manager = self.clone();
815
816 let wrapped_handle = tokio::spawn(async move {
817 let result = original_handle.await??;
818
819 {
821 let mut ops = ongoing_operations.write().await;
822 ops.remove(&session_id_owned);
823 }
824
825 {
827 let mut session = session_lock_clone.write().await;
828 session.messages = result.messages.clone();
829 session.update_usage(&result.usage);
830 }
831
832 let storage_type = {
834 let storage_types = session_storage_types.read().await;
835 storage_types.get(&session_id_owned).cloned()
836 };
837
838 if let Some(storage_type) = storage_type {
839 if storage_type != crate::config::StorageBackend::Memory {
840 let stores_guard = stores.read().await;
841 if let Some(store) = stores_guard.get(&storage_type) {
842 let session = session_lock_clone.read().await;
843 let llm_config = {
844 let configs = llm_configs.read().await;
845 configs.get(&session_id_owned).cloned()
846 };
847 let data = session.to_session_data(llm_config);
848 if let Err(e) = store.save(&data).await {
849 tracing::warn!(
850 "Failed to persist session {} after streaming: {}",
851 session_id_owned,
852 e
853 );
854 }
855 }
856 }
857 }
858
859 if let Err(e) = session_manager.maybe_auto_compact(&session_id_owned).await {
861 tracing::warn!(
862 "Auto-compact failed for session {}: {}",
863 session_id_owned,
864 e
865 );
866 }
867
868 Ok(result)
869 });
870
871 Ok((rx, wrapped_handle))
872 }
873
874 pub async fn context_usage(&self, session_id: &str) -> Result<ContextUsage> {
876 let session_lock = self.get_session(session_id).await?;
877 let session = session_lock.read().await;
878 Ok(session.context_usage.clone())
879 }
880
881 pub async fn history(&self, session_id: &str) -> Result<Vec<Message>> {
883 let session_lock = self.get_session(session_id).await?;
884 let session = session_lock.read().await;
885 Ok(session.messages.clone())
886 }
887
888 pub async fn clear(&self, session_id: &str) -> Result<()> {
890 {
891 let session_lock = self.get_session(session_id).await?;
892 let mut session = session_lock.write().await;
893 session.clear();
894 }
895
896 self.persist_in_background(session_id, "clear");
898
899 Ok(())
900 }
901
902 pub async fn compact(&self, session_id: &str) -> Result<()> {
904 tracing::info!(name: "a3s.session.compact", session_id = %session_id, "Compacting session context");
905
906 {
907 let session_lock = self.get_session(session_id).await?;
908 let mut session = session_lock.write().await;
909
910 let llm_client = if let Some(client) = &session.llm_client {
912 client.clone()
913 } else if let Some(client) = self.llm_client.read().await.clone() {
914 client
915 } else {
916 tracing::warn!("No LLM client configured for compaction, using simple truncation");
918 let keep_messages = 20;
919 if session.messages.len() > keep_messages {
920 let len = session.messages.len();
921 session.messages = session.messages.split_off(len - keep_messages);
922 }
923 drop(session);
925 self.persist_in_background(session_id, "compact");
926 return Ok(());
927 };
928
929 session.compact(&llm_client).await?;
930 }
931
932 self.persist_in_background(session_id, "compact");
934
935 Ok(())
936 }
937
938 pub async fn maybe_auto_compact(&self, session_id: &str) -> Result<bool> {
945 let (should_compact, percent_before, messages_before) = {
946 let session_lock = self.get_session(session_id).await?;
947 let session = session_lock.read().await;
948
949 if !session.config.auto_compact {
950 return Ok(false);
951 }
952
953 let threshold = session.config.auto_compact_threshold;
954 let percent = session.context_usage.percent;
955 let msg_count = session.messages.len();
956
957 tracing::debug!(
958 "Auto-compact check for session {}: percent={:.2}%, threshold={:.2}%, messages={}",
959 session_id,
960 percent * 100.0,
961 threshold * 100.0,
962 msg_count,
963 );
964
965 (percent >= threshold, percent, msg_count)
966 };
967
968 if !should_compact {
969 return Ok(false);
970 }
971
972 tracing::info!(
973 name: "a3s.session.auto_compact",
974 session_id = %session_id,
975 percent_before = %format!("{:.1}%", percent_before * 100.0),
976 messages_before = %messages_before,
977 "Auto-compacting session due to high context usage"
978 );
979
980 self.compact(session_id).await?;
982
983 let messages_after = {
985 let session_lock = self.get_session(session_id).await?;
986 let session = session_lock.read().await;
987 session.messages.len()
988 };
989
990 let event = AgentEvent::ContextCompacted {
992 session_id: session_id.to_string(),
993 before_messages: messages_before,
994 after_messages: messages_after,
995 percent_before,
996 };
997
998 if let Ok(session_lock) = self.get_session(session_id).await {
1000 let session = session_lock.read().await;
1001 let _ = session.event_tx.send(event);
1002 }
1003
1004 tracing::info!(
1005 name: "a3s.session.auto_compact.done",
1006 session_id = %session_id,
1007 messages_before = %messages_before,
1008 messages_after = %messages_after,
1009 "Auto-compaction complete"
1010 );
1011
1012 Ok(true)
1013 }
1014
1015 pub async fn get_llm_for_session(
1019 &self,
1020 session_id: &str,
1021 ) -> Result<Option<Arc<dyn LlmClient>>> {
1022 let session_lock = self.get_session(session_id).await?;
1023 let session = session_lock.read().await;
1024
1025 if let Some(client) = &session.llm_client {
1026 return Ok(Some(client.clone()));
1027 }
1028
1029 Ok(self.llm_client.read().await.clone())
1030 }
1031
1032 pub async fn configure(
1034 &self,
1035 session_id: &str,
1036 thinking: Option<bool>,
1037 budget: Option<usize>,
1038 model_config: Option<LlmConfig>,
1039 ) -> Result<()> {
1040 {
1041 let session_lock = self.get_session(session_id).await?;
1042 let mut session = session_lock.write().await;
1043
1044 if let Some(t) = thinking {
1045 session.thinking_enabled = t;
1046 }
1047 if let Some(b) = budget {
1048 session.thinking_budget = Some(b);
1049 }
1050 if let Some(ref config) = model_config {
1051 tracing::info!(
1052 "Configuring session {} with LLM: provider={}, model={}",
1053 session_id,
1054 config.provider,
1055 config.model
1056 );
1057 session.model_name = Some(config.model.clone());
1058 session.llm_client = Some(llm::create_client_with_config(config.clone()));
1059 }
1060 }
1061
1062 if let Some(config) = model_config {
1064 let llm_config_data = LlmConfigData {
1065 provider: config.provider,
1066 model: config.model,
1067 api_key: None, base_url: config.base_url,
1069 };
1070 let mut configs = self.llm_configs.write().await;
1071 configs.insert(session_id.to_string(), llm_config_data);
1072 }
1073
1074 self.persist_in_background(session_id, "configure");
1076
1077 Ok(())
1078 }
1079
1080 pub async fn session_count(&self) -> usize {
1082 let sessions = self.sessions.read().await;
1083 sessions.len()
1084 }
1085
1086 pub async fn store_health(&self) -> Vec<(String, Result<()>)> {
1088 let stores = self.stores.read().await;
1089 let mut results = Vec::new();
1090 for (_, store) in stores.iter() {
1091 let name = store.backend_name().to_string();
1092 let result = store.health_check().await;
1093 results.push((name, result));
1094 }
1095 results
1096 }
1097
1098 pub fn list_tools(&self) -> Vec<crate::llm::ToolDefinition> {
1100 self.tool_executor.definitions()
1101 }
1102
1103 pub async fn pause_session(&self, session_id: &str) -> Result<bool> {
1105 let paused = {
1106 let session_lock = self.get_session(session_id).await?;
1107 let mut session = session_lock.write().await;
1108 session.pause()
1109 };
1110
1111 if paused {
1112 self.persist_in_background(session_id, "pause");
1113 }
1114
1115 Ok(paused)
1116 }
1117
1118 pub async fn resume_session(&self, session_id: &str) -> Result<bool> {
1120 let resumed = {
1121 let session_lock = self.get_session(session_id).await?;
1122 let mut session = session_lock.write().await;
1123 session.resume()
1124 };
1125
1126 if resumed {
1127 self.persist_in_background(session_id, "resume");
1128 }
1129
1130 Ok(resumed)
1131 }
1132
1133 pub async fn cancel_operation(&self, session_id: &str) -> Result<bool> {
1137 let session_lock = self.get_session(session_id).await?;
1139 let cancelled_confirmations = {
1140 let session = session_lock.read().await;
1141 session.confirmation_manager.cancel_all().await
1142 };
1143
1144 if cancelled_confirmations > 0 {
1145 tracing::info!(
1146 "Cancelled {} pending confirmations for session {}",
1147 cancelled_confirmations,
1148 session_id
1149 );
1150 }
1151
1152 let abort_handle = {
1154 let mut ops = self.ongoing_operations.write().await;
1155 ops.remove(session_id)
1156 };
1157
1158 if let Some(handle) = abort_handle {
1159 handle.abort();
1160 tracing::info!("Cancelled ongoing operation for session {}", session_id);
1161 Ok(true)
1162 } else if cancelled_confirmations > 0 {
1163 Ok(true)
1165 } else {
1166 tracing::debug!("No ongoing operation to cancel for session {}", session_id);
1167 Ok(false)
1168 }
1169 }
1170
1171 pub async fn get_all_sessions(&self) -> Vec<Arc<RwLock<Session>>> {
1173 let sessions = self.sessions.read().await;
1174 sessions.values().cloned().collect()
1175 }
1176
1177 pub fn tool_executor(&self) -> &Arc<ToolExecutor> {
1179 &self.tool_executor
1180 }
1181
1182 pub async fn confirm_tool(
1184 &self,
1185 session_id: &str,
1186 tool_id: &str,
1187 approved: bool,
1188 reason: Option<String>,
1189 ) -> Result<bool> {
1190 let session_lock = self.get_session(session_id).await?;
1191 let session = session_lock.read().await;
1192 session
1193 .confirmation_manager
1194 .confirm(tool_id, approved, reason)
1195 .await
1196 .map_err(|e| anyhow::anyhow!(e))
1197 }
1198
1199 pub async fn set_confirmation_policy(
1201 &self,
1202 session_id: &str,
1203 policy: ConfirmationPolicy,
1204 ) -> Result<ConfirmationPolicy> {
1205 {
1206 let session_lock = self.get_session(session_id).await?;
1207 let session = session_lock.read().await;
1208 session.set_confirmation_policy(policy.clone()).await;
1209 }
1210
1211 {
1213 let session_lock = self.get_session(session_id).await?;
1214 let mut session = session_lock.write().await;
1215 session.config.confirmation_policy = Some(policy.clone());
1216 }
1217
1218 self.persist_in_background(session_id, "set_confirmation_policy");
1220
1221 Ok(policy)
1222 }
1223
1224 pub async fn get_confirmation_policy(&self, session_id: &str) -> Result<ConfirmationPolicy> {
1226 let session_lock = self.get_session(session_id).await?;
1227 let session = session_lock.read().await;
1228 Ok(session.confirmation_policy().await)
1229 }
1230}