1use super::{ContextUsage, Session, SessionConfig, SessionState};
4use crate::agent::{AgentConfig, AgentEvent, AgentLoop, AgentResult};
5use crate::hitl::ConfirmationPolicy;
6use crate::llm::{self, LlmClient, LlmConfig, Message};
7use crate::store::{FileSessionStore, LlmConfigData, SessionData, SessionStore};
8use crate::tools::ToolExecutor;
9use anyhow::{Context, Result};
10use std::collections::HashMap;
11use std::sync::Arc;
12use tokio::sync::{mpsc, RwLock};
13
14#[derive(Clone)]
16pub struct SessionManager {
17 pub(crate) sessions: Arc<RwLock<HashMap<String, Arc<RwLock<Session>>>>>,
18 pub(crate) llm_client: Option<Arc<dyn LlmClient>>,
19 pub(crate) tool_executor: Arc<ToolExecutor>,
20 pub(crate) stores: Arc<RwLock<HashMap<crate::config::StorageBackend, Arc<dyn SessionStore>>>>,
22 pub(crate) session_storage_types: Arc<RwLock<HashMap<String, crate::config::StorageBackend>>>,
24 pub(crate) llm_configs: Arc<RwLock<HashMap<String, LlmConfigData>>>,
26 pub(crate) ongoing_operations: Arc<RwLock<HashMap<String, tokio::task::AbortHandle>>>,
28}
29
30impl SessionManager {
31 pub fn new(llm_client: Option<Arc<dyn LlmClient>>, tool_executor: Arc<ToolExecutor>) -> Self {
33 Self {
34 sessions: Arc::new(RwLock::new(HashMap::new())),
35 llm_client,
36 tool_executor,
37 stores: Arc::new(RwLock::new(HashMap::new())),
38 session_storage_types: Arc::new(RwLock::new(HashMap::new())),
39 llm_configs: Arc::new(RwLock::new(HashMap::new())),
40 ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
41 }
42 }
43
44 pub async fn with_persistence<P: AsRef<std::path::Path>>(
48 llm_client: Option<Arc<dyn LlmClient>>,
49 tool_executor: Arc<ToolExecutor>,
50 sessions_dir: P,
51 ) -> Result<Self> {
52 let store = FileSessionStore::new(sessions_dir).await?;
53 let mut stores = HashMap::new();
54 stores.insert(
55 crate::config::StorageBackend::File,
56 Arc::new(store) as Arc<dyn SessionStore>,
57 );
58
59 let manager = Self {
60 sessions: Arc::new(RwLock::new(HashMap::new())),
61 llm_client,
62 tool_executor,
63 stores: Arc::new(RwLock::new(stores)),
64 session_storage_types: Arc::new(RwLock::new(HashMap::new())),
65 llm_configs: Arc::new(RwLock::new(HashMap::new())),
66 ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
67 };
68
69 Ok(manager)
70 }
71
72 pub fn with_store(
77 llm_client: Option<Arc<dyn LlmClient>>,
78 tool_executor: Arc<ToolExecutor>,
79 store: Arc<dyn SessionStore>,
80 backend: crate::config::StorageBackend,
81 ) -> Self {
82 let mut stores = HashMap::new();
83 stores.insert(backend, store);
84
85 Self {
86 sessions: Arc::new(RwLock::new(HashMap::new())),
87 llm_client,
88 tool_executor,
89 stores: Arc::new(RwLock::new(stores)),
90 session_storage_types: Arc::new(RwLock::new(HashMap::new())),
91 llm_configs: Arc::new(RwLock::new(HashMap::new())),
92 ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
93 }
94 }
95
96 pub async fn restore_session_by_id(&self, session_id: &str) -> Result<()> {
101 {
103 let sessions = self.sessions.read().await;
104 if sessions.contains_key(session_id) {
105 return Ok(());
106 }
107 }
108
109 let stores = self.stores.read().await;
110 for (backend, store) in stores.iter() {
111 match store.load(session_id).await {
112 Ok(Some(data)) => {
113 {
114 let mut storage_types = self.session_storage_types.write().await;
115 storage_types.insert(data.id.clone(), backend.clone());
116 }
117 self.restore_session(data).await?;
118 return Ok(());
119 }
120 Ok(None) => continue,
121 Err(e) => {
122 tracing::warn!(
123 "Failed to load session {} from {:?}: {}",
124 session_id,
125 backend,
126 e
127 );
128 continue;
129 }
130 }
131 }
132
133 Err(anyhow::anyhow!(
134 "Session {} not found in any store",
135 session_id
136 ))
137 }
138
139 pub async fn load_all_sessions(&mut self) -> Result<usize> {
141 let stores = self.stores.read().await;
142 let mut loaded = 0;
143
144 for (backend, store) in stores.iter() {
145 let session_ids = match store.list().await {
146 Ok(ids) => ids,
147 Err(e) => {
148 tracing::warn!("Failed to list sessions from {:?} store: {}", backend, e);
149 continue;
150 }
151 };
152
153 for id in session_ids {
154 match store.load(&id).await {
155 Ok(Some(data)) => {
156 {
158 let mut storage_types = self.session_storage_types.write().await;
159 storage_types.insert(data.id.clone(), backend.clone());
160 }
161
162 if let Err(e) = self.restore_session(data).await {
163 tracing::warn!("Failed to restore session {}: {}", id, e);
164 } else {
165 loaded += 1;
166 }
167 }
168 Ok(None) => {
169 tracing::warn!("Session {} not found in store", id);
170 }
171 Err(e) => {
172 tracing::warn!("Failed to load session {}: {}", id, e);
173 }
174 }
175 }
176 }
177
178 tracing::info!("Loaded {} sessions from store", loaded);
179 Ok(loaded)
180 }
181
182 async fn restore_session(&self, data: SessionData) -> Result<()> {
184 let tools = self.tool_executor.definitions();
185 let mut session = Session::new(data.id.clone(), data.config.clone(), tools).await?;
186
187 session.restore_from_data(&data);
189
190 if let Some(llm_config) = &data.llm_config {
192 let mut configs = self.llm_configs.write().await;
193 configs.insert(data.id.clone(), llm_config.clone());
194 }
195
196 let mut sessions = self.sessions.write().await;
197 sessions.insert(data.id.clone(), Arc::new(RwLock::new(session)));
198
199 tracing::info!("Restored session: {}", data.id);
200 Ok(())
201 }
202
203 async fn save_session(&self, session_id: &str) -> Result<()> {
205 let storage_type = {
207 let storage_types = self.session_storage_types.read().await;
208 storage_types.get(session_id).cloned()
209 };
210
211 let Some(storage_type) = storage_type else {
212 return Ok(());
214 };
215
216 if storage_type == crate::config::StorageBackend::Memory {
218 return Ok(());
219 }
220
221 let stores = self.stores.read().await;
223 let Some(store) = stores.get(&storage_type) else {
224 tracing::warn!("No store available for storage type: {:?}", storage_type);
225 return Ok(());
226 };
227
228 let session_lock = self.get_session(session_id).await?;
229 let session = session_lock.read().await;
230
231 let llm_config = {
233 let configs = self.llm_configs.read().await;
234 configs.get(session_id).cloned()
235 };
236
237 let data = session.to_session_data(llm_config);
238 store.save(&data).await?;
239
240 tracing::debug!("Saved session: {}", session_id);
241 Ok(())
242 }
243
244 async fn persist_or_warn(&self, session_id: &str, operation: &str) {
248 if let Err(e) = self.save_session(session_id).await {
249 tracing::warn!(
250 "Failed to persist session {} after {}: {}",
251 session_id,
252 operation,
253 e
254 );
255 if let Ok(session_lock) = self.get_session(session_id).await {
257 let session = session_lock.read().await;
258 let _ = session.event_tx().send(AgentEvent::PersistenceFailed {
259 session_id: session_id.to_string(),
260 operation: operation.to_string(),
261 error: e.to_string(),
262 });
263 }
264 }
265 }
266
267 fn persist_in_background(&self, session_id: &str, operation: &str) {
272 let mgr = self.clone();
273 let sid = session_id.to_string();
274 let op = operation.to_string();
275 tokio::spawn(async move {
276 mgr.persist_or_warn(&sid, &op).await;
277 });
278 }
279
280 pub async fn create_session(&self, id: String, config: SessionConfig) -> Result<String> {
282 tracing::info!(name: "a3s.session.create", session_id = %id, "Creating session");
283
284 {
286 let mut storage_types = self.session_storage_types.write().await;
287 storage_types.insert(id.clone(), config.storage_type.clone());
288 }
289
290 let tools = self.tool_executor.definitions();
292 let mut session = Session::new(id.clone(), config, tools).await?;
293
294 session.start_queue().await?;
296
297 if session.config.max_context_length > 0 {
299 session.context_usage.max_tokens = session.config.max_context_length as usize;
300 }
301
302 {
303 let mut sessions = self.sessions.write().await;
304 sessions.insert(id.clone(), Arc::new(RwLock::new(session)));
305 }
306
307 self.persist_in_background(&id, "create");
309
310 tracing::info!("Created session: {}", id);
311 Ok(id)
312 }
313
314 pub async fn destroy_session(&self, id: &str) -> Result<()> {
316 tracing::info!(name: "a3s.session.destroy", session_id = %id, "Destroying session");
317
318 let storage_type = {
320 let storage_types = self.session_storage_types.read().await;
321 storage_types.get(id).cloned()
322 };
323
324 {
325 let mut sessions = self.sessions.write().await;
326 sessions.remove(id);
327 }
328
329 {
331 let mut configs = self.llm_configs.write().await;
332 configs.remove(id);
333 }
334
335 {
337 let mut storage_types = self.session_storage_types.write().await;
338 storage_types.remove(id);
339 }
340
341 if let Some(storage_type) = storage_type {
343 if storage_type != crate::config::StorageBackend::Memory {
344 let stores = self.stores.read().await;
345 if let Some(store) = stores.get(&storage_type) {
346 if let Err(e) = store.delete(id).await {
347 tracing::warn!("Failed to delete session {} from store: {}", id, e);
348 }
349 }
350 }
351 }
352
353 tracing::info!("Destroyed session: {}", id);
354 Ok(())
355 }
356
357 pub async fn get_session(&self, id: &str) -> Result<Arc<RwLock<Session>>> {
359 let sessions = self.sessions.read().await;
360 sessions
361 .get(id)
362 .cloned()
363 .context(format!("Session not found: {}", id))
364 }
365
366 pub async fn create_child_session(
371 &self,
372 parent_id: &str,
373 child_id: String,
374 mut config: SessionConfig,
375 ) -> Result<String> {
376 let parent_lock = self.get_session(parent_id).await?;
378 let parent_llm_client = {
379 let parent = parent_lock.read().await;
380
381 if config.confirmation_policy.is_none() {
383 let parent_policy = parent.confirmation_manager.policy().await;
384 config.confirmation_policy = Some(parent_policy);
385 }
386
387 parent.llm_client.clone()
388 };
389
390 config.parent_id = Some(parent_id.to_string());
392
393 let tools = self.tool_executor.definitions();
395 let mut session = Session::new(child_id.clone(), config, tools).await?;
396
397 if session.llm_client.is_none() {
399 session.llm_client = parent_llm_client.or_else(|| self.llm_client.clone());
400 }
401
402 session.start_queue().await?;
404
405 if session.config.max_context_length > 0 {
407 session.context_usage.max_tokens = session.config.max_context_length as usize;
408 }
409
410 {
411 let mut sessions = self.sessions.write().await;
412 sessions.insert(child_id.clone(), Arc::new(RwLock::new(session)));
413 }
414
415 self.persist_in_background(&child_id, "create_child");
417
418 tracing::info!(
419 "Created child session: {} (parent: {})",
420 child_id,
421 parent_id
422 );
423 Ok(child_id)
424 }
425
426 pub async fn get_child_sessions(&self, parent_id: &str) -> Vec<String> {
428 let sessions = self.sessions.read().await;
429 let mut children = Vec::new();
430
431 for (id, session_lock) in sessions.iter() {
432 let session = session_lock.read().await;
433 if session.parent_id.as_deref() == Some(parent_id) {
434 children.push(id.clone());
435 }
436 }
437
438 children
439 }
440
441 pub async fn is_child_session(&self, session_id: &str) -> Result<bool> {
443 let session_lock = self.get_session(session_id).await?;
444 let session = session_lock.read().await;
445 Ok(session.is_child_session())
446 }
447
448 pub async fn generate(&self, session_id: &str, prompt: &str) -> Result<AgentResult> {
450 let session_lock = self.get_session(session_id).await?;
451
452 {
454 let session = session_lock.read().await;
455 if session.state == SessionState::Paused {
456 anyhow::bail!(
457 "Session {} is paused. Call Resume before generating.",
458 session_id
459 );
460 }
461 }
462
463 let (
465 history,
466 system,
467 tools,
468 session_llm_client,
469 permission_checker,
470 confirmation_manager,
471 context_providers,
472 session_workspace,
473 tool_metrics,
474 hook_engine,
475 planning_enabled,
476 goal_tracking,
477 ) = {
478 let session = session_lock.read().await;
479 (
480 session.messages.clone(),
481 session.system().map(String::from),
482 session.tools.clone(),
483 session.llm_client.clone(),
484 session.permission_checker.clone(),
485 session.confirmation_manager.clone(),
486 session.context_providers.clone(),
487 session.config.workspace.clone(),
488 session.tool_metrics.clone(),
489 session.config.hook_engine.clone(),
490 session.config.planning_enabled,
491 session.config.goal_tracking,
492 )
493 };
494
495 let llm_client = if let Some(client) = session_llm_client {
497 client
498 } else if let Some(client) = &self.llm_client {
499 client.clone()
500 } else {
501 anyhow::bail!(
502 "LLM client not configured for session {}. Please call Configure RPC with model configuration first.",
503 session_id
504 );
505 };
506
507 let tool_context = if session_workspace.is_empty() {
509 crate::tools::ToolContext::new(self.tool_executor.workspace().clone())
510 .with_session_id(session_id)
511 } else {
512 crate::tools::ToolContext::new(std::path::PathBuf::from(&session_workspace))
513 .with_session_id(session_id)
514 };
515
516 let config = AgentConfig {
518 system_prompt: system,
519 tools,
520 max_tool_rounds: 50,
521 permission_checker: Some(permission_checker),
522 confirmation_manager: Some(confirmation_manager),
523 context_providers,
524 planning_enabled,
525 goal_tracking,
526 hook_engine,
527 skill_registry: None,
528 ..AgentConfig::default()
529 };
530
531 let agent = AgentLoop::new(llm_client, self.tool_executor.clone(), tool_context, config)
532 .with_tool_metrics(tool_metrics);
533
534 let result = agent
536 .execute_with_session(&history, prompt, Some(session_id), None)
537 .await?;
538
539 {
541 let mut session = session_lock.write().await;
542 session.messages = result.messages.clone();
543 session.update_usage(&result.usage);
544 }
545
546 self.persist_in_background(session_id, "generate");
548
549 if let Err(e) = self.maybe_auto_compact(session_id).await {
551 tracing::warn!("Auto-compact failed for session {}: {}", session_id, e);
552 }
553
554 Ok(result)
555 }
556
557 pub async fn generate_streaming(
559 &self,
560 session_id: &str,
561 prompt: &str,
562 ) -> Result<(
563 mpsc::Receiver<AgentEvent>,
564 tokio::task::JoinHandle<Result<AgentResult>>,
565 )> {
566 let session_lock = self.get_session(session_id).await?;
567
568 {
570 let session = session_lock.read().await;
571 if session.state == SessionState::Paused {
572 anyhow::bail!(
573 "Session {} is paused. Call Resume before generating.",
574 session_id
575 );
576 }
577 }
578
579 let (
581 history,
582 system,
583 tools,
584 session_llm_client,
585 permission_checker,
586 confirmation_manager,
587 context_providers,
588 session_workspace,
589 tool_metrics,
590 hook_engine,
591 planning_enabled,
592 goal_tracking,
593 ) = {
594 let session = session_lock.read().await;
595 (
596 session.messages.clone(),
597 session.system().map(String::from),
598 session.tools.clone(),
599 session.llm_client.clone(),
600 session.permission_checker.clone(),
601 session.confirmation_manager.clone(),
602 session.context_providers.clone(),
603 session.config.workspace.clone(),
604 session.tool_metrics.clone(),
605 session.config.hook_engine.clone(),
606 session.config.planning_enabled,
607 session.config.goal_tracking,
608 )
609 };
610
611 let llm_client = if let Some(client) = session_llm_client {
613 client
614 } else if let Some(client) = &self.llm_client {
615 client.clone()
616 } else {
617 anyhow::bail!(
618 "LLM client not configured for session {}. Please call Configure RPC with model configuration first.",
619 session_id
620 );
621 };
622
623 let tool_context = if session_workspace.is_empty() {
625 crate::tools::ToolContext::new(self.tool_executor.workspace().clone())
626 .with_session_id(session_id)
627 } else {
628 crate::tools::ToolContext::new(std::path::PathBuf::from(&session_workspace))
629 .with_session_id(session_id)
630 };
631
632 let config = AgentConfig {
634 system_prompt: system,
635 tools,
636 max_tool_rounds: 50,
637 permission_checker: Some(permission_checker),
638 confirmation_manager: Some(confirmation_manager),
639 context_providers,
640 planning_enabled,
641 goal_tracking,
642 hook_engine,
643 skill_registry: None,
644 ..AgentConfig::default()
645 };
646
647 let agent = AgentLoop::new(llm_client, self.tool_executor.clone(), tool_context, config)
648 .with_tool_metrics(tool_metrics);
649
650 let (rx, handle) = agent.execute_streaming(&history, prompt).await?;
652
653 let abort_handle = handle.abort_handle();
655 {
656 let mut ops = self.ongoing_operations.write().await;
657 ops.insert(session_id.to_string(), abort_handle);
658 }
659
660 let session_lock_clone = session_lock.clone();
662 let original_handle = handle;
663 let stores = self.stores.clone();
664 let session_storage_types = self.session_storage_types.clone();
665 let llm_configs = self.llm_configs.clone();
666 let session_id_owned = session_id.to_string();
667 let ongoing_operations = self.ongoing_operations.clone();
668 let session_manager = self.clone();
669
670 let wrapped_handle = tokio::spawn(async move {
671 let result = original_handle.await??;
672
673 {
675 let mut ops = ongoing_operations.write().await;
676 ops.remove(&session_id_owned);
677 }
678
679 {
681 let mut session = session_lock_clone.write().await;
682 session.messages = result.messages.clone();
683 session.update_usage(&result.usage);
684 }
685
686 let storage_type = {
688 let storage_types = session_storage_types.read().await;
689 storage_types.get(&session_id_owned).cloned()
690 };
691
692 if let Some(storage_type) = storage_type {
693 if storage_type != crate::config::StorageBackend::Memory {
694 let stores_guard = stores.read().await;
695 if let Some(store) = stores_guard.get(&storage_type) {
696 let session = session_lock_clone.read().await;
697 let llm_config = {
698 let configs = llm_configs.read().await;
699 configs.get(&session_id_owned).cloned()
700 };
701 let data = session.to_session_data(llm_config);
702 if let Err(e) = store.save(&data).await {
703 tracing::warn!(
704 "Failed to persist session {} after streaming: {}",
705 session_id_owned,
706 e
707 );
708 }
709 }
710 }
711 }
712
713 if let Err(e) = session_manager.maybe_auto_compact(&session_id_owned).await {
715 tracing::warn!(
716 "Auto-compact failed for session {}: {}",
717 session_id_owned,
718 e
719 );
720 }
721
722 Ok(result)
723 });
724
725 Ok((rx, wrapped_handle))
726 }
727
728 pub async fn context_usage(&self, session_id: &str) -> Result<ContextUsage> {
730 let session_lock = self.get_session(session_id).await?;
731 let session = session_lock.read().await;
732 Ok(session.context_usage.clone())
733 }
734
735 pub async fn history(&self, session_id: &str) -> Result<Vec<Message>> {
737 let session_lock = self.get_session(session_id).await?;
738 let session = session_lock.read().await;
739 Ok(session.messages.clone())
740 }
741
742 pub async fn clear(&self, session_id: &str) -> Result<()> {
744 {
745 let session_lock = self.get_session(session_id).await?;
746 let mut session = session_lock.write().await;
747 session.clear();
748 }
749
750 self.persist_in_background(session_id, "clear");
752
753 Ok(())
754 }
755
756 pub async fn compact(&self, session_id: &str) -> Result<()> {
758 tracing::info!(name: "a3s.session.compact", session_id = %session_id, "Compacting session context");
759
760 {
761 let session_lock = self.get_session(session_id).await?;
762 let mut session = session_lock.write().await;
763
764 let llm_client = if let Some(client) = &session.llm_client {
766 client.clone()
767 } else if let Some(client) = &self.llm_client {
768 client.clone()
769 } else {
770 tracing::warn!("No LLM client configured for compaction, using simple truncation");
772 let keep_messages = 20;
773 if session.messages.len() > keep_messages {
774 let len = session.messages.len();
775 session.messages = session.messages.split_off(len - keep_messages);
776 }
777 drop(session);
779 self.persist_in_background(session_id, "compact");
780 return Ok(());
781 };
782
783 session.compact(&llm_client).await?;
784 }
785
786 self.persist_in_background(session_id, "compact");
788
789 Ok(())
790 }
791
792 pub async fn maybe_auto_compact(&self, session_id: &str) -> Result<bool> {
799 let (should_compact, percent_before, messages_before) = {
800 let session_lock = self.get_session(session_id).await?;
801 let session = session_lock.read().await;
802
803 if !session.config.auto_compact {
804 return Ok(false);
805 }
806
807 let threshold = session.config.auto_compact_threshold;
808 let percent = session.context_usage.percent;
809 let msg_count = session.messages.len();
810
811 tracing::debug!(
812 "Auto-compact check for session {}: percent={:.2}%, threshold={:.2}%, messages={}",
813 session_id,
814 percent * 100.0,
815 threshold * 100.0,
816 msg_count,
817 );
818
819 (percent >= threshold, percent, msg_count)
820 };
821
822 if !should_compact {
823 return Ok(false);
824 }
825
826 tracing::info!(
827 name: "a3s.session.auto_compact",
828 session_id = %session_id,
829 percent_before = %format!("{:.1}%", percent_before * 100.0),
830 messages_before = %messages_before,
831 "Auto-compacting session due to high context usage"
832 );
833
834 self.compact(session_id).await?;
836
837 let messages_after = {
839 let session_lock = self.get_session(session_id).await?;
840 let session = session_lock.read().await;
841 session.messages.len()
842 };
843
844 let event = AgentEvent::ContextCompacted {
846 session_id: session_id.to_string(),
847 before_messages: messages_before,
848 after_messages: messages_after,
849 percent_before,
850 };
851
852 if let Ok(session_lock) = self.get_session(session_id).await {
854 let session = session_lock.read().await;
855 let _ = session.event_tx.send(event);
856 }
857
858 tracing::info!(
859 name: "a3s.session.auto_compact.done",
860 session_id = %session_id,
861 messages_before = %messages_before,
862 messages_after = %messages_after,
863 "Auto-compaction complete"
864 );
865
866 Ok(true)
867 }
868
869 pub async fn get_llm_for_session(
873 &self,
874 session_id: &str,
875 ) -> Result<Option<Arc<dyn LlmClient>>> {
876 let session_lock = self.get_session(session_id).await?;
877 let session = session_lock.read().await;
878
879 if let Some(client) = &session.llm_client {
880 return Ok(Some(client.clone()));
881 }
882
883 Ok(self.llm_client.clone())
884 }
885
886 pub async fn configure(
888 &self,
889 session_id: &str,
890 thinking: Option<bool>,
891 budget: Option<usize>,
892 model_config: Option<LlmConfig>,
893 ) -> Result<()> {
894 {
895 let session_lock = self.get_session(session_id).await?;
896 let mut session = session_lock.write().await;
897
898 if let Some(t) = thinking {
899 session.thinking_enabled = t;
900 }
901 if let Some(b) = budget {
902 session.thinking_budget = Some(b);
903 }
904 if let Some(ref config) = model_config {
905 tracing::info!(
906 "Configuring session {} with LLM: provider={}, model={}",
907 session_id,
908 config.provider,
909 config.model
910 );
911 session.model_name = Some(config.model.clone());
912 session.llm_client = Some(llm::create_client_with_config(config.clone()));
913 }
914 }
915
916 if let Some(config) = model_config {
918 let llm_config_data = LlmConfigData {
919 provider: config.provider,
920 model: config.model,
921 api_key: None, base_url: config.base_url,
923 };
924 let mut configs = self.llm_configs.write().await;
925 configs.insert(session_id.to_string(), llm_config_data);
926 }
927
928 self.persist_in_background(session_id, "configure");
930
931 Ok(())
932 }
933
934 pub async fn session_count(&self) -> usize {
936 let sessions = self.sessions.read().await;
937 sessions.len()
938 }
939
940 pub async fn store_health(&self) -> Vec<(String, Result<()>)> {
942 let stores = self.stores.read().await;
943 let mut results = Vec::new();
944 for (_, store) in stores.iter() {
945 let name = store.backend_name().to_string();
946 let result = store.health_check().await;
947 results.push((name, result));
948 }
949 results
950 }
951
952 pub fn list_tools(&self) -> Vec<crate::llm::ToolDefinition> {
954 self.tool_executor.definitions()
955 }
956
957 pub async fn pause_session(&self, session_id: &str) -> Result<bool> {
959 let paused = {
960 let session_lock = self.get_session(session_id).await?;
961 let mut session = session_lock.write().await;
962 session.pause()
963 };
964
965 if paused {
966 self.persist_in_background(session_id, "pause");
967 }
968
969 Ok(paused)
970 }
971
972 pub async fn resume_session(&self, session_id: &str) -> Result<bool> {
974 let resumed = {
975 let session_lock = self.get_session(session_id).await?;
976 let mut session = session_lock.write().await;
977 session.resume()
978 };
979
980 if resumed {
981 self.persist_in_background(session_id, "resume");
982 }
983
984 Ok(resumed)
985 }
986
987 pub async fn cancel_operation(&self, session_id: &str) -> Result<bool> {
991 let session_lock = self.get_session(session_id).await?;
993 let cancelled_confirmations = {
994 let session = session_lock.read().await;
995 session.confirmation_manager.cancel_all().await
996 };
997
998 if cancelled_confirmations > 0 {
999 tracing::info!(
1000 "Cancelled {} pending confirmations for session {}",
1001 cancelled_confirmations,
1002 session_id
1003 );
1004 }
1005
1006 let abort_handle = {
1008 let mut ops = self.ongoing_operations.write().await;
1009 ops.remove(session_id)
1010 };
1011
1012 if let Some(handle) = abort_handle {
1013 handle.abort();
1014 tracing::info!("Cancelled ongoing operation for session {}", session_id);
1015 Ok(true)
1016 } else if cancelled_confirmations > 0 {
1017 Ok(true)
1019 } else {
1020 tracing::debug!("No ongoing operation to cancel for session {}", session_id);
1021 Ok(false)
1022 }
1023 }
1024
1025 pub async fn get_all_sessions(&self) -> Vec<Arc<RwLock<Session>>> {
1027 let sessions = self.sessions.read().await;
1028 sessions.values().cloned().collect()
1029 }
1030
1031 pub fn tool_executor(&self) -> &Arc<ToolExecutor> {
1033 &self.tool_executor
1034 }
1035
1036 pub async fn confirm_tool(
1038 &self,
1039 session_id: &str,
1040 tool_id: &str,
1041 approved: bool,
1042 reason: Option<String>,
1043 ) -> Result<bool> {
1044 let session_lock = self.get_session(session_id).await?;
1045 let session = session_lock.read().await;
1046 session
1047 .confirmation_manager
1048 .confirm(tool_id, approved, reason)
1049 .await
1050 .map_err(|e| anyhow::anyhow!(e))
1051 }
1052
1053 pub async fn set_confirmation_policy(
1055 &self,
1056 session_id: &str,
1057 policy: ConfirmationPolicy,
1058 ) -> Result<ConfirmationPolicy> {
1059 {
1060 let session_lock = self.get_session(session_id).await?;
1061 let session = session_lock.read().await;
1062 session.set_confirmation_policy(policy.clone()).await;
1063 }
1064
1065 {
1067 let session_lock = self.get_session(session_id).await?;
1068 let mut session = session_lock.write().await;
1069 session.config.confirmation_policy = Some(policy.clone());
1070 }
1071
1072 self.persist_in_background(session_id, "set_confirmation_policy");
1074
1075 Ok(policy)
1076 }
1077
1078 pub async fn get_confirmation_policy(&self, session_id: &str) -> Result<ConfirmationPolicy> {
1080 let session_lock = self.get_session(session_id).await?;
1081 let session = session_lock.read().await;
1082 Ok(session.confirmation_policy().await)
1083 }
1084}