1use super::{ContextUsage, Session, SessionConfig, SessionState};
4use crate::agent::{AgentConfig, AgentEvent, AgentLoop, AgentResult};
5use crate::hitl::ConfirmationPolicy;
6use crate::llm::{self, LlmClient, LlmConfig, Message};
7use crate::store::{FileSessionStore, LlmConfigData, SessionData, SessionStore};
8use crate::tools::ToolExecutor;
9use anyhow::{Context, Result};
10use std::collections::HashMap;
11use std::sync::Arc;
12use tokio::sync::{mpsc, RwLock};
13
14#[derive(Clone)]
16pub struct SessionManager {
17 pub(crate) sessions: Arc<RwLock<HashMap<String, Arc<RwLock<Session>>>>>,
18 pub(crate) llm_client: Option<Arc<dyn LlmClient>>,
19 pub(crate) tool_executor: Arc<ToolExecutor>,
20 pub(crate) stores: Arc<RwLock<HashMap<crate::config::StorageBackend, Arc<dyn SessionStore>>>>,
22 pub(crate) session_storage_types: Arc<RwLock<HashMap<String, crate::config::StorageBackend>>>,
24 pub(crate) llm_configs: Arc<RwLock<HashMap<String, LlmConfigData>>>,
26 pub(crate) ongoing_operations: Arc<RwLock<HashMap<String, tokio::task::AbortHandle>>>,
28}
29
30impl SessionManager {
31 pub fn new(llm_client: Option<Arc<dyn LlmClient>>, tool_executor: Arc<ToolExecutor>) -> Self {
33 Self {
34 sessions: Arc::new(RwLock::new(HashMap::new())),
35 llm_client,
36 tool_executor,
37 stores: Arc::new(RwLock::new(HashMap::new())),
38 session_storage_types: Arc::new(RwLock::new(HashMap::new())),
39 llm_configs: Arc::new(RwLock::new(HashMap::new())),
40 ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
41 }
42 }
43
44 pub async fn with_persistence<P: AsRef<std::path::Path>>(
48 llm_client: Option<Arc<dyn LlmClient>>,
49 tool_executor: Arc<ToolExecutor>,
50 sessions_dir: P,
51 ) -> Result<Self> {
52 let store = FileSessionStore::new(sessions_dir).await?;
53 let mut stores = HashMap::new();
54 stores.insert(
55 crate::config::StorageBackend::File,
56 Arc::new(store) as Arc<dyn SessionStore>,
57 );
58
59 let manager = Self {
60 sessions: Arc::new(RwLock::new(HashMap::new())),
61 llm_client,
62 tool_executor,
63 stores: Arc::new(RwLock::new(stores)),
64 session_storage_types: Arc::new(RwLock::new(HashMap::new())),
65 llm_configs: Arc::new(RwLock::new(HashMap::new())),
66 ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
67 };
68
69 Ok(manager)
70 }
71
72 pub fn with_store(
77 llm_client: Option<Arc<dyn LlmClient>>,
78 tool_executor: Arc<ToolExecutor>,
79 store: Arc<dyn SessionStore>,
80 backend: crate::config::StorageBackend,
81 ) -> Self {
82 let mut stores = HashMap::new();
83 stores.insert(backend, store);
84
85 Self {
86 sessions: Arc::new(RwLock::new(HashMap::new())),
87 llm_client,
88 tool_executor,
89 stores: Arc::new(RwLock::new(stores)),
90 session_storage_types: Arc::new(RwLock::new(HashMap::new())),
91 llm_configs: Arc::new(RwLock::new(HashMap::new())),
92 ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
93 }
94 }
95
96 pub async fn restore_session_by_id(&self, session_id: &str) -> Result<()> {
101 {
103 let sessions = self.sessions.read().await;
104 if sessions.contains_key(session_id) {
105 return Ok(());
106 }
107 }
108
109 let stores = self.stores.read().await;
110 for (backend, store) in stores.iter() {
111 match store.load(session_id).await {
112 Ok(Some(data)) => {
113 {
114 let mut storage_types = self.session_storage_types.write().await;
115 storage_types.insert(data.id.clone(), backend.clone());
116 }
117 self.restore_session(data).await?;
118 return Ok(());
119 }
120 Ok(None) => continue,
121 Err(e) => {
122 tracing::warn!(
123 "Failed to load session {} from {:?}: {}",
124 session_id,
125 backend,
126 e
127 );
128 continue;
129 }
130 }
131 }
132
133 Err(anyhow::anyhow!(
134 "Session {} not found in any store",
135 session_id
136 ))
137 }
138
139 pub async fn load_all_sessions(&mut self) -> Result<usize> {
141 let stores = self.stores.read().await;
142 let mut loaded = 0;
143
144 for (backend, store) in stores.iter() {
145 let session_ids = match store.list().await {
146 Ok(ids) => ids,
147 Err(e) => {
148 tracing::warn!("Failed to list sessions from {:?} store: {}", backend, e);
149 continue;
150 }
151 };
152
153 for id in session_ids {
154 match store.load(&id).await {
155 Ok(Some(data)) => {
156 {
158 let mut storage_types = self.session_storage_types.write().await;
159 storage_types.insert(data.id.clone(), backend.clone());
160 }
161
162 if let Err(e) = self.restore_session(data).await {
163 tracing::warn!("Failed to restore session {}: {}", id, e);
164 } else {
165 loaded += 1;
166 }
167 }
168 Ok(None) => {
169 tracing::warn!("Session {} not found in store", id);
170 }
171 Err(e) => {
172 tracing::warn!("Failed to load session {}: {}", id, e);
173 }
174 }
175 }
176 }
177
178 tracing::info!("Loaded {} sessions from store", loaded);
179 Ok(loaded)
180 }
181
182 async fn restore_session(&self, data: SessionData) -> Result<()> {
184 let tools = self.tool_executor.definitions();
185 let mut session = Session::new(data.id.clone(), data.config.clone(), tools).await?;
186
187 session.restore_from_data(&data);
189
190 if let Some(llm_config) = &data.llm_config {
192 let mut configs = self.llm_configs.write().await;
193 configs.insert(data.id.clone(), llm_config.clone());
194 }
195
196 let mut sessions = self.sessions.write().await;
197 sessions.insert(data.id.clone(), Arc::new(RwLock::new(session)));
198
199 tracing::info!("Restored session: {}", data.id);
200 Ok(())
201 }
202
203 async fn save_session(&self, session_id: &str) -> Result<()> {
205 let storage_type = {
207 let storage_types = self.session_storage_types.read().await;
208 storage_types.get(session_id).cloned()
209 };
210
211 let Some(storage_type) = storage_type else {
212 return Ok(());
214 };
215
216 if storage_type == crate::config::StorageBackend::Memory {
218 return Ok(());
219 }
220
221 let stores = self.stores.read().await;
223 let Some(store) = stores.get(&storage_type) else {
224 tracing::warn!("No store available for storage type: {:?}", storage_type);
225 return Ok(());
226 };
227
228 let session_lock = self.get_session(session_id).await?;
229 let session = session_lock.read().await;
230
231 let llm_config = {
233 let configs = self.llm_configs.read().await;
234 configs.get(session_id).cloned()
235 };
236
237 let data = session.to_session_data(llm_config);
238 store.save(&data).await?;
239
240 tracing::debug!("Saved session: {}", session_id);
241 Ok(())
242 }
243
244 async fn persist_or_warn(&self, session_id: &str, operation: &str) {
248 if let Err(e) = self.save_session(session_id).await {
249 tracing::warn!(
250 "Failed to persist session {} after {}: {}",
251 session_id,
252 operation,
253 e
254 );
255 if let Ok(session_lock) = self.get_session(session_id).await {
257 let session = session_lock.read().await;
258 let _ = session.event_tx().send(AgentEvent::PersistenceFailed {
259 session_id: session_id.to_string(),
260 operation: operation.to_string(),
261 error: e.to_string(),
262 });
263 }
264 }
265 }
266
267 fn persist_in_background(&self, session_id: &str, operation: &str) {
272 let mgr = self.clone();
273 let sid = session_id.to_string();
274 let op = operation.to_string();
275 tokio::spawn(async move {
276 mgr.persist_or_warn(&sid, &op).await;
277 });
278 }
279
280 pub async fn create_session(&self, id: String, config: SessionConfig) -> Result<String> {
282 tracing::info!(name: "a3s.session.create", session_id = %id, "Creating session");
283
284 {
286 let mut storage_types = self.session_storage_types.write().await;
287 storage_types.insert(id.clone(), config.storage_type.clone());
288 }
289
290 let tools = self.tool_executor.definitions();
292 let mut session = Session::new(id.clone(), config, tools).await?;
293
294 session.start_queue().await?;
296
297 if session.config.max_context_length > 0 {
299 session.context_usage.max_tokens = session.config.max_context_length as usize;
300 }
301
302 {
303 let mut sessions = self.sessions.write().await;
304 sessions.insert(id.clone(), Arc::new(RwLock::new(session)));
305 }
306
307 self.persist_in_background(&id, "create");
309
310 tracing::info!("Created session: {}", id);
311 Ok(id)
312 }
313
314 pub async fn destroy_session(&self, id: &str) -> Result<()> {
316 tracing::info!(name: "a3s.session.destroy", session_id = %id, "Destroying session");
317
318 let storage_type = {
320 let storage_types = self.session_storage_types.read().await;
321 storage_types.get(id).cloned()
322 };
323
324 {
325 let mut sessions = self.sessions.write().await;
326 sessions.remove(id);
327 }
328
329 {
331 let mut configs = self.llm_configs.write().await;
332 configs.remove(id);
333 }
334
335 {
337 let mut storage_types = self.session_storage_types.write().await;
338 storage_types.remove(id);
339 }
340
341 if let Some(storage_type) = storage_type {
343 if storage_type != crate::config::StorageBackend::Memory {
344 let stores = self.stores.read().await;
345 if let Some(store) = stores.get(&storage_type) {
346 if let Err(e) = store.delete(id).await {
347 tracing::warn!("Failed to delete session {} from store: {}", id, e);
348 }
349 }
350 }
351 }
352
353 tracing::info!("Destroyed session: {}", id);
354 Ok(())
355 }
356
357 pub async fn get_session(&self, id: &str) -> Result<Arc<RwLock<Session>>> {
359 let sessions = self.sessions.read().await;
360 sessions
361 .get(id)
362 .cloned()
363 .context(format!("Session not found: {}", id))
364 }
365
366 pub async fn create_child_session(
371 &self,
372 parent_id: &str,
373 child_id: String,
374 mut config: SessionConfig,
375 ) -> Result<String> {
376 let parent_lock = self.get_session(parent_id).await?;
378 let parent_llm_client = {
379 let parent = parent_lock.read().await;
380
381 if config.confirmation_policy.is_none() {
383 let parent_policy = parent.confirmation_manager.policy().await;
384 config.confirmation_policy = Some(parent_policy);
385 }
386
387 parent.llm_client.clone()
388 };
389
390 config.parent_id = Some(parent_id.to_string());
392
393 let tools = self.tool_executor.definitions();
395 let mut session = Session::new(child_id.clone(), config, tools).await?;
396
397 if session.llm_client.is_none() {
399 session.llm_client = parent_llm_client.or_else(|| self.llm_client.clone());
400 }
401
402 session.start_queue().await?;
404
405 if session.config.max_context_length > 0 {
407 session.context_usage.max_tokens = session.config.max_context_length as usize;
408 }
409
410 {
411 let mut sessions = self.sessions.write().await;
412 sessions.insert(child_id.clone(), Arc::new(RwLock::new(session)));
413 }
414
415 self.persist_in_background(&child_id, "create_child");
417
418 tracing::info!(
419 "Created child session: {} (parent: {})",
420 child_id,
421 parent_id
422 );
423 Ok(child_id)
424 }
425
426 pub async fn get_child_sessions(&self, parent_id: &str) -> Vec<String> {
428 let sessions = self.sessions.read().await;
429 let mut children = Vec::new();
430
431 for (id, session_lock) in sessions.iter() {
432 let session = session_lock.read().await;
433 if session.parent_id.as_deref() == Some(parent_id) {
434 children.push(id.clone());
435 }
436 }
437
438 children
439 }
440
441 pub async fn is_child_session(&self, session_id: &str) -> Result<bool> {
443 let session_lock = self.get_session(session_id).await?;
444 let session = session_lock.read().await;
445 Ok(session.is_child_session())
446 }
447
448 pub async fn generate(&self, session_id: &str, prompt: &str) -> Result<AgentResult> {
450 let session_lock = self.get_session(session_id).await?;
451
452 {
454 let session = session_lock.read().await;
455 if session.state == SessionState::Paused {
456 anyhow::bail!(
457 "Session {} is paused. Call Resume before generating.",
458 session_id
459 );
460 }
461 }
462
463 let (
465 history,
466 system,
467 tools,
468 session_llm_client,
469 permission_checker,
470 confirmation_manager,
471 context_providers,
472 session_workspace,
473 tool_metrics,
474 hook_engine,
475 planning_enabled,
476 goal_tracking,
477 ) = {
478 let session = session_lock.read().await;
479 (
480 session.messages.clone(),
481 session.system().map(String::from),
482 session.tools.clone(),
483 session.llm_client.clone(),
484 session.permission_checker.clone(),
485 session.confirmation_manager.clone(),
486 session.context_providers.clone(),
487 session.config.workspace.clone(),
488 session.tool_metrics.clone(),
489 session.config.hook_engine.clone(),
490 session.config.planning_enabled,
491 session.config.goal_tracking,
492 )
493 };
494
495 let llm_client = if let Some(client) = session_llm_client {
497 client
498 } else if let Some(client) = &self.llm_client {
499 client.clone()
500 } else {
501 anyhow::bail!(
502 "LLM client not configured for session {}. Please call Configure RPC with model configuration first.",
503 session_id
504 );
505 };
506
507 let tool_context = if session_workspace.is_empty() {
509 crate::tools::ToolContext::new(self.tool_executor.workspace().clone())
510 .with_session_id(session_id)
511 } else {
512 crate::tools::ToolContext::new(std::path::PathBuf::from(&session_workspace))
513 .with_session_id(session_id)
514 };
515
516 let config = AgentConfig {
518 system_prompt: system,
519 tools,
520 max_tool_rounds: 50,
521 permission_checker: Some(permission_checker),
522 confirmation_manager: Some(confirmation_manager),
523 context_providers,
524 planning_enabled,
525 goal_tracking,
526 hook_engine,
527 skill_registry: None,
528 };
529
530 let agent = AgentLoop::new(llm_client, self.tool_executor.clone(), tool_context, config)
531 .with_tool_metrics(tool_metrics);
532
533 let result = agent
535 .execute_with_session(&history, prompt, Some(session_id), None)
536 .await?;
537
538 {
540 let mut session = session_lock.write().await;
541 session.messages = result.messages.clone();
542 session.update_usage(&result.usage);
543 }
544
545 self.persist_in_background(session_id, "generate");
547
548 if let Err(e) = self.maybe_auto_compact(session_id).await {
550 tracing::warn!("Auto-compact failed for session {}: {}", session_id, e);
551 }
552
553 Ok(result)
554 }
555
556 pub async fn generate_streaming(
558 &self,
559 session_id: &str,
560 prompt: &str,
561 ) -> Result<(
562 mpsc::Receiver<AgentEvent>,
563 tokio::task::JoinHandle<Result<AgentResult>>,
564 )> {
565 let session_lock = self.get_session(session_id).await?;
566
567 {
569 let session = session_lock.read().await;
570 if session.state == SessionState::Paused {
571 anyhow::bail!(
572 "Session {} is paused. Call Resume before generating.",
573 session_id
574 );
575 }
576 }
577
578 let (
580 history,
581 system,
582 tools,
583 session_llm_client,
584 permission_checker,
585 confirmation_manager,
586 context_providers,
587 session_workspace,
588 tool_metrics,
589 hook_engine,
590 planning_enabled,
591 goal_tracking,
592 ) = {
593 let session = session_lock.read().await;
594 (
595 session.messages.clone(),
596 session.system().map(String::from),
597 session.tools.clone(),
598 session.llm_client.clone(),
599 session.permission_checker.clone(),
600 session.confirmation_manager.clone(),
601 session.context_providers.clone(),
602 session.config.workspace.clone(),
603 session.tool_metrics.clone(),
604 session.config.hook_engine.clone(),
605 session.config.planning_enabled,
606 session.config.goal_tracking,
607 )
608 };
609
610 let llm_client = if let Some(client) = session_llm_client {
612 client
613 } else if let Some(client) = &self.llm_client {
614 client.clone()
615 } else {
616 anyhow::bail!(
617 "LLM client not configured for session {}. Please call Configure RPC with model configuration first.",
618 session_id
619 );
620 };
621
622 let tool_context = if session_workspace.is_empty() {
624 crate::tools::ToolContext::new(self.tool_executor.workspace().clone())
625 .with_session_id(session_id)
626 } else {
627 crate::tools::ToolContext::new(std::path::PathBuf::from(&session_workspace))
628 .with_session_id(session_id)
629 };
630
631 let config = AgentConfig {
633 system_prompt: system,
634 tools,
635 max_tool_rounds: 50,
636 permission_checker: Some(permission_checker),
637 confirmation_manager: Some(confirmation_manager),
638 context_providers,
639 planning_enabled,
640 goal_tracking,
641 hook_engine,
642 skill_registry: None,
643 };
644
645 let agent = AgentLoop::new(llm_client, self.tool_executor.clone(), tool_context, config)
646 .with_tool_metrics(tool_metrics);
647
648 let (rx, handle) = agent.execute_streaming(&history, prompt).await?;
650
651 let abort_handle = handle.abort_handle();
653 {
654 let mut ops = self.ongoing_operations.write().await;
655 ops.insert(session_id.to_string(), abort_handle);
656 }
657
658 let session_lock_clone = session_lock.clone();
660 let original_handle = handle;
661 let stores = self.stores.clone();
662 let session_storage_types = self.session_storage_types.clone();
663 let llm_configs = self.llm_configs.clone();
664 let session_id_owned = session_id.to_string();
665 let ongoing_operations = self.ongoing_operations.clone();
666 let session_manager = self.clone();
667
668 let wrapped_handle = tokio::spawn(async move {
669 let result = original_handle.await??;
670
671 {
673 let mut ops = ongoing_operations.write().await;
674 ops.remove(&session_id_owned);
675 }
676
677 {
679 let mut session = session_lock_clone.write().await;
680 session.messages = result.messages.clone();
681 session.update_usage(&result.usage);
682 }
683
684 let storage_type = {
686 let storage_types = session_storage_types.read().await;
687 storage_types.get(&session_id_owned).cloned()
688 };
689
690 if let Some(storage_type) = storage_type {
691 if storage_type != crate::config::StorageBackend::Memory {
692 let stores_guard = stores.read().await;
693 if let Some(store) = stores_guard.get(&storage_type) {
694 let session = session_lock_clone.read().await;
695 let llm_config = {
696 let configs = llm_configs.read().await;
697 configs.get(&session_id_owned).cloned()
698 };
699 let data = session.to_session_data(llm_config);
700 if let Err(e) = store.save(&data).await {
701 tracing::warn!(
702 "Failed to persist session {} after streaming: {}",
703 session_id_owned,
704 e
705 );
706 }
707 }
708 }
709 }
710
711 if let Err(e) = session_manager.maybe_auto_compact(&session_id_owned).await {
713 tracing::warn!(
714 "Auto-compact failed for session {}: {}",
715 session_id_owned,
716 e
717 );
718 }
719
720 Ok(result)
721 });
722
723 Ok((rx, wrapped_handle))
724 }
725
726 pub async fn context_usage(&self, session_id: &str) -> Result<ContextUsage> {
728 let session_lock = self.get_session(session_id).await?;
729 let session = session_lock.read().await;
730 Ok(session.context_usage.clone())
731 }
732
733 pub async fn history(&self, session_id: &str) -> Result<Vec<Message>> {
735 let session_lock = self.get_session(session_id).await?;
736 let session = session_lock.read().await;
737 Ok(session.messages.clone())
738 }
739
740 pub async fn clear(&self, session_id: &str) -> Result<()> {
742 {
743 let session_lock = self.get_session(session_id).await?;
744 let mut session = session_lock.write().await;
745 session.clear();
746 }
747
748 self.persist_in_background(session_id, "clear");
750
751 Ok(())
752 }
753
754 pub async fn compact(&self, session_id: &str) -> Result<()> {
756 tracing::info!(name: "a3s.session.compact", session_id = %session_id, "Compacting session context");
757
758 {
759 let session_lock = self.get_session(session_id).await?;
760 let mut session = session_lock.write().await;
761
762 let llm_client = if let Some(client) = &session.llm_client {
764 client.clone()
765 } else if let Some(client) = &self.llm_client {
766 client.clone()
767 } else {
768 tracing::warn!("No LLM client configured for compaction, using simple truncation");
770 let keep_messages = 20;
771 if session.messages.len() > keep_messages {
772 let len = session.messages.len();
773 session.messages = session.messages.split_off(len - keep_messages);
774 }
775 drop(session);
777 self.persist_in_background(session_id, "compact");
778 return Ok(());
779 };
780
781 session.compact(&llm_client).await?;
782 }
783
784 self.persist_in_background(session_id, "compact");
786
787 Ok(())
788 }
789
790 pub async fn maybe_auto_compact(&self, session_id: &str) -> Result<bool> {
797 let (should_compact, percent_before, messages_before) = {
798 let session_lock = self.get_session(session_id).await?;
799 let session = session_lock.read().await;
800
801 if !session.config.auto_compact {
802 return Ok(false);
803 }
804
805 let threshold = session.config.auto_compact_threshold;
806 let percent = session.context_usage.percent;
807 let msg_count = session.messages.len();
808
809 tracing::debug!(
810 "Auto-compact check for session {}: percent={:.2}%, threshold={:.2}%, messages={}",
811 session_id,
812 percent * 100.0,
813 threshold * 100.0,
814 msg_count,
815 );
816
817 (percent >= threshold, percent, msg_count)
818 };
819
820 if !should_compact {
821 return Ok(false);
822 }
823
824 tracing::info!(
825 name: "a3s.session.auto_compact",
826 session_id = %session_id,
827 percent_before = %format!("{:.1}%", percent_before * 100.0),
828 messages_before = %messages_before,
829 "Auto-compacting session due to high context usage"
830 );
831
832 self.compact(session_id).await?;
834
835 let messages_after = {
837 let session_lock = self.get_session(session_id).await?;
838 let session = session_lock.read().await;
839 session.messages.len()
840 };
841
842 let event = AgentEvent::ContextCompacted {
844 session_id: session_id.to_string(),
845 before_messages: messages_before,
846 after_messages: messages_after,
847 percent_before,
848 };
849
850 if let Ok(session_lock) = self.get_session(session_id).await {
852 let session = session_lock.read().await;
853 let _ = session.event_tx.send(event);
854 }
855
856 tracing::info!(
857 name: "a3s.session.auto_compact.done",
858 session_id = %session_id,
859 messages_before = %messages_before,
860 messages_after = %messages_after,
861 "Auto-compaction complete"
862 );
863
864 Ok(true)
865 }
866
867 pub async fn get_llm_for_session(
871 &self,
872 session_id: &str,
873 ) -> Result<Option<Arc<dyn LlmClient>>> {
874 let session_lock = self.get_session(session_id).await?;
875 let session = session_lock.read().await;
876
877 if let Some(client) = &session.llm_client {
878 return Ok(Some(client.clone()));
879 }
880
881 Ok(self.llm_client.clone())
882 }
883
884 pub async fn configure(
886 &self,
887 session_id: &str,
888 thinking: Option<bool>,
889 budget: Option<usize>,
890 model_config: Option<LlmConfig>,
891 ) -> Result<()> {
892 {
893 let session_lock = self.get_session(session_id).await?;
894 let mut session = session_lock.write().await;
895
896 if let Some(t) = thinking {
897 session.thinking_enabled = t;
898 }
899 if let Some(b) = budget {
900 session.thinking_budget = Some(b);
901 }
902 if let Some(ref config) = model_config {
903 tracing::info!(
904 "Configuring session {} with LLM: provider={}, model={}",
905 session_id,
906 config.provider,
907 config.model
908 );
909 session.model_name = Some(config.model.clone());
910 session.llm_client = Some(llm::create_client_with_config(config.clone()));
911 }
912 }
913
914 if let Some(config) = model_config {
916 let llm_config_data = LlmConfigData {
917 provider: config.provider,
918 model: config.model,
919 api_key: None, base_url: config.base_url,
921 };
922 let mut configs = self.llm_configs.write().await;
923 configs.insert(session_id.to_string(), llm_config_data);
924 }
925
926 self.persist_in_background(session_id, "configure");
928
929 Ok(())
930 }
931
932 pub async fn session_count(&self) -> usize {
934 let sessions = self.sessions.read().await;
935 sessions.len()
936 }
937
938 pub async fn store_health(&self) -> Vec<(String, Result<()>)> {
940 let stores = self.stores.read().await;
941 let mut results = Vec::new();
942 for (_, store) in stores.iter() {
943 let name = store.backend_name().to_string();
944 let result = store.health_check().await;
945 results.push((name, result));
946 }
947 results
948 }
949
950 pub fn list_tools(&self) -> Vec<crate::llm::ToolDefinition> {
952 self.tool_executor.definitions()
953 }
954
955 pub async fn pause_session(&self, session_id: &str) -> Result<bool> {
957 let paused = {
958 let session_lock = self.get_session(session_id).await?;
959 let mut session = session_lock.write().await;
960 session.pause()
961 };
962
963 if paused {
964 self.persist_in_background(session_id, "pause");
965 }
966
967 Ok(paused)
968 }
969
970 pub async fn resume_session(&self, session_id: &str) -> Result<bool> {
972 let resumed = {
973 let session_lock = self.get_session(session_id).await?;
974 let mut session = session_lock.write().await;
975 session.resume()
976 };
977
978 if resumed {
979 self.persist_in_background(session_id, "resume");
980 }
981
982 Ok(resumed)
983 }
984
985 pub async fn cancel_operation(&self, session_id: &str) -> Result<bool> {
989 let session_lock = self.get_session(session_id).await?;
991 let cancelled_confirmations = {
992 let session = session_lock.read().await;
993 session.confirmation_manager.cancel_all().await
994 };
995
996 if cancelled_confirmations > 0 {
997 tracing::info!(
998 "Cancelled {} pending confirmations for session {}",
999 cancelled_confirmations,
1000 session_id
1001 );
1002 }
1003
1004 let abort_handle = {
1006 let mut ops = self.ongoing_operations.write().await;
1007 ops.remove(session_id)
1008 };
1009
1010 if let Some(handle) = abort_handle {
1011 handle.abort();
1012 tracing::info!("Cancelled ongoing operation for session {}", session_id);
1013 Ok(true)
1014 } else if cancelled_confirmations > 0 {
1015 Ok(true)
1017 } else {
1018 tracing::debug!("No ongoing operation to cancel for session {}", session_id);
1019 Ok(false)
1020 }
1021 }
1022
1023 pub async fn get_all_sessions(&self) -> Vec<Arc<RwLock<Session>>> {
1025 let sessions = self.sessions.read().await;
1026 sessions.values().cloned().collect()
1027 }
1028
1029 pub fn tool_executor(&self) -> &Arc<ToolExecutor> {
1031 &self.tool_executor
1032 }
1033
1034 pub async fn confirm_tool(
1036 &self,
1037 session_id: &str,
1038 tool_id: &str,
1039 approved: bool,
1040 reason: Option<String>,
1041 ) -> Result<bool> {
1042 let session_lock = self.get_session(session_id).await?;
1043 let session = session_lock.read().await;
1044 session
1045 .confirmation_manager
1046 .confirm(tool_id, approved, reason)
1047 .await
1048 .map_err(|e| anyhow::anyhow!(e))
1049 }
1050
1051 pub async fn set_confirmation_policy(
1053 &self,
1054 session_id: &str,
1055 policy: ConfirmationPolicy,
1056 ) -> Result<ConfirmationPolicy> {
1057 {
1058 let session_lock = self.get_session(session_id).await?;
1059 let session = session_lock.read().await;
1060 session.set_confirmation_policy(policy.clone()).await;
1061 }
1062
1063 {
1065 let session_lock = self.get_session(session_id).await?;
1066 let mut session = session_lock.write().await;
1067 session.config.confirmation_policy = Some(policy.clone());
1068 }
1069
1070 self.persist_in_background(session_id, "set_confirmation_policy");
1072
1073 Ok(policy)
1074 }
1075
1076 pub async fn get_confirmation_policy(&self, session_id: &str) -> Result<ConfirmationPolicy> {
1078 let session_lock = self.get_session(session_id).await?;
1079 let session = session_lock.read().await;
1080 Ok(session.confirmation_policy().await)
1081 }
1082}