1use super::{ContextUsage, Session, SessionConfig, SessionState};
4use crate::agent::{AgentConfig, AgentEvent, AgentLoop, AgentResult};
5use crate::hitl::ConfirmationPolicy;
6use crate::llm::{self, ContentBlock, LlmClient, LlmConfig, Message};
7use crate::permissions::{PermissionDecision, PermissionPolicy};
8use crate::planning::Task;
9use crate::store::{FileSessionStore, LlmConfigData, SessionData, SessionStore};
10use crate::tools::ToolExecutor;
11use anyhow::{Context, Result};
12use std::collections::HashMap;
13use std::sync::Arc;
14use tokio::sync::{mpsc, RwLock};
15
16#[derive(Clone)]
18pub struct SessionManager {
19 pub(crate) sessions: Arc<RwLock<HashMap<String, Arc<RwLock<Session>>>>>,
20 pub(crate) llm_client: Option<Arc<dyn LlmClient>>,
21 pub(crate) tool_executor: Arc<ToolExecutor>,
22 pub(crate) stores: Arc<RwLock<HashMap<crate::config::StorageBackend, Arc<dyn SessionStore>>>>,
24 pub(crate) session_storage_types: Arc<RwLock<HashMap<String, crate::config::StorageBackend>>>,
26 pub(crate) llm_configs: Arc<RwLock<HashMap<String, LlmConfigData>>>,
28 pub(crate) ongoing_operations: Arc<RwLock<HashMap<String, tokio::task::AbortHandle>>>,
30}
31
32impl SessionManager {
33 pub fn new(llm_client: Option<Arc<dyn LlmClient>>, tool_executor: Arc<ToolExecutor>) -> Self {
35 Self {
36 sessions: Arc::new(RwLock::new(HashMap::new())),
37 llm_client,
38 tool_executor,
39 stores: Arc::new(RwLock::new(HashMap::new())),
40 session_storage_types: Arc::new(RwLock::new(HashMap::new())),
41 llm_configs: Arc::new(RwLock::new(HashMap::new())),
42 ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
43 }
44 }
45
46 pub async fn with_persistence<P: AsRef<std::path::Path>>(
50 llm_client: Option<Arc<dyn LlmClient>>,
51 tool_executor: Arc<ToolExecutor>,
52 sessions_dir: P,
53 ) -> Result<Self> {
54 let store = FileSessionStore::new(sessions_dir).await?;
55 let mut stores = HashMap::new();
56 stores.insert(
57 crate::config::StorageBackend::File,
58 Arc::new(store) as Arc<dyn SessionStore>,
59 );
60
61 let manager = Self {
62 sessions: Arc::new(RwLock::new(HashMap::new())),
63 llm_client,
64 tool_executor,
65 stores: Arc::new(RwLock::new(stores)),
66 session_storage_types: Arc::new(RwLock::new(HashMap::new())),
67 llm_configs: Arc::new(RwLock::new(HashMap::new())),
68 ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
69 };
70
71 Ok(manager)
72 }
73
74 pub fn with_store(
79 llm_client: Option<Arc<dyn LlmClient>>,
80 tool_executor: Arc<ToolExecutor>,
81 store: Arc<dyn SessionStore>,
82 backend: crate::config::StorageBackend,
83 ) -> Self {
84 let mut stores = HashMap::new();
85 stores.insert(backend, store);
86
87 Self {
88 sessions: Arc::new(RwLock::new(HashMap::new())),
89 llm_client,
90 tool_executor,
91 stores: Arc::new(RwLock::new(stores)),
92 session_storage_types: Arc::new(RwLock::new(HashMap::new())),
93 llm_configs: Arc::new(RwLock::new(HashMap::new())),
94 ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
95 }
96 }
97
98 pub async fn restore_session_by_id(&self, session_id: &str) -> Result<()> {
103 {
105 let sessions = self.sessions.read().await;
106 if sessions.contains_key(session_id) {
107 return Ok(());
108 }
109 }
110
111 let stores = self.stores.read().await;
112 for (backend, store) in stores.iter() {
113 match store.load(session_id).await {
114 Ok(Some(data)) => {
115 {
116 let mut storage_types = self.session_storage_types.write().await;
117 storage_types.insert(data.id.clone(), backend.clone());
118 }
119 self.restore_session(data).await?;
120 return Ok(());
121 }
122 Ok(None) => continue,
123 Err(e) => {
124 tracing::warn!(
125 "Failed to load session {} from {:?}: {}",
126 session_id,
127 backend,
128 e
129 );
130 continue;
131 }
132 }
133 }
134
135 Err(anyhow::anyhow!(
136 "Session {} not found in any store",
137 session_id
138 ))
139 }
140
141 pub async fn load_all_sessions(&mut self) -> Result<usize> {
143 let stores = self.stores.read().await;
144 let mut loaded = 0;
145
146 for (backend, store) in stores.iter() {
147 let session_ids = match store.list().await {
148 Ok(ids) => ids,
149 Err(e) => {
150 tracing::warn!("Failed to list sessions from {:?} store: {}", backend, e);
151 continue;
152 }
153 };
154
155 for id in session_ids {
156 match store.load(&id).await {
157 Ok(Some(data)) => {
158 {
160 let mut storage_types = self.session_storage_types.write().await;
161 storage_types.insert(data.id.clone(), backend.clone());
162 }
163
164 if let Err(e) = self.restore_session(data).await {
165 tracing::warn!("Failed to restore session {}: {}", id, e);
166 } else {
167 loaded += 1;
168 }
169 }
170 Ok(None) => {
171 tracing::warn!("Session {} not found in store", id);
172 }
173 Err(e) => {
174 tracing::warn!("Failed to load session {}: {}", id, e);
175 }
176 }
177 }
178 }
179
180 tracing::info!("Loaded {} sessions from store", loaded);
181 Ok(loaded)
182 }
183
184 async fn restore_session(&self, data: SessionData) -> Result<()> {
186 let tools = self.tool_executor.definitions();
187 let mut session = Session::new(data.id.clone(), data.config.clone(), tools).await?;
188
189 session.restore_from_data(&data);
191
192 if let Some(llm_config) = &data.llm_config {
194 let mut configs = self.llm_configs.write().await;
195 configs.insert(data.id.clone(), llm_config.clone());
196 }
197
198 let mut sessions = self.sessions.write().await;
199 sessions.insert(data.id.clone(), Arc::new(RwLock::new(session)));
200
201 tracing::info!("Restored session: {}", data.id);
202 Ok(())
203 }
204
205 async fn save_session(&self, session_id: &str) -> Result<()> {
207 let storage_type = {
209 let storage_types = self.session_storage_types.read().await;
210 storage_types.get(session_id).cloned()
211 };
212
213 let Some(storage_type) = storage_type else {
214 return Ok(());
216 };
217
218 if storage_type == crate::config::StorageBackend::Memory {
220 return Ok(());
221 }
222
223 let stores = self.stores.read().await;
225 let Some(store) = stores.get(&storage_type) else {
226 tracing::warn!("No store available for storage type: {:?}", storage_type);
227 return Ok(());
228 };
229
230 let session_lock = self.get_session(session_id).await?;
231 let session = session_lock.read().await;
232
233 let llm_config = {
235 let configs = self.llm_configs.read().await;
236 configs.get(session_id).cloned()
237 };
238
239 let data = session.to_session_data(llm_config);
240 store.save(&data).await?;
241
242 tracing::debug!("Saved session: {}", session_id);
243 Ok(())
244 }
245
246 async fn persist_or_warn(&self, session_id: &str, operation: &str) {
250 if let Err(e) = self.save_session(session_id).await {
251 tracing::warn!(
252 "Failed to persist session {} after {}: {}",
253 session_id,
254 operation,
255 e
256 );
257 if let Ok(session_lock) = self.get_session(session_id).await {
259 let session = session_lock.read().await;
260 let _ = session.event_tx().send(AgentEvent::PersistenceFailed {
261 session_id: session_id.to_string(),
262 operation: operation.to_string(),
263 error: e.to_string(),
264 });
265 }
266 }
267 }
268
269 fn persist_in_background(&self, session_id: &str, operation: &str) {
274 let mgr = self.clone();
275 let sid = session_id.to_string();
276 let op = operation.to_string();
277 tokio::spawn(async move {
278 mgr.persist_or_warn(&sid, &op).await;
279 });
280 }
281
282 pub async fn create_session(&self, id: String, config: SessionConfig) -> Result<String> {
284 tracing::info!(name: "a3s.session.create", session_id = %id, "Creating session");
285
286 {
288 let mut storage_types = self.session_storage_types.write().await;
289 storage_types.insert(id.clone(), config.storage_type.clone());
290 }
291
292 let tools = self.tool_executor.definitions();
294 let mut session = Session::new(id.clone(), config, tools).await?;
295
296 session.start_queue().await?;
298
299 if session.config.max_context_length > 0 {
301 session.context_usage.max_tokens = session.config.max_context_length as usize;
302 }
303
304 {
305 let mut sessions = self.sessions.write().await;
306 sessions.insert(id.clone(), Arc::new(RwLock::new(session)));
307 }
308
309 self.persist_in_background(&id, "create");
311
312 tracing::info!("Created session: {}", id);
313 Ok(id)
314 }
315
316 pub async fn destroy_session(&self, id: &str) -> Result<()> {
318 tracing::info!(name: "a3s.session.destroy", session_id = %id, "Destroying session");
319
320 let storage_type = {
322 let storage_types = self.session_storage_types.read().await;
323 storage_types.get(id).cloned()
324 };
325
326 {
327 let mut sessions = self.sessions.write().await;
328 sessions.remove(id);
329 }
330
331 {
333 let mut configs = self.llm_configs.write().await;
334 configs.remove(id);
335 }
336
337 {
339 let mut storage_types = self.session_storage_types.write().await;
340 storage_types.remove(id);
341 }
342
343 if let Some(storage_type) = storage_type {
345 if storage_type != crate::config::StorageBackend::Memory {
346 let stores = self.stores.read().await;
347 if let Some(store) = stores.get(&storage_type) {
348 if let Err(e) = store.delete(id).await {
349 tracing::warn!("Failed to delete session {} from store: {}", id, e);
350 }
351 }
352 }
353 }
354
355 tracing::info!("Destroyed session: {}", id);
356 Ok(())
357 }
358
359 pub async fn get_session(&self, id: &str) -> Result<Arc<RwLock<Session>>> {
361 let sessions = self.sessions.read().await;
362 sessions
363 .get(id)
364 .cloned()
365 .context(format!("Session not found: {}", id))
366 }
367
368 pub async fn create_child_session(
373 &self,
374 parent_id: &str,
375 child_id: String,
376 mut config: SessionConfig,
377 ) -> Result<String> {
378 let parent_lock = self.get_session(parent_id).await?;
380 let parent_llm_client = {
381 let parent = parent_lock.read().await;
382
383 if config.confirmation_policy.is_none() {
385 let parent_policy = parent.confirmation_manager.policy().await;
386 config.confirmation_policy = Some(parent_policy);
387 }
388
389 parent.llm_client.clone()
390 };
391
392 config.parent_id = Some(parent_id.to_string());
394
395 let tools = self.tool_executor.definitions();
397 let mut session = Session::new(child_id.clone(), config, tools).await?;
398
399 if session.llm_client.is_none() {
401 session.llm_client = parent_llm_client.or_else(|| self.llm_client.clone());
402 }
403
404 session.start_queue().await?;
406
407 if session.config.max_context_length > 0 {
409 session.context_usage.max_tokens = session.config.max_context_length as usize;
410 }
411
412 {
413 let mut sessions = self.sessions.write().await;
414 sessions.insert(child_id.clone(), Arc::new(RwLock::new(session)));
415 }
416
417 self.persist_in_background(&child_id, "create_child");
419
420 tracing::info!(
421 "Created child session: {} (parent: {})",
422 child_id,
423 parent_id
424 );
425 Ok(child_id)
426 }
427
428 pub async fn get_child_sessions(&self, parent_id: &str) -> Vec<String> {
430 let sessions = self.sessions.read().await;
431 let mut children = Vec::new();
432
433 for (id, session_lock) in sessions.iter() {
434 let session = session_lock.read().await;
435 if session.parent_id.as_deref() == Some(parent_id) {
436 children.push(id.clone());
437 }
438 }
439
440 children
441 }
442
443 pub async fn is_child_session(&self, session_id: &str) -> Result<bool> {
445 let session_lock = self.get_session(session_id).await?;
446 let session = session_lock.read().await;
447 Ok(session.is_child_session())
448 }
449
450 pub async fn generate(&self, session_id: &str, prompt: &str) -> Result<AgentResult> {
452 let session_lock = self.get_session(session_id).await?;
453
454 {
456 let session = session_lock.read().await;
457 if session.state == SessionState::Paused {
458 anyhow::bail!(
459 "Session {} is paused. Call Resume before generating.",
460 session_id
461 );
462 }
463 }
464
465 let (
467 history,
468 system,
469 tools,
470 session_llm_client,
471 permission_policy,
472 confirmation_manager,
473 context_providers,
474 session_workspace,
475 tool_metrics,
476 hook_engine,
477 planning_enabled,
478 goal_tracking,
479 loaded_skills,
480 ) = {
481 let session = session_lock.read().await;
482 (
483 session.messages.clone(),
484 session.system().map(String::from),
485 session.tools.clone(),
486 session.llm_client.clone(),
487 session.permission_policy.clone(),
488 session.confirmation_manager.clone(),
489 session.context_providers.clone(),
490 session.config.workspace.clone(),
491 session.tool_metrics.clone(),
492 session.config.hook_engine.clone(),
493 session.config.planning_enabled,
494 session.config.goal_tracking,
495 session.loaded_skills.clone(),
496 )
497 };
498
499 let llm_client = if let Some(client) = session_llm_client {
501 client
502 } else if let Some(client) = &self.llm_client {
503 client.clone()
504 } else {
505 anyhow::bail!(
506 "LLM client not configured for session {}. Please call Configure RPC with model configuration first.",
507 session_id
508 );
509 };
510
511 let tool_context = if session_workspace.is_empty() {
513 crate::tools::ToolContext::new(self.tool_executor.workspace().clone())
514 .with_session_id(session_id)
515 } else {
516 crate::tools::ToolContext::new(std::path::PathBuf::from(&session_workspace))
517 .with_session_id(session_id)
518 };
519
520 let config = AgentConfig {
522 system_prompt: system,
523 tools,
524 max_tool_rounds: 50,
525 permission_policy: Some(permission_policy),
526 confirmation_manager: Some(confirmation_manager),
527 context_providers,
528 planning_enabled,
529 goal_tracking,
530 skill_tool_filters: loaded_skills,
531 hook_engine,
532 };
533
534 let agent = AgentLoop::new(llm_client, self.tool_executor.clone(), tool_context, config)
535 .with_tool_metrics(tool_metrics);
536
537 let result = agent
539 .execute_with_session(&history, prompt, Some(session_id), None)
540 .await?;
541
542 {
544 let mut session = session_lock.write().await;
545 session.messages = result.messages.clone();
546 session.update_usage(&result.usage);
547 }
548
549 self.persist_in_background(session_id, "generate");
551
552 if let Err(e) = self.maybe_auto_compact(session_id).await {
554 tracing::warn!("Auto-compact failed for session {}: {}", session_id, e);
555 }
556
557 Ok(result)
558 }
559
560 pub async fn generate_streaming(
562 &self,
563 session_id: &str,
564 prompt: &str,
565 ) -> Result<(
566 mpsc::Receiver<AgentEvent>,
567 tokio::task::JoinHandle<Result<AgentResult>>,
568 )> {
569 let session_lock = self.get_session(session_id).await?;
570
571 {
573 let session = session_lock.read().await;
574 if session.state == SessionState::Paused {
575 anyhow::bail!(
576 "Session {} is paused. Call Resume before generating.",
577 session_id
578 );
579 }
580 }
581
582 let (
584 history,
585 system,
586 tools,
587 session_llm_client,
588 permission_policy,
589 confirmation_manager,
590 context_providers,
591 session_workspace,
592 tool_metrics,
593 hook_engine,
594 planning_enabled,
595 goal_tracking,
596 loaded_skills,
597 ) = {
598 let session = session_lock.read().await;
599 (
600 session.messages.clone(),
601 session.system().map(String::from),
602 session.tools.clone(),
603 session.llm_client.clone(),
604 session.permission_policy.clone(),
605 session.confirmation_manager.clone(),
606 session.context_providers.clone(),
607 session.config.workspace.clone(),
608 session.tool_metrics.clone(),
609 session.config.hook_engine.clone(),
610 session.config.planning_enabled,
611 session.config.goal_tracking,
612 session.loaded_skills.clone(),
613 )
614 };
615
616 let llm_client = if let Some(client) = session_llm_client {
618 client
619 } else if let Some(client) = &self.llm_client {
620 client.clone()
621 } else {
622 anyhow::bail!(
623 "LLM client not configured for session {}. Please call Configure RPC with model configuration first.",
624 session_id
625 );
626 };
627
628 let tool_context = if session_workspace.is_empty() {
630 crate::tools::ToolContext::new(self.tool_executor.workspace().clone())
631 .with_session_id(session_id)
632 } else {
633 crate::tools::ToolContext::new(std::path::PathBuf::from(&session_workspace))
634 .with_session_id(session_id)
635 };
636
637 let config = AgentConfig {
639 system_prompt: system,
640 tools,
641 max_tool_rounds: 50,
642 permission_policy: Some(permission_policy),
643 confirmation_manager: Some(confirmation_manager),
644 context_providers,
645 planning_enabled,
646 goal_tracking,
647 skill_tool_filters: loaded_skills,
648 hook_engine,
649 };
650
651 let agent = AgentLoop::new(llm_client, self.tool_executor.clone(), tool_context, config)
652 .with_tool_metrics(tool_metrics);
653
654 let (rx, handle) = agent.execute_streaming(&history, prompt).await?;
656
657 let abort_handle = handle.abort_handle();
659 {
660 let mut ops = self.ongoing_operations.write().await;
661 ops.insert(session_id.to_string(), abort_handle);
662 }
663
664 let session_lock_clone = session_lock.clone();
666 let original_handle = handle;
667 let stores = self.stores.clone();
668 let session_storage_types = self.session_storage_types.clone();
669 let llm_configs = self.llm_configs.clone();
670 let session_id_owned = session_id.to_string();
671 let ongoing_operations = self.ongoing_operations.clone();
672 let session_manager = self.clone();
673
674 let wrapped_handle = tokio::spawn(async move {
675 let result = original_handle.await??;
676
677 {
679 let mut ops = ongoing_operations.write().await;
680 ops.remove(&session_id_owned);
681 }
682
683 {
685 let mut session = session_lock_clone.write().await;
686 session.messages = result.messages.clone();
687 session.update_usage(&result.usage);
688 }
689
690 let storage_type = {
692 let storage_types = session_storage_types.read().await;
693 storage_types.get(&session_id_owned).cloned()
694 };
695
696 if let Some(storage_type) = storage_type {
697 if storage_type != crate::config::StorageBackend::Memory {
698 let stores_guard = stores.read().await;
699 if let Some(store) = stores_guard.get(&storage_type) {
700 let session = session_lock_clone.read().await;
701 let llm_config = {
702 let configs = llm_configs.read().await;
703 configs.get(&session_id_owned).cloned()
704 };
705 let data = session.to_session_data(llm_config);
706 if let Err(e) = store.save(&data).await {
707 tracing::warn!(
708 "Failed to persist session {} after streaming: {}",
709 session_id_owned,
710 e
711 );
712 }
713 }
714 }
715 }
716
717 if let Err(e) = session_manager.maybe_auto_compact(&session_id_owned).await {
719 tracing::warn!(
720 "Auto-compact failed for session {}: {}",
721 session_id_owned,
722 e
723 );
724 }
725
726 Ok(result)
727 });
728
729 Ok((rx, wrapped_handle))
730 }
731
732 pub async fn context_usage(&self, session_id: &str) -> Result<ContextUsage> {
734 let session_lock = self.get_session(session_id).await?;
735 let session = session_lock.read().await;
736 Ok(session.context_usage.clone())
737 }
738
739 pub async fn history(&self, session_id: &str) -> Result<Vec<Message>> {
741 let session_lock = self.get_session(session_id).await?;
742 let session = session_lock.read().await;
743 Ok(session.messages.clone())
744 }
745
746 pub async fn clear(&self, session_id: &str) -> Result<()> {
748 {
749 let session_lock = self.get_session(session_id).await?;
750 let mut session = session_lock.write().await;
751 session.clear();
752 }
753
754 self.persist_in_background(session_id, "clear");
756
757 Ok(())
758 }
759
760 pub async fn compact(&self, session_id: &str) -> Result<()> {
762 tracing::info!(name: "a3s.session.compact", session_id = %session_id, "Compacting session context");
763
764 {
765 let session_lock = self.get_session(session_id).await?;
766 let mut session = session_lock.write().await;
767
768 let llm_client = if let Some(client) = &session.llm_client {
770 client.clone()
771 } else if let Some(client) = &self.llm_client {
772 client.clone()
773 } else {
774 tracing::warn!("No LLM client configured for compaction, using simple truncation");
776 let keep_messages = 20;
777 if session.messages.len() > keep_messages {
778 let len = session.messages.len();
779 session.messages = session.messages.split_off(len - keep_messages);
780 }
781 drop(session);
783 self.persist_in_background(session_id, "compact");
784 return Ok(());
785 };
786
787 session.compact(&llm_client).await?;
788 }
789
790 self.persist_in_background(session_id, "compact");
792
793 Ok(())
794 }
795
796 pub async fn maybe_auto_compact(&self, session_id: &str) -> Result<bool> {
803 let (should_compact, percent_before, messages_before) = {
804 let session_lock = self.get_session(session_id).await?;
805 let session = session_lock.read().await;
806
807 if !session.config.auto_compact {
808 return Ok(false);
809 }
810
811 let threshold = session.config.auto_compact_threshold;
812 let percent = session.context_usage.percent;
813 let msg_count = session.messages.len();
814
815 tracing::debug!(
816 "Auto-compact check for session {}: percent={:.2}%, threshold={:.2}%, messages={}",
817 session_id,
818 percent * 100.0,
819 threshold * 100.0,
820 msg_count,
821 );
822
823 (percent >= threshold, percent, msg_count)
824 };
825
826 if !should_compact {
827 return Ok(false);
828 }
829
830 tracing::info!(
831 name: "a3s.session.auto_compact",
832 session_id = %session_id,
833 percent_before = %format!("{:.1}%", percent_before * 100.0),
834 messages_before = %messages_before,
835 "Auto-compacting session due to high context usage"
836 );
837
838 self.compact(session_id).await?;
840
841 let messages_after = {
843 let session_lock = self.get_session(session_id).await?;
844 let session = session_lock.read().await;
845 session.messages.len()
846 };
847
848 let event = AgentEvent::ContextCompacted {
850 session_id: session_id.to_string(),
851 before_messages: messages_before,
852 after_messages: messages_after,
853 percent_before,
854 };
855
856 if let Ok(session_lock) = self.get_session(session_id).await {
858 let session = session_lock.read().await;
859 let _ = session.event_tx.send(event);
860 }
861
862 tracing::info!(
863 name: "a3s.session.auto_compact.done",
864 session_id = %session_id,
865 messages_before = %messages_before,
866 messages_after = %messages_after,
867 "Auto-compaction complete"
868 );
869
870 Ok(true)
871 }
872
873 pub async fn get_llm_for_session(
877 &self,
878 session_id: &str,
879 ) -> Result<Option<Arc<dyn LlmClient>>> {
880 let session_lock = self.get_session(session_id).await?;
881 let session = session_lock.read().await;
882
883 if let Some(client) = &session.llm_client {
884 return Ok(Some(client.clone()));
885 }
886
887 Ok(self.llm_client.clone())
888 }
889
890 pub async fn configure(
892 &self,
893 session_id: &str,
894 thinking: Option<bool>,
895 budget: Option<usize>,
896 model_config: Option<LlmConfig>,
897 ) -> Result<()> {
898 {
899 let session_lock = self.get_session(session_id).await?;
900 let mut session = session_lock.write().await;
901
902 if let Some(t) = thinking {
903 session.thinking_enabled = t;
904 }
905 if let Some(b) = budget {
906 session.thinking_budget = Some(b);
907 }
908 if let Some(ref config) = model_config {
909 tracing::info!(
910 "Configuring session {} with LLM: provider={}, model={}",
911 session_id,
912 config.provider,
913 config.model
914 );
915 session.model_name = Some(config.model.clone());
916 session.llm_client = Some(llm::create_client_with_config(config.clone()));
917 }
918 }
919
920 if let Some(config) = model_config {
922 let llm_config_data = LlmConfigData {
923 provider: config.provider,
924 model: config.model,
925 api_key: None, base_url: config.base_url,
927 };
928 let mut configs = self.llm_configs.write().await;
929 configs.insert(session_id.to_string(), llm_config_data);
930 }
931
932 self.persist_in_background(session_id, "configure");
934
935 Ok(())
936 }
937
938 pub async fn session_count(&self) -> usize {
940 let sessions = self.sessions.read().await;
941 sessions.len()
942 }
943
944 pub async fn store_health(&self) -> Vec<(String, Result<()>)> {
946 let stores = self.stores.read().await;
947 let mut results = Vec::new();
948 for (_, store) in stores.iter() {
949 let name = store.backend_name().to_string();
950 let result = store.health_check().await;
951 results.push((name, result));
952 }
953 results
954 }
955
956 pub fn list_tools(&self) -> Vec<crate::llm::ToolDefinition> {
958 self.tool_executor.definitions()
959 }
960
961 pub async fn pause_session(&self, session_id: &str) -> Result<bool> {
963 let paused = {
964 let session_lock = self.get_session(session_id).await?;
965 let mut session = session_lock.write().await;
966 session.pause()
967 };
968
969 if paused {
970 self.persist_in_background(session_id, "pause");
971 }
972
973 Ok(paused)
974 }
975
976 pub async fn resume_session(&self, session_id: &str) -> Result<bool> {
978 let resumed = {
979 let session_lock = self.get_session(session_id).await?;
980 let mut session = session_lock.write().await;
981 session.resume()
982 };
983
984 if resumed {
985 self.persist_in_background(session_id, "resume");
986 }
987
988 Ok(resumed)
989 }
990
991 pub async fn cancel_operation(&self, session_id: &str) -> Result<bool> {
995 let session_lock = self.get_session(session_id).await?;
997 let cancelled_confirmations = {
998 let session = session_lock.read().await;
999 session.confirmation_manager.cancel_all().await
1000 };
1001
1002 if cancelled_confirmations > 0 {
1003 tracing::info!(
1004 "Cancelled {} pending confirmations for session {}",
1005 cancelled_confirmations,
1006 session_id
1007 );
1008 }
1009
1010 let abort_handle = {
1012 let mut ops = self.ongoing_operations.write().await;
1013 ops.remove(session_id)
1014 };
1015
1016 if let Some(handle) = abort_handle {
1017 handle.abort();
1018 tracing::info!("Cancelled ongoing operation for session {}", session_id);
1019 Ok(true)
1020 } else if cancelled_confirmations > 0 {
1021 Ok(true)
1023 } else {
1024 tracing::debug!("No ongoing operation to cancel for session {}", session_id);
1025 Ok(false)
1026 }
1027 }
1028
1029 pub async fn get_all_sessions(&self) -> Vec<Arc<RwLock<Session>>> {
1031 let sessions = self.sessions.read().await;
1032 sessions.values().cloned().collect()
1033 }
1034
1035 pub fn tool_executor(&self) -> &Arc<ToolExecutor> {
1037 &self.tool_executor
1038 }
1039
1040 pub async fn confirm_tool(
1042 &self,
1043 session_id: &str,
1044 tool_id: &str,
1045 approved: bool,
1046 reason: Option<String>,
1047 ) -> Result<bool> {
1048 let session_lock = self.get_session(session_id).await?;
1049 let session = session_lock.read().await;
1050 session
1051 .confirmation_manager
1052 .confirm(tool_id, approved, reason)
1053 .await
1054 .map_err(|e| anyhow::anyhow!(e))
1055 }
1056
1057 pub async fn set_confirmation_policy(
1059 &self,
1060 session_id: &str,
1061 policy: ConfirmationPolicy,
1062 ) -> Result<ConfirmationPolicy> {
1063 {
1064 let session_lock = self.get_session(session_id).await?;
1065 let session = session_lock.read().await;
1066 session.set_confirmation_policy(policy.clone()).await;
1067 }
1068
1069 {
1071 let session_lock = self.get_session(session_id).await?;
1072 let mut session = session_lock.write().await;
1073 session.config.confirmation_policy = Some(policy.clone());
1074 }
1075
1076 self.persist_in_background(session_id, "set_confirmation_policy");
1078
1079 Ok(policy)
1080 }
1081
1082 pub async fn get_confirmation_policy(&self, session_id: &str) -> Result<ConfirmationPolicy> {
1084 let session_lock = self.get_session(session_id).await?;
1085 let session = session_lock.read().await;
1086 Ok(session.confirmation_policy().await)
1087 }
1088
1089 pub async fn set_permission_policy(
1091 &self,
1092 session_id: &str,
1093 policy: PermissionPolicy,
1094 ) -> Result<PermissionPolicy> {
1095 {
1096 let session_lock = self.get_session(session_id).await?;
1097 let session = session_lock.read().await;
1098 session.set_permission_policy(policy.clone()).await;
1099 }
1100
1101 {
1103 let session_lock = self.get_session(session_id).await?;
1104 let mut session = session_lock.write().await;
1105 session.config.permission_policy = Some(policy.clone());
1106 }
1107
1108 self.persist_in_background(session_id, "set_permission_policy");
1110
1111 Ok(policy)
1112 }
1113
1114 pub async fn get_permission_policy(&self, session_id: &str) -> Result<PermissionPolicy> {
1116 let session_lock = self.get_session(session_id).await?;
1117 let session = session_lock.read().await;
1118 Ok(session.permission_policy().await)
1119 }
1120
1121 pub async fn check_permission(
1123 &self,
1124 session_id: &str,
1125 tool_name: &str,
1126 args: &serde_json::Value,
1127 ) -> Result<PermissionDecision> {
1128 let session_lock = self.get_session(session_id).await?;
1129 let session = session_lock.read().await;
1130 Ok(session.check_permission(tool_name, args).await)
1131 }
1132
1133 pub async fn add_permission_rule(
1135 &self,
1136 session_id: &str,
1137 rule_type: &str,
1138 rule: &str,
1139 ) -> Result<()> {
1140 let session_lock = self.get_session(session_id).await?;
1141 let session = session_lock.read().await;
1142 match rule_type {
1143 "allow" => session.add_allow_rule(rule).await,
1144 "deny" => session.add_deny_rule(rule).await,
1145 "ask" => session.add_ask_rule(rule).await,
1146 _ => anyhow::bail!("Unknown rule type: {}", rule_type),
1147 }
1148 Ok(())
1149 }
1150
1151 pub async fn add_context_provider(
1153 &self,
1154 session_id: &str,
1155 provider: Arc<dyn crate::context::ContextProvider>,
1156 ) -> Result<()> {
1157 let session_lock = self.get_session(session_id).await?;
1158 let mut session = session_lock.write().await;
1159 session.add_context_provider(provider);
1160 Ok(())
1161 }
1162
1163 pub async fn remove_context_provider(&self, session_id: &str, name: &str) -> Result<bool> {
1165 let session_lock = self.get_session(session_id).await?;
1166 let mut session = session_lock.write().await;
1167 Ok(session.remove_context_provider(name))
1168 }
1169
1170 pub async fn list_context_providers(&self, session_id: &str) -> Result<Vec<String>> {
1172 let session_lock = self.get_session(session_id).await?;
1173 let session = session_lock.read().await;
1174 Ok(session.context_provider_names())
1175 }
1176
1177 pub async fn set_lane_handler(
1179 &self,
1180 session_id: &str,
1181 lane: crate::hitl::SessionLane,
1182 config: crate::queue::LaneHandlerConfig,
1183 ) -> Result<()> {
1184 let session_lock = self.get_session(session_id).await?;
1185 let session = session_lock.read().await;
1186 session.set_lane_handler(lane, config).await;
1187 Ok(())
1188 }
1189
1190 pub async fn get_lane_handler(
1192 &self,
1193 session_id: &str,
1194 lane: crate::hitl::SessionLane,
1195 ) -> Result<crate::queue::LaneHandlerConfig> {
1196 let session_lock = self.get_session(session_id).await?;
1197 let session = session_lock.read().await;
1198 Ok(session.get_lane_handler(lane).await)
1199 }
1200
1201 pub async fn complete_external_task(
1203 &self,
1204 session_id: &str,
1205 task_id: &str,
1206 result: crate::queue::ExternalTaskResult,
1207 ) -> Result<bool> {
1208 let session_lock = self.get_session(session_id).await?;
1209 let session = session_lock.read().await;
1210 Ok(session.complete_external_task(task_id, result).await)
1211 }
1212
1213 pub async fn pending_external_tasks(
1215 &self,
1216 session_id: &str,
1217 ) -> Result<Vec<crate::queue::ExternalTask>> {
1218 let session_lock = self.get_session(session_id).await?;
1219 let session = session_lock.read().await;
1220 Ok(session.pending_external_tasks().await)
1221 }
1222
1223 pub async fn get_tasks(&self, session_id: &str) -> Result<Vec<Task>> {
1229 let session_lock = self.get_session(session_id).await?;
1230 let session = session_lock.read().await;
1231 Ok(session.get_tasks().to_vec())
1232 }
1233
1234 pub async fn set_tasks(&self, session_id: &str, tasks: Vec<Task>) -> Result<Vec<Task>> {
1236 {
1237 let session_lock = self.get_session(session_id).await?;
1238 let mut session = session_lock.write().await;
1239 session.set_tasks(tasks);
1240 }
1241
1242 self.persist_in_background(session_id, "task_update");
1244
1245 self.get_tasks(session_id).await
1247 }
1248
1249 pub async fn fork_session(
1262 &self,
1263 source_id: &str,
1264 new_id: String,
1265 new_name: Option<String>,
1266 ) -> Result<String> {
1267 tracing::info!(
1268 name: "a3s.session.fork",
1269 source_id = %source_id,
1270 new_id = %new_id,
1271 "Forking session"
1272 );
1273
1274 let (
1276 source_config,
1277 source_messages,
1278 source_usage,
1279 source_cost,
1280 source_model_name,
1281 source_thinking_enabled,
1282 source_thinking_budget,
1283 source_tasks,
1284 source_context_usage,
1285 ) = {
1286 let session_lock = self
1287 .get_session(source_id)
1288 .await
1289 .context(format!("Source session '{}' not found for fork", source_id))?;
1290 let session = session_lock.read().await;
1291 (
1292 session.config.clone(),
1293 session.messages.clone(),
1294 session.total_usage.clone(),
1295 session.total_cost,
1296 session.model_name.clone(),
1297 session.thinking_enabled,
1298 session.thinking_budget,
1299 session.tasks.clone(),
1300 session.context_usage.clone(),
1301 )
1302 };
1303
1304 let source_llm_config = {
1306 let configs = self.llm_configs.read().await;
1307 configs.get(source_id).cloned()
1308 };
1309
1310 let mut forked_config = source_config;
1312 if let Some(name) = new_name {
1313 forked_config.name = name;
1314 } else {
1315 forked_config.name = format!("{} (fork)", forked_config.name);
1316 }
1317 forked_config.parent_id = Some(source_id.to_string());
1318
1319 let tools = self.tool_executor.definitions();
1321 let mut new_session = Session::new(new_id.clone(), forked_config, tools).await?;
1322
1323 new_session.messages = source_messages;
1325 new_session.total_usage = source_usage;
1326 new_session.total_cost = source_cost;
1327 new_session.model_name = source_model_name;
1328 new_session.thinking_enabled = source_thinking_enabled;
1329 new_session.thinking_budget = source_thinking_budget;
1330 new_session.tasks = source_tasks;
1331 new_session.context_usage = source_context_usage;
1332
1333 new_session.start_queue().await?;
1335
1336 {
1338 let mut storage_types = self.session_storage_types.write().await;
1339 storage_types.insert(new_id.clone(), new_session.config.storage_type.clone());
1340 }
1341
1342 if let Some(llm_config) = source_llm_config {
1344 let mut configs = self.llm_configs.write().await;
1345 configs.insert(new_id.clone(), llm_config);
1346 }
1347
1348 {
1350 let mut sessions = self.sessions.write().await;
1351 sessions.insert(new_id.clone(), Arc::new(RwLock::new(new_session)));
1352 }
1353
1354 self.persist_in_background(&new_id, "fork");
1356
1357 tracing::info!(
1358 "Forked session '{}' -> '{}' with parent_id set",
1359 source_id,
1360 new_id,
1361 );
1362
1363 Ok(new_id)
1364 }
1365
1366 pub async fn generate_title(&self, session_id: &str) -> Result<Option<String>> {
1374 tracing::info!(
1375 name: "a3s.session.generate_title",
1376 session_id = %session_id,
1377 "Generating session title"
1378 );
1379
1380 let messages = {
1382 let session_lock = self.get_session(session_id).await?;
1383 let session = session_lock.read().await;
1384
1385 if session.messages.is_empty() {
1386 return Ok(None);
1387 }
1388
1389 session.messages.iter().take(4).cloned().collect::<Vec<_>>()
1391 };
1392
1393 let llm_client = self.get_llm_for_session(session_id).await?;
1395 let Some(client) = llm_client else {
1396 tracing::debug!("No LLM client available for title generation");
1397 return Ok(None);
1398 };
1399
1400 let mut conversation_summary = String::new();
1402 for msg in &messages {
1403 let role = &msg.role;
1404 for block in &msg.content {
1405 if let ContentBlock::Text { text } = block {
1406 let truncated = if text.len() > 200 {
1408 format!("{}...", &text[..200])
1409 } else {
1410 text.clone()
1411 };
1412 conversation_summary.push_str(&format!("{}: {}\n", role, truncated));
1413 }
1414 }
1415 }
1416
1417 if conversation_summary.is_empty() {
1418 return Ok(None);
1419 }
1420
1421 let title_prompt = Message::user(&crate::prompts::render(
1423 crate::prompts::TITLE_GENERATE,
1424 &[("conversation", &conversation_summary)],
1425 ));
1426
1427 let response = client
1428 .complete(&[title_prompt], None, &[])
1429 .await
1430 .context("Failed to generate session title")?;
1431
1432 let title = response
1434 .message
1435 .content
1436 .iter()
1437 .find_map(|block| {
1438 if let ContentBlock::Text { text } = block {
1439 Some(text.trim().to_string())
1440 } else {
1441 None
1442 }
1443 })
1444 .unwrap_or_default();
1445
1446 if title.is_empty() {
1447 return Ok(None);
1448 }
1449
1450 let title = if title.len() > 80 {
1452 format!("{}...", &title[..77])
1453 } else {
1454 title
1455 };
1456
1457 {
1459 let session_lock = self.get_session(session_id).await?;
1460 let mut session = session_lock.write().await;
1461 session.config.name = title.clone();
1462 session.touch();
1463 }
1464
1465 self.persist_in_background(session_id, "title_generation");
1467
1468 tracing::info!("Generated title for session '{}': '{}'", session_id, title);
1469 Ok(Some(title))
1470 }
1471}