1use crate::agent::{AgentConfig, AgentEvent, AgentLoop, AgentResult};
20use crate::commands::{CommandContext, CommandRegistry};
21use crate::config::CodeConfig;
22use crate::error::{read_or_recover, write_or_recover, Result};
23use crate::llm::{LlmClient, Message};
24use crate::prompts::SystemPromptSlots;
25use crate::queue::{
26 ExternalTask, ExternalTaskResult, LaneHandlerConfig, SessionLane, SessionQueueConfig,
27 SessionQueueStats,
28};
29use crate::session_lane_queue::SessionLaneQueue;
30use crate::tools::{ToolContext, ToolExecutor};
31use a3s_lane::{DeadLetter, MetricsSnapshot};
32use a3s_memory::{FileMemoryStore, MemoryStore};
33use anyhow::Context;
34use std::path::{Path, PathBuf};
35use std::sync::{Arc, RwLock};
36use tokio::sync::{broadcast, mpsc};
37use tokio::task::JoinHandle;
38
39fn safe_canonicalize(path: &Path) -> PathBuf {
42 match std::fs::canonicalize(path) {
43 Ok(p) => strip_unc_prefix(p),
44 Err(_) => path.to_path_buf(),
45 }
46}
47
48fn strip_unc_prefix(path: PathBuf) -> PathBuf {
51 #[cfg(windows)]
52 {
53 let s = path.to_string_lossy();
54 if let Some(stripped) = s.strip_prefix(r"\\?\") {
55 return PathBuf::from(stripped);
56 }
57 }
58 path
59}
60
61#[derive(Debug, Clone)]
67pub struct ToolCallResult {
68 pub name: String,
69 pub output: String,
70 pub exit_code: i32,
71}
72
73#[derive(Clone, Default)]
79pub struct SessionOptions {
80 pub model: Option<String>,
82 pub agent_dirs: Vec<PathBuf>,
85 pub queue_config: Option<SessionQueueConfig>,
90 pub security_provider: Option<Arc<dyn crate::security::SecurityProvider>>,
92 pub context_providers: Vec<Arc<dyn crate::context::ContextProvider>>,
94 pub confirmation_manager: Option<Arc<dyn crate::hitl::ConfirmationProvider>>,
96 pub permission_checker: Option<Arc<dyn crate::permissions::PermissionChecker>>,
98 pub planning_enabled: bool,
100 pub goal_tracking: bool,
102 pub skill_dirs: Vec<PathBuf>,
105 pub skill_registry: Option<Arc<crate::skills::SkillRegistry>>,
107 pub memory_store: Option<Arc<dyn MemoryStore>>,
109 pub(crate) file_memory_dir: Option<PathBuf>,
111 pub session_store: Option<Arc<dyn crate::store::SessionStore>>,
113 pub session_id: Option<String>,
115 pub auto_save: bool,
117 pub max_parse_retries: Option<u32>,
120 pub tool_timeout_ms: Option<u64>,
123 pub circuit_breaker_threshold: Option<u32>,
127 pub sandbox_config: Option<crate::sandbox::SandboxConfig>,
133 pub auto_compact: bool,
135 pub auto_compact_threshold: Option<f32>,
138 pub continuation_enabled: Option<bool>,
141 pub max_continuation_turns: Option<u32>,
144 pub mcp_manager: Option<Arc<crate::mcp::manager::McpManager>>,
149 pub temperature: Option<f32>,
151 pub thinking_budget: Option<usize>,
153 pub max_tool_rounds: Option<usize>,
159 pub prompt_slots: Option<SystemPromptSlots>,
165}
166
167impl std::fmt::Debug for SessionOptions {
168 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169 f.debug_struct("SessionOptions")
170 .field("model", &self.model)
171 .field("agent_dirs", &self.agent_dirs)
172 .field("skill_dirs", &self.skill_dirs)
173 .field("queue_config", &self.queue_config)
174 .field("security_provider", &self.security_provider.is_some())
175 .field("context_providers", &self.context_providers.len())
176 .field("confirmation_manager", &self.confirmation_manager.is_some())
177 .field("permission_checker", &self.permission_checker.is_some())
178 .field("planning_enabled", &self.planning_enabled)
179 .field("goal_tracking", &self.goal_tracking)
180 .field(
181 "skill_registry",
182 &self
183 .skill_registry
184 .as_ref()
185 .map(|r| format!("{} skills", r.len())),
186 )
187 .field("memory_store", &self.memory_store.is_some())
188 .field("session_store", &self.session_store.is_some())
189 .field("session_id", &self.session_id)
190 .field("auto_save", &self.auto_save)
191 .field("max_parse_retries", &self.max_parse_retries)
192 .field("tool_timeout_ms", &self.tool_timeout_ms)
193 .field("circuit_breaker_threshold", &self.circuit_breaker_threshold)
194 .field("sandbox_config", &self.sandbox_config)
195 .field("auto_compact", &self.auto_compact)
196 .field("auto_compact_threshold", &self.auto_compact_threshold)
197 .field("continuation_enabled", &self.continuation_enabled)
198 .field("max_continuation_turns", &self.max_continuation_turns)
199 .field("mcp_manager", &self.mcp_manager.is_some())
200 .field("temperature", &self.temperature)
201 .field("thinking_budget", &self.thinking_budget)
202 .field("max_tool_rounds", &self.max_tool_rounds)
203 .field("prompt_slots", &self.prompt_slots.is_some())
204 .finish()
205 }
206}
207
208impl SessionOptions {
209 pub fn new() -> Self {
210 Self::default()
211 }
212
213 pub fn with_model(mut self, model: impl Into<String>) -> Self {
214 self.model = Some(model.into());
215 self
216 }
217
218 pub fn with_agent_dir(mut self, dir: impl Into<PathBuf>) -> Self {
219 self.agent_dirs.push(dir.into());
220 self
221 }
222
223 pub fn with_queue_config(mut self, config: SessionQueueConfig) -> Self {
224 self.queue_config = Some(config);
225 self
226 }
227
228 pub fn with_default_security(mut self) -> Self {
230 self.security_provider = Some(Arc::new(crate::security::DefaultSecurityProvider::new()));
231 self
232 }
233
234 pub fn with_security_provider(
236 mut self,
237 provider: Arc<dyn crate::security::SecurityProvider>,
238 ) -> Self {
239 self.security_provider = Some(provider);
240 self
241 }
242
243 pub fn with_fs_context(mut self, root_path: impl Into<PathBuf>) -> Self {
245 let config = crate::context::FileSystemContextConfig::new(root_path);
246 self.context_providers
247 .push(Arc::new(crate::context::FileSystemContextProvider::new(
248 config,
249 )));
250 self
251 }
252
253 pub fn with_context_provider(
255 mut self,
256 provider: Arc<dyn crate::context::ContextProvider>,
257 ) -> Self {
258 self.context_providers.push(provider);
259 self
260 }
261
262 pub fn with_confirmation_manager(
264 mut self,
265 manager: Arc<dyn crate::hitl::ConfirmationProvider>,
266 ) -> Self {
267 self.confirmation_manager = Some(manager);
268 self
269 }
270
271 pub fn with_permission_checker(
273 mut self,
274 checker: Arc<dyn crate::permissions::PermissionChecker>,
275 ) -> Self {
276 self.permission_checker = Some(checker);
277 self
278 }
279
280 pub fn with_permissive_policy(self) -> Self {
287 self.with_permission_checker(Arc::new(crate::permissions::PermissionPolicy::permissive()))
288 }
289
290 pub fn with_planning(mut self, enabled: bool) -> Self {
292 self.planning_enabled = enabled;
293 self
294 }
295
296 pub fn with_goal_tracking(mut self, enabled: bool) -> Self {
298 self.goal_tracking = enabled;
299 self
300 }
301
302 pub fn with_builtin_skills(mut self) -> Self {
304 self.skill_registry = Some(Arc::new(crate::skills::SkillRegistry::with_builtins()));
305 self
306 }
307
308 pub fn with_skill_registry(mut self, registry: Arc<crate::skills::SkillRegistry>) -> Self {
310 self.skill_registry = Some(registry);
311 self
312 }
313
314 pub fn with_skill_dirs(mut self, dirs: impl IntoIterator<Item = impl Into<PathBuf>>) -> Self {
317 self.skill_dirs.extend(dirs.into_iter().map(Into::into));
318 self
319 }
320
321 pub fn with_skills_from_dir(mut self, dir: impl AsRef<std::path::Path>) -> Self {
323 let registry = self
324 .skill_registry
325 .unwrap_or_else(|| Arc::new(crate::skills::SkillRegistry::new()));
326 if let Err(e) = registry.load_from_dir(&dir) {
327 tracing::warn!(
328 dir = %dir.as_ref().display(),
329 error = %e,
330 "Failed to load skills from directory — continuing without them"
331 );
332 }
333 self.skill_registry = Some(registry);
334 self
335 }
336
337 pub fn with_memory(mut self, store: Arc<dyn MemoryStore>) -> Self {
339 self.memory_store = Some(store);
340 self
341 }
342
343 pub fn with_file_memory(mut self, dir: impl Into<PathBuf>) -> Self {
349 self.file_memory_dir = Some(dir.into());
350 self
351 }
352
353 pub fn with_session_store(mut self, store: Arc<dyn crate::store::SessionStore>) -> Self {
355 self.session_store = Some(store);
356 self
357 }
358
359 pub fn with_file_session_store(mut self, dir: impl Into<PathBuf>) -> Self {
361 let dir = dir.into();
362 match tokio::runtime::Handle::try_current() {
363 Ok(handle) => {
364 match tokio::task::block_in_place(|| {
365 handle.block_on(crate::store::FileSessionStore::new(dir))
366 }) {
367 Ok(store) => {
368 self.session_store =
369 Some(Arc::new(store) as Arc<dyn crate::store::SessionStore>);
370 }
371 Err(e) => {
372 tracing::warn!("Failed to create file session store: {}", e);
373 }
374 }
375 }
376 Err(_) => {
377 tracing::warn!(
378 "No async runtime available for file session store — persistence disabled"
379 );
380 }
381 }
382 self
383 }
384
385 pub fn with_session_id(mut self, id: impl Into<String>) -> Self {
387 self.session_id = Some(id.into());
388 self
389 }
390
391 pub fn with_auto_save(mut self, enabled: bool) -> Self {
393 self.auto_save = enabled;
394 self
395 }
396
397 pub fn with_parse_retries(mut self, max: u32) -> Self {
403 self.max_parse_retries = Some(max);
404 self
405 }
406
407 pub fn with_tool_timeout(mut self, timeout_ms: u64) -> Self {
413 self.tool_timeout_ms = Some(timeout_ms);
414 self
415 }
416
417 pub fn with_circuit_breaker(mut self, threshold: u32) -> Self {
423 self.circuit_breaker_threshold = Some(threshold);
424 self
425 }
426
427 pub fn with_resilience_defaults(self) -> Self {
433 self.with_parse_retries(2)
434 .with_tool_timeout(120_000)
435 .with_circuit_breaker(3)
436 }
437
438 pub fn with_sandbox(mut self, config: crate::sandbox::SandboxConfig) -> Self {
457 self.sandbox_config = Some(config);
458 self
459 }
460
461 pub fn with_auto_compact(mut self, enabled: bool) -> Self {
466 self.auto_compact = enabled;
467 self
468 }
469
470 pub fn with_auto_compact_threshold(mut self, threshold: f32) -> Self {
472 self.auto_compact_threshold = Some(threshold.clamp(0.0, 1.0));
473 self
474 }
475
476 pub fn with_continuation(mut self, enabled: bool) -> Self {
481 self.continuation_enabled = Some(enabled);
482 self
483 }
484
485 pub fn with_max_continuation_turns(mut self, turns: u32) -> Self {
487 self.max_continuation_turns = Some(turns);
488 self
489 }
490
491 pub fn with_mcp(mut self, manager: Arc<crate::mcp::manager::McpManager>) -> Self {
496 self.mcp_manager = Some(manager);
497 self
498 }
499
500 pub fn with_temperature(mut self, temperature: f32) -> Self {
501 self.temperature = Some(temperature);
502 self
503 }
504
505 pub fn with_thinking_budget(mut self, budget: usize) -> Self {
506 self.thinking_budget = Some(budget);
507 self
508 }
509
510 pub fn with_max_tool_rounds(mut self, rounds: usize) -> Self {
515 self.max_tool_rounds = Some(rounds);
516 self
517 }
518
519 pub fn with_prompt_slots(mut self, slots: SystemPromptSlots) -> Self {
524 self.prompt_slots = Some(slots);
525 self
526 }
527}
528
529pub struct Agent {
538 llm_client: Arc<dyn LlmClient>,
539 code_config: CodeConfig,
540 config: AgentConfig,
541 global_mcp: Option<Arc<crate::mcp::manager::McpManager>>,
543 global_mcp_tools: std::sync::Mutex<Vec<(String, crate::mcp::McpTool)>>,
546}
547
548impl std::fmt::Debug for Agent {
549 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
550 f.debug_struct("Agent").finish()
551 }
552}
553
554impl Agent {
555 pub async fn new(config_source: impl Into<String>) -> Result<Self> {
559 let source = config_source.into();
560
561 let expanded = if let Some(rest) = source.strip_prefix("~/") {
563 let home = std::env::var_os("HOME").or_else(|| std::env::var_os("USERPROFILE"));
564 if let Some(home) = home {
565 PathBuf::from(home).join(rest).display().to_string()
566 } else {
567 source.clone()
568 }
569 } else {
570 source.clone()
571 };
572
573 let path = Path::new(&expanded);
574
575 let config = if path.extension().is_some() && path.exists() {
576 CodeConfig::from_file(path)
577 .with_context(|| format!("Failed to load config: {}", path.display()))?
578 } else {
579 CodeConfig::from_hcl(&source).context("Failed to parse config as HCL string")?
581 };
582
583 Self::from_config(config).await
584 }
585
586 pub async fn create(config_source: impl Into<String>) -> Result<Self> {
591 Self::new(config_source).await
592 }
593
594 pub async fn from_config(config: CodeConfig) -> Result<Self> {
596 let llm_config = config
597 .default_llm_config()
598 .context("default_model must be set in 'provider/model' format with a valid API key")?;
599 let llm_client = crate::llm::create_client_with_config(llm_config);
600
601 let agent_config = AgentConfig {
602 max_tool_rounds: config
603 .max_tool_rounds
604 .unwrap_or(AgentConfig::default().max_tool_rounds),
605 ..AgentConfig::default()
606 };
607
608 let (global_mcp, global_mcp_tools) = if config.mcp_servers.is_empty() {
610 (None, vec![])
611 } else {
612 let manager = Arc::new(crate::mcp::manager::McpManager::new());
613 for server in &config.mcp_servers {
614 if !server.enabled {
615 continue;
616 }
617 manager.register_server(server.clone()).await;
618 if let Err(e) = manager.connect(&server.name).await {
619 tracing::warn!(
620 server = %server.name,
621 error = %e,
622 "Failed to connect to MCP server — skipping"
623 );
624 }
625 }
626 let tools = manager.get_all_tools().await;
628 (Some(manager), tools)
629 };
630
631 let mut agent = Agent {
632 llm_client,
633 code_config: config,
634 config: agent_config,
635 global_mcp,
636 global_mcp_tools: std::sync::Mutex::new(global_mcp_tools),
637 };
638
639 let registry = Arc::new(crate::skills::SkillRegistry::with_builtins());
641 for dir in &agent.code_config.skill_dirs.clone() {
642 if let Err(e) = registry.load_from_dir(dir) {
643 tracing::warn!(
644 dir = %dir.display(),
645 error = %e,
646 "Failed to load skills from directory — skipping"
647 );
648 }
649 }
650 agent.config.skill_registry = Some(registry);
651
652 Ok(agent)
653 }
654
655 pub async fn refresh_mcp_tools(&self) -> Result<()> {
663 if let Some(ref mcp) = self.global_mcp {
664 let fresh = mcp.get_all_tools().await;
665 *self
666 .global_mcp_tools
667 .lock()
668 .expect("global_mcp_tools lock poisoned") = fresh;
669 }
670 Ok(())
671 }
672
673 pub fn session(
678 &self,
679 workspace: impl Into<String>,
680 options: Option<SessionOptions>,
681 ) -> Result<AgentSession> {
682 let opts = options.unwrap_or_default();
683
684 let llm_client = if let Some(ref model) = opts.model {
685 let (provider_name, model_id) = model
686 .split_once('/')
687 .context("model format must be 'provider/model' (e.g., 'openai/gpt-4o')")?;
688
689 let mut llm_config = self
690 .code_config
691 .llm_config(provider_name, model_id)
692 .with_context(|| {
693 format!("provider '{provider_name}' or model '{model_id}' not found in config")
694 })?;
695
696 if let Some(temp) = opts.temperature {
697 llm_config = llm_config.with_temperature(temp);
698 }
699 if let Some(budget) = opts.thinking_budget {
700 llm_config = llm_config.with_thinking_budget(budget);
701 }
702
703 crate::llm::create_client_with_config(llm_config)
704 } else {
705 if opts.temperature.is_some() || opts.thinking_budget.is_some() {
706 tracing::warn!(
707 "temperature/thinking_budget set without model override — these will be ignored. \
708 Use with_model() to apply LLM parameter overrides."
709 );
710 }
711 self.llm_client.clone()
712 };
713
714 let merged_opts = match (&self.global_mcp, &opts.mcp_manager) {
717 (Some(global), Some(session)) => {
718 let global = Arc::clone(global);
719 let session_mgr = Arc::clone(session);
720 match tokio::runtime::Handle::try_current() {
721 Ok(handle) => {
722 let global_for_merge = Arc::clone(&global);
723 tokio::task::block_in_place(|| {
724 handle.block_on(async move {
725 for config in session_mgr.all_configs().await {
726 let name = config.name.clone();
727 global_for_merge.register_server(config).await;
728 if let Err(e) = global_for_merge.connect(&name).await {
729 tracing::warn!(
730 server = %name,
731 error = %e,
732 "Failed to connect session-level MCP server — skipping"
733 );
734 }
735 }
736 })
737 });
738 }
739 Err(_) => {
740 tracing::warn!(
741 "No async runtime available to merge session-level MCP servers \
742 into global manager — session MCP servers will not be available"
743 );
744 }
745 }
746 SessionOptions {
747 mcp_manager: Some(Arc::clone(&global)),
748 ..opts
749 }
750 }
751 (Some(global), None) => SessionOptions {
752 mcp_manager: Some(Arc::clone(global)),
753 ..opts
754 },
755 _ => opts,
756 };
757
758 self.build_session(workspace.into(), llm_client, &merged_opts)
759 }
760
761 pub fn session_for_agent(
776 &self,
777 workspace: impl Into<String>,
778 def: &crate::subagent::AgentDefinition,
779 extra: Option<SessionOptions>,
780 ) -> Result<AgentSession> {
781 let mut opts = extra.unwrap_or_default();
782
783 if opts.permission_checker.is_none()
785 && (!def.permissions.allow.is_empty() || !def.permissions.deny.is_empty())
786 {
787 opts.permission_checker = Some(Arc::new(def.permissions.clone()));
788 }
789
790 if opts.max_tool_rounds.is_none() {
792 if let Some(steps) = def.max_steps {
793 opts.max_tool_rounds = Some(steps);
794 }
795 }
796
797 if opts.model.is_none() {
799 if let Some(ref m) = def.model {
800 let provider = m.provider.as_deref().unwrap_or("anthropic");
801 opts.model = Some(format!("{}/{}", provider, m.model));
802 }
803 }
804
805 if opts.prompt_slots.is_none() {
807 if let Some(ref prompt) = def.prompt {
808 opts.prompt_slots = Some(crate::prompts::SystemPromptSlots {
809 extra: Some(prompt.clone()),
810 ..Default::default()
811 });
812 }
813 }
814
815 self.session(workspace, Some(opts))
816 }
817
818 pub fn resume_session(
826 &self,
827 session_id: &str,
828 options: SessionOptions,
829 ) -> Result<AgentSession> {
830 let store = options.session_store.as_ref().ok_or_else(|| {
831 crate::error::CodeError::Session(
832 "resume_session requires a session_store in SessionOptions".to_string(),
833 )
834 })?;
835
836 let data = match tokio::runtime::Handle::try_current() {
838 Ok(handle) => tokio::task::block_in_place(|| handle.block_on(store.load(session_id)))
839 .map_err(|e| {
840 crate::error::CodeError::Session(format!(
841 "Failed to load session {}: {}",
842 session_id, e
843 ))
844 })?,
845 Err(_) => {
846 return Err(crate::error::CodeError::Session(
847 "No async runtime available for session resume".to_string(),
848 ))
849 }
850 };
851
852 let data = data.ok_or_else(|| {
853 crate::error::CodeError::Session(format!("Session not found: {}", session_id))
854 })?;
855
856 let mut opts = options;
858 opts.session_id = Some(data.id.clone());
859
860 let llm_client = if let Some(ref model) = opts.model {
861 let (provider_name, model_id) = model
862 .split_once('/')
863 .context("model format must be 'provider/model'")?;
864 let llm_config = self
865 .code_config
866 .llm_config(provider_name, model_id)
867 .with_context(|| {
868 format!("provider '{provider_name}' or model '{model_id}' not found")
869 })?;
870 crate::llm::create_client_with_config(llm_config)
871 } else {
872 self.llm_client.clone()
873 };
874
875 let session = self.build_session(data.config.workspace.clone(), llm_client, &opts)?;
876
877 *write_or_recover(&session.history) = data.messages;
879
880 Ok(session)
881 }
882
883 fn build_session(
884 &self,
885 workspace: String,
886 llm_client: Arc<dyn LlmClient>,
887 opts: &SessionOptions,
888 ) -> Result<AgentSession> {
889 let canonical = safe_canonicalize(Path::new(&workspace));
890
891 let tool_executor = Arc::new(ToolExecutor::new(canonical.display().to_string()));
892
893 {
897 use crate::subagent::{load_agents_from_dir, AgentRegistry};
898 use crate::tools::register_task_with_mcp;
899 let agent_registry = AgentRegistry::new();
900 for dir in self
901 .code_config
902 .agent_dirs
903 .iter()
904 .chain(opts.agent_dirs.iter())
905 {
906 for agent in load_agents_from_dir(dir) {
907 agent_registry.register(agent);
908 }
909 }
910 register_task_with_mcp(
911 tool_executor.registry(),
912 Arc::clone(&llm_client),
913 Arc::new(agent_registry),
914 canonical.display().to_string(),
915 opts.mcp_manager.clone(),
916 );
917 }
918
919 if let Some(ref mcp) = opts.mcp_manager {
922 let all_tools: Vec<(String, crate::mcp::McpTool)> = if std::ptr::eq(
925 Arc::as_ptr(mcp),
926 self.global_mcp
927 .as_ref()
928 .map(Arc::as_ptr)
929 .unwrap_or(std::ptr::null()),
930 ) {
931 self.global_mcp_tools
933 .lock()
934 .expect("global_mcp_tools lock poisoned")
935 .clone()
936 } else {
937 match tokio::runtime::Handle::try_current() {
939 Ok(handle) => {
940 tokio::task::block_in_place(|| handle.block_on(mcp.get_all_tools()))
941 }
942 Err(_) => {
943 tracing::warn!(
944 "No async runtime available for session-level MCP tools — \
945 MCP tools will not be registered"
946 );
947 vec![]
948 }
949 }
950 };
951
952 let mut by_server: std::collections::HashMap<String, Vec<crate::mcp::McpTool>> =
953 std::collections::HashMap::new();
954 for (server, tool) in all_tools {
955 by_server.entry(server).or_default().push(tool);
956 }
957 for (server_name, tools) in by_server {
958 for tool in
959 crate::mcp::tools::create_mcp_tools(&server_name, tools, Arc::clone(mcp))
960 {
961 tool_executor.register_dynamic_tool(tool);
962 }
963 }
964 }
965
966 let tool_defs = tool_executor.definitions();
967
968 let mut prompt_slots = opts
970 .prompt_slots
971 .clone()
972 .unwrap_or_else(|| self.config.prompt_slots.clone());
973
974 let base_registry = self
978 .config
979 .skill_registry
980 .as_deref()
981 .map(|r| r.fork())
982 .unwrap_or_else(crate::skills::SkillRegistry::with_builtins);
983 if let Some(ref r) = opts.skill_registry {
985 for skill in r.all() {
986 base_registry.register_unchecked(skill);
987 }
988 }
989 for dir in &opts.skill_dirs {
991 if let Err(e) = base_registry.load_from_dir(dir) {
992 tracing::warn!(
993 dir = %dir.display(),
994 error = %e,
995 "Failed to load session skill dir — skipping"
996 );
997 }
998 }
999 let effective_registry = Arc::new(base_registry);
1000
1001 let skill_prompt = effective_registry.to_system_prompt();
1003 if !skill_prompt.is_empty() {
1004 prompt_slots.extra = match prompt_slots.extra {
1005 Some(existing) => Some(format!("{}\n\n{}", existing, skill_prompt)),
1006 None => Some(skill_prompt),
1007 };
1008 }
1009
1010 let mut init_warning: Option<String> = None;
1012 let memory = {
1013 let store = if let Some(ref store) = opts.memory_store {
1014 Some(Arc::clone(store))
1015 } else if let Some(ref dir) = opts.file_memory_dir {
1016 match tokio::runtime::Handle::try_current() {
1017 Ok(handle) => {
1018 let dir = dir.clone();
1019 match tokio::task::block_in_place(|| {
1020 handle.block_on(FileMemoryStore::new(dir))
1021 }) {
1022 Ok(store) => Some(Arc::new(store) as Arc<dyn MemoryStore>),
1023 Err(e) => {
1024 let msg = format!("Failed to create file memory store: {}", e);
1025 tracing::warn!("{}", msg);
1026 init_warning = Some(msg);
1027 None
1028 }
1029 }
1030 }
1031 Err(_) => {
1032 let msg =
1033 "No async runtime available for file memory store — memory disabled"
1034 .to_string();
1035 tracing::warn!("{}", msg);
1036 init_warning = Some(msg);
1037 None
1038 }
1039 }
1040 } else {
1041 None
1042 };
1043 store.map(|s| Arc::new(crate::memory::AgentMemory::new(s)))
1044 };
1045
1046 let base = self.config.clone();
1047 let config = AgentConfig {
1048 prompt_slots,
1049 tools: tool_defs,
1050 security_provider: opts.security_provider.clone(),
1051 permission_checker: opts.permission_checker.clone(),
1052 confirmation_manager: opts.confirmation_manager.clone(),
1053 context_providers: opts.context_providers.clone(),
1054 planning_enabled: opts.planning_enabled,
1055 goal_tracking: opts.goal_tracking,
1056 skill_registry: Some(effective_registry),
1057 max_parse_retries: opts.max_parse_retries.unwrap_or(base.max_parse_retries),
1058 tool_timeout_ms: opts.tool_timeout_ms.or(base.tool_timeout_ms),
1059 circuit_breaker_threshold: opts
1060 .circuit_breaker_threshold
1061 .unwrap_or(base.circuit_breaker_threshold),
1062 auto_compact: opts.auto_compact,
1063 auto_compact_threshold: opts
1064 .auto_compact_threshold
1065 .unwrap_or(crate::session::DEFAULT_AUTO_COMPACT_THRESHOLD),
1066 max_context_tokens: base.max_context_tokens,
1067 llm_client: Some(Arc::clone(&llm_client)),
1068 memory: memory.clone(),
1069 continuation_enabled: opts
1070 .continuation_enabled
1071 .unwrap_or(base.continuation_enabled),
1072 max_continuation_turns: opts
1073 .max_continuation_turns
1074 .unwrap_or(base.max_continuation_turns),
1075 max_tool_rounds: opts.max_tool_rounds.unwrap_or(base.max_tool_rounds),
1076 ..base
1077 };
1078
1079 let (agent_event_tx, _) = broadcast::channel::<crate::agent::AgentEvent>(256);
1082 let command_queue = if let Some(ref queue_config) = opts.queue_config {
1083 let session_id = uuid::Uuid::new_v4().to_string();
1084 let rt = tokio::runtime::Handle::try_current();
1085
1086 match rt {
1087 Ok(handle) => {
1088 let queue = tokio::task::block_in_place(|| {
1090 handle.block_on(SessionLaneQueue::new(
1091 &session_id,
1092 queue_config.clone(),
1093 agent_event_tx.clone(),
1094 ))
1095 });
1096 match queue {
1097 Ok(q) => {
1098 let q = Arc::new(q);
1100 let q2 = Arc::clone(&q);
1101 tokio::task::block_in_place(|| {
1102 handle.block_on(async { q2.start().await.ok() })
1103 });
1104 Some(q)
1105 }
1106 Err(e) => {
1107 tracing::warn!("Failed to create session lane queue: {}", e);
1108 None
1109 }
1110 }
1111 }
1112 Err(_) => {
1113 tracing::warn!(
1114 "No async runtime available for queue creation — queue disabled"
1115 );
1116 None
1117 }
1118 }
1119 } else {
1120 None
1121 };
1122
1123 let mut tool_context = ToolContext::new(canonical.clone());
1125 if let Some(ref search_config) = self.code_config.search {
1126 tool_context = tool_context.with_search_config(search_config.clone());
1127 }
1128 tool_context = tool_context.with_agent_event_tx(agent_event_tx);
1129
1130 #[cfg(feature = "sandbox")]
1132 if let Some(ref sandbox_cfg) = opts.sandbox_config {
1133 let handle: Arc<dyn crate::sandbox::BashSandbox> =
1134 Arc::new(crate::sandbox::BoxSandboxHandle::new(
1135 sandbox_cfg.clone(),
1136 canonical.display().to_string(),
1137 ));
1138 tool_executor.registry().set_sandbox(Arc::clone(&handle));
1141 tool_context = tool_context.with_sandbox(handle);
1142 }
1143 #[cfg(not(feature = "sandbox"))]
1144 if opts.sandbox_config.is_some() {
1145 tracing::warn!(
1146 "sandbox_config is set but the `sandbox` Cargo feature is not enabled \
1147 — bash commands will run locally"
1148 );
1149 }
1150
1151 let session_id = opts
1152 .session_id
1153 .clone()
1154 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
1155
1156 let session_store = if opts.session_store.is_some() {
1158 opts.session_store.clone()
1159 } else if let Some(ref dir) = self.code_config.sessions_dir {
1160 match tokio::runtime::Handle::try_current() {
1161 Ok(handle) => {
1162 let dir = dir.clone();
1163 match tokio::task::block_in_place(|| {
1164 handle.block_on(crate::store::FileSessionStore::new(dir))
1165 }) {
1166 Ok(store) => Some(Arc::new(store) as Arc<dyn crate::store::SessionStore>),
1167 Err(e) => {
1168 tracing::warn!(
1169 "Failed to create session store from sessions_dir: {}",
1170 e
1171 );
1172 None
1173 }
1174 }
1175 }
1176 Err(_) => {
1177 tracing::warn!(
1178 "No async runtime for sessions_dir store — persistence disabled"
1179 );
1180 None
1181 }
1182 }
1183 } else {
1184 None
1185 };
1186
1187 Ok(AgentSession {
1188 llm_client,
1189 tool_executor,
1190 tool_context,
1191 memory: config.memory.clone(),
1192 config,
1193 workspace: canonical,
1194 session_id,
1195 history: RwLock::new(Vec::new()),
1196 command_queue,
1197 session_store,
1198 auto_save: opts.auto_save,
1199 hook_engine: Arc::new(crate::hooks::HookEngine::new()),
1200 init_warning,
1201 command_registry: CommandRegistry::new(),
1202 model_name: opts
1203 .model
1204 .clone()
1205 .or_else(|| self.code_config.default_model.clone())
1206 .unwrap_or_else(|| "unknown".to_string()),
1207 mcp_manager: opts
1208 .mcp_manager
1209 .clone()
1210 .or_else(|| self.global_mcp.clone())
1211 .unwrap_or_else(|| Arc::new(crate::mcp::manager::McpManager::new())),
1212 })
1213 }
1214}
1215
1216pub struct AgentSession {
1225 llm_client: Arc<dyn LlmClient>,
1226 tool_executor: Arc<ToolExecutor>,
1227 tool_context: ToolContext,
1228 config: AgentConfig,
1229 workspace: PathBuf,
1230 session_id: String,
1232 history: RwLock<Vec<Message>>,
1234 command_queue: Option<Arc<SessionLaneQueue>>,
1236 memory: Option<Arc<crate::memory::AgentMemory>>,
1238 session_store: Option<Arc<dyn crate::store::SessionStore>>,
1240 auto_save: bool,
1242 hook_engine: Arc<crate::hooks::HookEngine>,
1244 init_warning: Option<String>,
1246 command_registry: CommandRegistry,
1248 model_name: String,
1250 mcp_manager: Arc<crate::mcp::manager::McpManager>,
1252}
1253
1254impl std::fmt::Debug for AgentSession {
1255 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1256 f.debug_struct("AgentSession")
1257 .field("session_id", &self.session_id)
1258 .field("workspace", &self.workspace.display().to_string())
1259 .field("auto_save", &self.auto_save)
1260 .finish()
1261 }
1262}
1263
1264impl AgentSession {
1265 fn build_agent_loop(&self) -> AgentLoop {
1269 let mut config = self.config.clone();
1270 config.hook_engine =
1271 Some(Arc::clone(&self.hook_engine) as Arc<dyn crate::hooks::HookExecutor>);
1272 config.tools = self.tool_executor.definitions();
1276 let mut agent_loop = AgentLoop::new(
1277 self.llm_client.clone(),
1278 self.tool_executor.clone(),
1279 self.tool_context.clone(),
1280 config,
1281 );
1282 if let Some(ref queue) = self.command_queue {
1283 agent_loop = agent_loop.with_queue(Arc::clone(queue));
1284 }
1285 agent_loop
1286 }
1287
1288 fn build_command_context(&self) -> CommandContext {
1290 let history = read_or_recover(&self.history);
1291
1292 let tool_names: Vec<String> = self.config.tools.iter().map(|t| t.name.clone()).collect();
1294
1295 let mut mcp_map: std::collections::HashMap<String, usize> =
1297 std::collections::HashMap::new();
1298 for name in &tool_names {
1299 if let Some(rest) = name.strip_prefix("mcp__") {
1300 if let Some((server, _)) = rest.split_once("__") {
1301 *mcp_map.entry(server.to_string()).or_default() += 1;
1302 }
1303 }
1304 }
1305 let mut mcp_servers: Vec<(String, usize)> = mcp_map.into_iter().collect();
1306 mcp_servers.sort_by(|a, b| a.0.cmp(&b.0));
1307
1308 CommandContext {
1309 session_id: self.session_id.clone(),
1310 workspace: self.workspace.display().to_string(),
1311 model: self.model_name.clone(),
1312 history_len: history.len(),
1313 total_tokens: 0,
1314 total_cost: 0.0,
1315 tool_names,
1316 mcp_servers,
1317 }
1318 }
1319
1320 pub fn command_registry(&self) -> &CommandRegistry {
1322 &self.command_registry
1323 }
1324
1325 pub fn register_command(&mut self, cmd: Arc<dyn crate::commands::SlashCommand>) {
1327 self.command_registry.register(cmd);
1328 }
1329
1330 pub async fn send(&self, prompt: &str, history: Option<&[Message]>) -> Result<AgentResult> {
1339 if CommandRegistry::is_command(prompt) {
1341 let ctx = self.build_command_context();
1342 if let Some(output) = self.command_registry.dispatch(prompt, &ctx) {
1343 return Ok(AgentResult {
1344 text: output.text,
1345 messages: history
1346 .map(|h| h.to_vec())
1347 .unwrap_or_else(|| read_or_recover(&self.history).clone()),
1348 tool_calls_count: 0,
1349 usage: crate::llm::TokenUsage::default(),
1350 });
1351 }
1352 }
1353
1354 if let Some(ref w) = self.init_warning {
1355 tracing::warn!(session_id = %self.session_id, "Session init warning: {}", w);
1356 }
1357 let agent_loop = self.build_agent_loop();
1358
1359 let use_internal = history.is_none();
1360 let effective_history = match history {
1361 Some(h) => h.to_vec(),
1362 None => read_or_recover(&self.history).clone(),
1363 };
1364
1365 let result = agent_loop.execute(&effective_history, prompt, None).await?;
1366
1367 if use_internal {
1370 *write_or_recover(&self.history) = result.messages.clone();
1371
1372 if self.auto_save {
1374 if let Err(e) = self.save().await {
1375 tracing::warn!("Auto-save failed for session {}: {}", self.session_id, e);
1376 }
1377 }
1378 }
1379
1380 Ok(result)
1381 }
1382
1383 pub async fn send_with_attachments(
1388 &self,
1389 prompt: &str,
1390 attachments: &[crate::llm::Attachment],
1391 history: Option<&[Message]>,
1392 ) -> Result<AgentResult> {
1393 let use_internal = history.is_none();
1397 let mut effective_history = match history {
1398 Some(h) => h.to_vec(),
1399 None => read_or_recover(&self.history).clone(),
1400 };
1401 effective_history.push(Message::user_with_attachments(prompt, attachments));
1402
1403 let agent_loop = self.build_agent_loop();
1404 let result = agent_loop
1405 .execute_from_messages(effective_history, None, None)
1406 .await?;
1407
1408 if use_internal {
1409 *write_or_recover(&self.history) = result.messages.clone();
1410 if self.auto_save {
1411 if let Err(e) = self.save().await {
1412 tracing::warn!("Auto-save failed for session {}: {}", self.session_id, e);
1413 }
1414 }
1415 }
1416
1417 Ok(result)
1418 }
1419
1420 pub async fn stream_with_attachments(
1425 &self,
1426 prompt: &str,
1427 attachments: &[crate::llm::Attachment],
1428 history: Option<&[Message]>,
1429 ) -> Result<(mpsc::Receiver<AgentEvent>, JoinHandle<()>)> {
1430 let (tx, rx) = mpsc::channel(256);
1431 let mut effective_history = match history {
1432 Some(h) => h.to_vec(),
1433 None => read_or_recover(&self.history).clone(),
1434 };
1435 effective_history.push(Message::user_with_attachments(prompt, attachments));
1436
1437 let agent_loop = self.build_agent_loop();
1438 let handle = tokio::spawn(async move {
1439 let _ = agent_loop
1440 .execute_from_messages(effective_history, None, Some(tx))
1441 .await;
1442 });
1443
1444 Ok((rx, handle))
1445 }
1446
1447 pub async fn stream(
1457 &self,
1458 prompt: &str,
1459 history: Option<&[Message]>,
1460 ) -> Result<(mpsc::Receiver<AgentEvent>, JoinHandle<()>)> {
1461 if CommandRegistry::is_command(prompt) {
1463 let ctx = self.build_command_context();
1464 if let Some(output) = self.command_registry.dispatch(prompt, &ctx) {
1465 let (tx, rx) = mpsc::channel(256);
1466 let handle = tokio::spawn(async move {
1467 let _ = tx
1468 .send(AgentEvent::TextDelta {
1469 text: output.text.clone(),
1470 })
1471 .await;
1472 let _ = tx
1473 .send(AgentEvent::End {
1474 text: output.text.clone(),
1475 usage: crate::llm::TokenUsage::default(),
1476 })
1477 .await;
1478 });
1479 return Ok((rx, handle));
1480 }
1481 }
1482
1483 let (tx, rx) = mpsc::channel(256);
1484 let agent_loop = self.build_agent_loop();
1485 let effective_history = match history {
1486 Some(h) => h.to_vec(),
1487 None => read_or_recover(&self.history).clone(),
1488 };
1489 let prompt = prompt.to_string();
1490
1491 let handle = tokio::spawn(async move {
1492 let _ = agent_loop
1493 .execute(&effective_history, &prompt, Some(tx))
1494 .await;
1495 });
1496
1497 Ok((rx, handle))
1498 }
1499
1500 pub fn history(&self) -> Vec<Message> {
1502 read_or_recover(&self.history).clone()
1503 }
1504
1505 pub fn memory(&self) -> Option<&Arc<crate::memory::AgentMemory>> {
1507 self.memory.as_ref()
1508 }
1509
1510 pub fn id(&self) -> &str {
1512 &self.session_id
1513 }
1514
1515 pub fn workspace(&self) -> &std::path::Path {
1517 &self.workspace
1518 }
1519
1520 pub fn init_warning(&self) -> Option<&str> {
1522 self.init_warning.as_deref()
1523 }
1524
1525 pub fn session_id(&self) -> &str {
1527 &self.session_id
1528 }
1529
1530 pub fn tool_definitions(&self) -> Vec<crate::llm::ToolDefinition> {
1536 self.tool_executor.definitions()
1537 }
1538
1539 pub fn tool_names(&self) -> Vec<String> {
1545 self.tool_executor
1546 .definitions()
1547 .into_iter()
1548 .map(|t| t.name)
1549 .collect()
1550 }
1551
1552 pub fn register_hook(&self, hook: crate::hooks::Hook) {
1558 self.hook_engine.register(hook);
1559 }
1560
1561 pub fn unregister_hook(&self, hook_id: &str) -> Option<crate::hooks::Hook> {
1563 self.hook_engine.unregister(hook_id)
1564 }
1565
1566 pub fn register_hook_handler(
1568 &self,
1569 hook_id: &str,
1570 handler: Arc<dyn crate::hooks::HookHandler>,
1571 ) {
1572 self.hook_engine.register_handler(hook_id, handler);
1573 }
1574
1575 pub fn unregister_hook_handler(&self, hook_id: &str) {
1577 self.hook_engine.unregister_handler(hook_id);
1578 }
1579
1580 pub fn hook_count(&self) -> usize {
1582 self.hook_engine.hook_count()
1583 }
1584
1585 pub async fn save(&self) -> Result<()> {
1589 let store = match &self.session_store {
1590 Some(s) => s,
1591 None => return Ok(()),
1592 };
1593
1594 let history = read_or_recover(&self.history).clone();
1595 let now = chrono::Utc::now().timestamp();
1596
1597 let data = crate::store::SessionData {
1598 id: self.session_id.clone(),
1599 config: crate::session::SessionConfig {
1600 name: String::new(),
1601 workspace: self.workspace.display().to_string(),
1602 system_prompt: Some(self.config.prompt_slots.build()),
1603 max_context_length: 200_000,
1604 auto_compact: false,
1605 auto_compact_threshold: crate::session::DEFAULT_AUTO_COMPACT_THRESHOLD,
1606 storage_type: crate::config::StorageBackend::File,
1607 queue_config: None,
1608 confirmation_policy: None,
1609 permission_policy: None,
1610 parent_id: None,
1611 security_config: None,
1612 hook_engine: None,
1613 planning_enabled: self.config.planning_enabled,
1614 goal_tracking: self.config.goal_tracking,
1615 },
1616 state: crate::session::SessionState::Active,
1617 messages: history,
1618 context_usage: crate::session::ContextUsage::default(),
1619 total_usage: crate::llm::TokenUsage::default(),
1620 total_cost: 0.0,
1621 model_name: None,
1622 cost_records: Vec::new(),
1623 tool_names: crate::store::SessionData::tool_names_from_definitions(&self.config.tools),
1624 thinking_enabled: false,
1625 thinking_budget: None,
1626 created_at: now,
1627 updated_at: now,
1628 llm_config: None,
1629 tasks: Vec::new(),
1630 parent_id: None,
1631 };
1632
1633 store.save(&data).await?;
1634 tracing::debug!("Session {} saved", self.session_id);
1635 Ok(())
1636 }
1637
1638 pub async fn read_file(&self, path: &str) -> Result<String> {
1640 let args = serde_json::json!({ "file_path": path });
1641 let result = self.tool_executor.execute("read", &args).await?;
1642 Ok(result.output)
1643 }
1644
1645 pub async fn bash(&self, command: &str) -> Result<String> {
1650 let args = serde_json::json!({ "command": command });
1651 let result = self
1652 .tool_executor
1653 .execute_with_context("bash", &args, &self.tool_context)
1654 .await?;
1655 Ok(result.output)
1656 }
1657
1658 pub async fn glob(&self, pattern: &str) -> Result<Vec<String>> {
1660 let args = serde_json::json!({ "pattern": pattern });
1661 let result = self.tool_executor.execute("glob", &args).await?;
1662 let files: Vec<String> = result
1663 .output
1664 .lines()
1665 .filter(|l| !l.is_empty())
1666 .map(|l| l.to_string())
1667 .collect();
1668 Ok(files)
1669 }
1670
1671 pub async fn grep(&self, pattern: &str) -> Result<String> {
1673 let args = serde_json::json!({ "pattern": pattern });
1674 let result = self.tool_executor.execute("grep", &args).await?;
1675 Ok(result.output)
1676 }
1677
1678 pub async fn tool(&self, name: &str, args: serde_json::Value) -> Result<ToolCallResult> {
1680 let result = self.tool_executor.execute(name, &args).await?;
1681 Ok(ToolCallResult {
1682 name: name.to_string(),
1683 output: result.output,
1684 exit_code: result.exit_code,
1685 })
1686 }
1687
1688 pub fn has_queue(&self) -> bool {
1694 self.command_queue.is_some()
1695 }
1696
1697 pub async fn set_lane_handler(&self, lane: SessionLane, config: LaneHandlerConfig) {
1701 if let Some(ref queue) = self.command_queue {
1702 queue.set_lane_handler(lane, config).await;
1703 }
1704 }
1705
1706 pub async fn complete_external_task(&self, task_id: &str, result: ExternalTaskResult) -> bool {
1710 if let Some(ref queue) = self.command_queue {
1711 queue.complete_external_task(task_id, result).await
1712 } else {
1713 false
1714 }
1715 }
1716
1717 pub async fn pending_external_tasks(&self) -> Vec<ExternalTask> {
1719 if let Some(ref queue) = self.command_queue {
1720 queue.pending_external_tasks().await
1721 } else {
1722 Vec::new()
1723 }
1724 }
1725
1726 pub async fn queue_stats(&self) -> SessionQueueStats {
1728 if let Some(ref queue) = self.command_queue {
1729 queue.stats().await
1730 } else {
1731 SessionQueueStats::default()
1732 }
1733 }
1734
1735 pub async fn queue_metrics(&self) -> Option<MetricsSnapshot> {
1737 if let Some(ref queue) = self.command_queue {
1738 queue.metrics_snapshot().await
1739 } else {
1740 None
1741 }
1742 }
1743
1744 pub async fn submit(
1750 &self,
1751 lane: SessionLane,
1752 command: Box<dyn crate::queue::SessionCommand>,
1753 ) -> anyhow::Result<tokio::sync::oneshot::Receiver<anyhow::Result<serde_json::Value>>> {
1754 let queue = self
1755 .command_queue
1756 .as_ref()
1757 .ok_or_else(|| anyhow::anyhow!("No queue configured on this session"))?;
1758 Ok(queue.submit(lane, command).await)
1759 }
1760
1761 pub async fn submit_batch(
1768 &self,
1769 lane: SessionLane,
1770 commands: Vec<Box<dyn crate::queue::SessionCommand>>,
1771 ) -> anyhow::Result<Vec<tokio::sync::oneshot::Receiver<anyhow::Result<serde_json::Value>>>>
1772 {
1773 let queue = self
1774 .command_queue
1775 .as_ref()
1776 .ok_or_else(|| anyhow::anyhow!("No queue configured on this session"))?;
1777 Ok(queue.submit_batch(lane, commands).await)
1778 }
1779
1780 pub async fn dead_letters(&self) -> Vec<DeadLetter> {
1782 if let Some(ref queue) = self.command_queue {
1783 queue.dead_letters().await
1784 } else {
1785 Vec::new()
1786 }
1787 }
1788
1789 pub async fn add_mcp_server(
1800 &self,
1801 config: crate::mcp::McpServerConfig,
1802 ) -> crate::error::Result<usize> {
1803 let server_name = config.name.clone();
1804 self.mcp_manager.register_server(config).await;
1805 self.mcp_manager.connect(&server_name).await.map_err(|e| {
1806 crate::error::CodeError::Tool {
1807 tool: server_name.clone(),
1808 message: format!("Failed to connect MCP server: {}", e),
1809 }
1810 })?;
1811
1812 let tools = self.mcp_manager.get_server_tools(&server_name).await;
1813 let count = tools.len();
1814
1815 for tool in
1816 crate::mcp::tools::create_mcp_tools(&server_name, tools, Arc::clone(&self.mcp_manager))
1817 {
1818 self.tool_executor.register_dynamic_tool(tool);
1819 }
1820
1821 tracing::info!(
1822 session_id = %self.session_id,
1823 server = server_name,
1824 tools = count,
1825 "MCP server added to live session"
1826 );
1827
1828 Ok(count)
1829 }
1830
1831 pub async fn remove_mcp_server(&self, server_name: &str) -> crate::error::Result<()> {
1836 self.tool_executor
1837 .unregister_tools_by_prefix(&format!("mcp__{server_name}__"));
1838 self.mcp_manager
1839 .disconnect(server_name)
1840 .await
1841 .map_err(|e| crate::error::CodeError::Tool {
1842 tool: server_name.to_string(),
1843 message: format!("Failed to disconnect MCP server: {}", e),
1844 })?;
1845 tracing::info!(
1846 session_id = %self.session_id,
1847 server = server_name,
1848 "MCP server removed from live session"
1849 );
1850 Ok(())
1851 }
1852
1853 pub async fn mcp_status(
1855 &self,
1856 ) -> std::collections::HashMap<String, crate::mcp::McpServerStatus> {
1857 self.mcp_manager.get_status().await
1858 }
1859}
1860
1861#[cfg(test)]
1866mod tests {
1867 use super::*;
1868 use crate::config::{ModelConfig, ModelModalities, ProviderConfig};
1869 use crate::store::SessionStore;
1870
1871 #[tokio::test]
1872 async fn test_session_submit_no_queue_returns_err() {
1873 let agent = Agent::from_config(test_config()).await.unwrap();
1874 let session = agent.session(".", None).unwrap();
1875 struct Noop;
1876 #[async_trait::async_trait]
1877 impl crate::queue::SessionCommand for Noop {
1878 async fn execute(&self) -> anyhow::Result<serde_json::Value> {
1879 Ok(serde_json::json!(null))
1880 }
1881 fn command_type(&self) -> &str {
1882 "noop"
1883 }
1884 }
1885 let result: anyhow::Result<
1886 tokio::sync::oneshot::Receiver<anyhow::Result<serde_json::Value>>,
1887 > = session.submit(SessionLane::Query, Box::new(Noop)).await;
1888 assert!(result.is_err());
1889 assert!(result.unwrap_err().to_string().contains("No queue"));
1890 }
1891
1892 #[tokio::test]
1893 async fn test_session_submit_batch_no_queue_returns_err() {
1894 let agent = Agent::from_config(test_config()).await.unwrap();
1895 let session = agent.session(".", None).unwrap();
1896 struct Noop;
1897 #[async_trait::async_trait]
1898 impl crate::queue::SessionCommand for Noop {
1899 async fn execute(&self) -> anyhow::Result<serde_json::Value> {
1900 Ok(serde_json::json!(null))
1901 }
1902 fn command_type(&self) -> &str {
1903 "noop"
1904 }
1905 }
1906 let cmds: Vec<Box<dyn crate::queue::SessionCommand>> = vec![Box::new(Noop)];
1907 let result: anyhow::Result<
1908 Vec<tokio::sync::oneshot::Receiver<anyhow::Result<serde_json::Value>>>,
1909 > = session.submit_batch(SessionLane::Query, cmds).await;
1910 assert!(result.is_err());
1911 assert!(result.unwrap_err().to_string().contains("No queue"));
1912 }
1913
1914 fn test_config() -> CodeConfig {
1915 CodeConfig {
1916 default_model: Some("anthropic/claude-sonnet-4-20250514".to_string()),
1917 providers: vec![
1918 ProviderConfig {
1919 name: "anthropic".to_string(),
1920 api_key: Some("test-key".to_string()),
1921 base_url: None,
1922 models: vec![ModelConfig {
1923 id: "claude-sonnet-4-20250514".to_string(),
1924 name: "Claude Sonnet 4".to_string(),
1925 family: "claude-sonnet".to_string(),
1926 api_key: None,
1927 base_url: None,
1928 attachment: false,
1929 reasoning: false,
1930 tool_call: true,
1931 temperature: true,
1932 release_date: None,
1933 modalities: ModelModalities::default(),
1934 cost: Default::default(),
1935 limit: Default::default(),
1936 }],
1937 },
1938 ProviderConfig {
1939 name: "openai".to_string(),
1940 api_key: Some("test-openai-key".to_string()),
1941 base_url: None,
1942 models: vec![ModelConfig {
1943 id: "gpt-4o".to_string(),
1944 name: "GPT-4o".to_string(),
1945 family: "gpt-4".to_string(),
1946 api_key: None,
1947 base_url: None,
1948 attachment: false,
1949 reasoning: false,
1950 tool_call: true,
1951 temperature: true,
1952 release_date: None,
1953 modalities: ModelModalities::default(),
1954 cost: Default::default(),
1955 limit: Default::default(),
1956 }],
1957 },
1958 ],
1959 ..Default::default()
1960 }
1961 }
1962
1963 #[tokio::test]
1964 async fn test_from_config() {
1965 let agent = Agent::from_config(test_config()).await;
1966 assert!(agent.is_ok());
1967 }
1968
1969 #[tokio::test]
1970 async fn test_session_default() {
1971 let agent = Agent::from_config(test_config()).await.unwrap();
1972 let session = agent.session("/tmp/test-workspace", None);
1973 assert!(session.is_ok());
1974 let debug = format!("{:?}", session.unwrap());
1975 assert!(debug.contains("AgentSession"));
1976 }
1977
1978 #[tokio::test]
1979 async fn test_session_with_model_override() {
1980 let agent = Agent::from_config(test_config()).await.unwrap();
1981 let opts = SessionOptions::new().with_model("openai/gpt-4o");
1982 let session = agent.session("/tmp/test-workspace", Some(opts));
1983 assert!(session.is_ok());
1984 }
1985
1986 #[tokio::test]
1987 async fn test_session_with_invalid_model_format() {
1988 let agent = Agent::from_config(test_config()).await.unwrap();
1989 let opts = SessionOptions::new().with_model("gpt-4o");
1990 let session = agent.session("/tmp/test-workspace", Some(opts));
1991 assert!(session.is_err());
1992 }
1993
1994 #[tokio::test]
1995 async fn test_session_with_model_not_found() {
1996 let agent = Agent::from_config(test_config()).await.unwrap();
1997 let opts = SessionOptions::new().with_model("openai/nonexistent");
1998 let session = agent.session("/tmp/test-workspace", Some(opts));
1999 assert!(session.is_err());
2000 }
2001
2002 #[tokio::test]
2003 async fn test_new_with_hcl_string() {
2004 let hcl = r#"
2005 default_model = "anthropic/claude-sonnet-4-20250514"
2006 providers {
2007 name = "anthropic"
2008 api_key = "test-key"
2009 models {
2010 id = "claude-sonnet-4-20250514"
2011 name = "Claude Sonnet 4"
2012 }
2013 }
2014 "#;
2015 let agent = Agent::new(hcl).await;
2016 assert!(agent.is_ok());
2017 }
2018
2019 #[tokio::test]
2020 async fn test_create_alias_hcl() {
2021 let hcl = r#"
2022 default_model = "anthropic/claude-sonnet-4-20250514"
2023 providers {
2024 name = "anthropic"
2025 api_key = "test-key"
2026 models {
2027 id = "claude-sonnet-4-20250514"
2028 name = "Claude Sonnet 4"
2029 }
2030 }
2031 "#;
2032 let agent = Agent::create(hcl).await;
2033 assert!(agent.is_ok());
2034 }
2035
2036 #[tokio::test]
2037 async fn test_create_and_new_produce_same_result() {
2038 let hcl = r#"
2039 default_model = "anthropic/claude-sonnet-4-20250514"
2040 providers {
2041 name = "anthropic"
2042 api_key = "test-key"
2043 models {
2044 id = "claude-sonnet-4-20250514"
2045 name = "Claude Sonnet 4"
2046 }
2047 }
2048 "#;
2049 let agent_new = Agent::new(hcl).await;
2050 let agent_create = Agent::create(hcl).await;
2051 assert!(agent_new.is_ok());
2052 assert!(agent_create.is_ok());
2053
2054 let session_new = agent_new.unwrap().session("/tmp/test-ws-new", None);
2056 let session_create = agent_create.unwrap().session("/tmp/test-ws-create", None);
2057 assert!(session_new.is_ok());
2058 assert!(session_create.is_ok());
2059 }
2060
2061 #[test]
2062 fn test_from_config_requires_default_model() {
2063 let rt = tokio::runtime::Runtime::new().unwrap();
2064 let config = CodeConfig {
2065 providers: vec![ProviderConfig {
2066 name: "anthropic".to_string(),
2067 api_key: Some("test-key".to_string()),
2068 base_url: None,
2069 models: vec![],
2070 }],
2071 ..Default::default()
2072 };
2073 let result = rt.block_on(Agent::from_config(config));
2074 assert!(result.is_err());
2075 }
2076
2077 #[tokio::test]
2078 async fn test_history_empty_on_new_session() {
2079 let agent = Agent::from_config(test_config()).await.unwrap();
2080 let session = agent.session("/tmp/test-workspace", None).unwrap();
2081 assert!(session.history().is_empty());
2082 }
2083
2084 #[tokio::test]
2085 async fn test_session_options_with_agent_dir() {
2086 let opts = SessionOptions::new()
2087 .with_agent_dir("/tmp/agents")
2088 .with_agent_dir("/tmp/more-agents");
2089 assert_eq!(opts.agent_dirs.len(), 2);
2090 assert_eq!(opts.agent_dirs[0], PathBuf::from("/tmp/agents"));
2091 assert_eq!(opts.agent_dirs[1], PathBuf::from("/tmp/more-agents"));
2092 }
2093
2094 #[test]
2099 fn test_session_options_with_queue_config() {
2100 let qc = SessionQueueConfig::default().with_lane_features();
2101 let opts = SessionOptions::new().with_queue_config(qc.clone());
2102 assert!(opts.queue_config.is_some());
2103
2104 let config = opts.queue_config.unwrap();
2105 assert!(config.enable_dlq);
2106 assert!(config.enable_metrics);
2107 assert!(config.enable_alerts);
2108 assert_eq!(config.default_timeout_ms, Some(60_000));
2109 }
2110
2111 #[tokio::test(flavor = "multi_thread")]
2112 async fn test_session_with_queue_config() {
2113 let agent = Agent::from_config(test_config()).await.unwrap();
2114 let qc = SessionQueueConfig::default();
2115 let opts = SessionOptions::new().with_queue_config(qc);
2116 let session = agent.session("/tmp/test-workspace-queue", Some(opts));
2117 assert!(session.is_ok());
2118 let session = session.unwrap();
2119 assert!(session.has_queue());
2120 }
2121
2122 #[tokio::test]
2123 async fn test_session_without_queue_config() {
2124 let agent = Agent::from_config(test_config()).await.unwrap();
2125 let session = agent.session("/tmp/test-workspace-noqueue", None).unwrap();
2126 assert!(!session.has_queue());
2127 }
2128
2129 #[tokio::test]
2130 async fn test_session_queue_stats_without_queue() {
2131 let agent = Agent::from_config(test_config()).await.unwrap();
2132 let session = agent.session("/tmp/test-workspace-stats", None).unwrap();
2133 let stats = session.queue_stats().await;
2134 assert_eq!(stats.total_pending, 0);
2136 assert_eq!(stats.total_active, 0);
2137 }
2138
2139 #[tokio::test(flavor = "multi_thread")]
2140 async fn test_session_queue_stats_with_queue() {
2141 let agent = Agent::from_config(test_config()).await.unwrap();
2142 let qc = SessionQueueConfig::default();
2143 let opts = SessionOptions::new().with_queue_config(qc);
2144 let session = agent
2145 .session("/tmp/test-workspace-qstats", Some(opts))
2146 .unwrap();
2147 let stats = session.queue_stats().await;
2148 assert_eq!(stats.total_pending, 0);
2150 assert_eq!(stats.total_active, 0);
2151 }
2152
2153 #[tokio::test(flavor = "multi_thread")]
2154 async fn test_session_pending_external_tasks_empty() {
2155 let agent = Agent::from_config(test_config()).await.unwrap();
2156 let qc = SessionQueueConfig::default();
2157 let opts = SessionOptions::new().with_queue_config(qc);
2158 let session = agent
2159 .session("/tmp/test-workspace-ext", Some(opts))
2160 .unwrap();
2161 let tasks = session.pending_external_tasks().await;
2162 assert!(tasks.is_empty());
2163 }
2164
2165 #[tokio::test(flavor = "multi_thread")]
2166 async fn test_session_dead_letters_empty() {
2167 let agent = Agent::from_config(test_config()).await.unwrap();
2168 let qc = SessionQueueConfig::default().with_dlq(Some(100));
2169 let opts = SessionOptions::new().with_queue_config(qc);
2170 let session = agent
2171 .session("/tmp/test-workspace-dlq", Some(opts))
2172 .unwrap();
2173 let dead = session.dead_letters().await;
2174 assert!(dead.is_empty());
2175 }
2176
2177 #[tokio::test(flavor = "multi_thread")]
2178 async fn test_session_queue_metrics_disabled() {
2179 let agent = Agent::from_config(test_config()).await.unwrap();
2180 let qc = SessionQueueConfig::default();
2182 let opts = SessionOptions::new().with_queue_config(qc);
2183 let session = agent
2184 .session("/tmp/test-workspace-nomet", Some(opts))
2185 .unwrap();
2186 let metrics = session.queue_metrics().await;
2187 assert!(metrics.is_none());
2188 }
2189
2190 #[tokio::test(flavor = "multi_thread")]
2191 async fn test_session_queue_metrics_enabled() {
2192 let agent = Agent::from_config(test_config()).await.unwrap();
2193 let qc = SessionQueueConfig::default().with_metrics();
2194 let opts = SessionOptions::new().with_queue_config(qc);
2195 let session = agent
2196 .session("/tmp/test-workspace-met", Some(opts))
2197 .unwrap();
2198 let metrics = session.queue_metrics().await;
2199 assert!(metrics.is_some());
2200 }
2201
2202 #[tokio::test(flavor = "multi_thread")]
2203 async fn test_session_set_lane_handler() {
2204 let agent = Agent::from_config(test_config()).await.unwrap();
2205 let qc = SessionQueueConfig::default();
2206 let opts = SessionOptions::new().with_queue_config(qc);
2207 let session = agent
2208 .session("/tmp/test-workspace-handler", Some(opts))
2209 .unwrap();
2210
2211 session
2213 .set_lane_handler(
2214 SessionLane::Execute,
2215 LaneHandlerConfig {
2216 mode: crate::queue::TaskHandlerMode::External,
2217 timeout_ms: 30_000,
2218 },
2219 )
2220 .await;
2221
2222 }
2225
2226 #[tokio::test(flavor = "multi_thread")]
2231 async fn test_session_has_id() {
2232 let agent = Agent::from_config(test_config()).await.unwrap();
2233 let session = agent.session("/tmp/test-ws-id", None).unwrap();
2234 assert!(!session.session_id().is_empty());
2236 assert_eq!(session.session_id().len(), 36); }
2238
2239 #[tokio::test(flavor = "multi_thread")]
2240 async fn test_session_explicit_id() {
2241 let agent = Agent::from_config(test_config()).await.unwrap();
2242 let opts = SessionOptions::new().with_session_id("my-session-42");
2243 let session = agent.session("/tmp/test-ws-eid", Some(opts)).unwrap();
2244 assert_eq!(session.session_id(), "my-session-42");
2245 }
2246
2247 #[tokio::test(flavor = "multi_thread")]
2248 async fn test_session_save_no_store() {
2249 let agent = Agent::from_config(test_config()).await.unwrap();
2250 let session = agent.session("/tmp/test-ws-save", None).unwrap();
2251 session.save().await.unwrap();
2253 }
2254
2255 #[tokio::test(flavor = "multi_thread")]
2256 async fn test_session_save_and_load() {
2257 let store = Arc::new(crate::store::MemorySessionStore::new());
2258 let agent = Agent::from_config(test_config()).await.unwrap();
2259
2260 let opts = SessionOptions::new()
2261 .with_session_store(store.clone())
2262 .with_session_id("persist-test");
2263 let session = agent.session("/tmp/test-ws-persist", Some(opts)).unwrap();
2264
2265 session.save().await.unwrap();
2267
2268 assert!(store.exists("persist-test").await.unwrap());
2270
2271 let data = store.load("persist-test").await.unwrap().unwrap();
2272 assert_eq!(data.id, "persist-test");
2273 assert!(data.messages.is_empty());
2274 }
2275
2276 #[tokio::test(flavor = "multi_thread")]
2277 async fn test_session_save_with_history() {
2278 let store = Arc::new(crate::store::MemorySessionStore::new());
2279 let agent = Agent::from_config(test_config()).await.unwrap();
2280
2281 let opts = SessionOptions::new()
2282 .with_session_store(store.clone())
2283 .with_session_id("history-test");
2284 let session = agent.session("/tmp/test-ws-hist", Some(opts)).unwrap();
2285
2286 {
2288 let mut h = session.history.write().unwrap();
2289 h.push(Message::user("Hello"));
2290 h.push(Message::user("How are you?"));
2291 }
2292
2293 session.save().await.unwrap();
2294
2295 let data = store.load("history-test").await.unwrap().unwrap();
2296 assert_eq!(data.messages.len(), 2);
2297 }
2298
2299 #[tokio::test(flavor = "multi_thread")]
2300 async fn test_resume_session() {
2301 let store = Arc::new(crate::store::MemorySessionStore::new());
2302 let agent = Agent::from_config(test_config()).await.unwrap();
2303
2304 let opts = SessionOptions::new()
2306 .with_session_store(store.clone())
2307 .with_session_id("resume-test");
2308 let session = agent.session("/tmp/test-ws-resume", Some(opts)).unwrap();
2309 {
2310 let mut h = session.history.write().unwrap();
2311 h.push(Message::user("What is Rust?"));
2312 h.push(Message::user("Tell me more"));
2313 }
2314 session.save().await.unwrap();
2315
2316 let opts2 = SessionOptions::new().with_session_store(store.clone());
2318 let resumed = agent.resume_session("resume-test", opts2).unwrap();
2319
2320 assert_eq!(resumed.session_id(), "resume-test");
2321 let history = resumed.history();
2322 assert_eq!(history.len(), 2);
2323 assert_eq!(history[0].text(), "What is Rust?");
2324 }
2325
2326 #[tokio::test(flavor = "multi_thread")]
2327 async fn test_resume_session_not_found() {
2328 let store = Arc::new(crate::store::MemorySessionStore::new());
2329 let agent = Agent::from_config(test_config()).await.unwrap();
2330
2331 let opts = SessionOptions::new().with_session_store(store.clone());
2332 let result = agent.resume_session("nonexistent", opts);
2333 assert!(result.is_err());
2334 assert!(result.unwrap_err().to_string().contains("not found"));
2335 }
2336
2337 #[tokio::test(flavor = "multi_thread")]
2338 async fn test_resume_session_no_store() {
2339 let agent = Agent::from_config(test_config()).await.unwrap();
2340 let opts = SessionOptions::new();
2341 let result = agent.resume_session("any-id", opts);
2342 assert!(result.is_err());
2343 assert!(result.unwrap_err().to_string().contains("session_store"));
2344 }
2345
2346 #[tokio::test(flavor = "multi_thread")]
2347 async fn test_file_session_store_persistence() {
2348 let dir = tempfile::TempDir::new().unwrap();
2349 let store = Arc::new(
2350 crate::store::FileSessionStore::new(dir.path())
2351 .await
2352 .unwrap(),
2353 );
2354 let agent = Agent::from_config(test_config()).await.unwrap();
2355
2356 let opts = SessionOptions::new()
2358 .with_session_store(store.clone())
2359 .with_session_id("file-persist");
2360 let session = agent
2361 .session("/tmp/test-ws-file-persist", Some(opts))
2362 .unwrap();
2363 {
2364 let mut h = session.history.write().unwrap();
2365 h.push(Message::user("test message"));
2366 }
2367 session.save().await.unwrap();
2368
2369 let store2 = Arc::new(
2371 crate::store::FileSessionStore::new(dir.path())
2372 .await
2373 .unwrap(),
2374 );
2375 let data = store2.load("file-persist").await.unwrap().unwrap();
2376 assert_eq!(data.messages.len(), 1);
2377 }
2378
2379 #[tokio::test(flavor = "multi_thread")]
2380 async fn test_session_options_builders() {
2381 let opts = SessionOptions::new()
2382 .with_session_id("test-id")
2383 .with_auto_save(true);
2384 assert_eq!(opts.session_id, Some("test-id".to_string()));
2385 assert!(opts.auto_save);
2386 }
2387
2388 #[test]
2393 fn test_session_options_with_sandbox_sets_config() {
2394 use crate::sandbox::SandboxConfig;
2395 let cfg = SandboxConfig {
2396 image: "ubuntu:22.04".into(),
2397 memory_mb: 1024,
2398 ..SandboxConfig::default()
2399 };
2400 let opts = SessionOptions::new().with_sandbox(cfg);
2401 assert!(opts.sandbox_config.is_some());
2402 let sc = opts.sandbox_config.unwrap();
2403 assert_eq!(sc.image, "ubuntu:22.04");
2404 assert_eq!(sc.memory_mb, 1024);
2405 }
2406
2407 #[test]
2408 fn test_session_options_default_has_no_sandbox() {
2409 let opts = SessionOptions::default();
2410 assert!(opts.sandbox_config.is_none());
2411 }
2412
2413 #[tokio::test]
2414 async fn test_session_debug_includes_sandbox_config() {
2415 use crate::sandbox::SandboxConfig;
2416 let opts = SessionOptions::new().with_sandbox(SandboxConfig::default());
2417 let debug = format!("{:?}", opts);
2418 assert!(debug.contains("sandbox_config"));
2419 }
2420
2421 #[tokio::test]
2422 async fn test_session_build_with_sandbox_config_no_feature_warn() {
2423 let agent = Agent::from_config(test_config()).await.unwrap();
2426 let opts = SessionOptions::new().with_sandbox(crate::sandbox::SandboxConfig::default());
2427 let session = agent.session("/tmp/test-sandbox-session", Some(opts));
2429 assert!(session.is_ok());
2430 }
2431
2432 #[tokio::test(flavor = "multi_thread")]
2437 async fn test_session_with_memory_store() {
2438 use a3s_memory::InMemoryStore;
2439 let store = Arc::new(InMemoryStore::new());
2440 let agent = Agent::from_config(test_config()).await.unwrap();
2441 let opts = SessionOptions::new().with_memory(store);
2442 let session = agent.session("/tmp/test-ws-memory", Some(opts)).unwrap();
2443 assert!(session.memory().is_some());
2444 }
2445
2446 #[tokio::test(flavor = "multi_thread")]
2447 async fn test_session_without_memory_store() {
2448 let agent = Agent::from_config(test_config()).await.unwrap();
2449 let session = agent.session("/tmp/test-ws-no-memory", None).unwrap();
2450 assert!(session.memory().is_none());
2451 }
2452
2453 #[tokio::test(flavor = "multi_thread")]
2454 async fn test_session_memory_wired_into_config() {
2455 use a3s_memory::InMemoryStore;
2456 let store = Arc::new(InMemoryStore::new());
2457 let agent = Agent::from_config(test_config()).await.unwrap();
2458 let opts = SessionOptions::new().with_memory(store);
2459 let session = agent
2460 .session("/tmp/test-ws-mem-config", Some(opts))
2461 .unwrap();
2462 assert!(session.memory().is_some());
2464 }
2465
2466 #[tokio::test(flavor = "multi_thread")]
2467 async fn test_session_with_file_memory() {
2468 let dir = tempfile::TempDir::new().unwrap();
2469 let agent = Agent::from_config(test_config()).await.unwrap();
2470 let opts = SessionOptions::new().with_file_memory(dir.path());
2471 let session = agent.session("/tmp/test-ws-file-mem", Some(opts)).unwrap();
2472 assert!(session.memory().is_some());
2473 }
2474
2475 #[tokio::test(flavor = "multi_thread")]
2476 async fn test_memory_remember_and_recall() {
2477 use a3s_memory::InMemoryStore;
2478 let store = Arc::new(InMemoryStore::new());
2479 let agent = Agent::from_config(test_config()).await.unwrap();
2480 let opts = SessionOptions::new().with_memory(store);
2481 let session = agent
2482 .session("/tmp/test-ws-mem-recall", Some(opts))
2483 .unwrap();
2484
2485 let memory = session.memory().unwrap();
2486 memory
2487 .remember_success("write a file", &["write".to_string()], "done")
2488 .await
2489 .unwrap();
2490
2491 let results = memory.recall_similar("write", 5).await.unwrap();
2492 assert!(!results.is_empty());
2493 let stats = memory.stats().await.unwrap();
2494 assert_eq!(stats.long_term_count, 1);
2495 }
2496
2497 #[tokio::test(flavor = "multi_thread")]
2502 async fn test_session_tool_timeout_configured() {
2503 let agent = Agent::from_config(test_config()).await.unwrap();
2504 let opts = SessionOptions::new().with_tool_timeout(5000);
2505 let session = agent.session("/tmp/test-ws-timeout", Some(opts)).unwrap();
2506 assert!(!session.id().is_empty());
2507 }
2508
2509 #[tokio::test(flavor = "multi_thread")]
2514 async fn test_session_without_queue_builds_ok() {
2515 let agent = Agent::from_config(test_config()).await.unwrap();
2516 let session = agent.session("/tmp/test-ws-no-queue", None).unwrap();
2517 assert!(!session.id().is_empty());
2518 }
2519
2520 #[tokio::test(flavor = "multi_thread")]
2525 async fn test_concurrent_history_reads() {
2526 let agent = Agent::from_config(test_config()).await.unwrap();
2527 let session = Arc::new(agent.session("/tmp/test-ws-concurrent", None).unwrap());
2528
2529 let handles: Vec<_> = (0..10)
2530 .map(|_| {
2531 let s = Arc::clone(&session);
2532 tokio::spawn(async move { s.history().len() })
2533 })
2534 .collect();
2535
2536 for h in handles {
2537 h.await.unwrap();
2538 }
2539 }
2540
2541 #[tokio::test(flavor = "multi_thread")]
2546 async fn test_session_no_init_warning_without_file_memory() {
2547 let agent = Agent::from_config(test_config()).await.unwrap();
2548 let session = agent.session("/tmp/test-ws-no-warn", None).unwrap();
2549 assert!(session.init_warning().is_none());
2550 }
2551
2552 #[tokio::test(flavor = "multi_thread")]
2553 async fn test_session_with_mcp_manager_builds_ok() {
2554 use crate::mcp::manager::McpManager;
2555 let mcp = Arc::new(McpManager::new());
2556 let agent = Agent::from_config(test_config()).await.unwrap();
2557 let opts = SessionOptions::new().with_mcp(mcp);
2558 let session = agent.session("/tmp/test-ws-mcp", Some(opts)).unwrap();
2560 assert!(!session.id().is_empty());
2561 }
2562
2563 #[test]
2564 fn test_session_command_is_pub() {
2565 use crate::SessionCommand;
2567 let _ = std::marker::PhantomData::<Box<dyn SessionCommand>>;
2568 }
2569
2570 #[tokio::test(flavor = "multi_thread")]
2571 async fn test_session_submit_with_queue_executes() {
2572 let agent = Agent::from_config(test_config()).await.unwrap();
2573 let qc = SessionQueueConfig::default();
2574 let opts = SessionOptions::new().with_queue_config(qc);
2575 let session = agent
2576 .session("/tmp/test-ws-submit-exec", Some(opts))
2577 .unwrap();
2578
2579 struct Echo(serde_json::Value);
2580 #[async_trait::async_trait]
2581 impl crate::queue::SessionCommand for Echo {
2582 async fn execute(&self) -> anyhow::Result<serde_json::Value> {
2583 Ok(self.0.clone())
2584 }
2585 fn command_type(&self) -> &str {
2586 "echo"
2587 }
2588 }
2589
2590 let rx = session
2591 .submit(
2592 SessionLane::Query,
2593 Box::new(Echo(serde_json::json!({"ok": true}))),
2594 )
2595 .await
2596 .expect("submit should succeed with queue configured");
2597
2598 let result = tokio::time::timeout(std::time::Duration::from_secs(2), rx)
2599 .await
2600 .expect("timed out waiting for command result")
2601 .expect("channel closed before result")
2602 .expect("command returned an error");
2603
2604 assert_eq!(result["ok"], true);
2605 }
2606}