1use crate::agent::{AgentConfig, AgentEvent, AgentLoop, AgentResult};
20use crate::commands::{CommandContext, CommandRegistry};
21use crate::config::CodeConfig;
22use crate::error::{read_or_recover, write_or_recover, Result};
23use crate::llm::{LlmClient, Message};
24use crate::prompts::SystemPromptSlots;
25use crate::queue::{
26 ExternalTask, ExternalTaskResult, LaneHandlerConfig, SessionLane, SessionQueueConfig,
27 SessionQueueStats,
28};
29use crate::session_lane_queue::SessionLaneQueue;
30use crate::tools::{ToolContext, ToolExecutor};
31use a3s_lane::{DeadLetter, MetricsSnapshot};
32use a3s_memory::{FileMemoryStore, MemoryStore};
33use anyhow::Context;
34use std::path::{Path, PathBuf};
35use std::sync::{Arc, RwLock};
36use tokio::sync::{broadcast, mpsc};
37use tokio::task::JoinHandle;
38
39fn safe_canonicalize(path: &Path) -> PathBuf {
42 match std::fs::canonicalize(path) {
43 Ok(p) => strip_unc_prefix(p),
44 Err(_) => path.to_path_buf(),
45 }
46}
47
48fn strip_unc_prefix(path: PathBuf) -> PathBuf {
51 #[cfg(windows)]
52 {
53 let s = path.to_string_lossy();
54 if let Some(stripped) = s.strip_prefix(r"\\?\") {
55 return PathBuf::from(stripped);
56 }
57 }
58 path
59}
60
61#[derive(Debug, Clone)]
67pub struct ToolCallResult {
68 pub name: String,
69 pub output: String,
70 pub exit_code: i32,
71}
72
73#[derive(Clone, Default)]
79pub struct SessionOptions {
80 pub model: Option<String>,
82 pub agent_dirs: Vec<PathBuf>,
85 pub queue_config: Option<SessionQueueConfig>,
90 pub security_provider: Option<Arc<dyn crate::security::SecurityProvider>>,
92 pub context_providers: Vec<Arc<dyn crate::context::ContextProvider>>,
94 pub confirmation_manager: Option<Arc<dyn crate::hitl::ConfirmationProvider>>,
96 pub permission_checker: Option<Arc<dyn crate::permissions::PermissionChecker>>,
98 pub planning_enabled: bool,
100 pub goal_tracking: bool,
102 pub skill_dirs: Vec<PathBuf>,
105 pub skill_registry: Option<Arc<crate::skills::SkillRegistry>>,
107 pub memory_store: Option<Arc<dyn MemoryStore>>,
109 pub(crate) file_memory_dir: Option<PathBuf>,
111 pub session_store: Option<Arc<dyn crate::store::SessionStore>>,
113 pub session_id: Option<String>,
115 pub auto_save: bool,
117 pub max_parse_retries: Option<u32>,
120 pub tool_timeout_ms: Option<u64>,
123 pub circuit_breaker_threshold: Option<u32>,
127 pub sandbox_config: Option<crate::sandbox::SandboxConfig>,
133 pub auto_compact: bool,
135 pub auto_compact_threshold: Option<f32>,
138 pub continuation_enabled: Option<bool>,
141 pub max_continuation_turns: Option<u32>,
144 pub mcp_manager: Option<Arc<crate::mcp::manager::McpManager>>,
149 pub temperature: Option<f32>,
151 pub thinking_budget: Option<usize>,
153 pub max_tool_rounds: Option<usize>,
159 pub prompt_slots: Option<SystemPromptSlots>,
165}
166
167impl std::fmt::Debug for SessionOptions {
168 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169 f.debug_struct("SessionOptions")
170 .field("model", &self.model)
171 .field("agent_dirs", &self.agent_dirs)
172 .field("skill_dirs", &self.skill_dirs)
173 .field("queue_config", &self.queue_config)
174 .field("security_provider", &self.security_provider.is_some())
175 .field("context_providers", &self.context_providers.len())
176 .field("confirmation_manager", &self.confirmation_manager.is_some())
177 .field("permission_checker", &self.permission_checker.is_some())
178 .field("planning_enabled", &self.planning_enabled)
179 .field("goal_tracking", &self.goal_tracking)
180 .field(
181 "skill_registry",
182 &self
183 .skill_registry
184 .as_ref()
185 .map(|r| format!("{} skills", r.len())),
186 )
187 .field("memory_store", &self.memory_store.is_some())
188 .field("session_store", &self.session_store.is_some())
189 .field("session_id", &self.session_id)
190 .field("auto_save", &self.auto_save)
191 .field("max_parse_retries", &self.max_parse_retries)
192 .field("tool_timeout_ms", &self.tool_timeout_ms)
193 .field("circuit_breaker_threshold", &self.circuit_breaker_threshold)
194 .field("sandbox_config", &self.sandbox_config)
195 .field("auto_compact", &self.auto_compact)
196 .field("auto_compact_threshold", &self.auto_compact_threshold)
197 .field("continuation_enabled", &self.continuation_enabled)
198 .field("max_continuation_turns", &self.max_continuation_turns)
199 .field("mcp_manager", &self.mcp_manager.is_some())
200 .field("temperature", &self.temperature)
201 .field("thinking_budget", &self.thinking_budget)
202 .field("max_tool_rounds", &self.max_tool_rounds)
203 .field("prompt_slots", &self.prompt_slots.is_some())
204 .finish()
205 }
206}
207
208impl SessionOptions {
209 pub fn new() -> Self {
210 Self::default()
211 }
212
213 pub fn with_model(mut self, model: impl Into<String>) -> Self {
214 self.model = Some(model.into());
215 self
216 }
217
218 pub fn with_agent_dir(mut self, dir: impl Into<PathBuf>) -> Self {
219 self.agent_dirs.push(dir.into());
220 self
221 }
222
223 pub fn with_queue_config(mut self, config: SessionQueueConfig) -> Self {
224 self.queue_config = Some(config);
225 self
226 }
227
228 pub fn with_default_security(mut self) -> Self {
230 self.security_provider = Some(Arc::new(crate::security::DefaultSecurityProvider::new()));
231 self
232 }
233
234 pub fn with_security_provider(
236 mut self,
237 provider: Arc<dyn crate::security::SecurityProvider>,
238 ) -> Self {
239 self.security_provider = Some(provider);
240 self
241 }
242
243 pub fn with_fs_context(mut self, root_path: impl Into<PathBuf>) -> Self {
245 let config = crate::context::FileSystemContextConfig::new(root_path);
246 self.context_providers
247 .push(Arc::new(crate::context::FileSystemContextProvider::new(
248 config,
249 )));
250 self
251 }
252
253 pub fn with_context_provider(
255 mut self,
256 provider: Arc<dyn crate::context::ContextProvider>,
257 ) -> Self {
258 self.context_providers.push(provider);
259 self
260 }
261
262 pub fn with_confirmation_manager(
264 mut self,
265 manager: Arc<dyn crate::hitl::ConfirmationProvider>,
266 ) -> Self {
267 self.confirmation_manager = Some(manager);
268 self
269 }
270
271 pub fn with_permission_checker(
273 mut self,
274 checker: Arc<dyn crate::permissions::PermissionChecker>,
275 ) -> Self {
276 self.permission_checker = Some(checker);
277 self
278 }
279
280 pub fn with_permissive_policy(self) -> Self {
287 self.with_permission_checker(Arc::new(crate::permissions::PermissionPolicy::permissive()))
288 }
289
290 pub fn with_planning(mut self, enabled: bool) -> Self {
292 self.planning_enabled = enabled;
293 self
294 }
295
296 pub fn with_goal_tracking(mut self, enabled: bool) -> Self {
298 self.goal_tracking = enabled;
299 self
300 }
301
302 pub fn with_builtin_skills(mut self) -> Self {
304 self.skill_registry = Some(Arc::new(crate::skills::SkillRegistry::with_builtins()));
305 self
306 }
307
308 pub fn with_skill_registry(mut self, registry: Arc<crate::skills::SkillRegistry>) -> Self {
310 self.skill_registry = Some(registry);
311 self
312 }
313
314 pub fn with_skill_dirs(mut self, dirs: impl IntoIterator<Item = impl Into<PathBuf>>) -> Self {
317 self.skill_dirs.extend(dirs.into_iter().map(Into::into));
318 self
319 }
320
321 pub fn with_skills_from_dir(mut self, dir: impl AsRef<std::path::Path>) -> Self {
323 let registry = self
324 .skill_registry
325 .unwrap_or_else(|| Arc::new(crate::skills::SkillRegistry::new()));
326 if let Err(e) = registry.load_from_dir(&dir) {
327 tracing::warn!(
328 dir = %dir.as_ref().display(),
329 error = %e,
330 "Failed to load skills from directory — continuing without them"
331 );
332 }
333 self.skill_registry = Some(registry);
334 self
335 }
336
337 pub fn with_memory(mut self, store: Arc<dyn MemoryStore>) -> Self {
339 self.memory_store = Some(store);
340 self
341 }
342
343 pub fn with_file_memory(mut self, dir: impl Into<PathBuf>) -> Self {
349 self.file_memory_dir = Some(dir.into());
350 self
351 }
352
353 pub fn with_session_store(mut self, store: Arc<dyn crate::store::SessionStore>) -> Self {
355 self.session_store = Some(store);
356 self
357 }
358
359 pub fn with_file_session_store(mut self, dir: impl Into<PathBuf>) -> Self {
361 let dir = dir.into();
362 match tokio::runtime::Handle::try_current() {
363 Ok(handle) => {
364 match tokio::task::block_in_place(|| {
365 handle.block_on(crate::store::FileSessionStore::new(dir))
366 }) {
367 Ok(store) => {
368 self.session_store =
369 Some(Arc::new(store) as Arc<dyn crate::store::SessionStore>);
370 }
371 Err(e) => {
372 tracing::warn!("Failed to create file session store: {}", e);
373 }
374 }
375 }
376 Err(_) => {
377 tracing::warn!(
378 "No async runtime available for file session store — persistence disabled"
379 );
380 }
381 }
382 self
383 }
384
385 pub fn with_session_id(mut self, id: impl Into<String>) -> Self {
387 self.session_id = Some(id.into());
388 self
389 }
390
391 pub fn with_auto_save(mut self, enabled: bool) -> Self {
393 self.auto_save = enabled;
394 self
395 }
396
397 pub fn with_parse_retries(mut self, max: u32) -> Self {
403 self.max_parse_retries = Some(max);
404 self
405 }
406
407 pub fn with_tool_timeout(mut self, timeout_ms: u64) -> Self {
413 self.tool_timeout_ms = Some(timeout_ms);
414 self
415 }
416
417 pub fn with_circuit_breaker(mut self, threshold: u32) -> Self {
423 self.circuit_breaker_threshold = Some(threshold);
424 self
425 }
426
427 pub fn with_resilience_defaults(self) -> Self {
433 self.with_parse_retries(2)
434 .with_tool_timeout(120_000)
435 .with_circuit_breaker(3)
436 }
437
438 pub fn with_sandbox(mut self, config: crate::sandbox::SandboxConfig) -> Self {
457 self.sandbox_config = Some(config);
458 self
459 }
460
461 pub fn with_auto_compact(mut self, enabled: bool) -> Self {
466 self.auto_compact = enabled;
467 self
468 }
469
470 pub fn with_auto_compact_threshold(mut self, threshold: f32) -> Self {
472 self.auto_compact_threshold = Some(threshold.clamp(0.0, 1.0));
473 self
474 }
475
476 pub fn with_continuation(mut self, enabled: bool) -> Self {
481 self.continuation_enabled = Some(enabled);
482 self
483 }
484
485 pub fn with_max_continuation_turns(mut self, turns: u32) -> Self {
487 self.max_continuation_turns = Some(turns);
488 self
489 }
490
491 pub fn with_mcp(mut self, manager: Arc<crate::mcp::manager::McpManager>) -> Self {
496 self.mcp_manager = Some(manager);
497 self
498 }
499
500 pub fn with_temperature(mut self, temperature: f32) -> Self {
501 self.temperature = Some(temperature);
502 self
503 }
504
505 pub fn with_thinking_budget(mut self, budget: usize) -> Self {
506 self.thinking_budget = Some(budget);
507 self
508 }
509
510 pub fn with_max_tool_rounds(mut self, rounds: usize) -> Self {
515 self.max_tool_rounds = Some(rounds);
516 self
517 }
518
519 pub fn with_prompt_slots(mut self, slots: SystemPromptSlots) -> Self {
524 self.prompt_slots = Some(slots);
525 self
526 }
527}
528
529pub struct Agent {
538 llm_client: Arc<dyn LlmClient>,
539 code_config: CodeConfig,
540 config: AgentConfig,
541 global_mcp: Option<Arc<crate::mcp::manager::McpManager>>,
543 global_mcp_tools: std::sync::Mutex<Vec<(String, crate::mcp::McpTool)>>,
546}
547
548impl std::fmt::Debug for Agent {
549 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
550 f.debug_struct("Agent").finish()
551 }
552}
553
554impl Agent {
555 pub async fn new(config_source: impl Into<String>) -> Result<Self> {
559 let source = config_source.into();
560
561 let expanded = if let Some(rest) = source.strip_prefix("~/") {
563 let home = std::env::var_os("HOME").or_else(|| std::env::var_os("USERPROFILE"));
564 if let Some(home) = home {
565 PathBuf::from(home).join(rest).display().to_string()
566 } else {
567 source.clone()
568 }
569 } else {
570 source.clone()
571 };
572
573 let path = Path::new(&expanded);
574
575 let config = if path.extension().is_some() && path.exists() {
576 CodeConfig::from_file(path)
577 .with_context(|| format!("Failed to load config: {}", path.display()))?
578 } else {
579 CodeConfig::from_hcl(&source).context("Failed to parse config as HCL string")?
581 };
582
583 Self::from_config(config).await
584 }
585
586 pub async fn create(config_source: impl Into<String>) -> Result<Self> {
591 Self::new(config_source).await
592 }
593
594 pub async fn from_config(config: CodeConfig) -> Result<Self> {
596 let llm_config = config
597 .default_llm_config()
598 .context("default_model must be set in 'provider/model' format with a valid API key")?;
599 let llm_client = crate::llm::create_client_with_config(llm_config);
600
601 let agent_config = AgentConfig {
602 max_tool_rounds: config
603 .max_tool_rounds
604 .unwrap_or(AgentConfig::default().max_tool_rounds),
605 ..AgentConfig::default()
606 };
607
608 let (global_mcp, global_mcp_tools) = if config.mcp_servers.is_empty() {
610 (None, vec![])
611 } else {
612 let manager = Arc::new(crate::mcp::manager::McpManager::new());
613 for server in &config.mcp_servers {
614 if !server.enabled {
615 continue;
616 }
617 manager.register_server(server.clone()).await;
618 if let Err(e) = manager.connect(&server.name).await {
619 tracing::warn!(
620 server = %server.name,
621 error = %e,
622 "Failed to connect to MCP server — skipping"
623 );
624 }
625 }
626 let tools = manager.get_all_tools().await;
628 (Some(manager), tools)
629 };
630
631 let mut agent = Agent {
632 llm_client,
633 code_config: config,
634 config: agent_config,
635 global_mcp,
636 global_mcp_tools: std::sync::Mutex::new(global_mcp_tools),
637 };
638
639 let registry = Arc::new(crate::skills::SkillRegistry::with_builtins());
641 for dir in &agent.code_config.skill_dirs.clone() {
642 if let Err(e) = registry.load_from_dir(dir) {
643 tracing::warn!(
644 dir = %dir.display(),
645 error = %e,
646 "Failed to load skills from directory — skipping"
647 );
648 }
649 }
650 agent.config.skill_registry = Some(registry);
651
652 Ok(agent)
653 }
654
655 pub async fn refresh_mcp_tools(&self) -> Result<()> {
663 if let Some(ref mcp) = self.global_mcp {
664 let fresh = mcp.get_all_tools().await;
665 *self
666 .global_mcp_tools
667 .lock()
668 .expect("global_mcp_tools lock poisoned") = fresh;
669 }
670 Ok(())
671 }
672
673 pub fn session(
678 &self,
679 workspace: impl Into<String>,
680 options: Option<SessionOptions>,
681 ) -> Result<AgentSession> {
682 let opts = options.unwrap_or_default();
683
684 let llm_client = if let Some(ref model) = opts.model {
685 let (provider_name, model_id) = model
686 .split_once('/')
687 .context("model format must be 'provider/model' (e.g., 'openai/gpt-4o')")?;
688
689 let mut llm_config = self
690 .code_config
691 .llm_config(provider_name, model_id)
692 .with_context(|| {
693 format!("provider '{provider_name}' or model '{model_id}' not found in config")
694 })?;
695
696 if let Some(temp) = opts.temperature {
697 llm_config = llm_config.with_temperature(temp);
698 }
699 if let Some(budget) = opts.thinking_budget {
700 llm_config = llm_config.with_thinking_budget(budget);
701 }
702
703 crate::llm::create_client_with_config(llm_config)
704 } else {
705 if opts.temperature.is_some() || opts.thinking_budget.is_some() {
706 tracing::warn!(
707 "temperature/thinking_budget set without model override — these will be ignored. \
708 Use with_model() to apply LLM parameter overrides."
709 );
710 }
711 self.llm_client.clone()
712 };
713
714 let merged_opts = match (&self.global_mcp, &opts.mcp_manager) {
717 (Some(global), Some(session)) => {
718 let global = Arc::clone(global);
719 let session_mgr = Arc::clone(session);
720 match tokio::runtime::Handle::try_current() {
721 Ok(handle) => {
722 let global_for_merge = Arc::clone(&global);
723 tokio::task::block_in_place(|| {
724 handle.block_on(async move {
725 for config in session_mgr.all_configs().await {
726 let name = config.name.clone();
727 global_for_merge.register_server(config).await;
728 if let Err(e) = global_for_merge.connect(&name).await {
729 tracing::warn!(
730 server = %name,
731 error = %e,
732 "Failed to connect session-level MCP server — skipping"
733 );
734 }
735 }
736 })
737 });
738 }
739 Err(_) => {
740 tracing::warn!(
741 "No async runtime available to merge session-level MCP servers \
742 into global manager — session MCP servers will not be available"
743 );
744 }
745 }
746 SessionOptions {
747 mcp_manager: Some(Arc::clone(&global)),
748 ..opts
749 }
750 }
751 (Some(global), None) => SessionOptions {
752 mcp_manager: Some(Arc::clone(global)),
753 ..opts
754 },
755 _ => opts,
756 };
757
758 self.build_session(workspace.into(), llm_client, &merged_opts)
759 }
760
761 pub fn session_for_agent(
776 &self,
777 workspace: impl Into<String>,
778 def: &crate::subagent::AgentDefinition,
779 extra: Option<SessionOptions>,
780 ) -> Result<AgentSession> {
781 let mut opts = extra.unwrap_or_default();
782
783 if opts.permission_checker.is_none()
785 && (!def.permissions.allow.is_empty() || !def.permissions.deny.is_empty())
786 {
787 opts.permission_checker = Some(Arc::new(def.permissions.clone()));
788 }
789
790 if opts.max_tool_rounds.is_none() {
792 if let Some(steps) = def.max_steps {
793 opts.max_tool_rounds = Some(steps);
794 }
795 }
796
797 if opts.model.is_none() {
799 if let Some(ref m) = def.model {
800 let provider = m.provider.as_deref().unwrap_or("anthropic");
801 opts.model = Some(format!("{}/{}", provider, m.model));
802 }
803 }
804
805 if let Some(ref prompt) = def.prompt {
812 let slots = opts
813 .prompt_slots
814 .get_or_insert_with(crate::prompts::SystemPromptSlots::default);
815 if slots.extra.is_none() {
816 slots.extra = Some(prompt.clone());
817 }
818 }
819
820 self.session(workspace, Some(opts))
821 }
822
823 pub fn resume_session(
831 &self,
832 session_id: &str,
833 options: SessionOptions,
834 ) -> Result<AgentSession> {
835 let store = options.session_store.as_ref().ok_or_else(|| {
836 crate::error::CodeError::Session(
837 "resume_session requires a session_store in SessionOptions".to_string(),
838 )
839 })?;
840
841 let data = match tokio::runtime::Handle::try_current() {
843 Ok(handle) => tokio::task::block_in_place(|| handle.block_on(store.load(session_id)))
844 .map_err(|e| {
845 crate::error::CodeError::Session(format!(
846 "Failed to load session {}: {}",
847 session_id, e
848 ))
849 })?,
850 Err(_) => {
851 return Err(crate::error::CodeError::Session(
852 "No async runtime available for session resume".to_string(),
853 ))
854 }
855 };
856
857 let data = data.ok_or_else(|| {
858 crate::error::CodeError::Session(format!("Session not found: {}", session_id))
859 })?;
860
861 let mut opts = options;
863 opts.session_id = Some(data.id.clone());
864
865 let llm_client = if let Some(ref model) = opts.model {
866 let (provider_name, model_id) = model
867 .split_once('/')
868 .context("model format must be 'provider/model'")?;
869 let llm_config = self
870 .code_config
871 .llm_config(provider_name, model_id)
872 .with_context(|| {
873 format!("provider '{provider_name}' or model '{model_id}' not found")
874 })?;
875 crate::llm::create_client_with_config(llm_config)
876 } else {
877 self.llm_client.clone()
878 };
879
880 let session = self.build_session(data.config.workspace.clone(), llm_client, &opts)?;
881
882 *write_or_recover(&session.history) = data.messages;
884
885 Ok(session)
886 }
887
888 fn build_session(
889 &self,
890 workspace: String,
891 llm_client: Arc<dyn LlmClient>,
892 opts: &SessionOptions,
893 ) -> Result<AgentSession> {
894 let canonical = safe_canonicalize(Path::new(&workspace));
895
896 let tool_executor = Arc::new(ToolExecutor::new(canonical.display().to_string()));
897
898 {
902 use crate::subagent::{load_agents_from_dir, AgentRegistry};
903 use crate::tools::register_task_with_mcp;
904 let agent_registry = AgentRegistry::new();
905 for dir in self
906 .code_config
907 .agent_dirs
908 .iter()
909 .chain(opts.agent_dirs.iter())
910 {
911 for agent in load_agents_from_dir(dir) {
912 agent_registry.register(agent);
913 }
914 }
915 register_task_with_mcp(
916 tool_executor.registry(),
917 Arc::clone(&llm_client),
918 Arc::new(agent_registry),
919 canonical.display().to_string(),
920 opts.mcp_manager.clone(),
921 );
922 }
923
924 if let Some(ref mcp) = opts.mcp_manager {
927 let all_tools: Vec<(String, crate::mcp::McpTool)> = if std::ptr::eq(
930 Arc::as_ptr(mcp),
931 self.global_mcp
932 .as_ref()
933 .map(Arc::as_ptr)
934 .unwrap_or(std::ptr::null()),
935 ) {
936 self.global_mcp_tools
938 .lock()
939 .expect("global_mcp_tools lock poisoned")
940 .clone()
941 } else {
942 match tokio::runtime::Handle::try_current() {
944 Ok(handle) => {
945 tokio::task::block_in_place(|| handle.block_on(mcp.get_all_tools()))
946 }
947 Err(_) => {
948 tracing::warn!(
949 "No async runtime available for session-level MCP tools — \
950 MCP tools will not be registered"
951 );
952 vec![]
953 }
954 }
955 };
956
957 let mut by_server: std::collections::HashMap<String, Vec<crate::mcp::McpTool>> =
958 std::collections::HashMap::new();
959 for (server, tool) in all_tools {
960 by_server.entry(server).or_default().push(tool);
961 }
962 for (server_name, tools) in by_server {
963 for tool in
964 crate::mcp::tools::create_mcp_tools(&server_name, tools, Arc::clone(mcp))
965 {
966 tool_executor.register_dynamic_tool(tool);
967 }
968 }
969 }
970
971 let tool_defs = tool_executor.definitions();
972
973 let mut prompt_slots = opts
975 .prompt_slots
976 .clone()
977 .unwrap_or_else(|| self.config.prompt_slots.clone());
978
979 let base_registry = self
983 .config
984 .skill_registry
985 .as_deref()
986 .map(|r| r.fork())
987 .unwrap_or_else(crate::skills::SkillRegistry::with_builtins);
988 if let Some(ref r) = opts.skill_registry {
990 for skill in r.all() {
991 base_registry.register_unchecked(skill);
992 }
993 }
994 for dir in &opts.skill_dirs {
996 if let Err(e) = base_registry.load_from_dir(dir) {
997 tracing::warn!(
998 dir = %dir.display(),
999 error = %e,
1000 "Failed to load session skill dir — skipping"
1001 );
1002 }
1003 }
1004 let effective_registry = Arc::new(base_registry);
1005
1006 let skill_prompt = effective_registry.to_system_prompt();
1008 if !skill_prompt.is_empty() {
1009 prompt_slots.extra = match prompt_slots.extra {
1010 Some(existing) => Some(format!("{}\n\n{}", existing, skill_prompt)),
1011 None => Some(skill_prompt),
1012 };
1013 }
1014
1015 let mut init_warning: Option<String> = None;
1017 let memory = {
1018 let store = if let Some(ref store) = opts.memory_store {
1019 Some(Arc::clone(store))
1020 } else if let Some(ref dir) = opts.file_memory_dir {
1021 match tokio::runtime::Handle::try_current() {
1022 Ok(handle) => {
1023 let dir = dir.clone();
1024 match tokio::task::block_in_place(|| {
1025 handle.block_on(FileMemoryStore::new(dir))
1026 }) {
1027 Ok(store) => Some(Arc::new(store) as Arc<dyn MemoryStore>),
1028 Err(e) => {
1029 let msg = format!("Failed to create file memory store: {}", e);
1030 tracing::warn!("{}", msg);
1031 init_warning = Some(msg);
1032 None
1033 }
1034 }
1035 }
1036 Err(_) => {
1037 let msg =
1038 "No async runtime available for file memory store — memory disabled"
1039 .to_string();
1040 tracing::warn!("{}", msg);
1041 init_warning = Some(msg);
1042 None
1043 }
1044 }
1045 } else {
1046 None
1047 };
1048 store.map(|s| Arc::new(crate::memory::AgentMemory::new(s)))
1049 };
1050
1051 let base = self.config.clone();
1052 let config = AgentConfig {
1053 prompt_slots,
1054 tools: tool_defs,
1055 security_provider: opts.security_provider.clone(),
1056 permission_checker: opts.permission_checker.clone(),
1057 confirmation_manager: opts.confirmation_manager.clone(),
1058 context_providers: opts.context_providers.clone(),
1059 planning_enabled: opts.planning_enabled,
1060 goal_tracking: opts.goal_tracking,
1061 skill_registry: Some(effective_registry),
1062 max_parse_retries: opts.max_parse_retries.unwrap_or(base.max_parse_retries),
1063 tool_timeout_ms: opts.tool_timeout_ms.or(base.tool_timeout_ms),
1064 circuit_breaker_threshold: opts
1065 .circuit_breaker_threshold
1066 .unwrap_or(base.circuit_breaker_threshold),
1067 auto_compact: opts.auto_compact,
1068 auto_compact_threshold: opts
1069 .auto_compact_threshold
1070 .unwrap_or(crate::session::DEFAULT_AUTO_COMPACT_THRESHOLD),
1071 max_context_tokens: base.max_context_tokens,
1072 llm_client: Some(Arc::clone(&llm_client)),
1073 memory: memory.clone(),
1074 continuation_enabled: opts
1075 .continuation_enabled
1076 .unwrap_or(base.continuation_enabled),
1077 max_continuation_turns: opts
1078 .max_continuation_turns
1079 .unwrap_or(base.max_continuation_turns),
1080 max_tool_rounds: opts.max_tool_rounds.unwrap_or(base.max_tool_rounds),
1081 ..base
1082 };
1083
1084 let (agent_event_tx, _) = broadcast::channel::<crate::agent::AgentEvent>(256);
1087 let command_queue = if let Some(ref queue_config) = opts.queue_config {
1088 let session_id = uuid::Uuid::new_v4().to_string();
1089 let rt = tokio::runtime::Handle::try_current();
1090
1091 match rt {
1092 Ok(handle) => {
1093 let queue = tokio::task::block_in_place(|| {
1095 handle.block_on(SessionLaneQueue::new(
1096 &session_id,
1097 queue_config.clone(),
1098 agent_event_tx.clone(),
1099 ))
1100 });
1101 match queue {
1102 Ok(q) => {
1103 let q = Arc::new(q);
1105 let q2 = Arc::clone(&q);
1106 tokio::task::block_in_place(|| {
1107 handle.block_on(async { q2.start().await.ok() })
1108 });
1109 Some(q)
1110 }
1111 Err(e) => {
1112 tracing::warn!("Failed to create session lane queue: {}", e);
1113 None
1114 }
1115 }
1116 }
1117 Err(_) => {
1118 tracing::warn!(
1119 "No async runtime available for queue creation — queue disabled"
1120 );
1121 None
1122 }
1123 }
1124 } else {
1125 None
1126 };
1127
1128 let mut tool_context = ToolContext::new(canonical.clone());
1130 if let Some(ref search_config) = self.code_config.search {
1131 tool_context = tool_context.with_search_config(search_config.clone());
1132 }
1133 tool_context = tool_context.with_agent_event_tx(agent_event_tx);
1134
1135 #[cfg(feature = "sandbox")]
1137 if let Some(ref sandbox_cfg) = opts.sandbox_config {
1138 let handle: Arc<dyn crate::sandbox::BashSandbox> =
1139 Arc::new(crate::sandbox::BoxSandboxHandle::new(
1140 sandbox_cfg.clone(),
1141 canonical.display().to_string(),
1142 ));
1143 tool_executor.registry().set_sandbox(Arc::clone(&handle));
1146 tool_context = tool_context.with_sandbox(handle);
1147 }
1148 #[cfg(not(feature = "sandbox"))]
1149 if opts.sandbox_config.is_some() {
1150 tracing::warn!(
1151 "sandbox_config is set but the `sandbox` Cargo feature is not enabled \
1152 — bash commands will run locally"
1153 );
1154 }
1155
1156 if let Some(ref hook) = config.hook_engine {
1161 tool_context = tool_context.with_sentinel_hook(hook.clone());
1162 }
1163
1164 let session_id = opts
1165 .session_id
1166 .clone()
1167 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
1168
1169 let session_store = if opts.session_store.is_some() {
1171 opts.session_store.clone()
1172 } else if let Some(ref dir) = self.code_config.sessions_dir {
1173 match tokio::runtime::Handle::try_current() {
1174 Ok(handle) => {
1175 let dir = dir.clone();
1176 match tokio::task::block_in_place(|| {
1177 handle.block_on(crate::store::FileSessionStore::new(dir))
1178 }) {
1179 Ok(store) => Some(Arc::new(store) as Arc<dyn crate::store::SessionStore>),
1180 Err(e) => {
1181 tracing::warn!(
1182 "Failed to create session store from sessions_dir: {}",
1183 e
1184 );
1185 None
1186 }
1187 }
1188 }
1189 Err(_) => {
1190 tracing::warn!(
1191 "No async runtime for sessions_dir store — persistence disabled"
1192 );
1193 None
1194 }
1195 }
1196 } else {
1197 None
1198 };
1199
1200 Ok(AgentSession {
1201 llm_client,
1202 tool_executor,
1203 tool_context,
1204 memory: config.memory.clone(),
1205 config,
1206 workspace: canonical,
1207 session_id,
1208 history: RwLock::new(Vec::new()),
1209 command_queue,
1210 session_store,
1211 auto_save: opts.auto_save,
1212 hook_engine: Arc::new(crate::hooks::HookEngine::new()),
1213 init_warning,
1214 command_registry: CommandRegistry::new(),
1215 model_name: opts
1216 .model
1217 .clone()
1218 .or_else(|| self.code_config.default_model.clone())
1219 .unwrap_or_else(|| "unknown".to_string()),
1220 mcp_manager: opts
1221 .mcp_manager
1222 .clone()
1223 .or_else(|| self.global_mcp.clone())
1224 .unwrap_or_else(|| Arc::new(crate::mcp::manager::McpManager::new())),
1225 })
1226 }
1227}
1228
1229pub struct AgentSession {
1238 llm_client: Arc<dyn LlmClient>,
1239 tool_executor: Arc<ToolExecutor>,
1240 tool_context: ToolContext,
1241 config: AgentConfig,
1242 workspace: PathBuf,
1243 session_id: String,
1245 history: RwLock<Vec<Message>>,
1247 command_queue: Option<Arc<SessionLaneQueue>>,
1249 memory: Option<Arc<crate::memory::AgentMemory>>,
1251 session_store: Option<Arc<dyn crate::store::SessionStore>>,
1253 auto_save: bool,
1255 hook_engine: Arc<crate::hooks::HookEngine>,
1257 init_warning: Option<String>,
1259 command_registry: CommandRegistry,
1261 model_name: String,
1263 mcp_manager: Arc<crate::mcp::manager::McpManager>,
1265}
1266
1267impl std::fmt::Debug for AgentSession {
1268 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1269 f.debug_struct("AgentSession")
1270 .field("session_id", &self.session_id)
1271 .field("workspace", &self.workspace.display().to_string())
1272 .field("auto_save", &self.auto_save)
1273 .finish()
1274 }
1275}
1276
1277impl AgentSession {
1278 fn build_agent_loop(&self) -> AgentLoop {
1282 let mut config = self.config.clone();
1283 config.hook_engine =
1284 Some(Arc::clone(&self.hook_engine) as Arc<dyn crate::hooks::HookExecutor>);
1285 config.tools = self.tool_executor.definitions();
1289 let mut agent_loop = AgentLoop::new(
1290 self.llm_client.clone(),
1291 self.tool_executor.clone(),
1292 self.tool_context.clone(),
1293 config,
1294 );
1295 if let Some(ref queue) = self.command_queue {
1296 agent_loop = agent_loop.with_queue(Arc::clone(queue));
1297 }
1298 agent_loop
1299 }
1300
1301 fn build_command_context(&self) -> CommandContext {
1303 let history = read_or_recover(&self.history);
1304
1305 let tool_names: Vec<String> = self.config.tools.iter().map(|t| t.name.clone()).collect();
1307
1308 let mut mcp_map: std::collections::HashMap<String, usize> =
1310 std::collections::HashMap::new();
1311 for name in &tool_names {
1312 if let Some(rest) = name.strip_prefix("mcp__") {
1313 if let Some((server, _)) = rest.split_once("__") {
1314 *mcp_map.entry(server.to_string()).or_default() += 1;
1315 }
1316 }
1317 }
1318 let mut mcp_servers: Vec<(String, usize)> = mcp_map.into_iter().collect();
1319 mcp_servers.sort_by(|a, b| a.0.cmp(&b.0));
1320
1321 CommandContext {
1322 session_id: self.session_id.clone(),
1323 workspace: self.workspace.display().to_string(),
1324 model: self.model_name.clone(),
1325 history_len: history.len(),
1326 total_tokens: 0,
1327 total_cost: 0.0,
1328 tool_names,
1329 mcp_servers,
1330 }
1331 }
1332
1333 pub fn command_registry(&self) -> &CommandRegistry {
1335 &self.command_registry
1336 }
1337
1338 pub fn register_command(&mut self, cmd: Arc<dyn crate::commands::SlashCommand>) {
1340 self.command_registry.register(cmd);
1341 }
1342
1343 pub async fn send(&self, prompt: &str, history: Option<&[Message]>) -> Result<AgentResult> {
1352 if CommandRegistry::is_command(prompt) {
1354 let ctx = self.build_command_context();
1355 if let Some(output) = self.command_registry.dispatch(prompt, &ctx) {
1356 return Ok(AgentResult {
1357 text: output.text,
1358 messages: history
1359 .map(|h| h.to_vec())
1360 .unwrap_or_else(|| read_or_recover(&self.history).clone()),
1361 tool_calls_count: 0,
1362 usage: crate::llm::TokenUsage::default(),
1363 });
1364 }
1365 }
1366
1367 if let Some(ref w) = self.init_warning {
1368 tracing::warn!(session_id = %self.session_id, "Session init warning: {}", w);
1369 }
1370 let agent_loop = self.build_agent_loop();
1371
1372 let use_internal = history.is_none();
1373 let effective_history = match history {
1374 Some(h) => h.to_vec(),
1375 None => read_or_recover(&self.history).clone(),
1376 };
1377
1378 let result = agent_loop.execute(&effective_history, prompt, None).await?;
1379
1380 if use_internal {
1383 *write_or_recover(&self.history) = result.messages.clone();
1384
1385 if self.auto_save {
1387 if let Err(e) = self.save().await {
1388 tracing::warn!("Auto-save failed for session {}: {}", self.session_id, e);
1389 }
1390 }
1391 }
1392
1393 Ok(result)
1394 }
1395
1396 pub async fn send_with_attachments(
1401 &self,
1402 prompt: &str,
1403 attachments: &[crate::llm::Attachment],
1404 history: Option<&[Message]>,
1405 ) -> Result<AgentResult> {
1406 let use_internal = history.is_none();
1410 let mut effective_history = match history {
1411 Some(h) => h.to_vec(),
1412 None => read_or_recover(&self.history).clone(),
1413 };
1414 effective_history.push(Message::user_with_attachments(prompt, attachments));
1415
1416 let agent_loop = self.build_agent_loop();
1417 let result = agent_loop
1418 .execute_from_messages(effective_history, None, None)
1419 .await?;
1420
1421 if use_internal {
1422 *write_or_recover(&self.history) = result.messages.clone();
1423 if self.auto_save {
1424 if let Err(e) = self.save().await {
1425 tracing::warn!("Auto-save failed for session {}: {}", self.session_id, e);
1426 }
1427 }
1428 }
1429
1430 Ok(result)
1431 }
1432
1433 pub async fn stream_with_attachments(
1438 &self,
1439 prompt: &str,
1440 attachments: &[crate::llm::Attachment],
1441 history: Option<&[Message]>,
1442 ) -> Result<(mpsc::Receiver<AgentEvent>, JoinHandle<()>)> {
1443 let (tx, rx) = mpsc::channel(256);
1444 let mut effective_history = match history {
1445 Some(h) => h.to_vec(),
1446 None => read_or_recover(&self.history).clone(),
1447 };
1448 effective_history.push(Message::user_with_attachments(prompt, attachments));
1449
1450 let agent_loop = self.build_agent_loop();
1451 let handle = tokio::spawn(async move {
1452 let _ = agent_loop
1453 .execute_from_messages(effective_history, None, Some(tx))
1454 .await;
1455 });
1456
1457 Ok((rx, handle))
1458 }
1459
1460 pub async fn stream(
1470 &self,
1471 prompt: &str,
1472 history: Option<&[Message]>,
1473 ) -> Result<(mpsc::Receiver<AgentEvent>, JoinHandle<()>)> {
1474 if CommandRegistry::is_command(prompt) {
1476 let ctx = self.build_command_context();
1477 if let Some(output) = self.command_registry.dispatch(prompt, &ctx) {
1478 let (tx, rx) = mpsc::channel(256);
1479 let handle = tokio::spawn(async move {
1480 let _ = tx
1481 .send(AgentEvent::TextDelta {
1482 text: output.text.clone(),
1483 })
1484 .await;
1485 let _ = tx
1486 .send(AgentEvent::End {
1487 text: output.text.clone(),
1488 usage: crate::llm::TokenUsage::default(),
1489 })
1490 .await;
1491 });
1492 return Ok((rx, handle));
1493 }
1494 }
1495
1496 let (tx, rx) = mpsc::channel(256);
1497 let agent_loop = self.build_agent_loop();
1498 let effective_history = match history {
1499 Some(h) => h.to_vec(),
1500 None => read_or_recover(&self.history).clone(),
1501 };
1502 let prompt = prompt.to_string();
1503
1504 let handle = tokio::spawn(async move {
1505 let _ = agent_loop
1506 .execute(&effective_history, &prompt, Some(tx))
1507 .await;
1508 });
1509
1510 Ok((rx, handle))
1511 }
1512
1513 pub fn history(&self) -> Vec<Message> {
1515 read_or_recover(&self.history).clone()
1516 }
1517
1518 pub fn memory(&self) -> Option<&Arc<crate::memory::AgentMemory>> {
1520 self.memory.as_ref()
1521 }
1522
1523 pub fn id(&self) -> &str {
1525 &self.session_id
1526 }
1527
1528 pub fn workspace(&self) -> &std::path::Path {
1530 &self.workspace
1531 }
1532
1533 pub fn init_warning(&self) -> Option<&str> {
1535 self.init_warning.as_deref()
1536 }
1537
1538 pub fn session_id(&self) -> &str {
1540 &self.session_id
1541 }
1542
1543 pub fn tool_definitions(&self) -> Vec<crate::llm::ToolDefinition> {
1549 self.tool_executor.definitions()
1550 }
1551
1552 pub fn tool_names(&self) -> Vec<String> {
1558 self.tool_executor
1559 .definitions()
1560 .into_iter()
1561 .map(|t| t.name)
1562 .collect()
1563 }
1564
1565 pub fn register_hook(&self, hook: crate::hooks::Hook) {
1571 self.hook_engine.register(hook);
1572 }
1573
1574 pub fn unregister_hook(&self, hook_id: &str) -> Option<crate::hooks::Hook> {
1576 self.hook_engine.unregister(hook_id)
1577 }
1578
1579 pub fn register_hook_handler(
1581 &self,
1582 hook_id: &str,
1583 handler: Arc<dyn crate::hooks::HookHandler>,
1584 ) {
1585 self.hook_engine.register_handler(hook_id, handler);
1586 }
1587
1588 pub fn unregister_hook_handler(&self, hook_id: &str) {
1590 self.hook_engine.unregister_handler(hook_id);
1591 }
1592
1593 pub fn hook_count(&self) -> usize {
1595 self.hook_engine.hook_count()
1596 }
1597
1598 pub async fn save(&self) -> Result<()> {
1602 let store = match &self.session_store {
1603 Some(s) => s,
1604 None => return Ok(()),
1605 };
1606
1607 let history = read_or_recover(&self.history).clone();
1608 let now = chrono::Utc::now().timestamp();
1609
1610 let data = crate::store::SessionData {
1611 id: self.session_id.clone(),
1612 config: crate::session::SessionConfig {
1613 name: String::new(),
1614 workspace: self.workspace.display().to_string(),
1615 system_prompt: Some(self.config.prompt_slots.build()),
1616 max_context_length: 200_000,
1617 auto_compact: false,
1618 auto_compact_threshold: crate::session::DEFAULT_AUTO_COMPACT_THRESHOLD,
1619 storage_type: crate::config::StorageBackend::File,
1620 queue_config: None,
1621 confirmation_policy: None,
1622 permission_policy: None,
1623 parent_id: None,
1624 security_config: None,
1625 hook_engine: None,
1626 planning_enabled: self.config.planning_enabled,
1627 goal_tracking: self.config.goal_tracking,
1628 },
1629 state: crate::session::SessionState::Active,
1630 messages: history,
1631 context_usage: crate::session::ContextUsage::default(),
1632 total_usage: crate::llm::TokenUsage::default(),
1633 total_cost: 0.0,
1634 model_name: None,
1635 cost_records: Vec::new(),
1636 tool_names: crate::store::SessionData::tool_names_from_definitions(&self.config.tools),
1637 thinking_enabled: false,
1638 thinking_budget: None,
1639 created_at: now,
1640 updated_at: now,
1641 llm_config: None,
1642 tasks: Vec::new(),
1643 parent_id: None,
1644 };
1645
1646 store.save(&data).await?;
1647 tracing::debug!("Session {} saved", self.session_id);
1648 Ok(())
1649 }
1650
1651 pub async fn read_file(&self, path: &str) -> Result<String> {
1653 let args = serde_json::json!({ "file_path": path });
1654 let result = self.tool_executor.execute("read", &args).await?;
1655 Ok(result.output)
1656 }
1657
1658 pub async fn bash(&self, command: &str) -> Result<String> {
1663 let args = serde_json::json!({ "command": command });
1664 let result = self
1665 .tool_executor
1666 .execute_with_context("bash", &args, &self.tool_context)
1667 .await?;
1668 Ok(result.output)
1669 }
1670
1671 pub async fn glob(&self, pattern: &str) -> Result<Vec<String>> {
1673 let args = serde_json::json!({ "pattern": pattern });
1674 let result = self.tool_executor.execute("glob", &args).await?;
1675 let files: Vec<String> = result
1676 .output
1677 .lines()
1678 .filter(|l| !l.is_empty())
1679 .map(|l| l.to_string())
1680 .collect();
1681 Ok(files)
1682 }
1683
1684 pub async fn grep(&self, pattern: &str) -> Result<String> {
1686 let args = serde_json::json!({ "pattern": pattern });
1687 let result = self.tool_executor.execute("grep", &args).await?;
1688 Ok(result.output)
1689 }
1690
1691 pub async fn tool(&self, name: &str, args: serde_json::Value) -> Result<ToolCallResult> {
1693 let result = self.tool_executor.execute(name, &args).await?;
1694 Ok(ToolCallResult {
1695 name: name.to_string(),
1696 output: result.output,
1697 exit_code: result.exit_code,
1698 })
1699 }
1700
1701 pub fn has_queue(&self) -> bool {
1707 self.command_queue.is_some()
1708 }
1709
1710 pub async fn set_lane_handler(&self, lane: SessionLane, config: LaneHandlerConfig) {
1714 if let Some(ref queue) = self.command_queue {
1715 queue.set_lane_handler(lane, config).await;
1716 }
1717 }
1718
1719 pub async fn complete_external_task(&self, task_id: &str, result: ExternalTaskResult) -> bool {
1723 if let Some(ref queue) = self.command_queue {
1724 queue.complete_external_task(task_id, result).await
1725 } else {
1726 false
1727 }
1728 }
1729
1730 pub async fn pending_external_tasks(&self) -> Vec<ExternalTask> {
1732 if let Some(ref queue) = self.command_queue {
1733 queue.pending_external_tasks().await
1734 } else {
1735 Vec::new()
1736 }
1737 }
1738
1739 pub async fn queue_stats(&self) -> SessionQueueStats {
1741 if let Some(ref queue) = self.command_queue {
1742 queue.stats().await
1743 } else {
1744 SessionQueueStats::default()
1745 }
1746 }
1747
1748 pub async fn queue_metrics(&self) -> Option<MetricsSnapshot> {
1750 if let Some(ref queue) = self.command_queue {
1751 queue.metrics_snapshot().await
1752 } else {
1753 None
1754 }
1755 }
1756
1757 pub async fn submit(
1763 &self,
1764 lane: SessionLane,
1765 command: Box<dyn crate::queue::SessionCommand>,
1766 ) -> anyhow::Result<tokio::sync::oneshot::Receiver<anyhow::Result<serde_json::Value>>> {
1767 let queue = self
1768 .command_queue
1769 .as_ref()
1770 .ok_or_else(|| anyhow::anyhow!("No queue configured on this session"))?;
1771 Ok(queue.submit(lane, command).await)
1772 }
1773
1774 pub async fn submit_batch(
1781 &self,
1782 lane: SessionLane,
1783 commands: Vec<Box<dyn crate::queue::SessionCommand>>,
1784 ) -> anyhow::Result<Vec<tokio::sync::oneshot::Receiver<anyhow::Result<serde_json::Value>>>>
1785 {
1786 let queue = self
1787 .command_queue
1788 .as_ref()
1789 .ok_or_else(|| anyhow::anyhow!("No queue configured on this session"))?;
1790 Ok(queue.submit_batch(lane, commands).await)
1791 }
1792
1793 pub async fn dead_letters(&self) -> Vec<DeadLetter> {
1795 if let Some(ref queue) = self.command_queue {
1796 queue.dead_letters().await
1797 } else {
1798 Vec::new()
1799 }
1800 }
1801
1802 pub async fn add_mcp_server(
1813 &self,
1814 config: crate::mcp::McpServerConfig,
1815 ) -> crate::error::Result<usize> {
1816 let server_name = config.name.clone();
1817 self.mcp_manager.register_server(config).await;
1818 self.mcp_manager.connect(&server_name).await.map_err(|e| {
1819 crate::error::CodeError::Tool {
1820 tool: server_name.clone(),
1821 message: format!("Failed to connect MCP server: {}", e),
1822 }
1823 })?;
1824
1825 let tools = self.mcp_manager.get_server_tools(&server_name).await;
1826 let count = tools.len();
1827
1828 for tool in
1829 crate::mcp::tools::create_mcp_tools(&server_name, tools, Arc::clone(&self.mcp_manager))
1830 {
1831 self.tool_executor.register_dynamic_tool(tool);
1832 }
1833
1834 tracing::info!(
1835 session_id = %self.session_id,
1836 server = server_name,
1837 tools = count,
1838 "MCP server added to live session"
1839 );
1840
1841 Ok(count)
1842 }
1843
1844 pub async fn remove_mcp_server(&self, server_name: &str) -> crate::error::Result<()> {
1849 self.tool_executor
1850 .unregister_tools_by_prefix(&format!("mcp__{server_name}__"));
1851 self.mcp_manager
1852 .disconnect(server_name)
1853 .await
1854 .map_err(|e| crate::error::CodeError::Tool {
1855 tool: server_name.to_string(),
1856 message: format!("Failed to disconnect MCP server: {}", e),
1857 })?;
1858 tracing::info!(
1859 session_id = %self.session_id,
1860 server = server_name,
1861 "MCP server removed from live session"
1862 );
1863 Ok(())
1864 }
1865
1866 pub async fn mcp_status(
1868 &self,
1869 ) -> std::collections::HashMap<String, crate::mcp::McpServerStatus> {
1870 self.mcp_manager.get_status().await
1871 }
1872}
1873
1874#[cfg(test)]
1879mod tests {
1880 use super::*;
1881 use crate::config::{ModelConfig, ModelModalities, ProviderConfig};
1882 use crate::store::SessionStore;
1883
1884 #[tokio::test]
1885 async fn test_session_submit_no_queue_returns_err() {
1886 let agent = Agent::from_config(test_config()).await.unwrap();
1887 let session = agent.session(".", None).unwrap();
1888 struct Noop;
1889 #[async_trait::async_trait]
1890 impl crate::queue::SessionCommand for Noop {
1891 async fn execute(&self) -> anyhow::Result<serde_json::Value> {
1892 Ok(serde_json::json!(null))
1893 }
1894 fn command_type(&self) -> &str {
1895 "noop"
1896 }
1897 }
1898 let result: anyhow::Result<
1899 tokio::sync::oneshot::Receiver<anyhow::Result<serde_json::Value>>,
1900 > = session.submit(SessionLane::Query, Box::new(Noop)).await;
1901 assert!(result.is_err());
1902 assert!(result.unwrap_err().to_string().contains("No queue"));
1903 }
1904
1905 #[tokio::test]
1906 async fn test_session_submit_batch_no_queue_returns_err() {
1907 let agent = Agent::from_config(test_config()).await.unwrap();
1908 let session = agent.session(".", None).unwrap();
1909 struct Noop;
1910 #[async_trait::async_trait]
1911 impl crate::queue::SessionCommand for Noop {
1912 async fn execute(&self) -> anyhow::Result<serde_json::Value> {
1913 Ok(serde_json::json!(null))
1914 }
1915 fn command_type(&self) -> &str {
1916 "noop"
1917 }
1918 }
1919 let cmds: Vec<Box<dyn crate::queue::SessionCommand>> = vec![Box::new(Noop)];
1920 let result: anyhow::Result<
1921 Vec<tokio::sync::oneshot::Receiver<anyhow::Result<serde_json::Value>>>,
1922 > = session.submit_batch(SessionLane::Query, cmds).await;
1923 assert!(result.is_err());
1924 assert!(result.unwrap_err().to_string().contains("No queue"));
1925 }
1926
1927 fn test_config() -> CodeConfig {
1928 CodeConfig {
1929 default_model: Some("anthropic/claude-sonnet-4-20250514".to_string()),
1930 providers: vec![
1931 ProviderConfig {
1932 name: "anthropic".to_string(),
1933 api_key: Some("test-key".to_string()),
1934 base_url: None,
1935 models: vec![ModelConfig {
1936 id: "claude-sonnet-4-20250514".to_string(),
1937 name: "Claude Sonnet 4".to_string(),
1938 family: "claude-sonnet".to_string(),
1939 api_key: None,
1940 base_url: None,
1941 attachment: false,
1942 reasoning: false,
1943 tool_call: true,
1944 temperature: true,
1945 release_date: None,
1946 modalities: ModelModalities::default(),
1947 cost: Default::default(),
1948 limit: Default::default(),
1949 }],
1950 },
1951 ProviderConfig {
1952 name: "openai".to_string(),
1953 api_key: Some("test-openai-key".to_string()),
1954 base_url: None,
1955 models: vec![ModelConfig {
1956 id: "gpt-4o".to_string(),
1957 name: "GPT-4o".to_string(),
1958 family: "gpt-4".to_string(),
1959 api_key: None,
1960 base_url: None,
1961 attachment: false,
1962 reasoning: false,
1963 tool_call: true,
1964 temperature: true,
1965 release_date: None,
1966 modalities: ModelModalities::default(),
1967 cost: Default::default(),
1968 limit: Default::default(),
1969 }],
1970 },
1971 ],
1972 ..Default::default()
1973 }
1974 }
1975
1976 #[tokio::test]
1977 async fn test_from_config() {
1978 let agent = Agent::from_config(test_config()).await;
1979 assert!(agent.is_ok());
1980 }
1981
1982 #[tokio::test]
1983 async fn test_session_default() {
1984 let agent = Agent::from_config(test_config()).await.unwrap();
1985 let session = agent.session("/tmp/test-workspace", None);
1986 assert!(session.is_ok());
1987 let debug = format!("{:?}", session.unwrap());
1988 assert!(debug.contains("AgentSession"));
1989 }
1990
1991 #[tokio::test]
1992 async fn test_session_with_model_override() {
1993 let agent = Agent::from_config(test_config()).await.unwrap();
1994 let opts = SessionOptions::new().with_model("openai/gpt-4o");
1995 let session = agent.session("/tmp/test-workspace", Some(opts));
1996 assert!(session.is_ok());
1997 }
1998
1999 #[tokio::test]
2000 async fn test_session_with_invalid_model_format() {
2001 let agent = Agent::from_config(test_config()).await.unwrap();
2002 let opts = SessionOptions::new().with_model("gpt-4o");
2003 let session = agent.session("/tmp/test-workspace", Some(opts));
2004 assert!(session.is_err());
2005 }
2006
2007 #[tokio::test]
2008 async fn test_session_with_model_not_found() {
2009 let agent = Agent::from_config(test_config()).await.unwrap();
2010 let opts = SessionOptions::new().with_model("openai/nonexistent");
2011 let session = agent.session("/tmp/test-workspace", Some(opts));
2012 assert!(session.is_err());
2013 }
2014
2015 #[tokio::test]
2016 async fn test_new_with_hcl_string() {
2017 let hcl = r#"
2018 default_model = "anthropic/claude-sonnet-4-20250514"
2019 providers {
2020 name = "anthropic"
2021 api_key = "test-key"
2022 models {
2023 id = "claude-sonnet-4-20250514"
2024 name = "Claude Sonnet 4"
2025 }
2026 }
2027 "#;
2028 let agent = Agent::new(hcl).await;
2029 assert!(agent.is_ok());
2030 }
2031
2032 #[tokio::test]
2033 async fn test_create_alias_hcl() {
2034 let hcl = r#"
2035 default_model = "anthropic/claude-sonnet-4-20250514"
2036 providers {
2037 name = "anthropic"
2038 api_key = "test-key"
2039 models {
2040 id = "claude-sonnet-4-20250514"
2041 name = "Claude Sonnet 4"
2042 }
2043 }
2044 "#;
2045 let agent = Agent::create(hcl).await;
2046 assert!(agent.is_ok());
2047 }
2048
2049 #[tokio::test]
2050 async fn test_create_and_new_produce_same_result() {
2051 let hcl = r#"
2052 default_model = "anthropic/claude-sonnet-4-20250514"
2053 providers {
2054 name = "anthropic"
2055 api_key = "test-key"
2056 models {
2057 id = "claude-sonnet-4-20250514"
2058 name = "Claude Sonnet 4"
2059 }
2060 }
2061 "#;
2062 let agent_new = Agent::new(hcl).await;
2063 let agent_create = Agent::create(hcl).await;
2064 assert!(agent_new.is_ok());
2065 assert!(agent_create.is_ok());
2066
2067 let session_new = agent_new.unwrap().session("/tmp/test-ws-new", None);
2069 let session_create = agent_create.unwrap().session("/tmp/test-ws-create", None);
2070 assert!(session_new.is_ok());
2071 assert!(session_create.is_ok());
2072 }
2073
2074 #[test]
2075 fn test_from_config_requires_default_model() {
2076 let rt = tokio::runtime::Runtime::new().unwrap();
2077 let config = CodeConfig {
2078 providers: vec![ProviderConfig {
2079 name: "anthropic".to_string(),
2080 api_key: Some("test-key".to_string()),
2081 base_url: None,
2082 models: vec![],
2083 }],
2084 ..Default::default()
2085 };
2086 let result = rt.block_on(Agent::from_config(config));
2087 assert!(result.is_err());
2088 }
2089
2090 #[tokio::test]
2091 async fn test_history_empty_on_new_session() {
2092 let agent = Agent::from_config(test_config()).await.unwrap();
2093 let session = agent.session("/tmp/test-workspace", None).unwrap();
2094 assert!(session.history().is_empty());
2095 }
2096
2097 #[tokio::test]
2098 async fn test_session_options_with_agent_dir() {
2099 let opts = SessionOptions::new()
2100 .with_agent_dir("/tmp/agents")
2101 .with_agent_dir("/tmp/more-agents");
2102 assert_eq!(opts.agent_dirs.len(), 2);
2103 assert_eq!(opts.agent_dirs[0], PathBuf::from("/tmp/agents"));
2104 assert_eq!(opts.agent_dirs[1], PathBuf::from("/tmp/more-agents"));
2105 }
2106
2107 #[test]
2112 fn test_session_options_with_queue_config() {
2113 let qc = SessionQueueConfig::default().with_lane_features();
2114 let opts = SessionOptions::new().with_queue_config(qc.clone());
2115 assert!(opts.queue_config.is_some());
2116
2117 let config = opts.queue_config.unwrap();
2118 assert!(config.enable_dlq);
2119 assert!(config.enable_metrics);
2120 assert!(config.enable_alerts);
2121 assert_eq!(config.default_timeout_ms, Some(60_000));
2122 }
2123
2124 #[tokio::test(flavor = "multi_thread")]
2125 async fn test_session_with_queue_config() {
2126 let agent = Agent::from_config(test_config()).await.unwrap();
2127 let qc = SessionQueueConfig::default();
2128 let opts = SessionOptions::new().with_queue_config(qc);
2129 let session = agent.session("/tmp/test-workspace-queue", Some(opts));
2130 assert!(session.is_ok());
2131 let session = session.unwrap();
2132 assert!(session.has_queue());
2133 }
2134
2135 #[tokio::test]
2136 async fn test_session_without_queue_config() {
2137 let agent = Agent::from_config(test_config()).await.unwrap();
2138 let session = agent.session("/tmp/test-workspace-noqueue", None).unwrap();
2139 assert!(!session.has_queue());
2140 }
2141
2142 #[tokio::test]
2143 async fn test_session_queue_stats_without_queue() {
2144 let agent = Agent::from_config(test_config()).await.unwrap();
2145 let session = agent.session("/tmp/test-workspace-stats", None).unwrap();
2146 let stats = session.queue_stats().await;
2147 assert_eq!(stats.total_pending, 0);
2149 assert_eq!(stats.total_active, 0);
2150 }
2151
2152 #[tokio::test(flavor = "multi_thread")]
2153 async fn test_session_queue_stats_with_queue() {
2154 let agent = Agent::from_config(test_config()).await.unwrap();
2155 let qc = SessionQueueConfig::default();
2156 let opts = SessionOptions::new().with_queue_config(qc);
2157 let session = agent
2158 .session("/tmp/test-workspace-qstats", Some(opts))
2159 .unwrap();
2160 let stats = session.queue_stats().await;
2161 assert_eq!(stats.total_pending, 0);
2163 assert_eq!(stats.total_active, 0);
2164 }
2165
2166 #[tokio::test(flavor = "multi_thread")]
2167 async fn test_session_pending_external_tasks_empty() {
2168 let agent = Agent::from_config(test_config()).await.unwrap();
2169 let qc = SessionQueueConfig::default();
2170 let opts = SessionOptions::new().with_queue_config(qc);
2171 let session = agent
2172 .session("/tmp/test-workspace-ext", Some(opts))
2173 .unwrap();
2174 let tasks = session.pending_external_tasks().await;
2175 assert!(tasks.is_empty());
2176 }
2177
2178 #[tokio::test(flavor = "multi_thread")]
2179 async fn test_session_dead_letters_empty() {
2180 let agent = Agent::from_config(test_config()).await.unwrap();
2181 let qc = SessionQueueConfig::default().with_dlq(Some(100));
2182 let opts = SessionOptions::new().with_queue_config(qc);
2183 let session = agent
2184 .session("/tmp/test-workspace-dlq", Some(opts))
2185 .unwrap();
2186 let dead = session.dead_letters().await;
2187 assert!(dead.is_empty());
2188 }
2189
2190 #[tokio::test(flavor = "multi_thread")]
2191 async fn test_session_queue_metrics_disabled() {
2192 let agent = Agent::from_config(test_config()).await.unwrap();
2193 let qc = SessionQueueConfig::default();
2195 let opts = SessionOptions::new().with_queue_config(qc);
2196 let session = agent
2197 .session("/tmp/test-workspace-nomet", Some(opts))
2198 .unwrap();
2199 let metrics = session.queue_metrics().await;
2200 assert!(metrics.is_none());
2201 }
2202
2203 #[tokio::test(flavor = "multi_thread")]
2204 async fn test_session_queue_metrics_enabled() {
2205 let agent = Agent::from_config(test_config()).await.unwrap();
2206 let qc = SessionQueueConfig::default().with_metrics();
2207 let opts = SessionOptions::new().with_queue_config(qc);
2208 let session = agent
2209 .session("/tmp/test-workspace-met", Some(opts))
2210 .unwrap();
2211 let metrics = session.queue_metrics().await;
2212 assert!(metrics.is_some());
2213 }
2214
2215 #[tokio::test(flavor = "multi_thread")]
2216 async fn test_session_set_lane_handler() {
2217 let agent = Agent::from_config(test_config()).await.unwrap();
2218 let qc = SessionQueueConfig::default();
2219 let opts = SessionOptions::new().with_queue_config(qc);
2220 let session = agent
2221 .session("/tmp/test-workspace-handler", Some(opts))
2222 .unwrap();
2223
2224 session
2226 .set_lane_handler(
2227 SessionLane::Execute,
2228 LaneHandlerConfig {
2229 mode: crate::queue::TaskHandlerMode::External,
2230 timeout_ms: 30_000,
2231 },
2232 )
2233 .await;
2234
2235 }
2238
2239 #[tokio::test(flavor = "multi_thread")]
2244 async fn test_session_has_id() {
2245 let agent = Agent::from_config(test_config()).await.unwrap();
2246 let session = agent.session("/tmp/test-ws-id", None).unwrap();
2247 assert!(!session.session_id().is_empty());
2249 assert_eq!(session.session_id().len(), 36); }
2251
2252 #[tokio::test(flavor = "multi_thread")]
2253 async fn test_session_explicit_id() {
2254 let agent = Agent::from_config(test_config()).await.unwrap();
2255 let opts = SessionOptions::new().with_session_id("my-session-42");
2256 let session = agent.session("/tmp/test-ws-eid", Some(opts)).unwrap();
2257 assert_eq!(session.session_id(), "my-session-42");
2258 }
2259
2260 #[tokio::test(flavor = "multi_thread")]
2261 async fn test_session_save_no_store() {
2262 let agent = Agent::from_config(test_config()).await.unwrap();
2263 let session = agent.session("/tmp/test-ws-save", None).unwrap();
2264 session.save().await.unwrap();
2266 }
2267
2268 #[tokio::test(flavor = "multi_thread")]
2269 async fn test_session_save_and_load() {
2270 let store = Arc::new(crate::store::MemorySessionStore::new());
2271 let agent = Agent::from_config(test_config()).await.unwrap();
2272
2273 let opts = SessionOptions::new()
2274 .with_session_store(store.clone())
2275 .with_session_id("persist-test");
2276 let session = agent.session("/tmp/test-ws-persist", Some(opts)).unwrap();
2277
2278 session.save().await.unwrap();
2280
2281 assert!(store.exists("persist-test").await.unwrap());
2283
2284 let data = store.load("persist-test").await.unwrap().unwrap();
2285 assert_eq!(data.id, "persist-test");
2286 assert!(data.messages.is_empty());
2287 }
2288
2289 #[tokio::test(flavor = "multi_thread")]
2290 async fn test_session_save_with_history() {
2291 let store = Arc::new(crate::store::MemorySessionStore::new());
2292 let agent = Agent::from_config(test_config()).await.unwrap();
2293
2294 let opts = SessionOptions::new()
2295 .with_session_store(store.clone())
2296 .with_session_id("history-test");
2297 let session = agent.session("/tmp/test-ws-hist", Some(opts)).unwrap();
2298
2299 {
2301 let mut h = session.history.write().unwrap();
2302 h.push(Message::user("Hello"));
2303 h.push(Message::user("How are you?"));
2304 }
2305
2306 session.save().await.unwrap();
2307
2308 let data = store.load("history-test").await.unwrap().unwrap();
2309 assert_eq!(data.messages.len(), 2);
2310 }
2311
2312 #[tokio::test(flavor = "multi_thread")]
2313 async fn test_resume_session() {
2314 let store = Arc::new(crate::store::MemorySessionStore::new());
2315 let agent = Agent::from_config(test_config()).await.unwrap();
2316
2317 let opts = SessionOptions::new()
2319 .with_session_store(store.clone())
2320 .with_session_id("resume-test");
2321 let session = agent.session("/tmp/test-ws-resume", Some(opts)).unwrap();
2322 {
2323 let mut h = session.history.write().unwrap();
2324 h.push(Message::user("What is Rust?"));
2325 h.push(Message::user("Tell me more"));
2326 }
2327 session.save().await.unwrap();
2328
2329 let opts2 = SessionOptions::new().with_session_store(store.clone());
2331 let resumed = agent.resume_session("resume-test", opts2).unwrap();
2332
2333 assert_eq!(resumed.session_id(), "resume-test");
2334 let history = resumed.history();
2335 assert_eq!(history.len(), 2);
2336 assert_eq!(history[0].text(), "What is Rust?");
2337 }
2338
2339 #[tokio::test(flavor = "multi_thread")]
2340 async fn test_resume_session_not_found() {
2341 let store = Arc::new(crate::store::MemorySessionStore::new());
2342 let agent = Agent::from_config(test_config()).await.unwrap();
2343
2344 let opts = SessionOptions::new().with_session_store(store.clone());
2345 let result = agent.resume_session("nonexistent", opts);
2346 assert!(result.is_err());
2347 assert!(result.unwrap_err().to_string().contains("not found"));
2348 }
2349
2350 #[tokio::test(flavor = "multi_thread")]
2351 async fn test_resume_session_no_store() {
2352 let agent = Agent::from_config(test_config()).await.unwrap();
2353 let opts = SessionOptions::new();
2354 let result = agent.resume_session("any-id", opts);
2355 assert!(result.is_err());
2356 assert!(result.unwrap_err().to_string().contains("session_store"));
2357 }
2358
2359 #[tokio::test(flavor = "multi_thread")]
2360 async fn test_file_session_store_persistence() {
2361 let dir = tempfile::TempDir::new().unwrap();
2362 let store = Arc::new(
2363 crate::store::FileSessionStore::new(dir.path())
2364 .await
2365 .unwrap(),
2366 );
2367 let agent = Agent::from_config(test_config()).await.unwrap();
2368
2369 let opts = SessionOptions::new()
2371 .with_session_store(store.clone())
2372 .with_session_id("file-persist");
2373 let session = agent
2374 .session("/tmp/test-ws-file-persist", Some(opts))
2375 .unwrap();
2376 {
2377 let mut h = session.history.write().unwrap();
2378 h.push(Message::user("test message"));
2379 }
2380 session.save().await.unwrap();
2381
2382 let store2 = Arc::new(
2384 crate::store::FileSessionStore::new(dir.path())
2385 .await
2386 .unwrap(),
2387 );
2388 let data = store2.load("file-persist").await.unwrap().unwrap();
2389 assert_eq!(data.messages.len(), 1);
2390 }
2391
2392 #[tokio::test(flavor = "multi_thread")]
2393 async fn test_session_options_builders() {
2394 let opts = SessionOptions::new()
2395 .with_session_id("test-id")
2396 .with_auto_save(true);
2397 assert_eq!(opts.session_id, Some("test-id".to_string()));
2398 assert!(opts.auto_save);
2399 }
2400
2401 #[test]
2406 fn test_session_options_with_sandbox_sets_config() {
2407 use crate::sandbox::SandboxConfig;
2408 let cfg = SandboxConfig {
2409 image: "ubuntu:22.04".into(),
2410 memory_mb: 1024,
2411 ..SandboxConfig::default()
2412 };
2413 let opts = SessionOptions::new().with_sandbox(cfg);
2414 assert!(opts.sandbox_config.is_some());
2415 let sc = opts.sandbox_config.unwrap();
2416 assert_eq!(sc.image, "ubuntu:22.04");
2417 assert_eq!(sc.memory_mb, 1024);
2418 }
2419
2420 #[test]
2421 fn test_session_options_default_has_no_sandbox() {
2422 let opts = SessionOptions::default();
2423 assert!(opts.sandbox_config.is_none());
2424 }
2425
2426 #[tokio::test]
2427 async fn test_session_debug_includes_sandbox_config() {
2428 use crate::sandbox::SandboxConfig;
2429 let opts = SessionOptions::new().with_sandbox(SandboxConfig::default());
2430 let debug = format!("{:?}", opts);
2431 assert!(debug.contains("sandbox_config"));
2432 }
2433
2434 #[tokio::test]
2435 async fn test_session_build_with_sandbox_config_no_feature_warn() {
2436 let agent = Agent::from_config(test_config()).await.unwrap();
2439 let opts = SessionOptions::new().with_sandbox(crate::sandbox::SandboxConfig::default());
2440 let session = agent.session("/tmp/test-sandbox-session", Some(opts));
2442 assert!(session.is_ok());
2443 }
2444
2445 #[tokio::test(flavor = "multi_thread")]
2450 async fn test_session_with_memory_store() {
2451 use a3s_memory::InMemoryStore;
2452 let store = Arc::new(InMemoryStore::new());
2453 let agent = Agent::from_config(test_config()).await.unwrap();
2454 let opts = SessionOptions::new().with_memory(store);
2455 let session = agent.session("/tmp/test-ws-memory", Some(opts)).unwrap();
2456 assert!(session.memory().is_some());
2457 }
2458
2459 #[tokio::test(flavor = "multi_thread")]
2460 async fn test_session_without_memory_store() {
2461 let agent = Agent::from_config(test_config()).await.unwrap();
2462 let session = agent.session("/tmp/test-ws-no-memory", None).unwrap();
2463 assert!(session.memory().is_none());
2464 }
2465
2466 #[tokio::test(flavor = "multi_thread")]
2467 async fn test_session_memory_wired_into_config() {
2468 use a3s_memory::InMemoryStore;
2469 let store = Arc::new(InMemoryStore::new());
2470 let agent = Agent::from_config(test_config()).await.unwrap();
2471 let opts = SessionOptions::new().with_memory(store);
2472 let session = agent
2473 .session("/tmp/test-ws-mem-config", Some(opts))
2474 .unwrap();
2475 assert!(session.memory().is_some());
2477 }
2478
2479 #[tokio::test(flavor = "multi_thread")]
2480 async fn test_session_with_file_memory() {
2481 let dir = tempfile::TempDir::new().unwrap();
2482 let agent = Agent::from_config(test_config()).await.unwrap();
2483 let opts = SessionOptions::new().with_file_memory(dir.path());
2484 let session = agent.session("/tmp/test-ws-file-mem", Some(opts)).unwrap();
2485 assert!(session.memory().is_some());
2486 }
2487
2488 #[tokio::test(flavor = "multi_thread")]
2489 async fn test_memory_remember_and_recall() {
2490 use a3s_memory::InMemoryStore;
2491 let store = Arc::new(InMemoryStore::new());
2492 let agent = Agent::from_config(test_config()).await.unwrap();
2493 let opts = SessionOptions::new().with_memory(store);
2494 let session = agent
2495 .session("/tmp/test-ws-mem-recall", Some(opts))
2496 .unwrap();
2497
2498 let memory = session.memory().unwrap();
2499 memory
2500 .remember_success("write a file", &["write".to_string()], "done")
2501 .await
2502 .unwrap();
2503
2504 let results = memory.recall_similar("write", 5).await.unwrap();
2505 assert!(!results.is_empty());
2506 let stats = memory.stats().await.unwrap();
2507 assert_eq!(stats.long_term_count, 1);
2508 }
2509
2510 #[tokio::test(flavor = "multi_thread")]
2515 async fn test_session_tool_timeout_configured() {
2516 let agent = Agent::from_config(test_config()).await.unwrap();
2517 let opts = SessionOptions::new().with_tool_timeout(5000);
2518 let session = agent.session("/tmp/test-ws-timeout", Some(opts)).unwrap();
2519 assert!(!session.id().is_empty());
2520 }
2521
2522 #[tokio::test(flavor = "multi_thread")]
2527 async fn test_session_without_queue_builds_ok() {
2528 let agent = Agent::from_config(test_config()).await.unwrap();
2529 let session = agent.session("/tmp/test-ws-no-queue", None).unwrap();
2530 assert!(!session.id().is_empty());
2531 }
2532
2533 #[tokio::test(flavor = "multi_thread")]
2538 async fn test_concurrent_history_reads() {
2539 let agent = Agent::from_config(test_config()).await.unwrap();
2540 let session = Arc::new(agent.session("/tmp/test-ws-concurrent", None).unwrap());
2541
2542 let handles: Vec<_> = (0..10)
2543 .map(|_| {
2544 let s = Arc::clone(&session);
2545 tokio::spawn(async move { s.history().len() })
2546 })
2547 .collect();
2548
2549 for h in handles {
2550 h.await.unwrap();
2551 }
2552 }
2553
2554 #[tokio::test(flavor = "multi_thread")]
2559 async fn test_session_no_init_warning_without_file_memory() {
2560 let agent = Agent::from_config(test_config()).await.unwrap();
2561 let session = agent.session("/tmp/test-ws-no-warn", None).unwrap();
2562 assert!(session.init_warning().is_none());
2563 }
2564
2565 #[tokio::test(flavor = "multi_thread")]
2566 async fn test_session_with_mcp_manager_builds_ok() {
2567 use crate::mcp::manager::McpManager;
2568 let mcp = Arc::new(McpManager::new());
2569 let agent = Agent::from_config(test_config()).await.unwrap();
2570 let opts = SessionOptions::new().with_mcp(mcp);
2571 let session = agent.session("/tmp/test-ws-mcp", Some(opts)).unwrap();
2573 assert!(!session.id().is_empty());
2574 }
2575
2576 #[test]
2577 fn test_session_command_is_pub() {
2578 use crate::SessionCommand;
2580 let _ = std::marker::PhantomData::<Box<dyn SessionCommand>>;
2581 }
2582
2583 #[tokio::test(flavor = "multi_thread")]
2584 async fn test_session_submit_with_queue_executes() {
2585 let agent = Agent::from_config(test_config()).await.unwrap();
2586 let qc = SessionQueueConfig::default();
2587 let opts = SessionOptions::new().with_queue_config(qc);
2588 let session = agent
2589 .session("/tmp/test-ws-submit-exec", Some(opts))
2590 .unwrap();
2591
2592 struct Echo(serde_json::Value);
2593 #[async_trait::async_trait]
2594 impl crate::queue::SessionCommand for Echo {
2595 async fn execute(&self) -> anyhow::Result<serde_json::Value> {
2596 Ok(self.0.clone())
2597 }
2598 fn command_type(&self) -> &str {
2599 "echo"
2600 }
2601 }
2602
2603 let rx = session
2604 .submit(
2605 SessionLane::Query,
2606 Box::new(Echo(serde_json::json!({"ok": true}))),
2607 )
2608 .await
2609 .expect("submit should succeed with queue configured");
2610
2611 let result = tokio::time::timeout(std::time::Duration::from_secs(2), rx)
2612 .await
2613 .expect("timed out waiting for command result")
2614 .expect("channel closed before result")
2615 .expect("command returned an error");
2616
2617 assert_eq!(result["ok"], true);
2618 }
2619}